python连接数据库mysql失败_python操作MySQL数据库报错问题解决
#coding:utf8
'''定义对mysql数据库操作的封装
1、包括基本的单条语句操作,如删除、修改、插入
2、独立地查询单条、多条数据
3、独立地添加多条数据'''
importlogging, os, pymysqlfrom public importconfigclassOperationDbInterface(object):#定义初始化数据库连接
def __init__(self, host_db='127.0.0.1', user_db='root', password_db='123456',
name_db='interface_test', port_db=3306, link_type=0):''':param host_db: 数据库服务主机IP
:param user_db: 数据库连接用户名
:param password_db: 数据库密码
:param name_db: 数据库名称
:param port_db: 数据库端口号,整型数据
:param link_type: 连接类型,用于设置输出数据是元祖还是字典,默认是字典,link_type=0
:return:游标'''
try:if link_type ==0:#创建数据,返回字典
self.conn = pymysql.connect(host=host_db, user=user_db, password=password_db, db=name_db, port=port_db,
charset='utf8', cursorclass=pymysql.cursors.DictCursor)else:#创建数据库,返回元祖
self.conn = pymysql.connect(host=host_db, user=user_db, password=password_db, db=name_db, port=port_db,
charset='utf8')
self.cur=self.conn.cursor()print("输出:%s" %self.cur)exceptpymysql.Error as e:print("创建数据库连接失败|Mysql Error %d: %s" % (e.args[0], e.args[1]))
logging.basicConfig(filename=config.src_path + '/log/syserror.log', level=logging.DEBUG,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
logger= logging.getLogger(__name__)
logger.exception(e)#定义单条数据操作,包含删除和更新操作
defop_sql(self, condition):''':param condition: sql语句,该通用方法可用来替代updateone,deleteone
:return: 字典形式'''
try:
self.cur.execute(condition)#执行sql语句
self.conn.commit() #提交游标数据
result = {'code': '0000', 'message': '执行通用操作成功', 'data': []}exceptpymysql.Error as e:
self.conn.rollback()#执行回滚操作
result = {'code': '9999', 'message': '执行通用操作异常', 'data': []}print("数据库错误|op_sql %d:%s" % (e.args[0], e.args[1]))
logging.basicConfig(filename=config.src_path + '/log/syserror.log', level=logging.DEBUG,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
logger= logging.getLogger(__name__)
logger.exception(e)returnresult#查询表中单条数据
defselect_one(self, condition):''':param condition: 查询单条sql语句
:return: 字典形式的单条查询语句'''
try:
rows_affect=self.cur.execute(condition)if rows_affect >0:
results= self.cur.fetchone() #获取一条数据
result = {'code': '0000', 'message': '执行单条查询操作成功', 'data': results}else:
result= {'code': '0000', 'message': '执行单条查询操作成功', 'data': []}exceptpymysql.Error as e:
self.conn.rollback()#执行回滚操作
result = {'code': '9999', 'message': '执行单条查询异常', 'data': []}print("数据库错误|select_one %d: %s" % (e.args[0], e.args[1]))
logging.basicConfig(filename=config.src_path + '/log/syserror.log', level=logging.DEBUG,
format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s')
logger= logging.getLogger(__name__)
logger.exception(e)returnresult#查询表中多条数据
defselect_all(self, condition):''':param condition: sql语句
:return: 字典形式的批量查询结果'''
try:
rows_affect=self.cur.execute(condition)if rows_affect >0:#将鼠标光标放回到初始位置
self.cur.scroll(0, mode='absolute')#返回游标中所有结果
results =self.cur.fetchall()
result= {'code': '0000', 'message': '执行批量查询操作成功', 'data': results}else:
result= {'code': '0000', 'message': '执行批量查询操作成功', 'data': []}exceptpymysql.Error as e:#执行回滚操作
self.conn.rollback()
result= {'code': '9999', 'message': '执行批量查询异常', 'data': []}
logging.basicConfig(filename=config.src_path + '/log/syserror.log', level=logging.DEBUG,
format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s')
logger= logging.getLogger(__name__)
logger.exception(e)returnresult#定义表中插入数据操作的方法
definsert_data(self, condition, params):''':param condition: 插入语句
:param params: 插入数据,列表形式[(,,,),(,,,)]
:return: 字典形式批量插入数据结果'''
try:
results= self.cur.executemany(condition, params) #返回插入数据的条数
self.conn.commit()
result= {'code': '0000', 'message': '执行批量插入数据成功', 'data': results}exceptpymysql.Error as e:
self.conn.rollback()#执行回滚操作
result = {'code': '9999', 'message': '执行批量插入数据异常', 'data': []}print("数据库错误|insert_data %d: %s" % (e.args[0], e.args[1]))
logging.basicConfig(filename=config.src_path + '/log/syserror.log', level=logging.DEBUG,
format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(massage)s')
logger= logging.getLogger(__name__)
logger.exception(e)returnresult#关闭数据库
def __del__(self):if self.cur is notNone:
self.cur.close()#关闭游标
if self.conn is notNone:
self.conn.close()#释放数据库资源
if __name__ == "__main__":
test= OperationDbInterface() #实例化类
result_select_all = test.select_all("select * from config_test") #查询所有数据
result_select_one = test.select_one("select * from config_test where id=1") #查询单条数据
result_oP_sql = test.op_sql("UPDATE config_test SET value_config='修改后的valuetest' WHERE id=1") #通用操作,修改数据库
result__insert_sql =test.insert_data("insert into config_test (key_config, value_config, description, status) values (%s, %s, %s, %s)",
[('mytest1', 'mytestvalue1', '我插入的测试数据1', 1), ('mytest2', 'mytestvalue2', '我插入的测试数据2', 0)])print(result_select_all['data'], result_select_all['message'])print(result_select_one['data'], result_select_one['message'])print(result_oP_sql['data'], result_oP_sql['message'])print(result__insert_sql['data'], result__insert_sql['message'])