Skip to content
清晨的一缕阳光
返回

RocketMQ CommitLog 存储结构详解

RocketMQ 采用独特的 CommitLog 存储结构,将所有消息顺序写入同一文件,实现高吞吐量和低延迟。本文将深入探讨 CommitLog 的设计原理、存储结构以及刷盘策略。

一、存储架构概览

1.1 整体架构

RocketMQ 的存储结构由三部分组成:

graph TB
    subgraph Broker
        subgraph 存储层
            CL[CommitLog]
            CQ[ConsumeQueue]
            IQ[IndexFile]
        end
        
        CL -->|消息 | CQ
        CL -->|Key 索引 | IQ
    end
    
    subgraph Producer
        P[Producer]
    end
    
    subgraph Consumer
        C[Consumer]
    end
    
    P -->|写入 | CL
    C -->|读取 | CQ
    C -->|查询 | IQ

1.2 核心文件

文件类型说明作用
CommitLog消息主体文件存储所有消息内容
ConsumeQueue消费队列索引按 Topic+Queue 组织的索引
IndexFile消息索引按 Key 查询消息

1.3 目录结构

$HOME/store/
├── commitlog/
│   ├── 00000000000000000000  (1GB)
│   ├── 00000000001073741824  (1GB)
│   └── 00000000002147483648  (1GB)
├── consumequeue/
│   └── topic/
│       ├── 0/
│       │   ├── 00000000000000000000  (20 字节 * 条数)
│       │   └── ...
│       └── 1/
│           └── ...
├── index/
│   ├── 20260408100000000.index
│   └── ...
└── checkpoint  (检查点文件)

二、CommitLog 设计

2.1 文件结构

所有消息顺序写入 CommitLog:

graph LR
    subgraph CommitLog
        F1[File 1<br/>0-1GB]
        F2[File 2<br/>1-2GB]
        F3[File 3<br/>2-3GB]
    end
    
    F1 -->|顺序写入 | F2
    F2 -->|顺序写入 | F3

2.2 消息格式

┌─────────────────────────────────────────┐
│           Total Size (4 bytes)          │
├─────────────────────────────────────────┤
│            Magic Code (4 bytes)         │
├─────────────────────────────────────────┤
│             CRC32 (4 bytes)             │
├─────────────────────────────────────────┤
│             Queue ID (4 bytes)          │
├─────────────────────────────────────────┤
│              Flag (4 bytes)             │
├─────────────────────────────────────────┤
│         Queue Offset (8 bytes)          │
├─────────────────────────────────────────┤
│         Sys Flag (4 bytes)              │
├─────────────────────────────────────────┤
│      Born Timestamp (8 bytes)           │
├─────────────────────────────────────────┤
│        Born Host (8 bytes)              │
├─────────────────────────────────────────┤
│     Store Timestamp (8 bytes)           │
├─────────────────────────────────────────┤
│       Store Host (8 bytes)              │
├─────────────────────────────────────────┤
│     Reconsume Times (4 bytes)           │
├─────────────────────────────────────────┤
│    Prepared Transaction Offset (8 bytes)│
├─────────────────────────────────────────┤
│           Body Size (4 bytes)           │
├─────────────────────────────────────────┤
│              Body (variable)            │
├─────────────────────────────────────────┤
│          Topic Length (2 bytes)         │
├─────────────────────────────────────────┤
│            Topic (variable)             │
├─────────────────────────────────────────┤
│         Properties Length (2 bytes)     │
├─────────────────────────────────────────┤
│           Properties (variable)         │
└─────────────────────────────────────────┘

2.3 固定长度消息

// 消息头固定 128 字节
public static final int MSG_HEADER_SIZE = 128;

// 消息总长度 = 消息头 + Body + Topic + Properties
int totalSize = MSG_HEADER_SIZE + bodyLength + topicLength + propertiesLength;

2.4 文件滚动

# Broker 配置
flushCommitLogLeastPoints=4  # 最少 4 页刷盘
flushCommitLogThoroughInterval=200  # 200ms 强制刷盘
commitCommitLogLeastPages=4  # 最少 4 页提交
commitCommitLogThoroughInterval=200  # 200ms 强制提交

三、ConsumeQueue 设计

3.1 索引结构

