Python3 多线程(连接池)操作MySQL插入数据

  

下面我将详细介绍如何使用Python3多线程(连接池)操作MySQL插入数据。

准备工作

首先,我们需要安装Python3以及对应的MySQL库。可以使用以下命令进行安装:

pip install pymysql

创建数据库连接池

使用连接池可以最大化利用已经建立的连接,提高程序的性能和并发能力。下面是创建连接池的示例代码:

import pymysql
from DBUtils.PooledDB import PooledDB

pool = PooledDB(
    creator=pymysql, 
    mincached=1, 
    maxcached=10, 
    host='localhost', 
    port=3306, 
    user='root', 
    password='password', 
    database='mydb', 
    charset='utf8'
)

在这个示例中,我们使用了DBUtils.PooledDB库来创建连接池。mincached参数表示连接池中最少保持的连接数,maxcached参数表示连接池中最大的连接数。使用该库创建的pool对象可以用来获取数据库连接。

连接池实现多线程插入数据

下面是连接池实现多线程插入数据的示例代码:

from concurrent.futures import ThreadPoolExecutor


def insert(data):
    query = "INSERT INTO mytable (name, age) VALUES (%s, %s)"
    conn = pool.connection()
    cursor = conn.cursor()
    cursor.execute(query, data)
    conn.commit()
    cursor.close()
    conn.close()

with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(insert, ('Alice', 25)) for _ in range(50)]

在这个示例中,我们使用了Python的concurrent.futures库来实现多线程操作MySQL。ThreadPoolExecutor对象可以用来创建一个线程池,其中max_workers表示线程池中线程的最大数量。

executor.submit()方法用于提交一个任务,该方法会立即返回一个Future对象。在这个示例中,我们提交了50个插入数据的任务,并指定了参数值('Alice', 25)

insert函数中,我们使用连接池来获取数据库连接,执行插入操作并关闭数据库连接。使用连接池可以显著提高程序的性能和并发能力。

示例说明

以下是两个使用连接池多线程插入数据的示例:

示例1:批量插入数据

from concurrent.futures import ThreadPoolExecutor


def insert_many(data_list):
    query = "INSERT INTO mytable (name, age) VALUES (%s, %s)"
    conn = pool.connection()
    cursor = conn.cursor()
    cursor.executemany(query, data_list)
    conn.commit()
    cursor.close()
    conn.close()

data = [('Alice', 25), ('Bob', 30), ('Charlie', 35), ('David', 40)]
with ThreadPoolExecutor(max_workers=1) as executor:
    futures = [executor.submit(insert_many, (data[i:i+2])) for i in range(0, len(data), 2)]

在这个示例中,我们使用executemany()方法批量插入数据。在insert_many函数中,我们传入了一个包含多个数据的列表data_list,使用executemany()方法批量插入数据。

在主程序中,我们使用一个线程处理相邻的两个数据项,通过循环遍历将数据分组并提交到线程池处理。

示例2:数据插入失败重试

from concurrent.futures import ThreadPoolExecutor, as_completed


def insert_with_retry(data):
    query = "INSERT INTO mytable (name, age) VALUES (%s, %s)"
    conn = pool.connection()
    cursor = conn.cursor()
    for i in range(3):
        try:
            cursor.execute(query, data)
            conn.commit()
            break
        except Exception as e:
            print(f"Insert failed, retrying ({i+1}/3)")
    cursor.close()
    conn.close()

data = [('Alice', 25), ('Bob', 30), ('Charlie', 'invalid'), ('David', 40)]
with ThreadPoolExecutor(max_workers=2) as executor:
    futures = [executor.submit(insert_with_retry, d) for d in data]

for future in as_completed(futures):
    try:
        future.result()
    except Exception as e:
        print(f"Insert failed: {e}")

在这个示例中,我们在insert_with_retry函数中增加了数据插入失败重试的逻辑。在主程序中,我们提交了包含多个数据的任务列表,并使用as_completed()函数迭代多个Future对象并尝试获取其结果。

由于数据中包含了一个无效的年龄值,在插入操作中将会抛出异常。在这种情况下,我们将尝试最多三次执行插入操作。

以上就是Python3多线程(连接池)操作MySQL插入数据的完整攻略。

相关文章