欢迎光临殡葬网
详情描述

基础版本(使用mysql-connector-python)

1. 首先安装MySQL驱动

pip install mysql-connector-python

2. 基础脚本示例

import mysql.connector
from mysql.connector import Error
import datetime

def create_connection():
    """创建数据库连接"""
    try:
        connection = mysql.connector.connect(
            host='localhost',          # 主机地址
            user='your_username',      # 用户名
            password='your_password',  # 密码
            database='your_database'   # 数据库名
        )
        return connection
    except Error as e:
        print(f"连接数据库时出错: {e}")
        return None

def insert_single_data(connection):
    """插入单条数据"""
    try:
        cursor = connection.cursor()

        # SQL插入语句
        insert_query = """
        INSERT INTO users (name, email, age, created_at)
        VALUES (%s, %s, %s, %s)
        """

        # 要插入的数据
        data = ('张三', 'zhangsan@example.com', 25, datetime.datetime.now())

        cursor.execute(insert_query, data)
        connection.commit()
        print(f"插入成功,ID: {cursor.lastrowid}")

    except Error as e:
        print(f"插入数据时出错: {e}")
        connection.rollback()
    finally:
        if cursor:
            cursor.close()

def insert_multiple_data(connection):
    """批量插入数据"""
    try:
        cursor = connection.cursor()

        # SQL插入语句
        insert_query = """
        INSERT INTO products (product_name, price, quantity)
        VALUES (%s, %s, %s)
        """

        # 批量数据
        products = [
            ('iPhone 13', 6999.00, 100),
            ('MacBook Pro', 12999.00, 50),
            ('iPad Air', 4799.00, 80),
            ('AirPods Pro', 1999.00, 150)
        ]

        cursor.executemany(insert_query, products)
        connection.commit()
        print(f"批量插入成功,影响行数: {cursor.rowcount}")

    except Error as e:
        print(f"批量插入时出错: {e}")
        connection.rollback()
    finally:
        if cursor:
            cursor.close()

def insert_with_return(connection):
    """插入并返回结果"""
    try:
        cursor = connection.cursor()

        # 插入数据
        insert_query = """
        INSERT INTO orders (customer_name, total_amount, order_date)
        VALUES (%s, %s, %s)
        """

        data = ('李四', 5000.00, datetime.datetime.now())
        cursor.execute(insert_query, data)
        connection.commit()

        # 获取刚插入的数据
        select_query = "SELECT * FROM orders WHERE id = %s"
        cursor.execute(select_query, (cursor.lastrowid,))
        result = cursor.fetchone()

        print("插入的数据:")
        print(f"订单ID: {result[0]}")
        print(f"客户名: {result[1]}")
        print(f"总金额: {result[2]}")
        print(f"订单时间: {result[3]}")

    except Error as e:
        print(f"操作出错: {e}")
        connection.rollback()
    finally:
        if cursor:
            cursor.close()

def main():
    # 创建连接
    connection = create_connection()

    if connection:
        try:
            # 插入单条数据
            print("=== 插入单条数据 ===")
            insert_single_data(connection)

            # 批量插入数据
            print("\n=== 批量插入数据 ===")
            insert_multiple_data(connection)

            # 插入并返回
            print("\n=== 插入并返回结果 ===")
            insert_with_return(connection)

        finally:
            # 关闭连接
            connection.close()
            print("\n数据库连接已关闭")

if __name__ == "__main__":
    main()

高级版本(使用连接池和配置文件)

1. 配置文件 config.py

# config.py
DB_CONFIG = {
    'host': 'localhost',
    'user': 'your_username',
    'password': 'your_password',
    'database': 'your_database',
    'port': 3306,
    'charset': 'utf8mb4',
    'use_unicode': True,
    'pool_size': 5,  # 连接池大小
    'pool_name': 'mypool'
}

2. 数据库连接池类

# db_pool.py
import mysql.connector
from mysql.connector import pooling
from config import DB_CONFIG

class DatabasePool:
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance.init_pool()
        return cls._instance

    def init_pool(self):
        try:
            self.pool = pooling.MySQLConnectionPool(
                pool_name=DB_CONFIG['pool_name'],
                pool_size=DB_CONFIG['pool_size'],
                host=DB_CONFIG['host'],
                user=DB_CONFIG['user'],
                password=DB_CONFIG['password'],
                database=DB_CONFIG['database'],
                port=DB_CONFIG['port'],
                charset=DB_CONFIG['charset']
            )
            print("数据库连接池创建成功")
        except Exception as e:
            print(f"创建连接池失败: {e}")
            self.pool = None

    def get_connection(self):
        """获取数据库连接"""
        if self.pool:
            return self.pool.get_connection()
        return None

    def close_all(self):
        """关闭所有连接"""
        if hasattr(self, 'pool'):
            self.pool._remove_connections()

3. 数据模型类

# models.py
from db_pool import DatabasePool
import datetime
from typing import List, Dict, Any, Optional

