核心概念
锁优化与并发控制是数据库性能调优的核心内容。合理的锁策略可以:
- 提高并发度:减少锁等待,提升吞吐量
- 降低死锁概率:优化访问模式,减少冲突
- 提升响应速度:缩短锁持有时间,加快事务处理
并发控制的目标
ACID 特性中的隔离性(Isolation)通过并发控制实现:
- 防止丢失更新
- 防止脏读
- 防止不可重复读
- 防止幻读
锁优化的核心原则
- 锁粒度最小化:只锁必要的资源
- 锁时间最短化:快速提交事务
- 锁冲突最小化:减少并发冲突
- 死锁概率最低化:固定访问顺序
锁优化底层原理
InnoDB 锁优化机制
1. 自适应哈希索引
自适应哈希索引(Adaptive Hash Index, AHI):
- InnoDB 自动为热点数据创建哈希索引
- 等值查询直接从哈希表获取,跳过 B+ 树遍历
- 减少锁扫描范围,提升并发
-- 查看自适应哈希索引状态
SHOW VARIABLES LIKE 'innodb_adaptive_hash_index';
-- 关闭自适应哈希索引(特定场景)
SET GLOBAL innodb_adaptive_hash_index = OFF;
2. 插入缓冲(Insert Buffer)
插入缓冲优化:
- 对非唯一索引的插入操作进行缓冲
- 批量合并插入,减少随机 IO
- 降低页分裂和锁竞争
-- 查看插入缓冲状态
SHOW ENGINE INNODB STATUS\G
-- 输出包含:
INSERT BUFFER AND ADAPTIVE HASH INDEX
IBUF: size 1, free list len 0, seg size 2, 1 merges
3. 锁等待优化
锁等待优化策略:
- 自旋等待:短时等待避免上下文切换
- 优先级调整:长事务优先,减少回滚代价
- 批量唤醒:减少线程调度开销
多版本并发控制(MVCC)
MVCC 通过维护数据的历史版本,实现读写不阻塞:
MVCC 实现机制:
1. 隐藏列:每行记录包含 DB_TRX_ID、DB_ROLL_PTR
2. Undo Log:记录数据的历史版本链
3. Read View:事务可见性判断依据
-- MVCC 在 READ COMMITTED 和 REPEATABLE READ 下生效
-- SELECT 操作不加锁,读取历史版本
-- 示例:读写不阻塞
-- 事务 A:更新数据(持有 X 锁)
UPDATE users SET name = 'new' WHERE id = 1;
-- 事务 B:读取数据(不加锁,读取旧版本)
SELECT name FROM users WHERE id = 1; -- 不阻塞
锁优化策略
策略 1:优化索引减少锁范围
-- 优化前:无索引,全表锁
UPDATE orders SET status = 1 WHERE user_id = 100;
-- 优化后:添加索引,行锁
CREATE INDEX idx_user_id ON orders(user_id);
UPDATE orders SET status = 1 WHERE user_id = 100;
-- 对比:
-- 无索引:锁 100 万行
-- 有索引:锁 10 行(假设 user_id=100 有 10 条记录)
覆盖索引优化:
-- 优化前:回表查询,锁数据行
SELECT * FROM users WHERE email = 'test@example.com' FOR UPDATE;
-- 优化后:覆盖索引,只锁索引
CREATE INDEX idx_email ON users(email);
SELECT email FROM users WHERE email = 'test@example.com' FOR UPDATE;
策略 2:缩短事务持有时间
-- 错误示例:长事务
BEGIN;
SELECT * FROM orders WHERE id = 1 FOR UPDATE;
-- 执行复杂业务逻辑(500ms)
-- 调用外部 API(1000ms)
-- 写日志文件(200ms)
UPDATE orders SET status = 1 WHERE id = 1;
COMMIT;
-- 总耗时:1700ms,锁持有 1700ms
-- 正确示例:短事务
BEGIN;
SELECT * FROM orders WHERE id = 1 FOR UPDATE;
UPDATE orders SET status = 1 WHERE id = 1;
COMMIT;
-- 总耗时:10ms,锁持有 10ms
-- 再执行其他业务逻辑
应用层优化:
// 错误示例
@Transactional
public void processOrder(Long orderId) {
Order order = orderRepository.findById(orderId);
order.setStatus(1);
orderRepository.save(order);
// 以下操作不应在事务内
emailService.sendConfirmation(order); // 发送邮件
logService.logOrder(order); // 写日志
kafkaTemplate.send("order-topic", order); // 发消息
}
// 正确示例
@Transactional
public void processOrder(Long orderId) {
Order order = orderRepository.findById(orderId);
order.setStatus(1);
orderRepository.save(order);
}
// 事务已提交,锁已释放
emailService.sendConfirmation(order);
logService.logOrder(order);
kafkaTemplate.send("order-topic", order);
策略 3:减少锁粒度
-- 优化前:表锁
UPDATE orders SET status = 1; -- 锁全表
-- 优化后:分批更新
UPDATE orders SET status = 1 WHERE id BETWEEN 1 AND 1000;
UPDATE orders SET status = 1 WHERE id BETWEEN 1001 AND 2000;
UPDATE orders SET status = 1 WHERE id BETWEEN 2001 AND 3000;
使用 LIMIT 分批:
// 大批量更新
public void batchUpdateStatus() {
int batchSize = 500;
int affected;
do {
affected = orderMapper.updateStatusBatch(batchSize);
// 每批提交,释放锁
} while (affected > 0);
}
// Mapper XML
<update id="updateStatusBatch">
UPDATE orders
SET status = 1
WHERE status = 0
LIMIT #{batchSize}
</update>
策略 4:调整隔离级别
-- READ UNCOMMITTED:最低隔离,几乎不锁(不推荐)
-- READ COMMITTED:读已提交,减少间隙锁(推荐)
-- REPEATABLE READ:可重复读(MySQL 默认)
-- SERIALIZABLE:最高隔离,几乎全锁(性能最差)
-- 高并发读多写少场景
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- 对比:
-- REPEATABLE READ:范围查询使用临键锁,锁间隙
-- READ COMMITTED:范围查询只锁记录,不锁间隙
隔离级别选择指南:
| 业务场景 | 推荐隔离级别 | 理由 |
|---|---|---|
| 电商下单 | READ COMMITTED | 减少间隙锁,提高并发 |
| 金融转账 | REPEATABLE READ | 需要可重复读保证 |
| 报表统计 | READ COMMITTED | 允许脏读,提升性能 |
| 库存扣减 | READ COMMITTED + 行锁 | 精确控制锁范围 |
策略 5:使用乐观锁
-- 悲观锁:SELECT ... FOR UPDATE
SELECT version FROM products WHERE id = 1 FOR UPDATE;
UPDATE products SET stock = stock - 1, version = version + 1
WHERE id = 1 AND version = 1;
-- 乐观锁:CAS 操作
UPDATE products SET stock = stock - 1, version = version + 1
WHERE id = 1 AND version = 1;
-- 检查影响行数
-- rows_affected = 1: 成功
-- rows_affected = 0: 失败(版本号已变),重试或放弃
应用层实现:
// 乐观锁实现
public boolean decreaseStock(Long productId, int quantity) {
int maxRetries = 3;
for (int i = 0; i < maxRetries; i++) {
Product product = productMapper.selectById(productId);
if (product.getStock() < quantity) {
return false; // 库存不足
}
int updated = productMapper.updateStock(
productId, quantity, product.getVersion()
);
if (updated > 0) {
return true; // 更新成功
}
// 更新失败,重试
}
return false; // 超过最大重试次数
}
策略 6:避免锁升级
-- 锁升级:行锁 → 表锁(当锁数量超过阈值)
-- 优化前:可能触发锁升级
DELETE FROM logs WHERE create_time < '2024-01-01'; -- 100 万行
-- 优化后:分批删除,避免锁升级
DELETE FROM logs WHERE create_time < '2024-01-01' LIMIT 1000;
-- 循环执行直到影响行数为 0
配置参数:
-- 查看锁相关配置
SHOW VARIABLES LIKE 'innodb_buffer_pool_size';
SHOW VARIABLES LIKE 'innodb_lock_wait_timeout';
SHOW VARIABLES LIKE 'innodb_status_output';
-- 调整 innodb_buffer_pool_size 减少锁竞争
SET GLOBAL innodb_buffer_pool_size = 4G; -- 根据内存调整
并发控制实践
场景 1:秒杀系统
-- 方案 1:悲观锁(性能差,不推荐)
BEGIN;
SELECT stock FROM products WHERE id = 1 FOR UPDATE;
UPDATE products SET stock = stock - 1 WHERE id = 1;
COMMIT;
-- 方案 2:乐观锁(推荐)
UPDATE products SET stock = stock - 1
WHERE id = 1 AND stock > 0;
-- 方案 3:Redis 预扣减(高并发推荐)
-- Redis: DECR product:stock:1
-- MySQL 异步同步
完整实现:
// Redis + MySQL 方案
@Service
public class SeckillService {
@Resource
private RedisTemplate<String, Integer> redisTemplate;
@Resource
private ProductMapper productMapper;
public boolean seckill(Long productId) {
String stockKey = "seckill:stock:" + productId;
// 1. Redis 预扣减
Integer stock = redisTemplate.opsForValue().decrement(stockKey);
if (stock == null || stock < 0) {
redisTemplate.opsForValue().increment(stockKey); // 回滚
return false;
}
// 2. 异步写入 MySQL
seckillQueue.add(productId);
return true;
}
}
场景 2:账户转账
-- 优化前:可能死锁
-- 事务 A:A → B
UPDATE accounts SET balance = balance - 100 WHERE id = 'A';
UPDATE accounts SET balance = balance + 100 WHERE id = 'B';
-- 事务 B:B → A
UPDATE accounts SET balance = balance - 50 WHERE id = 'B';
UPDATE accounts SET balance = balance + 50 WHERE id = 'A';
-- 优化后:固定顺序
-- 所有事务都按 id 升序更新
UPDATE accounts SET balance = balance - 100 WHERE id IN ('A', 'B');
UPDATE accounts SET balance = balance + 100 WHERE id IN ('A', 'B');
// 代码实现
@Transactional
public void transfer(String from, String to, BigDecimal amount) {
List<String> ids = Arrays.asList(from, to);
Collections.sort(ids); // 固定顺序
// 批量查询
List<Account> accounts = accountMapper.selectByIds(ids);
// 计算新余额
for (Account account : accounts) {
if (account.getId().equals(from)) {
account.setBalance(account.getBalance().subtract(amount));
} else if (account.getId().equals(to)) {
account.setBalance(account.getBalance().add(amount));
}
}
// 批量更新
accountMapper.batchUpdate(accounts);
}
场景 3:库存扣减
-- 方案 1:单条更新(简单场景)
UPDATE inventory
SET stock = stock - #{quantity}
WHERE product_id = #{productId} AND stock >= #{quantity};
-- 方案 2:分仓库扣减(复杂场景)
-- 优先扣减主仓库,不足再扣减分仓库
UPDATE inventory
SET stock = stock - #{quantity}
WHERE warehouse_id = 'main' AND product_id = #{productId};
-- 如果影响行数不足,扣减分仓库
UPDATE inventory
SET stock = stock - #{remaining}
WHERE warehouse_id = 'sub' AND product_id = #{productId};
场景 4:分布式锁
// Redis 分布式锁实现
@Component
public class DistributedLockService {
@Resource
private RedisTemplate<String, String> redisTemplate;
public boolean tryLock(String key, String value, long expireTime) {
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(key, value, expireTime, TimeUnit.MILLISECONDS);
return Boolean.TRUE.equals(success);
}
public void unlock(String key, String value) {
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
"return redis.call('del', KEYS[1]) " +
"else return 0 end";
RedisScript<Long> redisScript = RedisScript.of(script, Long.class);
redisTemplate.execute(redisScript, Collections.singletonList(key), value);
}
}
// 使用示例
public void processWithLock(String orderId) {
String lockKey = "lock:order:" + orderId;
String lockValue = UUID.randomUUID().toString();
if (distributedLockService.tryLock(lockKey, lockValue, 10000)) {
try {
// 执行业务逻辑
processOrder(orderId);
} finally {
distributedLockService.unlock(lockKey, lockValue);
}
} else {
throw new BusinessException("获取锁失败");
}
}
性能监控与诊断
监控指标
-- 1. 锁等待次数
SHOW GLOBAL STATUS LIKE 'Innodb_lock_wait_requests';
-- 2. 死锁次数
SHOW GLOBAL STATUS LIKE 'Innodb_deadlocks';
-- 3. 当前锁等待
SELECT * FROM sys.innodb_lock_waits;
-- 4. 长事务(超过 60 秒)
SELECT * FROM information_schema.INNODB_TRX
WHERE TIMESTAMPDIFF(SECOND, trx_started, NOW()) > 60;
-- 5. 锁统计信息
SELECT
engine_transaction_id,
lock_table,
lock_type,
lock_mode,
COUNT(*) as lock_count
FROM performance_schema.data_locks
GROUP BY engine_transaction_id, lock_table, lock_type, lock_mode;
性能分析工具
-- 1. performance_schema 锁监控
SELECT
event_name,
COUNT_STAR,
SUM_TIMER_WAIT
FROM performance_schema.events_waits_summary_by_event_name
WHERE event_name LIKE '%innodb%lock%'
ORDER BY SUM_TIMER_WAIT DESC;
-- 2. 慢查询日志(包含锁等待)
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL long_query_time = 1; -- 超过 1 秒记录
SET GLOBAL log_queries_not_using_indexes = 'ON';
-- 3. 实时锁监控
CREATE PROCEDURE show_locks()
BEGIN
SELECT
r.trx_id waiting_trx_id,
r.trx_mysql_thread_id waiting_thread,
r.trx_query waiting_query,
b.trx_id blocking_trx_id,
b.trx_mysql_thread_id blocking_thread,
b.trx_query blocking_query
FROM information_schema.INNODB_LOCK_WAITS w
INNER JOIN information_schema.INNODB_TRX b ON b.trx_id = w.blocking_trx_id
INNER JOIN information_schema.INNODB_TRX r ON r.trx_id = w.requesting_trx_id;
END;
性能调优参数
# my.cnf 配置建议
[mysqld]
# 缓冲池大小(物理内存的 50-80%)
innodb_buffer_pool_size = 8G
# 缓冲池实例数(减少锁竞争)
innodb_buffer_pool_instances = 8
# 锁等待超时(秒)
innodb_lock_wait_timeout = 50
# 死锁检测
innodb_deadlock_detect = ON
# 自增锁模式(高并发)
innodb_autoinc_lock_mode = 2
# 日志缓冲区大小
innodb_log_buffer_size = 64M
# 刷新日志策略
innodb_flush_log_at_trx_commit = 2
# 并发插入
innodb_concurrency_tickets = 5000
高并发场景优化方案
方案 1:热点行优化
-- 问题:热点行(如计数器)锁竞争激烈
UPDATE counters SET value = value + 1 WHERE id = 1;
-- 优化 1:拆分热点行
UPDATE counters SET value = value + 1 WHERE id = 1 AND shard = 0;
UPDATE counters SET value = value + 1 WHERE id = 1 AND shard = 1;
-- 读取时汇总:SELECT SUM(value) FROM counters WHERE id = 1;
-- 优化 2:Redis 缓存 + 异步落库
-- Redis: INCR counter:1
-- 定期批量写入 MySQL
方案 2:队列化串行处理
// 使用消息队列串行化同一资源的更新
@Component
public class AccountUpdateService {
@Resource
private KafkaTemplate<String, AccountUpdate> kafkaTemplate;
public void update(Long accountId, BigDecimal amount) {
// 同一账户进入同一分区,保证顺序
kafkaTemplate.send(
"account-updates",
accountId.hashCode(), // 分区键
new AccountUpdate(accountId, amount)
);
}
}
// 消费者串行处理
@KafkaListener(topics = "account-updates")
public void consume(AccountUpdate update) {
accountMapper.update(update.getAccountId(), update.getAmount());
}
方案 3:分库分表
-- 水平分表:按用户 ID 分表
-- users_0, users_1, ..., users_9
UPDATE users_3 SET balance = balance - 100 WHERE id = 123;
-- 垂直分表:冷热数据分离
-- users_main (热点): id, name, balance
-- users_extra (冷点): id, address, bio
UPDATE users_main SET balance = balance - 100 WHERE id = 123;
方案 4:读写分离
// 主库写,从库读
@Configuration
public class DataSourceConfig {
@Bean
public DataSource dataSource() {
// 配置主从数据源
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put("master", masterDataSource());
targetDataSources.put("slave1", slaveDataSource1());
targetDataSources.put("slave2", slaveDataSource2());
RoutingDataSource routingDataSource = new RoutingDataSource();
routingDataSource.setTargetDataSources(targetDataSources);
routingDataSource.setDefaultTargetDataSource(masterDataSource());
return routingDataSource;
}
}
// 使用
@Transactional // 主库
public void updateAccount(Account account) {
accountMapper.update(account);
}
@ReadOnly // 从库
public Account getAccount(Long id) {
return accountMapper.selectById(id);
}
最佳实践总结
开发规范
-
事务控制
- 事务内只包含必要的数据库操作
- 避免在事务中执行 RPC、HTTP 请求
- 大事务拆分为小事务
-
SQL 编写
- WHERE 条件必须使用索引
- 避免 SELECT *,使用覆盖索引
- 批量操作使用 LIMIT 分批
-
锁的使用
- 优先使用乐观锁
- 必须使用悲观锁时,固定访问顺序
- 避免长事务持有锁
监控告警
# Prometheus 监控配置
groups:
- name: mysql_locks
rules:
- alert: HighLockWait
expr: mysql_innodb_lock_wait_timeout_total > 10
for: 5m
labels:
severity: warning
annotations:
summary: "MySQL 锁等待过高"
- alert: DeadlockDetected
expr: increase(mysql_innodb_deadlocks_total[5m]) > 0
labels:
severity: critical
annotations:
summary: "MySQL 检测到死锁"
配置建议
# 生产环境推荐配置
[mysqld]
# 基础配置
innodb_buffer_pool_size = 物理内存的 70%
innodb_buffer_pool_instances = 8
innodb_log_file_size = 512M
# 锁相关
innodb_lock_wait_timeout = 50
innodb_deadlock_detect = ON
innodb_autoinc_lock_mode = 2
# 并发优化
innodb_read_io_threads = 8
innodb_write_io_threads = 8
innodb_purge_threads = 4
# 日志
slow_query_log = ON
long_query_time = 1
log_queries_not_using_indexes = ON
总结
核心要点
- 锁优化原则:粒度最小、时间最短、冲突最少
- 索引优化:减少锁范围,避免全表锁
- 事务优化:缩短持有时间,避免长事务
- 隔离级别:根据业务选择,READ COMMITTED 减少间隙锁
- 并发控制:MVCC 实现读写分离,乐观锁减少锁竞争
优化策略
- 应用层:固定顺序、重试机制、分批处理、乐观锁
- 数据库层:索引优化、隔离级别调整、参数调优
- 架构层:队列化、分布式锁、分库分表、读写分离
参考资料
- MySQL 官方文档 - InnoDB 锁优化
- MySQL 官方文档 - 事务隔离
- 《高性能 MySQL》第 7-9 章
- 《MySQL 技术内幕:InnoDB 存储引擎》第 6-7 章