python连接数据库mysql失败_python操作MySQL数据库报错问题解决

#coding:utf8

'''定义对mysql数据库操作的封装

1、包括基本的单条语句操作,如删除、修改、插入

2、独立地查询单条、多条数据

3、独立地添加多条数据'''

importlogging, os, pymysqlfrom public importconfigclassOperationDbInterface(object):#定义初始化数据库连接

def __init__(self, host_db='127.0.0.1', user_db='root', password_db='123456',

name_db='interface_test', port_db=3306, link_type=0):''':param host_db: 数据库服务主机IP

:param user_db: 数据库连接用户名

:param password_db: 数据库密码

:param name_db: 数据库名称

:param port_db: 数据库端口号,整型数据

:param link_type: 连接类型,用于设置输出数据是元祖还是字典,默认是字典,link_type=0

:return:游标'''

try:if link_type ==0:#创建数据,返回字典

self.conn = pymysql.connect(host=host_db, user=user_db, password=password_db, db=name_db, port=port_db,

charset='utf8', cursorclass=pymysql.cursors.DictCursor)else:#创建数据库,返回元祖

self.conn = pymysql.connect(host=host_db, user=user_db, password=password_db, db=name_db, port=port_db,

charset='utf8')

self.cur=self.conn.cursor()print("输出:%s" %self.cur)exceptpymysql.Error as e:print("创建数据库连接失败|Mysql Error %d: %s" % (e.args[0], e.args[1]))

logging.basicConfig(filename=config.src_path + '/log/syserror.log', level=logging.DEBUG,

format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')

logger= logging.getLogger(__name__)

logger.exception(e)#定义单条数据操作,包含删除和更新操作

defop_sql(self, condition):''':param condition: sql语句,该通用方法可用来替代updateone,deleteone

:return: 字典形式'''

try:

self.cur.execute(condition)#执行sql语句

self.conn.commit() #提交游标数据

result = {'code': '0000', 'message': '执行通用操作成功', 'data': []}exceptpymysql.Error as e:

self.conn.rollback()#执行回滚操作

result = {'code': '9999', 'message': '执行通用操作异常', 'data': []}print("数据库错误|op_sql %d:%s" % (e.args[0], e.args[1]))

logging.basicConfig(filename=config.src_path + '/log/syserror.log', level=logging.DEBUG,

format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')

logger= logging.getLogger(__name__)

logger.exception(e)returnresult#查询表中单条数据

defselect_one(self, condition):''':param condition: 查询单条sql语句

:return: 字典形式的单条查询语句'''

try:

rows_affect=self.cur.execute(condition)if rows_affect >0:

results= self.cur.fetchone() #获取一条数据

result = {'code': '0000', 'message': '执行单条查询操作成功', 'data': results}else:

result= {'code': '0000', 'message': '执行单条查询操作成功', 'data': []}exceptpymysql.Error as e:

self.conn.rollback()#执行回滚操作

result = {'code': '9999', 'message': '执行单条查询异常', 'data': []}print("数据库错误|select_one %d: %s" % (e.args[0], e.args[1]))

logging.basicConfig(filename=config.src_path + '/log/syserror.log', level=logging.DEBUG,

format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s')

logger= logging.getLogger(__name__)

logger.exception(e)returnresult#查询表中多条数据

defselect_all(self, condition):''':param condition: sql语句

:return: 字典形式的批量查询结果'''

try:

rows_affect=self.cur.execute(condition)if rows_affect >0:#将鼠标光标放回到初始位置

self.cur.scroll(0, mode='absolute')#返回游标中所有结果

results =self.cur.fetchall()

result= {'code': '0000', 'message': '执行批量查询操作成功', 'data': results}else:

result= {'code': '0000', 'message': '执行批量查询操作成功', 'data': []}exceptpymysql.Error as e:#执行回滚操作

self.conn.rollback()

result= {'code': '9999', 'message': '执行批量查询异常', 'data': []}

logging.basicConfig(filename=config.src_path + '/log/syserror.log', level=logging.DEBUG,

format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s')

logger= logging.getLogger(__name__)

logger.exception(e)returnresult#定义表中插入数据操作的方法

definsert_data(self, condition, params):''':param condition: 插入语句

:param params: 插入数据,列表形式[(,,,),(,,,)]

:return: 字典形式批量插入数据结果'''

try:

results= self.cur.executemany(condition, params) #返回插入数据的条数

self.conn.commit()

result= {'code': '0000', 'message': '执行批量插入数据成功', 'data': results}exceptpymysql.Error as e:

self.conn.rollback()#执行回滚操作

result = {'code': '9999', 'message': '执行批量插入数据异常', 'data': []}print("数据库错误|insert_data %d: %s" % (e.args[0], e.args[1]))

logging.basicConfig(filename=config.src_path + '/log/syserror.log', level=logging.DEBUG,

format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(massage)s')

logger= logging.getLogger(__name__)

logger.exception(e)returnresult#关闭数据库

def __del__(self):if self.cur is notNone:

self.cur.close()#关闭游标

if self.conn is notNone:

self.conn.close()#释放数据库资源

if __name__ == "__main__":

test= OperationDbInterface() #实例化类

result_select_all = test.select_all("select * from config_test") #查询所有数据

result_select_one = test.select_one("select * from config_test where id=1") #查询单条数据

result_oP_sql = test.op_sql("UPDATE config_test SET value_config='修改后的valuetest' WHERE id=1") #通用操作,修改数据库

result__insert_sql =test.insert_data("insert into config_test (key_config, value_config, description, status) values (%s, %s, %s, %s)",

[('mytest1', 'mytestvalue1', '我插入的测试数据1', 1), ('mytest2', 'mytestvalue2', '我插入的测试数据2', 0)])print(result_select_all['data'], result_select_all['message'])print(result_select_one['data'], result_select_one['message'])print(result_oP_sql['data'], result_oP_sql['message'])print(result__insert_sql['data'], result__insert_sql['message'])