import pymysql
# 连接数据库
conn = pymysql.connect(
host='localhost',
user='root',
password='password',
database='test_db'
)
cursor = conn.cursor()
# 低效方法:逐条更新
def update_one_by_one(data_list):
for item in data_list:
sql = "UPDATE users SET name=%s, age=%s WHERE id=%s"
cursor.execute(sql, (item['name'], item['age'], item['id']))
conn.commit()
def update_with_executemany(data_list):
sql = "UPDATE users SET name=%s, age=%s WHERE id=%s"
# 准备参数元组列表
params = [(item['name'], item['age'], item['id']) for item in data_list]
cursor.executemany(sql, params)
conn.commit()
import pymysql
from contextlib import contextmanager
@contextmanager
def get_db_connection():
conn = pymysql.connect(
host='localhost',
user='root',
password='password',
database='test_db',
autocommit=False # 关闭自动提交
)
try:
yield conn
finally:
conn.close()
def batch_update_efficient(data_list, batch_size=1000):
with get_db_connection() as conn:
cursor = conn.cursor()
sql = "UPDATE users SET name=%s, age=%s WHERE id=%s"
try:
for i in range(0, len(data_list), batch_size):
batch = data_list[i:i+batch_size]
params = [(item['name'], item['age'], item['id'])
for item in batch]
cursor.executemany(sql, params)
conn.commit() # 每批提交一次
print(f"成功更新 {len(data_list)} 条记录")
except Exception as e:
conn.rollback()
print(f"更新失败: {e}")
适用于批量插入或更新
def upsert_data_bulk(data_list):
sql = """
INSERT INTO users (id, name, age, email)
VALUES (%s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
name=VALUES(name),
age=VALUES(age),
email=VALUES(email)
"""
params = [
(item['id'], item['name'], item['age'], item['email'])
for item in data_list
]
cursor.executemany(sql, params)
conn.commit()
def batch_update_case_when(data_list):
"""
使用CASE WHEN语句单次更新多条记录
适用于需要根据不同条件更新不同值的情况
"""
# 构建CASE WHEN语句
case_name = "CASE id "
case_age = "CASE id "
id_list = []
for item in data_list:
case_name += f"WHEN {item['id']} THEN '{item['name']}' "
case_age += f"WHEN {item['id']} THEN {item['age']} "
id_list.append(str(item['id']))
case_name += "ELSE name END"
case_age += "ELSE age END"
ids = ','.join(id_list)
sql = f"""
UPDATE users
SET name = {case_name},
age = {case_age}
WHERE id IN ({ids})
"""
cursor.execute(sql)
conn.commit()
import pymysql
from dbutils.pooled_db import PooledDB
# 创建连接池
pool = PooledDB(
creator=pymysql,
maxconnections=10, # 最大连接数
mincached=2, # 初始化连接数
host='localhost',
user='root',
password='password',
database='test_db',
autocommit=False
)
def update_with_pool(data_list):
conn = pool.connection()
cursor = conn.cursor()
try:
sql = "UPDATE users SET name=%s WHERE id=%s"
params = [(item['name'], item['id']) for item in data_list]
cursor.executemany(sql, params)
conn.commit()
finally:
cursor.close()
conn.close()
import asyncio
import aiomysql
async def async_batch_update(data_list):
# 创建异步连接池
pool = await aiomysql.create_pool(
host='localhost',
user='root',
password='password',
db='test_db',
autocommit=False,
pool_recycle=3600
)
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
sql = "UPDATE users SET name=%s, age=%s WHERE id=%s"
params = [(item['name'], item['age'], item['id'])
for item in data_list]
await cursor.executemany(sql, params)
await conn.commit()
pool.close()
await pool.wait_closed()
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
from sqlalchemy.orm import sessionmaker
engine = create_engine('mysql+pymysql://user:password@localhost/test_db')
Session = sessionmaker(bind=engine)
def sqlalchemy_batch_update(data_list):
session = Session()
try:
# 方法1:使用bulk_update_mappings
session.bulk_update_mappings(User, data_list)
# 方法2:批量查询后更新
ids = [item['id'] for item in data_list]
users = session.query(User).filter(User.id.in_(ids)).all()
# 构建id到对象的映射
user_dict = {user.id: user for user in users}
for item in data_list:
if item['id'] in user_dict:
user = user_dict[item['id']]
user.name = item['name']
user.age = item['age']
session.commit()
except Exception as e:
session.rollback()
print(f"更新失败: {e}")
finally:
session.close()
def adaptive_batch_update(data_list):
"""
自适应批量大小
根据数据量动态调整批量大小
"""
total_records = len(data_list)
# 根据数据量调整批量大小
if total_records < 100:
batch_size = total_records
elif total_records < 10000:
batch_size = 500
else:
batch_size = 1000
return batch_update_efficient(data_list, batch_size)
确保更新条件字段有索引:
-- 为更新条件字段创建索引
CREATE INDEX idx_user_id ON users(id);
CREATE INDEX idx_update_condition ON users(status, update_time);
import time
import logging
logging.basicConfig(level=logging.INFO)
def monitored_batch_update(data_list, batch_size=1000):
start_time = time.time()
total = len(data_list)
updated = 0
for i in range(0, total, batch_size):
batch = data_list[i:i+batch_size]
batch_start = time.time()
# 执行批量更新
# update_batch(batch)
updated += len(batch)
batch_time = time.time() - batch_start
logging.info(
f"批次 {i//batch_size + 1}: "
f"更新 {len(batch)} 条, "
f"耗时 {batch_time:.2f}秒, "
f"进度 {updated}/{total}"
)
total_time = time.time() - start_time
logging.info(f"总耗时: {total_time:.2f}秒, 平均: {total_time/total:.4f}秒/条")
import pymysql
from typing import List, Dict
import time
class MySQLBatchUpdater:
def __init__(self, host, user, password, database):
self.conn = pymysql.connect(
host=host,
user=user,
password=password,
database=database,
autocommit=False,
cursorclass=pymysql.cursors.DictCursor
)
def update_batch_executemany(self, table: str,
data: List[Dict],
key_field: str = 'id',
batch_size: int = 1000) -> bool:
"""
通用批量更新方法
Args:
table: 表名
data: 更新数据列表
key_field: 主键字段名
batch_size: 批量大小
"""
if not data:
return True
# 获取除主键外的所有字段
sample = data[0]
update_fields = [f for f in sample.keys() if f != key_field]
if not update_fields:
return False
# 构建SQL
set_clause = ', '.join([f"{field}=%s" for field in update_fields])
sql = f"UPDATE {table} SET {set_clause} WHERE {key_field}=%s"
cursor = self.conn.cursor()
try:
for i in range(0, len(data), batch_size):
batch = data[i:i+batch_size]
# 准备参数:字段值 + 主键值
params = []
for item in batch:
param = tuple(item[field] for field in update_fields)
param = param + (item[key_field],)
params.append(param)
cursor.executemany(sql, params)
self.conn.commit()
print(f"已更新批次 {i//batch_size + 1}, "
f"共 {len(batch)} 条记录")
return True
except Exception as e:
self.conn.rollback()
print(f"批量更新失败: {e}")
return False
finally:
cursor.close()
def close(self):
if self.conn:
self.conn.close()
# 使用示例
if __name__ == "__main__":
updater = MySQLBatchUpdater(
host='localhost',
user='root',
password='password',
database='test_db'
)
# 模拟数据
test_data = [
{'id': i, 'name': f'user_{i}', 'age': 20 + i % 30}
for i in range(1, 10001)
]
# 执行批量更新
success = updater.update_batch_executemany(
table='users',
data=test_data,
key_field='id',
batch_size=500
)
updater.close()
选择哪种方法取决于具体场景:数据量大小、更新频率、一致性要求等。通常建议先使用executemany()配合适当批量大小,如遇性能瓶颈再考虑更高级的优化策略。