Skip to content
清晨的一缕阳光
返回

MySQL 锁优化与并发控制

核心概念

锁优化与并发控制是数据库性能调优的核心内容。合理的锁策略可以:

并发控制的目标

ACID 特性中的隔离性(Isolation)通过并发控制实现:
- 防止丢失更新
- 防止脏读
- 防止不可重复读
- 防止幻读

锁优化的核心原则

  1. 锁粒度最小化:只锁必要的资源
  2. 锁时间最短化:快速提交事务
  3. 锁冲突最小化:减少并发冲突
  4. 死锁概率最低化:固定访问顺序

锁优化底层原理

InnoDB 锁优化机制

1. 自适应哈希索引

自适应哈希索引(Adaptive Hash Index, AHI):
- InnoDB 自动为热点数据创建哈希索引
- 等值查询直接从哈希表获取,跳过 B+ 树遍历
- 减少锁扫描范围,提升并发
-- 查看自适应哈希索引状态
SHOW VARIABLES LIKE 'innodb_adaptive_hash_index';

-- 关闭自适应哈希索引(特定场景)
SET GLOBAL innodb_adaptive_hash_index = OFF;

2. 插入缓冲(Insert Buffer)

插入缓冲优化:
- 对非唯一索引的插入操作进行缓冲
- 批量合并插入,减少随机 IO
- 降低页分裂和锁竞争
-- 查看插入缓冲状态
SHOW ENGINE INNODB STATUS\G

-- 输出包含:
INSERT BUFFER AND ADAPTIVE HASH INDEX
IBUF: size 1, free list len 0, seg size 2, 1 merges

3. 锁等待优化

锁等待优化策略:
- 自旋等待:短时等待避免上下文切换
- 优先级调整:长事务优先,减少回滚代价
- 批量唤醒:减少线程调度开销

多版本并发控制(MVCC)

MVCC 通过维护数据的历史版本,实现读写不阻塞

MVCC 实现机制:
1. 隐藏列:每行记录包含 DB_TRX_ID、DB_ROLL_PTR
2. Undo Log:记录数据的历史版本链
3. Read View:事务可见性判断依据
-- MVCC 在 READ COMMITTED 和 REPEATABLE READ 下生效
-- SELECT 操作不加锁,读取历史版本

-- 示例:读写不阻塞
-- 事务 A:更新数据(持有 X 锁)
UPDATE users SET name = 'new' WHERE id = 1;

-- 事务 B:读取数据(不加锁,读取旧版本)
SELECT name FROM users WHERE id = 1;  -- 不阻塞

锁优化策略

策略 1:优化索引减少锁范围

-- 优化前:无索引,全表锁
UPDATE orders SET status = 1 WHERE user_id = 100;

-- 优化后:添加索引,行锁
CREATE INDEX idx_user_id ON orders(user_id);
UPDATE orders SET status = 1 WHERE user_id = 100;

-- 对比:
-- 无索引:锁 100 万行
-- 有索引:锁 10 行(假设 user_id=100 有 10 条记录)

覆盖索引优化:

-- 优化前:回表查询,锁数据行
SELECT * FROM users WHERE email = 'test@example.com' FOR UPDATE;

-- 优化后:覆盖索引,只锁索引
CREATE INDEX idx_email ON users(email);
SELECT email FROM users WHERE email = 'test@example.com' FOR UPDATE;

策略 2:缩短事务持有时间

-- 错误示例:长事务
BEGIN;
SELECT * FROM orders WHERE id = 1 FOR UPDATE;
-- 执行复杂业务逻辑(500ms)
-- 调用外部 API(1000ms)
-- 写日志文件(200ms)
UPDATE orders SET status = 1 WHERE id = 1;
COMMIT;
-- 总耗时:1700ms,锁持有 1700ms

-- 正确示例:短事务
BEGIN;
SELECT * FROM orders WHERE id = 1 FOR UPDATE;
UPDATE orders SET status = 1 WHERE id = 1;
COMMIT;
-- 总耗时:10ms,锁持有 10ms
-- 再执行其他业务逻辑

