RocketMQ 采用独特的 CommitLog 存储结构,将所有消息顺序写入同一文件,实现高吞吐量和低延迟。本文将深入探讨 CommitLog 的设计原理、存储结构以及刷盘策略。
一、存储架构概览
1.1 整体架构
RocketMQ 的存储结构由三部分组成:
graph TB
subgraph Broker
subgraph 存储层
CL[CommitLog]
CQ[ConsumeQueue]
IQ[IndexFile]
end
CL -->|消息 | CQ
CL -->|Key 索引 | IQ
end
subgraph Producer
P[Producer]
end
subgraph Consumer
C[Consumer]
end
P -->|写入 | CL
C -->|读取 | CQ
C -->|查询 | IQ
1.2 核心文件
| 文件类型 | 说明 | 作用 |
|---|---|---|
| CommitLog | 消息主体文件 | 存储所有消息内容 |
| ConsumeQueue | 消费队列索引 | 按 Topic+Queue 组织的索引 |
| IndexFile | 消息索引 | 按 Key 查询消息 |
1.3 目录结构
$HOME/store/
├── commitlog/
│ ├── 00000000000000000000 (1GB)
│ ├── 00000000001073741824 (1GB)
│ └── 00000000002147483648 (1GB)
├── consumequeue/
│ └── topic/
│ ├── 0/
│ │ ├── 00000000000000000000 (20 字节 * 条数)
│ │ └── ...
│ └── 1/
│ └── ...
├── index/
│ ├── 20260408100000000.index
│ └── ...
└── checkpoint (检查点文件)
二、CommitLog 设计
2.1 文件结构
所有消息顺序写入 CommitLog:
graph LR
subgraph CommitLog
F1[File 1<br/>0-1GB]
F2[File 2<br/>1-2GB]
F3[File 3<br/>2-3GB]
end
F1 -->|顺序写入 | F2
F2 -->|顺序写入 | F3
2.2 消息格式
┌─────────────────────────────────────────┐
│ Total Size (4 bytes) │
├─────────────────────────────────────────┤
│ Magic Code (4 bytes) │
├─────────────────────────────────────────┤
│ CRC32 (4 bytes) │
├─────────────────────────────────────────┤
│ Queue ID (4 bytes) │
├─────────────────────────────────────────┤
│ Flag (4 bytes) │
├─────────────────────────────────────────┤
│ Queue Offset (8 bytes) │
├─────────────────────────────────────────┤
│ Sys Flag (4 bytes) │
├─────────────────────────────────────────┤
│ Born Timestamp (8 bytes) │
├─────────────────────────────────────────┤
│ Born Host (8 bytes) │
├─────────────────────────────────────────┤
│ Store Timestamp (8 bytes) │
├─────────────────────────────────────────┤
│ Store Host (8 bytes) │
├─────────────────────────────────────────┤
│ Reconsume Times (4 bytes) │
├─────────────────────────────────────────┤
│ Prepared Transaction Offset (8 bytes)│
├─────────────────────────────────────────┤
│ Body Size (4 bytes) │
├─────────────────────────────────────────┤
│ Body (variable) │
├─────────────────────────────────────────┤
│ Topic Length (2 bytes) │
├─────────────────────────────────────────┤
│ Topic (variable) │
├─────────────────────────────────────────┤
│ Properties Length (2 bytes) │
├─────────────────────────────────────────┤
│ Properties (variable) │
└─────────────────────────────────────────┘
2.3 固定长度消息
// 消息头固定 128 字节
public static final int MSG_HEADER_SIZE = 128;
// 消息总长度 = 消息头 + Body + Topic + Properties
int totalSize = MSG_HEADER_SIZE + bodyLength + topicLength + propertiesLength;
2.4 文件滚动
# Broker 配置
flushCommitLogLeastPoints=4 # 最少 4 页刷盘
flushCommitLogThoroughInterval=200 # 200ms 强制刷盘
commitCommitLogLeastPages=4 # 最少 4 页提交
commitCommitLogThoroughInterval=200 # 200ms 强制提交
三、ConsumeQueue 设计
3.1 索引结构
ConsumeQueue 是 Topic+Queue 的逻辑视图:
graph TB
subgraph CommitLog
CL[所有消息顺序存储]
end
subgraph ConsumeQueue
subgraph Topic-1
Q0[Queue 0]
Q1[Queue 1]
end
Q0 -->|指向 | M1[消息 1]
Q0 -->|指向 | M2[消息 2]
Q1 -->|指向 | M3[消息 3]
end
CL -.-> Q0
CL -.-> Q1
3.2 索引条目
每个条目固定 20 字节:
┌─────────────────────────────────┐
│ CommitLog Offset (8 bytes) │ ← 消息在 CommitLog 中的位置
├─────────────────────────────────┤
│ Size (4 bytes) │ ← 消息大小
├─────────────────────────────────┤
│ Topic Hash Code (8 bytes) │ ← Topic 哈希
└─────────────────────────────────┘
3.3 查询流程
sequenceDiagram
participant C as Consumer
participant CQ as ConsumeQueue
participant CL as CommitLog
C->>CQ: 请求消息
CQ-->>C: 返回 (offset, size)
C->>CL: 读取消息
CL-->>C: 返回消息内容
3.4 代码实现
// 从 ConsumeQueue 获取消息位置
public DispatchRequest buildDispatchRequest(MessageExtBrokerInner msg) {
DispatchRequest request = new DispatchRequest(
msg.getTopic(),
msg.getQueueId(),
msg.getQueueOffset(), // ConsumeQueue 偏移
msg.getCommitLogOffset(), // CommitLog 位置
msg.getBody().length, // 消息大小
msg.getTagsCode(), // Tag 哈希
msg.getStoreTimestamp()
);
return request;
}
// 构建 ConsumeQueue 索引
public void putRequest(DispatchRequest request) {
ByteBuffer buffer = ByteBuffer.allocate(20);
buffer.putLong(request.getCommitLogOffset());
buffer.putInt(request.getMsgSize());
buffer.putLong(request.getTagsCode());
// 写入 ConsumeQueue 文件
consumeQueue.putQueueData(request.getQueueOffset(), buffer);
}
四、IndexFile 设计
4.1 索引结构
按 Key 查询消息的索引:
graph TB
subgraph IndexFile
H[Header<br/>40 bytes]
S1[Slot 1]
S2[Slot 2]
S3[Slot 3]
end
S1 --> I1[Index 1]
S2 --> I2[Index 2]
S3 --> I3[Index 3]
I1 --> M1[消息 1]
I2 --> M2[消息 2]
I3 --> M3[消息 3]
4.2 文件结构
┌─────────────────────────────────┐
│ Header (40 bytes) │
│ - beginTimestamp │
│ - endTimestamp │
│ - beginOffset │
│ - endOffset │
│ - hashSlotCount │
│ - indexCount │
├─────────────────────────────────┤
│ Hash Slot (5000 * 4B) │
│ (默认 5000 个槽位) │
├─────────────────────────────────┤
│ Index (300000 * 20B) │
│ (默认 30 万个索引) │
└─────────────────────────────────┘
4.3 索引条目
┌─────────────────────────────────┐
│ Key Hash (4 bytes) │
├─────────────────────────────────┤
│ CommitLog Offset (8 bytes) │
├─────────────────────────────────┤
│ Timestamp (8 bytes) │
└─────────────────────────────────┘
4.4 查询示例
// 按 Key 查询消息
public QueryMessage queryMessage(String topic, String key,
int maxNum, long begin, long end) {
QueryMessage queryMessage = new QueryMessage(topic, key, maxNum);
// 1. 计算 Key 哈希
int hashCode = KeyBuilder.buildKey(topic, key).hashCode();
// 2. 查找 IndexFile
for (IndexFile indexFile : indexFileList) {
if (indexFile.isTimeMatch(begin, end)) {
List<Long> offsets = indexFile.selectOffset(hashCode);
// 3. 从 CommitLog 读取消息
for (Long offset : offsets) {
MessageExt msg = commitLog.getMessage(offset);
queryMessage.addMessage(msg);
}
}
}
return queryMessage;
}
五、刷盘策略
5.1 同步刷盘
// 同步刷盘配置
brokerConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
// 刷盘流程
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
// 1. 写入 CommitLog
commitLog.append(msg);
// 2. 等待刷盘完成
flushOffset = commitLog.flush(msg.getQueueOffset());
// 3. 返回结果
return PutMessageResult.PUT_OK;
}
特点:
- 可靠性高
- 延迟较高
- 适合金融场景
5.2 异步刷盘
// 异步刷盘配置(默认)
brokerConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
// 刷盘线程
public class FlushCommitLogService extends ServiceThread {
@Override
public void run() {
while (!this.isStopped()) {
try {
// 等待刷盘条件
this.waitForRunning(500);
// 批量刷盘
commitLog.flush(0);
} catch (Exception e) {
log.error("刷盘失败", e);
}
}
}
}
特点:
- 性能好
- 可能丢失数据(断电)
- 适合一般业务
5.3 刷盘配置
# 刷盘类型
flushDiskType=ASYNC_FLUSH # SYNC_FLUSH / ASYNC_FLUSH
# 刷盘间隔
flushCommitLogLeastPoints=4 # 最少 4 页
flushCommitLogThoroughInterval=200 # 200ms
# 提交间隔
commitCommitLogLeastPages=4
commitCommitLogThoroughInterval=200
5.4 刷盘性能对比
| 刷盘方式 | 吞吐量 | 延迟 | 可靠性 |
|---|---|---|---|
| 同步刷盘 | 1 万 TPS | 10ms | 高 |
| 异步刷盘 | 10 万 TPS | 1ms | 中 |
六、主从复制
6.1 复制模式
graph TB
subgraph Master
M1[Master Broker]
CL1[CommitLog]
end
subgraph Slave
S1[Slave Broker]
CL2[CommitLog]
end
M1 -->|同步复制 | S1
CL1 -->|数据 | CL2
6.2 同步复制
# 同步复制配置
brokerRole=SYNC_MASTER
flushDiskType=SYNC_FLUSH
特点:
- 数据不丢失
- 延迟较高
- 适合金融场景
6.3 异步复制
# 异步复制配置(默认)
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
特点:
- 性能好
- 可能丢失少量数据
- 适合一般业务
6.4 从节点配置
# 从节点
brokerRole=SLAVE
slaveReadEnable=true # 允许从节点读
七、性能优化
7.1 mmap 优化
// 使用 mmap 映射文件
public class MappedFile {
private MappedByteBuffer mappedByteBuffer;
public MappedFile(String filePath, int size) {
RandomAccessFile file = new RandomAccessFile(filePath, "rw");
file.setLength(size);
// 内存映射
mappedByteBuffer = file.getChannel()
.map(FileChannel.MapMode.READ_WRITE, 0, size);
}
public void write(byte[] data) {
mappedByteBuffer.put(data);
}
}
7.2 预分配文件
// 预分配 CommitLog 文件
public class AllocateMappedFileService {
@Override
public void run() {
while (!this.isStopped()) {
// 预分配下一个文件
MappedFile nextFile = commitLog.allocateNextFile();
// 预热 mmap
nextFile.warmMappedFile();
this.waitForRunning(200);
}
}
}
7.3 批量提交
// 批量提交配置
commitCommitLogLeastPages=4 # 最少 4 页
commitCommitLogThoroughInterval=200 # 200ms
八、常见问题排查
8.1 磁盘空间不足
原因:
- 保留时间过长
- 消息量过大
解决:
# 缩短保留时间
fileReservedTime=72 # 72 小时
# 删除时间间隔
deleteWhen=02 # 凌晨 2 点
8.2 刷盘延迟
原因:
- 磁盘 IO 瓶颈
- 刷盘配置不当
解决:
# 使用 SSD
# 调整刷盘间隔
flushCommitLogThoroughInterval=500 # 500ms
8.3 主从延迟
原因:
- 网络延迟
- 从节点负载高
解决:
# 监控主从差距
haSlaveFallbehindMax=104857600 # 100MB
总结
RocketMQ CommitLog 的核心机制:
- CommitLog:所有消息顺序写入同一文件
- ConsumeQueue:按 Topic+Queue 的索引
- IndexFile:按 Key 查询的索引
- 刷盘策略:同步/异步刷盘
- 主从复制:同步/异步复制
核心要点:
- 理解 CommitLog 的顺序写盘设计
- 掌握 ConsumeQueue 的索引机制
- 根据场景选择刷盘和复制策略
- 利用 mmap 和预分配提升性能
参考资料
- RocketMQ 存储设计
- RocketMQ 源码
- 《RocketMQ 技术内幕》第 5 章