前言
现在看 Java 后端消息中间件,RocketMQ 5 仍然是一个很值得学习的方向。它在国内业务系统里出现频率很高,顺序消息、延时消息、事务消息这些能力也很贴近常见业务。
但学习 RocketMQ 时,如果只会发一条普通消息、消费一条普通消息,其实还不太够。真正落到项目里,更多问题会出现在消息分类、消费失败、延时补偿、事务边界和监控上。
这篇算是我重新梳理 RocketMQ 5 时的一篇学习笔记,不追求把所有源码讲完,主要记录 Java 项目里比较容易用到、也比较容易踩坑的点。
先把消息类型分清楚
很多项目刚接入消息队列时,所有消息都按普通消息处理。后来业务一复杂,就开始在普通消息上补各种逻辑。
比如订单超时取消,用普通消息加定时任务扫库也能做,但延时消息会更直接。比如支付成功后扣库存,如果要保证同一订单状态变更顺序,就要考虑顺序消息。比如本地事务和消息发送需要保持一致,就会想到事务消息。
我会先按场景分:
1.普通消息:异步通知、日志同步、轻量解耦。 2.顺序消息:同一订单、同一账户、同一设备的状态流转。 3.延时消息:超时取消、延迟提醒、补偿检查。 4.事务消息:本地数据库事务和消息发送的一致性。
分清楚类型以后,代码才不会把所有问题都塞进消费者里。
普通消息也要有规范
普通消息并不代表可以随便发。
我比较推荐消息体外面套一层统一信封。这样事件 ID、发生时间、业务类型、版本号都能有固定位置。
public record MessageEnvelope<T>(
String messageId,
String eventType,
Integer version,
LocalDateTime occurredAt,
T payload) {
}
很多人会觉得这层包装麻烦,但等到排查问题时就知道它很有用。没有 messageId,就很难追踪一条消息从生产到消费的全过程。没有 version,事件升级时也会很被动。
生产者发送时,也不要只把 JSON 字符串扔出去。至少要把 topic、tag、key 的规则写进一个地方,不要散落在业务代码里。
Tag 不要当成万能分类器
RocketMQ 的 tag 很好用,但也容易被滥用。
有的项目会把业务状态、地区、渠道、租户都放进 tag,最后消费者过滤规则越来越复杂。这样会让消息模型变得很别扭。
我的理解是,tag 更适合表达同一个 topic 下比较稳定的大类。比如订单领域 topic 里,PAID、CANCELED、REFUNDED可以作为 tag。不要把高度动态的业务字段放进去,比如用户 ID、订单号、门店号。
动态字段应该放在消息体里,或者作为 key 用于路由和排查。tag 设计得越克制,后面越容易维护。
顺序消息的核心是 key
顺序消息不是声明一下“我要顺序”就完事了。它的关键是同一类需要有序的数据必须进入同一个队列。
比如订单状态事件,通常会用订单号做 sharding key。这样同一订单的创建、支付、取消、退款事件会被路由到同一个队列。
String shardingKey = order.getOrderNo();
messageClient.sendOrderly("order-event", shardingKey, event);
但顺序也有代价。同一个队列内要保持顺序,消费并发就会受到限制。如果某个 key 事件量特别大,可能会拖慢整个队列。
所以顺序消息应该只用在真正需要顺序的地方。不是所有消息都要顺序,很多通知类、日志类、统计类消息其实不需要。
延时消息适合补偿,不适合替代调度系统
延时消息很好用,比如订单 30 分钟未支付自动取消。
用户下单时发送一条延时消息,30 分钟后消费者检查订单状态,如果仍然未支付,就取消订单。如果已经支付,就忽略。
这里有一个重点:延时消息到达后仍然要查业务状态,不能直接执行取消。
Order order = orderRepository.findByOrderNo(event.orderNo());
if (order.isWaitingPayment()) {
order.cancelByTimeout();
}
因为延时消息只是提醒你“该检查了”,不是证明业务一定还处在某个状态。
我不建议把延时消息当成复杂调度系统来用。如果任务需要复杂日历、批量调度、人工暂停、失败重跑,那还是应该用专门的任务调度方案。
事务消息不要神化
RocketMQ 的事务消息经常被拿来解决“数据库提交成功但消息发送失败”的问题。
它的思路是先发送半消息,再执行本地事务,最后提交或回滚消息。如果生产者异常,Broker 会回查本地事务状态。
这个机制很有用,但它不是分布式事务万能药。它解决的是本地事务和消息发送之间的一致性,不负责保证下游消费一定成功。
所以事务消息后面仍然要配合消费者幂等、失败重试和补偿。
本地事务状态也要能被回查。比如订单表里要有订单状态,或者单独有事务记录表。不能让回查逻辑只能依赖内存变量。
消费失败要分清可重试和不可重试
消费者里最常见的坏味道是 catch 住所有异常,然后统一重试。
有些异常是暂时性的,比如数据库连接闪断、下游接口超时、Redis 短暂不可用。这种可以重试。
但有些异常是数据本身有问题,比如消息字段缺失、枚举值不认识、业务状态已经不允许处理。重试多少次都不会好。
所以消费者最好区分异常类型。不可重试的问题要记录清楚,进入人工处理或死信流程。可重试的问题再交给重试机制。
try {
handler.handle(event);
} catch (InvalidMessageException e) {
log.error("invalid message, skip. messageId={}", event.messageId(), e);
} catch (Exception e) {
throw e;
}
这段代码的重点不是示例本身,而是提醒自己不要把所有失败都当成系统抖动。
观测要从生产者开始
排查消息问题时,很多人只看消费者日志。但消息链路其实从生产者就开始了。
我希望生产者至少打印这些信息:topic、tag、key、messageId、业务单号、发送耗时、发送结果。
消费者也要打印:消费组、topic、tag、messageId、业务单号、消费耗时、处理结果。
如果系统接了链路追踪,可以把 traceId 放进消息属性里,消费时再恢复上下文。这样一次 HTTP 请求触发的异步处理也能串起来。
没有这些信息时,线上排查会很痛苦。只知道“用户说订单没有更新”,但不知道消息发没发、谁消费了、失败在哪里。
小结
RocketMQ 5 在 Java 后端里不是只负责“把消息发出去”。真正要用好,需要先分清消息类型,再设计好 topic、tag、key、事件体、幂等、重试和观测。
我的理解是,消息中间件越强,越不能把业务一致性的责任全部甩给它。它能提供可靠投递、延时、顺序、事务消息这些能力,但业务状态、幂等处理、失败补偿仍然要由系统自己设计清楚。
把这些边界想明白,再去写生产者和消费者,代码会稳很多。
気に入ったならばコメントを残してくださいね~