应用层优化:

// 错误示例
@Transactional
public void processOrder(Long orderId) {
    Order order = orderRepository.findById(orderId);
    order.setStatus(1);
    orderRepository.save(order);
    
    // 以下操作不应在事务内
    emailService.sendConfirmation(order);  // 发送邮件
    logService.logOrder(order);            // 写日志
    kafkaTemplate.send("order-topic", order);  // 发消息
}

// 正确示例
@Transactional
public void processOrder(Long orderId) {
    Order order = orderRepository.findById(orderId);
    order.setStatus(1);
    orderRepository.save(order);
}
// 事务已提交,锁已释放
emailService.sendConfirmation(order);
logService.logOrder(order);
kafkaTemplate.send("order-topic", order);

策略 3:减少锁粒度

-- 优化前:表锁
UPDATE orders SET status = 1;  -- 锁全表

-- 优化后:分批更新
UPDATE orders SET status = 1 WHERE id BETWEEN 1 AND 1000;
UPDATE orders SET status = 1 WHERE id BETWEEN 1001 AND 2000;
UPDATE orders SET status = 1 WHERE id BETWEEN 2001 AND 3000;

使用 LIMIT 分批:

// 大批量更新
public void batchUpdateStatus() {
    int batchSize = 500;
    int affected;
    
    do {
        affected = orderMapper.updateStatusBatch(batchSize);
        // 每批提交,释放锁
    } while (affected > 0);
}

// Mapper XML
<update id="updateStatusBatch">
    UPDATE orders 
    SET status = 1 
    WHERE status = 0 
    LIMIT #{batchSize}
</update>

策略 4:调整隔离级别

-- READ UNCOMMITTED:最低隔离,几乎不锁(不推荐)
-- READ COMMITTED:读已提交,减少间隙锁(推荐)
-- REPEATABLE READ:可重复读(MySQL 默认)
-- SERIALIZABLE:最高隔离,几乎全锁(性能最差)

-- 高并发读多写少场景
SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;

-- 对比:
-- REPEATABLE READ:范围查询使用临键锁,锁间隙
-- READ COMMITTED:范围查询只锁记录,不锁间隙

隔离级别选择指南:

业务场景推荐隔离级别理由
电商下单READ COMMITTED减少间隙锁,提高并发
金融转账REPEATABLE READ需要可重复读保证
报表统计READ COMMITTED允许脏读,提升性能
库存扣减READ COMMITTED + 行锁精确控制锁范围

策略 5:使用乐观锁

-- 悲观锁:SELECT ... FOR UPDATE
SELECT version FROM products WHERE id = 1 FOR UPDATE;
UPDATE products SET stock = stock - 1, version = version + 1 
WHERE id = 1 AND version = 1;

-- 乐观锁:CAS 操作
UPDATE products SET stock = stock - 1, version = version + 1 
WHERE id = 1 AND version = 1;

-- 检查影响行数
-- rows_affected = 1: 成功
-- rows_affected = 0: 失败(版本号已变),重试或放弃

应用层实现:

// 乐观锁实现
public boolean decreaseStock(Long productId, int quantity) {
    int maxRetries = 3;
    for (int i = 0; i < maxRetries; i++) {
        Product product = productMapper.selectById(productId);
        if (product.getStock() < quantity) {
            return false;  // 库存不足
        }
        
        int updated = productMapper.updateStock(
            productId, quantity, product.getVersion()
        );
        
        if (updated > 0) {
            return true;  // 更新成功
        }
        // 更新失败,重试
    }
    return false;  // 超过最大重试次数
}

策略 6:避免锁升级

-- 锁升级:行锁 → 表锁(当锁数量超过阈值)

-- 优化前:可能触发锁升级
DELETE FROM logs WHERE create_time < '2024-01-01';  -- 100 万行

-- 优化后:分批删除,避免锁升级
DELETE FROM logs WHERE create_time < '2024-01-01' LIMIT 1000;
-- 循环执行直到影响行数为 0

配置参数:

