import pymysql from dbutils.pooled_db import PooledDB class MysqlUtils(): # 连接池对象 __pool = None def __init__(self,**config): self.host = config["host"] self.port = int(config["port"]) self.user = config["user"] self.passwd = config["passwd"] self.database = None if "database" in config: self.database = config["database"] self.pool = self.get_conn() # self.pool = None def get_conn(self): if MysqlUtils.__pool is None: __pool = PooledDB(pymysql, mincached=1, maxcached=5, maxconnections=5, host=self.host, port=self.port, user=self.user, passwd=self.passwd, database=self.database, use_unicode=False, blocking=False, charset="utf8") self.pool = __pool return __pool def query(self,sql): con = None cur = None try: con = self.pool.connection() cur = con.cursor() count = cur.execute(sql) if count > 0: result = cur.fetchall() result = [[j.decode() if isinstance(j,bytes) else j for j in i] for i in result] print(result) else: result = False return result except Exception as e: print(e) finally: if cur: cur.close() if con: con.close() def update(self,sql): con = None cur = None try: con = self.pool.connection() cur = con.cursor() cur.execute(sql) con.commit() except Exception as e: con.rollback() # 事务回滚 print(e) finally: if cur: cur.close() if con: con.close() def close(self): if MysqlUtils.__pool: MysqlUtils.__pool.close()