pip install mysql-connector-python
import mysql.connector
from mysql.connector import Error
import datetime
def create_connection():
"""创建数据库连接"""
try:
connection = mysql.connector.connect(
host='localhost', # 主机地址
user='your_username', # 用户名
password='your_password', # 密码
database='your_database' # 数据库名
)
return connection
except Error as e:
print(f"连接数据库时出错: {e}")
return None
def insert_single_data(connection):
"""插入单条数据"""
try:
cursor = connection.cursor()
# SQL插入语句
insert_query = """
INSERT INTO users (name, email, age, created_at)
VALUES (%s, %s, %s, %s)
"""
# 要插入的数据
data = ('张三', 'zhangsan@example.com', 25, datetime.datetime.now())
cursor.execute(insert_query, data)
connection.commit()
print(f"插入成功,ID: {cursor.lastrowid}")
except Error as e:
print(f"插入数据时出错: {e}")
connection.rollback()
finally:
if cursor:
cursor.close()
def insert_multiple_data(connection):
"""批量插入数据"""
try:
cursor = connection.cursor()
# SQL插入语句
insert_query = """
INSERT INTO products (product_name, price, quantity)
VALUES (%s, %s, %s)
"""
# 批量数据
products = [
('iPhone 13', 6999.00, 100),
('MacBook Pro', 12999.00, 50),
('iPad Air', 4799.00, 80),
('AirPods Pro', 1999.00, 150)
]
cursor.executemany(insert_query, products)
connection.commit()
print(f"批量插入成功,影响行数: {cursor.rowcount}")
except Error as e:
print(f"批量插入时出错: {e}")
connection.rollback()
finally:
if cursor:
cursor.close()
def insert_with_return(connection):
"""插入并返回结果"""
try:
cursor = connection.cursor()
# 插入数据
insert_query = """
INSERT INTO orders (customer_name, total_amount, order_date)
VALUES (%s, %s, %s)
"""
data = ('李四', 5000.00, datetime.datetime.now())
cursor.execute(insert_query, data)
connection.commit()
# 获取刚插入的数据
select_query = "SELECT * FROM orders WHERE id = %s"
cursor.execute(select_query, (cursor.lastrowid,))
result = cursor.fetchone()
print("插入的数据:")
print(f"订单ID: {result[0]}")
print(f"客户名: {result[1]}")
print(f"总金额: {result[2]}")
print(f"订单时间: {result[3]}")
except Error as e:
print(f"操作出错: {e}")
connection.rollback()
finally:
if cursor:
cursor.close()
def main():
# 创建连接
connection = create_connection()
if connection:
try:
# 插入单条数据
print("=== 插入单条数据 ===")
insert_single_data(connection)
# 批量插入数据
print("\n=== 批量插入数据 ===")
insert_multiple_data(connection)
# 插入并返回
print("\n=== 插入并返回结果 ===")
insert_with_return(connection)
finally:
# 关闭连接
connection.close()
print("\n数据库连接已关闭")
if __name__ == "__main__":
main()
config.py# config.py
DB_CONFIG = {
'host': 'localhost',
'user': 'your_username',
'password': 'your_password',
'database': 'your_database',
'port': 3306,
'charset': 'utf8mb4',
'use_unicode': True,
'pool_size': 5, # 连接池大小
'pool_name': 'mypool'
}
# db_pool.py
import mysql.connector
from mysql.connector import pooling
from config import DB_CONFIG
class DatabasePool:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.init_pool()
return cls._instance
def init_pool(self):
try:
self.pool = pooling.MySQLConnectionPool(
pool_name=DB_CONFIG['pool_name'],
pool_size=DB_CONFIG['pool_size'],
host=DB_CONFIG['host'],
user=DB_CONFIG['user'],
password=DB_CONFIG['password'],
database=DB_CONFIG['database'],
port=DB_CONFIG['port'],
charset=DB_CONFIG['charset']
)
print("数据库连接池创建成功")
except Exception as e:
print(f"创建连接池失败: {e}")
self.pool = None
def get_connection(self):
"""获取数据库连接"""
if self.pool:
return self.pool.get_connection()
return None
def close_all(self):
"""关闭所有连接"""
if hasattr(self, 'pool'):
self.pool._remove_connections()
# models.py
from db_pool import DatabasePool
import datetime
from typing import List, Dict, Any, Optional
class UserDAO:
def __init__(self):
self.db_pool = DatabasePool()
def insert_user(self, user_data: Dict[str, Any]) -> Optional[int]:
"""插入用户数据"""
connection = self.db_pool.get_connection()
if not connection:
return None
cursor = None
try:
cursor = connection.cursor(dictionary=True)
insert_query = """
INSERT INTO users (name, email, age, created_at, updated_at)
VALUES (%(name)s, %(email)s, %(age)s, %(created_at)s, %(updated_at)s)
"""
# 添加时间戳
now = datetime.datetime.now()
user_data['created_at'] = now
user_data['updated_at'] = now
cursor.execute(insert_query, user_data)
connection.commit()
return cursor.lastrowid
except Exception as e:
print(f"插入用户失败: {e}")
if connection:
connection.rollback()
return None
finally:
if cursor:
cursor.close()
if connection:
connection.close()
def bulk_insert_users(self, users: List[Dict[str, Any]]) -> int:
"""批量插入用户"""
connection = self.db_pool.get_connection()
if not connection:
return 0
cursor = None
try:
cursor = connection.cursor()
insert_query = """
INSERT INTO users (name, email, age, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s)
"""
# 准备数据
now = datetime.datetime.now()
data = []
for user in users:
data.append((
user.get('name'),
user.get('email'),
user.get('age'),
now,
now
))
cursor.executemany(insert_query, data)
connection.commit()
return cursor.rowcount
except Exception as e:
print(f"批量插入失败: {e}")
if connection:
connection.rollback()
return 0
finally:
if cursor:
cursor.close()
if connection:
connection.close()
# main.py
import json
from models import UserDAO
from datetime import datetime
def demo_insert_single():
"""演示插入单条数据"""
dao = UserDAO()
user_data = {
'name': '王五',
'email': 'wangwu@example.com',
'age': 30
}
user_id = dao.insert_user(user_data)
if user_id:
print(f"用户插入成功,ID: {user_id}")
else:
print("用户插入失败")
def demo_bulk_insert():
"""演示批量插入"""
dao = UserDAO()
users = [
{'name': '用户1', 'email': 'user1@example.com', 'age': 20},
{'name': '用户2', 'email': 'user2@example.com', 'age': 25},
{'name': '用户3', 'email': 'user3@example.com', 'age': 30},
{'name': '用户4', 'email': 'user4@example.com', 'age': 35},
{'name': '用户5', 'email': 'user5@example.com', 'age': 40}
]
count = dao.bulk_insert_users(users)
print(f"批量插入成功,影响行数: {count}")
def demo_from_json_file():
"""从JSON文件读取并插入数据"""
dao = UserDAO()
try:
# 读取JSON文件
with open('users.json', 'r', encoding='utf-8') as f:
users = json.load(f)
# 插入数据
count = dao.bulk_insert_users(users)
print(f"从JSON文件插入成功,影响行数: {count}")
except FileNotFoundError:
print("找不到users.json文件")
except json.JSONDecodeError:
print("JSON文件格式错误")
def create_table_if_not_exists():
"""创建表(如果不存在)"""
from db_pool import DatabasePool
connection = DatabasePool().get_connection()
if not connection:
return
cursor = None
try:
cursor = connection.cursor()
create_table_query = """
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(100) UNIQUE NOT NULL,
age INT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_email (email),
INDEX idx_created_at (created_at)
)
"""
cursor.execute(create_table_query)
print("用户表创建/检查完成")
except Exception as e:
print(f"创建表失败: {e}")
finally:
if cursor:
cursor.close()
if connection:
connection.close()
def main():
print("=== MySQL数据插入演示 ===")
# 创建表
create_table_if_not_exists()
# 演示各种插入方式
print("\n1. 插入单条数据:")
demo_insert_single()
print("\n2. 批量插入数据:")
demo_bulk_insert()
print("\n3. 从JSON文件插入:")
demo_from_json_file()
if __name__ == "__main__":
main()
users.json[
{
"name": "赵六",
"email": "zhaoliu@example.com",
"age": 28
},
{
"name": "钱七",
"email": "qianqi@example.com",
"age": 32
},
{
"name": "孙八",
"email": "sunba@example.com",
"age": 45
}
]
如果你更喜欢使用PyMySQL:
# 安装
# pip install pymysql
import pymysql
from pymysql import Error
def connect_with_pymysql():
"""使用PyMySQL连接"""
try:
connection = pymysql.connect(
host='localhost',
user='your_username',
password='your_password',
database='your_database',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor # 返回字典格式
)
with connection.cursor() as cursor:
# 插入数据
sql = "INSERT INTO users (name, email) VALUES (%s, %s)"
cursor.execute(sql, ('测试用户', 'test@example.com'))
connection.commit()
print("插入成功")
except Error as e:
print(f"错误: {e}")
finally:
if connection:
connection.close()
根据你的具体需求选择合适的版本,基础版本适合简单场景,高级版本适合生产环境使用。