-- 查看锁相关配置
SHOW VARIABLES LIKE 'innodb_buffer_pool_size';
SHOW VARIABLES LIKE 'innodb_lock_wait_timeout';
SHOW VARIABLES LIKE 'innodb_status_output';

-- 调整 innodb_buffer_pool_size 减少锁竞争
SET GLOBAL innodb_buffer_pool_size = 4G;  -- 根据内存调整

并发控制实践

场景 1:秒杀系统

-- 方案 1:悲观锁(性能差,不推荐)
BEGIN;
SELECT stock FROM products WHERE id = 1 FOR UPDATE;
UPDATE products SET stock = stock - 1 WHERE id = 1;
COMMIT;

-- 方案 2:乐观锁(推荐)
UPDATE products SET stock = stock - 1 
WHERE id = 1 AND stock > 0;

-- 方案 3:Redis 预扣减(高并发推荐)
-- Redis: DECR product:stock:1
-- MySQL 异步同步

完整实现:

// Redis + MySQL 方案
@Service
public class SeckillService {
    
    @Resource
    private RedisTemplate<String, Integer> redisTemplate;
    
    @Resource
    private ProductMapper productMapper;
    
    public boolean seckill(Long productId) {
        String stockKey = "seckill:stock:" + productId;
        
        // 1. Redis 预扣减
        Integer stock = redisTemplate.opsForValue().decrement(stockKey);
        if (stock == null || stock < 0) {
            redisTemplate.opsForValue().increment(stockKey);  // 回滚
            return false;
        }
        
        // 2. 异步写入 MySQL
        seckillQueue.add(productId);
        
        return true;
    }
}

场景 2:账户转账

-- 优化前:可能死锁
-- 事务 A:A → B
UPDATE accounts SET balance = balance - 100 WHERE id = 'A';
UPDATE accounts SET balance = balance + 100 WHERE id = 'B';

-- 事务 B:B → A
UPDATE accounts SET balance = balance - 50 WHERE id = 'B';
UPDATE accounts SET balance = balance + 50 WHERE id = 'A';

-- 优化后:固定顺序
-- 所有事务都按 id 升序更新
UPDATE accounts SET balance = balance - 100 WHERE id IN ('A', 'B');
UPDATE accounts SET balance = balance + 100 WHERE id IN ('A', 'B');
// 代码实现
@Transactional
public void transfer(String from, String to, BigDecimal amount) {
    List<String> ids = Arrays.asList(from, to);
    Collections.sort(ids);  // 固定顺序
    
    // 批量查询
    List<Account> accounts = accountMapper.selectByIds(ids);
    
    // 计算新余额
    for (Account account : accounts) {
        if (account.getId().equals(from)) {
            account.setBalance(account.getBalance().subtract(amount));
        } else if (account.getId().equals(to)) {
            account.setBalance(account.getBalance().add(amount));
        }
    }
    
    // 批量更新
    accountMapper.batchUpdate(accounts);
}

场景 3:库存扣减

-- 方案 1:单条更新(简单场景)
UPDATE inventory 
SET stock = stock - #{quantity} 
WHERE product_id = #{productId} AND stock >= #{quantity};

-- 方案 2:分仓库扣减(复杂场景)
-- 优先扣减主仓库,不足再扣减分仓库
UPDATE inventory 
SET stock = stock - #{quantity} 
WHERE warehouse_id = 'main' AND product_id = #{productId};

-- 如果影响行数不足,扣减分仓库
UPDATE inventory 
SET stock = stock - #{remaining} 
WHERE warehouse_id = 'sub' AND product_id = #{productId};

场景 4:分布式锁

// Redis 分布式锁实现
@Component
public class DistributedLockService {
    
    @Resource
    private RedisTemplate<String, String> redisTemplate;
    
    public boolean tryLock(String key, String value, long expireTime) {
        Boolean success = redisTemplate.opsForValue()
            .setIfAbsent(key, value, expireTime, TimeUnit.MILLISECONDS);
        return Boolean.TRUE.equals(success);
    }
    
