RocketMQ是阿里巴巴开源的分布式消息中间件,基于Java开发,专为高吞吐、高可靠的分布式系统设计。

领域模型

核心特点

①高吞吐量(支持千万级TPS);

②低延迟(毫秒级响应);

③支持多种消息模式(普通消息、顺序消息、事务消息等);

④完善的重试和死信机制;

⑤分布式架构,支持水平扩展;

⑥提供丰富的监控和运维工具。

消息队列优势

  • 系统解耦:解决不同重要程度、不同能力级别系统之间依赖导致一死全死;大多数MQ支撑多语言客户端,可兼容多语言发开发;
  • 削峰填谷:主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题
  • 异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
  • 提升性能:当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统
  • 蓄流压测:线上有些链路不好压测,可以通过堆积一定量消息再放开来压测
  • 数据冗余:有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。MQ把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多MQ所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

核心组件

  • **生产者(Producer):**Apache RocketMQ 中用于产生消息的运行实体,一般集成于业务调用链路的上游。生产者是轻量级匿名无身份的。

  • 消息存储

    • 主题(Topic):Apache RocketMQ 消息传输和存储的分组容器,主题内部由多个队列组成,消息的存储和水平扩展实际是通过主题内的队列实现的。
    • 队列(MessageQueue):Apache RocketMQ 消息传输和存储的实际单元容器,类比于其他消息队列中的分区。 Apache RocketMQ 通过流式特性的无限队列结构来存储消息,消息在队列内具备顺序性存储特征。
    • 消息(Message):Apache RocketMQ 的最小传输单元。消息具备不可变性,在初始化发送和完成存储后即不可变。
  • 消息消费

    • 消费者分组(ConsumerGroup):Apache RocketMQ 发布订阅模型中定义的独立的消费身份分组,用于统一管理底层运行的多个消费者(Consumer)。同一个消费组的多个消费者必须保持消费逻辑和配置一致,共同分担该消费组订阅的消息,实现消费能力的水平扩展。
    • 消费者(Consumer):Apache RocketMQ 消费消息的运行实体,一般集成在业务调用链路的下游。消费者必须被指定到某一个消费组中。
    • 订阅关系(Subscription):

    Apache RocketMQ 发布订阅模型中消息过滤、重试、消费进度的规则配置。订阅关系以消费组粒度进行管理,消费组通过定义订阅关系控制指定消费组下的消费者如何实现消息过滤、消费重试及消费进度恢复等。

    Apache RocketMQ 的订阅关系除过滤表达式之外都是持久化的,即服务端重启或请求断开,订阅关系依然保留。

  • Broker:消息服务器,存储消息并处理收发请求,由多个节点组成集群,分为Master和Slave(Master负责读写,Slave同步数据并提供读服务)。

  • NameServer:轻量级注册中心,存储Broker的路由信息(如Topic与Broker的映射关系),支持动态扩容,无状态且节点间互不通信。

    • NameServer是RocketMQ的“路由中枢”,核心作用是存储和更新集群的路由信息,为生产者和消费者提供Broker的地址发现服务。
    • 与Broker的交互机制:
      • ①Broker启动时向所有NameServer注册自身信息(如IP、端口、Topic配置等);
      • ②Broker定期(默认30秒)向NameServer发送心跳包,维持在线状态;
      • ③NameServer在120秒内未收到Broker心跳,则将其从路由信息中移除,保证路由的实时性。

保证消息的可靠性

RocketMQ的消息可靠性保证贯穿于消息生产、Broker存储、消息消费全链路,通过多层次机制确保消息不丢失、不重复,具体如下:

一、生产端:确保消息成功发送到Broker

生产者(Producer)发送消息时,通过“重试机制+确认机制”避免因网络波动、Broker临时故障导致的消息丢失。

  1. 发送确认机制

    RocketMQ支持三种发送方式,均通过Broker的响应确认消息是否成功送达:

    • 同步发送:Producer发送消息后,等待Broker返回“发送成功”确认(包含消息ID和存储位置),才视为发送完成;若超时未收到确认,触发重试。
    • 异步发送:Producer发送消息后立即返回,通过回调函数接收Broker的确认结果;若失败,在回调中处理重试。
    • 单向发送:仅发送消息不等待确认(适用于日志等非核心场景),但核心业务一般不使用,避免丢失。
  2. 失败重试机制

    当发送失败(如网络超时、Broker繁忙)时,Producer会自动重试,可通过参数配置:

    • retryTimesWhenSendFailed:同步发送失败重试次数(默认2次)。
    • retryTimesWhenSendAsyncFailed:异步发送失败重试次数(默认2次)。
      重试时会选择其他Broker节点(通过NameServer获取路由信息),避免单节点故障影响。