ConsumeQueue 是 Topic+Queue 的逻辑视图:

graph TB
    subgraph CommitLog
        CL[所有消息顺序存储]
    end
    
    subgraph ConsumeQueue
        subgraph Topic-1
            Q0[Queue 0]
            Q1[Queue 1]
        end
        
        Q0 -->|指向 | M1[消息 1]
        Q0 -->|指向 | M2[消息 2]
        Q1 -->|指向 | M3[消息 3]
    end
    
    CL -.-> Q0
    CL -.-> Q1

3.2 索引条目

每个条目固定 20 字节:

┌─────────────────────────────────┐
│    CommitLog Offset (8 bytes)   │  ← 消息在 CommitLog 中的位置
├─────────────────────────────────┤
│        Size (4 bytes)           │  ← 消息大小
├─────────────────────────────────┤
│      Topic Hash Code (8 bytes)  │  ← Topic 哈希
└─────────────────────────────────┘

3.3 查询流程

sequenceDiagram
    participant C as Consumer
    participant CQ as ConsumeQueue
    participant CL as CommitLog
    
    C->>CQ: 请求消息
    CQ-->>C: 返回 (offset, size)
    C->>CL: 读取消息
    CL-->>C: 返回消息内容

3.4 代码实现

// 从 ConsumeQueue 获取消息位置
public DispatchRequest buildDispatchRequest(MessageExtBrokerInner msg) {
    DispatchRequest request = new DispatchRequest(
        msg.getTopic(),
        msg.getQueueId(),
        msg.getQueueOffset(),  // ConsumeQueue 偏移
        msg.getCommitLogOffset(),  // CommitLog 位置
        msg.getBody().length,  // 消息大小
        msg.getTagsCode(),  // Tag 哈希
        msg.getStoreTimestamp()
    );
    return request;
}

// 构建 ConsumeQueue 索引
public void putRequest(DispatchRequest request) {
    ByteBuffer buffer = ByteBuffer.allocate(20);
    buffer.putLong(request.getCommitLogOffset());
    buffer.putInt(request.getMsgSize());
    buffer.putLong(request.getTagsCode());
    
    // 写入 ConsumeQueue 文件
    consumeQueue.putQueueData(request.getQueueOffset(), buffer);
}

四、IndexFile 设计

4.1 索引结构

按 Key 查询消息的索引:

graph TB
    subgraph IndexFile
        H[Header<br/>40 bytes]
        S1[Slot 1]
        S2[Slot 2]
        S3[Slot 3]
    end
    
    S1 --> I1[Index 1]
    S2 --> I2[Index 2]
    S3 --> I3[Index 3]
    
    I1 --> M1[消息 1]
    I2 --> M2[消息 2]
    I3 --> M3[消息 3]

4.2 文件结构

┌─────────────────────────────────┐
│         Header (40 bytes)       │
│  - beginTimestamp               │
│  - endTimestamp                 │
│  - beginOffset                  │
│  - endOffset                    │
│  - hashSlotCount                │
│  - indexCount                   │
├─────────────────────────────────┤
│      Hash Slot (5000 * 4B)      │
│      (默认 5000 个槽位)           │
├─────────────────────────────────┤
│       Index (300000 * 20B)      │
│      (默认 30 万个索引)           │
└─────────────────────────────────┘

4.3 索引条目

┌─────────────────────────────────┐
│     Key Hash (4 bytes)          │
├─────────────────────────────────┤
│  CommitLog Offset (8 bytes)     │
├─────────────────────────────────┤
│     Timestamp (8 bytes)         │
└─────────────────────────────────┘

4.4 查询示例

// 按 Key 查询消息
public QueryMessage queryMessage(String topic, String key, 
                                 int maxNum, long begin, long end) {
    QueryMessage queryMessage = new QueryMessage(topic, key, maxNum);
    
    // 1. 计算 Key 哈希
    int hashCode = KeyBuilder.buildKey(topic, key).hashCode();
    
    // 2. 查找 IndexFile
    for (IndexFile indexFile : indexFileList) {
        if (indexFile.isTimeMatch(begin, end)) {
            List<Long> offsets = indexFile.selectOffset(hashCode);
            
            // 3. 从 CommitLog 读取消息
            for (Long offset : offsets) {
                MessageExt msg = commitLog.getMessage(offset);
                queryMessage.addMessage(msg);
            }
        }
    }
    
    return queryMessage;
}