class UserDAO:
    def __init__(self):
        self.db_pool = DatabasePool()

    def insert_user(self, user_data: Dict[str, Any]) -> Optional[int]:
        """插入用户数据"""
        connection = self.db_pool.get_connection()
        if not connection:
            return None

        cursor = None
        try:
            cursor = connection.cursor(dictionary=True)

            insert_query = """
            INSERT INTO users (name, email, age, created_at, updated_at)
            VALUES (%(name)s, %(email)s, %(age)s, %(created_at)s, %(updated_at)s)
            """

            # 添加时间戳
            now = datetime.datetime.now()
            user_data['created_at'] = now
            user_data['updated_at'] = now

            cursor.execute(insert_query, user_data)
            connection.commit()

            return cursor.lastrowid

        except Exception as e:
            print(f"插入用户失败: {e}")
            if connection:
                connection.rollback()
            return None
        finally:
            if cursor:
                cursor.close()
            if connection:
                connection.close()

    def bulk_insert_users(self, users: List[Dict[str, Any]]) -> int:
        """批量插入用户"""
        connection = self.db_pool.get_connection()
        if not connection:
            return 0

        cursor = None
        try:
            cursor = connection.cursor()

            insert_query = """
            INSERT INTO users (name, email, age, created_at, updated_at)
            VALUES (%s, %s, %s, %s, %s)
            """

            # 准备数据
            now = datetime.datetime.now()
            data = []
            for user in users:
                data.append((
                    user.get('name'),
                    user.get('email'),
                    user.get('age'),
                    now,
                    now
                ))

            cursor.executemany(insert_query, data)
            connection.commit()

            return cursor.rowcount

        except Exception as e:
            print(f"批量插入失败: {e}")
            if connection:
                connection.rollback()
            return 0
        finally:
            if cursor:
                cursor.close()
            if connection:
                connection.close()

4. 主程序

# main.py
import json
from models import UserDAO
from datetime import datetime

def demo_insert_single():
    """演示插入单条数据"""
    dao = UserDAO()

    user_data = {
        'name': '王五',
        'email': 'wangwu@example.com',
        'age': 30
    }

    user_id = dao.insert_user(user_data)
    if user_id:
        print(f"用户插入成功,ID: {user_id}")
    else:
        print("用户插入失败")

def demo_bulk_insert():
    """演示批量插入"""
    dao = UserDAO()

    users = [
        {'name': '用户1', 'email': 'user1@example.com', 'age': 20},
        {'name': '用户2', 'email': 'user2@example.com', 'age': 25},
        {'name': '用户3', 'email': 'user3@example.com', 'age': 30},
        {'name': '用户4', 'email': 'user4@example.com', 'age': 35},
        {'name': '用户5', 'email': 'user5@example.com', 'age': 40}
    ]

    count = dao.bulk_insert_users(users)
    print(f"批量插入成功,影响行数: {count}")

def demo_from_json_file():
    """从JSON文件读取并插入数据"""
    dao = UserDAO()

    try:
        # 读取JSON文件
        with open('users.json', 'r', encoding='utf-8') as f:
            users = json.load(f)

        # 插入数据
        count = dao.bulk_insert_users(users)
        print(f"从JSON文件插入成功,影响行数: {count}")

    except FileNotFoundError:
        print("找不到users.json文件")
    except json.JSONDecodeError:
        print("JSON文件格式错误")

def create_table_if_not_exists():
    """创建表(如果不存在)"""
    from db_pool import DatabasePool

    connection = DatabasePool().get_connection()
    if not connection:
        return

    cursor = None
    try:
        cursor = connection.cursor()

        create_table_query = """
        CREATE TABLE IF NOT EXISTS users (
            id INT AUTO_INCREMENT PRIMARY KEY,
            name VARCHAR(100) NOT NULL,
            email VARCHAR(100) UNIQUE NOT NULL,
            age INT,
            created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
            updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
            INDEX idx_email (email),
            INDEX idx_created_at (created_at)
        )
        """

        cursor.execute(create_table_query)
        print("用户表创建/检查完成")

    except Exception as e:
        print(f"创建表失败: {e}")
    finally:
        if cursor:
            cursor.close()
        if connection:
            connection.close()

def main():
    print("=== MySQL数据插入演示 ===")

    # 创建表
    create_table_if_not_exists()

    # 演示各种插入方式
    print("\n1. 插入单条数据:")
    demo_insert_single()

    print("\n2. 批量插入数据:")
    demo_bulk_insert()

    print("\n3. 从JSON文件插入:")
    demo_from_json_file()

if __name__ == "__main__":
    main()

5. 使用示例的JSON文件 users.json

[
    {
        "name": "赵六",
        "email": "zhaoliu@example.com",
        "age": 28
    },
    {
        "name": "钱七",
        "email": "qianqi@example.com",
        "age": 32
    },
    {
        "name": "孙八",
        "email": "sunba@example.com",
        "age": 45
    }
]

使用其他驱动(PyMySQL版本)

如果你更喜欢使用PyMySQL:

# 安装
# pip install pymysql

import pymysql
from pymysql import Error

def connect_with_pymysql():
    """使用PyMySQL连接"""
    try:
        connection = pymysql.connect(
            host='localhost',
            user='your_username',
            password='your_password',
            database='your_database',
            charset='utf8mb4',
            cursorclass=pymysql.cursors.DictCursor  # 返回字典格式
        )

        with connection.cursor() as cursor:
            # 插入数据
            sql = "INSERT INTO users (name, email) VALUES (%s, %s)"
            cursor.execute(sql, ('测试用户', 'test@example.com'))

        connection.commit()
        print("插入成功")

    except Error as e:
        print(f"错误: {e}")
    finally:
        if connection:
            connection.close()

最佳实践建议

使用连接池:对于频繁的数据库操作,使用连接池可以提高性能 参数化查询:使用参数化查询防止SQL注入 异常处理:妥善处理数据库异常,确保连接关闭 配置分离:将数据库配置放在单独的配置文件中 使用上下文管理器:确保资源正确释放 日志记录:添加适当的日志记录 批量操作:大量数据时使用批量插入

根据你的具体需求选择合适的版本,基础版本适合简单场景,高级版本适合生产环境使用。