二、Broker端:确保消息持久化与集群可靠性

Broker作为消息存储核心,通过“持久化存储+主从复制+故障转移”保证消息不丢失。

  1. 消息持久化存储

    消息到达Broker后,会写入CommitLog(全局顺序写入的日志文件),并同步生成ConsumeQueue(消息消费的索引文件),最终持久化到磁盘:

    • 刷盘机制:控制消息从内存写入磁盘的时机,确保断电不丢失:
      • 同步刷盘(flushDiskType=SYNC_FLUSH):消息写入内存后,立即触发磁盘写入,成功后才返回确认(可靠性最高,性能略低)。
      • 异步刷盘(flushDiskType=ASYNC_FLUSH):消息先写入内存,定期(默认500ms)或累计到一定量后批量刷盘(性能高,极端情况可能丢失未刷盘消息,默认方式)。
  2. 主从复制机制

    Broker集群采用“Master-Slave”架构,每个Master可配置多个Slave,通过数据复制实现冗余:

    • 同步复制(SYNC_MASTER):Master接收消息后,需等待至少一个Slave同步完成(消息写入Slave的CommitLog),才返回“发送成功”确认(即使Master宕机,Slave有完整数据,无丢失风险)。
    • 异步复制(ASYNC_MASTER):Master接收消息后立即返回确认,Slave异步拉取并同步数据(性能高,Master宕机可能丢失未同步的消息)。
      核心业务通常采用“同步复制+同步刷盘”组合,牺牲部分性能换取最高可靠性。
  3. 故障转移能力

    • 当Master宕机时,Slave可通过手动或工具(如DLedger)升级为新Master,继续提供服务(需提前配置主从切换机制)。
    • 多Master集群部署(每个Topic的Queue分散在不同Master),单个Master故障仅影响其负责的Queue,其他节点正常工作。

三、消费端:确保消息被正确处理

消费者(Consumer)通过“消费确认+重试机制+Offset管理”保证消息不重复、不遗漏。

  1. 消费确认机制

    Consumer处理消息后,需向Broker返回明确的处理结果:

    • 返回CONSUME_SUCCESS:Broker标记消息为“已消费”,不再重新发送。
    • 返回其他结果(如RECONSUME_LATER)或超时未返回:Broker判定消费失败,将消息放入重试队列(%RETRY%ConsumerGroup),等待重新发送。
  2. 消费重试机制

    消费失败的消息会按“指数退避”策略重试(默认16次),间隔从1秒逐渐延长至2小时,确保临时故障(如依赖服务超时)恢复后能重新处理。

    超过最大重试次数后,消息进入死信队列(%DLQ%ConsumerGroup),由人工介入处理(避免无限重试浪费资源)。

  3. Offset持久化

    Consumer的消费进度(Offset,即已消费到的消息位置)会定期持久化到Broker的consumerOffset.json文件或远程存储(如数据库):

    • 即使Consumer重启,也能从上次记录的Offset继续消费,避免重复消费或漏消费。
    • 集群消费模式下,Offset由Broker统一管理;广播消费模式下,Offset由Consumer本地存储。

四、特殊场景的可靠性保障

  1. 事务消息的一致性

    针对分布式事务场景,RocketMQ通过“半事务消息+两阶段提交+回查机制”确保消息发送与本地事务的原子性:

    • 半事务消息:先发送到Broker,标记为“暂不可消费”,等待本地事务结果。
    • 事务确认:本地事务成功则提交消息(变为可消费),失败则回滚(删除消息)。
    • 回查补偿:若Broker未收到确认(如Producer宕机),会定期回查Producer的事务状态,避免消息状态不一致。
  2. 消息轨迹追踪

    通过开启消息轨迹(traceTopicEnable=true),记录消息从生产到消费的全链路日志(发送时间、Broker存储位置、消费时间等),便于问题排查(如消息丢失时可追溯路径)。

总结

RocketMQ的消息可靠性是“生产端重试确认+Broker端持久化与主从复制+消费端确认重试+特殊场景补偿”的综合结果:

  • 生产端通过重试和确认确保消息送达Broker;
  • Broker通过持久化、主从同步确保消息不丢失;
  • 消费端通过确认、重试和Offset管理确保消息被正确处理;
  • 事务消息、死信队列等机制进一步覆盖边缘场景,最终实现生产级的消息可靠性保障。

RocketMQ与Kafka对比