五、刷盘策略

5.1 同步刷盘

// 同步刷盘配置
brokerConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);

// 刷盘流程
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
    // 1. 写入 CommitLog
    commitLog.append(msg);
    
    // 2. 等待刷盘完成
    flushOffset = commitLog.flush(msg.getQueueOffset());
    
    // 3. 返回结果
    return PutMessageResult.PUT_OK;
}

特点

5.2 异步刷盘

// 异步刷盘配置(默认)
brokerConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);

// 刷盘线程
public class FlushCommitLogService extends ServiceThread {
    @Override
    public void run() {
        while (!this.isStopped()) {
            try {
                // 等待刷盘条件
                this.waitForRunning(500);
                
                // 批量刷盘
                commitLog.flush(0);
            } catch (Exception e) {
                log.error("刷盘失败", e);
            }
        }
    }
}

特点

5.3 刷盘配置

# 刷盘类型
flushDiskType=ASYNC_FLUSH  # SYNC_FLUSH / ASYNC_FLUSH

# 刷盘间隔
flushCommitLogLeastPoints=4  # 最少 4 页
flushCommitLogThoroughInterval=200  # 200ms

# 提交间隔
commitCommitLogLeastPages=4
commitCommitLogThoroughInterval=200

5.4 刷盘性能对比

刷盘方式吞吐量延迟可靠性
同步刷盘1 万 TPS10ms
异步刷盘10 万 TPS1ms

六、主从复制

6.1 复制模式

graph TB
    subgraph Master
        M1[Master Broker]
        CL1[CommitLog]
    end
    
    subgraph Slave
        S1[Slave Broker]
        CL2[CommitLog]
    end
    
    M1 -->|同步复制 | S1
    CL1 -->|数据 | CL2

6.2 同步复制

# 同步复制配置
brokerRole=SYNC_MASTER
flushDiskType=SYNC_FLUSH

特点

6.3 异步复制

# 异步复制配置(默认)
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH

特点

6.4 从节点配置

# 从节点
brokerRole=SLAVE
slaveReadEnable=true  # 允许从节点读

七、性能优化

7.1 mmap 优化

// 使用 mmap 映射文件
public class MappedFile {
    private MappedByteBuffer mappedByteBuffer;
    
    public MappedFile(String filePath, int size) {
        RandomAccessFile file = new RandomAccessFile(filePath, "rw");
        file.setLength(size);
        
        // 内存映射
        mappedByteBuffer = file.getChannel()
            .map(FileChannel.MapMode.READ_WRITE, 0, size);
    }
    
    public void write(byte[] data) {
        mappedByteBuffer.put(data);
    }
}

7.2 预分配文件

// 预分配 CommitLog 文件
public class AllocateMappedFileService {
    @Override
    public void run() {
        while (!this.isStopped()) {
            // 预分配下一个文件
            MappedFile nextFile = commitLog.allocateNextFile();
            
            // 预热 mmap
            nextFile.warmMappedFile();
            
            this.waitForRunning(200);
        }
    }
}

7.3 批量提交

// 批量提交配置
commitCommitLogLeastPages=4  # 最少 4
commitCommitLogThoroughInterval=200  # 200ms

八、常见问题排查

8.1 磁盘空间不足

原因

解决

# 缩短保留时间
fileReservedTime=72  # 72 小时

# 删除时间间隔
deleteWhen=02  # 凌晨 2 点

8.2 刷盘延迟

原因

解决

# 使用 SSD
# 调整刷盘间隔
flushCommitLogThoroughInterval=500  # 500ms

8.3 主从延迟

原因

解决

# 监控主从差距
haSlaveFallbehindMax=104857600  # 100MB

总结

RocketMQ CommitLog 的核心机制:

  1. CommitLog:所有消息顺序写入同一文件
  2. ConsumeQueue:按 Topic+Queue 的索引
  3. IndexFile:按 Key 查询的索引
  4. 刷盘策略:同步/异步刷盘
  5. 主从复制:同步/异步复制

核心要点

参考资料


分享这篇文章到:

上一篇文章
Redis 会话管理实战
下一篇文章
SDD 规范驱动开发详解