    public void unlock(String key, String value) {
        String script = 
            "if redis.call('get', KEYS[1]) == ARGV[1] then " +
            "return redis.call('del', KEYS[1]) " +
            "else return 0 end";
        
        RedisScript<Long> redisScript = RedisScript.of(script, Long.class);
        redisTemplate.execute(redisScript, Collections.singletonList(key), value);
    }
}

// 使用示例
public void processWithLock(String orderId) {
    String lockKey = "lock:order:" + orderId;
    String lockValue = UUID.randomUUID().toString();
    
    if (distributedLockService.tryLock(lockKey, lockValue, 10000)) {
        try {
            // 执行业务逻辑
            processOrder(orderId);
        } finally {
            distributedLockService.unlock(lockKey, lockValue);
        }
    } else {
        throw new BusinessException("获取锁失败");
    }
}

性能监控与诊断

监控指标

-- 1. 锁等待次数
SHOW GLOBAL STATUS LIKE 'Innodb_lock_wait_requests';

-- 2. 死锁次数
SHOW GLOBAL STATUS LIKE 'Innodb_deadlocks';

-- 3. 当前锁等待
SELECT * FROM sys.innodb_lock_waits;

-- 4. 长事务(超过 60 秒)
SELECT * FROM information_schema.INNODB_TRX 
WHERE TIMESTAMPDIFF(SECOND, trx_started, NOW()) > 60;

-- 5. 锁统计信息
SELECT 
    engine_transaction_id,
    lock_table,
    lock_type,
    lock_mode,
    COUNT(*) as lock_count
FROM performance_schema.data_locks
GROUP BY engine_transaction_id, lock_table, lock_type, lock_mode;

性能分析工具

-- 1. performance_schema 锁监控
SELECT 
    event_name,
    COUNT_STAR,
    SUM_TIMER_WAIT
FROM performance_schema.events_waits_summary_by_event_name
WHERE event_name LIKE '%innodb%lock%'
ORDER BY SUM_TIMER_WAIT DESC;

-- 2. 慢查询日志(包含锁等待)
SET GLOBAL slow_query_log = 'ON';
SET GLOBAL long_query_time = 1;  -- 超过 1 秒记录
SET GLOBAL log_queries_not_using_indexes = 'ON';

-- 3. 实时锁监控
CREATE PROCEDURE show_locks()
BEGIN
    SELECT 
        r.trx_id waiting_trx_id,
        r.trx_mysql_thread_id waiting_thread,
        r.trx_query waiting_query,
        b.trx_id blocking_trx_id,
        b.trx_mysql_thread_id blocking_thread,
        b.trx_query blocking_query
    FROM information_schema.INNODB_LOCK_WAITS w
    INNER JOIN information_schema.INNODB_TRX b ON b.trx_id = w.blocking_trx_id
    INNER JOIN information_schema.INNODB_TRX r ON r.trx_id = w.requesting_trx_id;
END;

性能调优参数

# my.cnf 配置建议

[mysqld]
# 缓冲池大小(物理内存的 50-80%)
innodb_buffer_pool_size = 8G

# 缓冲池实例数(减少锁竞争)
innodb_buffer_pool_instances = 8

# 锁等待超时(秒)
innodb_lock_wait_timeout = 50

# 死锁检测
innodb_deadlock_detect = ON

# 自增锁模式(高并发)
innodb_autoinc_lock_mode = 2

# 日志缓冲区大小
innodb_log_buffer_size = 64M

# 刷新日志策略
innodb_flush_log_at_trx_commit = 2

# 并发插入
innodb_concurrency_tickets = 5000

高并发场景优化方案

方案 1:热点行优化

-- 问题:热点行(如计数器)锁竞争激烈
UPDATE counters SET value = value + 1 WHERE id = 1;

-- 优化 1:拆分热点行
UPDATE counters SET value = value + 1 WHERE id = 1 AND shard = 0;
UPDATE counters SET value = value + 1 WHERE id = 1 AND shard = 1;
-- 读取时汇总:SELECT SUM(value) FROM counters WHERE id = 1;