答案:相同点:均为分布式消息中间件,支持高吞吐、分区存储、发布-订阅模式。

不同点主要体现在功能和设计上:

特性 RocketMQ Kafka
消息模式 支持普通、顺序、事务等多种模式 主要支持普通消息,事务消息需额外实现
部署依赖 无外部依赖(NameServer轻量) 早期依赖Zookeeper
重试与死信 内置完善的重试和死信机制 需自定义实现
吞吐量 千万级TPS,适合高并发场景 百万级TPS,更侧重流式处理
生态适配 更适配Java生态和阿里云服务 与大数据生态(Spark、Flink)集成更优

选择建议:金融、电商等需事务和复杂消息模式的场景优先选RocketMQ;日志收集、流式处理场景可选Kafka。

RocketMQ常见面试题

  1. RocketMQ如何保证消息的顺序性?

    答案:RocketMQ通过“分区单写+消费顺序”实现顺序性:

    • 生产端:同一业务键(如订单ID)的消息通过哈希路由到同一个Queue(Topic的分区),因单个Queue是串行写入的,保证消息在Queue内有序。
    • 消费端:一个Queue只能被同一个Consumer线程消费(同组内的Consumer通过负载均衡分配Queue,每个Queue对应一个消费线程),确保消费顺序与写入顺序一致。
    • 注意:仅支持“分区内顺序”,全局顺序需将Topic的Queue数设为1(牺牲吞吐量)。
  2. RocketMQ的事务消息是如何实现的?

    答案:RocketMQ通过“两阶段提交+回查机制”实现事务消息,确保本地事务与消息发送的原子性:

    • 第一阶段:Producer发送“半事务消息”(Half Message)到Broker,Broker标记其为暂不可消费。
    • 第二阶段:Producer执行本地事务,根据结果向Broker发送“Commit”或“Rollback”指令:
      • 若发送Commit,Broker将半事务消息标记为可消费;
      • 若发送Rollback,Broker删除半事务消息。
    • 回查机制:若Broker未收到Commit/Rollback(如Producer宕机),会定期向Producer发送回查请求,根据本地事务实际状态决定消息是否提交。
  3. RocketMQ的消息重试机制是怎样的?什么情况下会触发重试?

    答案:消息重试指Consumer消费消息失败后,Broker重新发送消息的机制,确保消息被正确处理。

    • 触发条件:Consumer消费消息时抛出异常(未返回CONSUME_SUCCESS),或消费超时(默认15分钟)。
    • 重试策略:
      • ①重试次数可配置(默认16次),间隔时间逐渐延长(从1秒到2小时);
      • ②重试消息存储在原Topic的重试队列(如%RETRY%ConsumerGroup);
      • ③超过最大重试次数后,消息进入死信队列(%DLQ%ConsumerGroup),需人工处理。
  4. RocketMQ中Master和Slave的区别是什么?如何实现数据同步?

    答案:Master和Slave是Broker的两种角色,用于保证数据可靠性:

    • Master:可读写,接收Producer的消息写入和Consumer的读取请求,是Broker的主节点。

    • Slave:只读,不接收写入请求,仅同步Master的数据,当Master故障时可切换为Master(需手动或通过工具配置)。

      数据同步方式:

      ①同步复制(Sync Replication):Master写入消息后,需等待Slave同步完成才返回成功,可靠性高但性能略低;

      ②异步复制(Async Replication):Master写入后立即返回,Slave异步同步数据,性能高但可能丢失数据(默认方式)。

  5. RocketMQ的消息过滤机制有哪些?如何实现?

    答案:消息过滤指Consumer只接收符合条件的消息,减少无效消费,主要有两种方式:

    • Tag过滤:发送消息时指定Tag(如"TagA"),Consumer订阅时指定需要的Tag(如"TagA || TagB"),由Broker在发送时过滤,效率高。
    • SQL92过滤:基于消息的属性(Properties)进行更复杂的过滤(如"a > 10 AND b = 'test'"),需Consumer在订阅时开启,由Broker执行过滤,支持多条件组合。
  6. 什么是死信队列?RocketMQ中死信队列的作用是什么?

    答案:死信队列(Dead-Letter Queue)是存储无法正常消费的消息的特殊队列,命名格式为%DLQ%ConsumerGroup

    作用:

    ①保存消费失败且超过最大重试次数的消息,避免消息丢失;

    ②提供数据回溯能力,可通过重新消费死信队列的消息排查问题(如业务逻辑错误、依赖服务故障);

    ③死信队列中的消息不会再被重试,需人工介入处理。

参考网站

https://rocketmq.apache.org/zh/