欢迎光临殡葬网
详情描述

一、基础更新方法对比

1. 简单循环更新(不推荐)

import pymysql

# 连接数据库
conn = pymysql.connect(
    host='localhost',
    user='root',
    password='password',
    database='test_db'
)
cursor = conn.cursor()

# 低效方法:逐条更新
def update_one_by_one(data_list):
    for item in data_list:
        sql = "UPDATE users SET name=%s, age=%s WHERE id=%s"
        cursor.execute(sql, (item['name'], item['age'], item['id']))
    conn.commit()

2. executemany() 批量更新

def update_with_executemany(data_list):
    sql = "UPDATE users SET name=%s, age=%s WHERE id=%s"

    # 准备参数元组列表
    params = [(item['name'], item['age'], item['id']) for item in data_list]

    cursor.executemany(sql, params)
    conn.commit()

二、高效更新策略

1. 批量更新 + 事务控制

import pymysql
from contextlib import contextmanager

@contextmanager
def get_db_connection():
    conn = pymysql.connect(
        host='localhost',
        user='root',
        password='password',
        database='test_db',
        autocommit=False  # 关闭自动提交
    )
    try:
        yield conn
    finally:
        conn.close()

def batch_update_efficient(data_list, batch_size=1000):
    with get_db_connection() as conn:
        cursor = conn.cursor()

        sql = "UPDATE users SET name=%s, age=%s WHERE id=%s"

        try:
            for i in range(0, len(data_list), batch_size):
                batch = data_list[i:i+batch_size]
                params = [(item['name'], item['age'], item['id']) 
                         for item in batch]

                cursor.executemany(sql, params)
                conn.commit()  # 每批提交一次

            print(f"成功更新 {len(data_list)} 条记录")

        except Exception as e:
            conn.rollback()
            print(f"更新失败: {e}")

2. INSERT ... ON DUPLICATE KEY UPDATE

适用于批量插入或更新

def upsert_data_bulk(data_list):
    sql = """
    INSERT INTO users (id, name, age, email) 
    VALUES (%s, %s, %s, %s)
    ON DUPLICATE KEY UPDATE 
    name=VALUES(name), 
    age=VALUES(age), 
    email=VALUES(email)
    """

    params = [
        (item['id'], item['name'], item['age'], item['email'])
        for item in data_list
    ]

    cursor.executemany(sql, params)
    conn.commit()

3. CASE WHEN 批量条件更新

def batch_update_case_when(data_list):
    """
    使用CASE WHEN语句单次更新多条记录
    适用于需要根据不同条件更新不同值的情况
    """
    # 构建CASE WHEN语句
    case_name = "CASE id "
    case_age = "CASE id "

    id_list = []
    for item in data_list:
        case_name += f"WHEN {item['id']} THEN '{item['name']}' "
        case_age += f"WHEN {item['id']} THEN {item['age']} "
        id_list.append(str(item['id']))

    case_name += "ELSE name END"
    case_age += "ELSE age END"

    ids = ','.join(id_list)

    sql = f"""
    UPDATE users 
    SET name = {case_name},
        age = {case_age}
    WHERE id IN ({ids})
    """

    cursor.execute(sql)
    conn.commit()

三、高级优化技巧

1. 使用连接池

import pymysql
from dbutils.pooled_db import PooledDB

# 创建连接池
pool = PooledDB(
    creator=pymysql,
    maxconnections=10,  # 最大连接数
    mincached=2,        # 初始化连接数
    host='localhost',
    user='root',
    password='password',
    database='test_db',
    autocommit=False
)

def update_with_pool(data_list):
    conn = pool.connection()
    cursor = conn.cursor()

    try:
        sql = "UPDATE users SET name=%s WHERE id=%s"
        params = [(item['name'], item['id']) for item in data_list]

        cursor.executemany(sql, params)
        conn.commit()
    finally:
        cursor.close()
        conn.close()

2. 异步更新(使用aiomysql)

import asyncio
import aiomysql

async def async_batch_update(data_list):
    # 创建异步连接池
    pool = await aiomysql.create_pool(
        host='localhost',
        user='root',
        password='password',
        db='test_db',
        autocommit=False,
        pool_recycle=3600
    )

    async with pool.acquire() as conn:
        async with conn.cursor() as cursor:
            sql = "UPDATE users SET name=%s, age=%s WHERE id=%s"
            params = [(item['name'], item['age'], item['id']) 
                     for item in data_list]

            await cursor.executemany(sql, params)
            await conn.commit()

    pool.close()
    await pool.wait_closed()

3. 使用SQLAlchemy ORM批量更新

from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
from sqlalchemy.orm import sessionmaker

engine = create_engine('mysql+pymysql://user:password@localhost/test_db')
Session = sessionmaker(bind=engine)