-- 优化 2:Redis 缓存 + 异步落库
-- Redis: INCR counter:1
-- 定期批量写入 MySQL

方案 2:队列化串行处理

// 使用消息队列串行化同一资源的更新
@Component
public class AccountUpdateService {
    
    @Resource
    private KafkaTemplate<String, AccountUpdate> kafkaTemplate;
    
    public void update(Long accountId, BigDecimal amount) {
        // 同一账户进入同一分区,保证顺序
        kafkaTemplate.send(
            "account-updates",
            accountId.hashCode(),  // 分区键
            new AccountUpdate(accountId, amount)
        );
    }
}

// 消费者串行处理
@KafkaListener(topics = "account-updates")
public void consume(AccountUpdate update) {
    accountMapper.update(update.getAccountId(), update.getAmount());
}

方案 3:分库分表

-- 水平分表:按用户 ID 分表
-- users_0, users_1, ..., users_9
UPDATE users_3 SET balance = balance - 100 WHERE id = 123;

-- 垂直分表:冷热数据分离
-- users_main (热点): id, name, balance
-- users_extra (冷点): id, address, bio

UPDATE users_main SET balance = balance - 100 WHERE id = 123;

方案 4:读写分离

// 主库写,从库读
@Configuration
public class DataSourceConfig {
    
    @Bean
    public DataSource dataSource() {
        // 配置主从数据源
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put("master", masterDataSource());
        targetDataSources.put("slave1", slaveDataSource1());
        targetDataSources.put("slave2", slaveDataSource2());
        
        RoutingDataSource routingDataSource = new RoutingDataSource();
        routingDataSource.setTargetDataSources(targetDataSources);
        routingDataSource.setDefaultTargetDataSource(masterDataSource());
        
        return routingDataSource;
    }
}

// 使用
@Transactional  // 主库
public void updateAccount(Account account) {
    accountMapper.update(account);
}

@ReadOnly  // 从库
public Account getAccount(Long id) {
    return accountMapper.selectById(id);
}

最佳实践总结

开发规范

  1. 事务控制

    • 事务内只包含必要的数据库操作
    • 避免在事务中执行 RPC、HTTP 请求
    • 大事务拆分为小事务
  2. SQL 编写

    • WHERE 条件必须使用索引
    • 避免 SELECT *,使用覆盖索引
    • 批量操作使用 LIMIT 分批
  3. 锁的使用

    • 优先使用乐观锁
    • 必须使用悲观锁时,固定访问顺序
    • 避免长事务持有锁

监控告警

# Prometheus 监控配置
groups:
  - name: mysql_locks
    rules:
      - alert: HighLockWait
        expr: mysql_innodb_lock_wait_timeout_total > 10
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "MySQL 锁等待过高"
          
      - alert: DeadlockDetected
        expr: increase(mysql_innodb_deadlocks_total[5m]) > 0
        labels:
          severity: critical
        annotations:
          summary: "MySQL 检测到死锁"

配置建议

# 生产环境推荐配置
[mysqld]
# 基础配置
innodb_buffer_pool_size = 物理内存的 70%
innodb_buffer_pool_instances = 8
innodb_log_file_size = 512M

# 锁相关
innodb_lock_wait_timeout = 50
innodb_deadlock_detect = ON
innodb_autoinc_lock_mode = 2

# 并发优化
innodb_read_io_threads = 8
innodb_write_io_threads = 8
innodb_purge_threads = 4

# 日志
slow_query_log = ON
long_query_time = 1
log_queries_not_using_indexes = ON

总结

核心要点

  1. 锁优化原则:粒度最小、时间最短、冲突最少
  2. 索引优化:减少锁范围,避免全表锁
  3. 事务优化:缩短持有时间,避免长事务
  4. 隔离级别:根据业务选择,READ COMMITTED 减少间隙锁
  5. 并发控制:MVCC 实现读写分离,乐观锁减少锁竞争

优化策略

参考资料


分享这篇文章到:

上一篇文章
Collections 工具类详解
下一篇文章
Queue 并发队列详解