def sqlalchemy_batch_update(data_list):
    session = Session()

    try:
        # 方法1:使用bulk_update_mappings
        session.bulk_update_mappings(User, data_list)

        # 方法2:批量查询后更新
        ids = [item['id'] for item in data_list]
        users = session.query(User).filter(User.id.in_(ids)).all()

        # 构建id到对象的映射
        user_dict = {user.id: user for user in users}

        for item in data_list:
            if item['id'] in user_dict:
                user = user_dict[item['id']]
                user.name = item['name']
                user.age = item['age']

        session.commit()

    except Exception as e:
        session.rollback()
        print(f"更新失败: {e}")
    finally:
        session.close()

四、性能优化建议

1. 批量大小调优

def adaptive_batch_update(data_list):
    """
    自适应批量大小
    根据数据量动态调整批量大小
    """
    total_records = len(data_list)

    # 根据数据量调整批量大小
    if total_records < 100:
        batch_size = total_records
    elif total_records < 10000:
        batch_size = 500
    else:
        batch_size = 1000

    return batch_update_efficient(data_list, batch_size)

2. 索引优化

确保更新条件字段有索引:

-- 为更新条件字段创建索引
CREATE INDEX idx_user_id ON users(id);
CREATE INDEX idx_update_condition ON users(status, update_time);

3. 监控和日志

import time
import logging

logging.basicConfig(level=logging.INFO)

def monitored_batch_update(data_list, batch_size=1000):
    start_time = time.time()

    total = len(data_list)
    updated = 0

    for i in range(0, total, batch_size):
        batch = data_list[i:i+batch_size]
        batch_start = time.time()

        # 执行批量更新
        # update_batch(batch)

        updated += len(batch)
        batch_time = time.time() - batch_start

        logging.info(
            f"批次 {i//batch_size + 1}: "
            f"更新 {len(batch)} 条, "
            f"耗时 {batch_time:.2f}秒, "
            f"进度 {updated}/{total}"
        )

    total_time = time.time() - start_time
    logging.info(f"总耗时: {total_time:.2f}秒, 平均: {total_time/total:.4f}秒/条")

五、完整示例

import pymysql
from typing import List, Dict
import time

class MySQLBatchUpdater:
    def __init__(self, host, user, password, database):
        self.conn = pymysql.connect(
            host=host,
            user=user,
            password=password,
            database=database,
            autocommit=False,
            cursorclass=pymysql.cursors.DictCursor
        )

    def update_batch_executemany(self, table: str, 
                                 data: List[Dict], 
                                 key_field: str = 'id',
                                 batch_size: int = 1000) -> bool:
        """
        通用批量更新方法

        Args:
            table: 表名
            data: 更新数据列表
            key_field: 主键字段名
            batch_size: 批量大小
        """
        if not data:
            return True

        # 获取除主键外的所有字段
        sample = data[0]
        update_fields = [f for f in sample.keys() if f != key_field]

        if not update_fields:
            return False

        # 构建SQL
        set_clause = ', '.join([f"{field}=%s" for field in update_fields])
        sql = f"UPDATE {table} SET {set_clause} WHERE {key_field}=%s"

        cursor = self.conn.cursor()

        try:
            for i in range(0, len(data), batch_size):
                batch = data[i:i+batch_size]

                # 准备参数:字段值 + 主键值
                params = []
                for item in batch:
                    param = tuple(item[field] for field in update_fields)
                    param = param + (item[key_field],)
                    params.append(param)

                cursor.executemany(sql, params)
                self.conn.commit()

                print(f"已更新批次 {i//batch_size + 1}, "
                      f"共 {len(batch)} 条记录")

            return True

        except Exception as e:
            self.conn.rollback()
            print(f"批量更新失败: {e}")
            return False
        finally:
            cursor.close()

    def close(self):
        if self.conn:
            self.conn.close()

# 使用示例
if __name__ == "__main__":
    updater = MySQLBatchUpdater(
        host='localhost',
        user='root',
        password='password',
        database='test_db'
    )

    # 模拟数据
    test_data = [
        {'id': i, 'name': f'user_{i}', 'age': 20 + i % 30}
        for i in range(1, 10001)
    ]

    # 执行批量更新
    success = updater.update_batch_executemany(
        table='users',
        data=test_data,
        key_field='id',
        batch_size=500
    )

    updater.close()

六、最佳实践总结

优先使用executemany()而不是循环单条更新 合理设置批量大小(通常500-2000条/批) 关闭autocommit,手动控制事务提交 使用连接池管理数据库连接 确保更新条件字段有索引 监控性能,根据实际情况调整策略 考虑使用ON DUPLICATE KEY UPDATE替代先查询后更新 大数据量更新考虑分时段执行,避免锁表时间过长

选择哪种方法取决于具体场景:数据量大小、更新频率、一致性要求等。通常建议先使用executemany()配合适当批量大小,如遇性能瓶颈再考虑更高级的优化策略。