RocketMQ 5学习笔记:普通消息之外要看什么

发表于 2024-05-21 21:36 1993 字 10 min read

Spring AI学习笔记(四)工具调用和MCPSpring AI学习笔记(三)RAG从文档入库到回答Spring AI学习笔记(二)ChatClient从怎么调到怎么封装Spring AI学习笔记(一)它到底解决什么问题java新版本-java25学习笔记(四)用JFR和GC日志做一次体检java新版本-java25学习笔记(三)虚拟线程要和资源边界一起看java新版本-java25学习笔记(二)运行时基线先统一java新版本-java25学习笔记(一)LTS版本对比和学习路线主流AI Agent能力对比与工程选型我用Kiro做了个自己的工具站盘一盘虚拟线程我用Trae做了个AstrBot的AI角色扮演插件Python初学笔记(六)常用标准库先学这几个Python初学笔记(五)读写文件和处理异常Python初学笔记(四)函数让代码开始有结构Python初学笔记(三)条件、循环和推导式Python初学笔记(二)变量和基础类型比想象中重要Python初学笔记(一)先把环境和运行方式弄明白主流AI大模型能力对比Java 21和Spring Boot 3升级笔记(五)日志指标与可观测性Java 21和Spring Boot 3升级笔记(四)数据访问层适配Java 21和Spring Boot 3升级笔记(三)虚拟线程使用边界Java 21和Spring Boot 3升级笔记(二)Jakarta迁移要点Java 21和Spring Boot 3升级笔记(一)工程基线整理魔法值怎么能忍!JPA的Specification大改造!处理生僻字乱码:JPA框架对于Oracle的NVarchar2,NChar,NClob类型支持Redis Stream能不能当轻量消息队列用RocketMQ 5学习笔记:普通消息之外要看什么事件流不是换个消息队列这么简单Kubernetes学习笔记04:发布、排障和观测Kubernetes学习笔记03:配置、密钥和存储Kubernetes学习笔记02:Deployment、Service和IngressKubernetes学习笔记01:Pod和控制器mysql索引原理02--存储引擎索引的实现mysql索引原理01--索引的数据结构
  现在看 Java 后端消息中间件,RocketMQ 5 仍然是一个很值得学习的方向。它在国内业务系统里出现频率很高,顺序消息、延时消息、事务消息这些能力也很贴近常见业务。   但学习 RocketMQ 时,如果只会发一条普通消息、消费一条普通消息,其实还不太够。真正落到项目里,更多问题会出现在消息分类、消费失败、延时补偿、事务边界和监控上。...

前言

  现在看 Java 后端消息中间件,RocketMQ 5 仍然是一个很值得学习的方向。它在国内业务系统里出现频率很高,顺序消息、延时消息、事务消息这些能力也很贴近常见业务。

  但学习 RocketMQ 时,如果只会发一条普通消息、消费一条普通消息,其实还不太够。真正落到项目里,更多问题会出现在消息分类、消费失败、延时补偿、事务边界和监控上。

  这篇算是我重新梳理 RocketMQ 5 时的一篇学习笔记,不追求把所有源码讲完,主要记录 Java 项目里比较容易用到、也比较容易踩坑的点。

先把消息类型分清楚

  很多项目刚接入消息队列时,所有消息都按普通消息处理。后来业务一复杂,就开始在普通消息上补各种逻辑。

比如订单超时取消,用普通消息加定时任务扫库也能做,但延时消息会更直接。比如支付成功后扣库存,如果要保证同一订单状态变更顺序,就要考虑顺序消息。比如本地事务和消息发送需要保持一致,就会想到事务消息。

我会先按场景分:

1.普通消息:异步通知、日志同步、轻量解耦。 2.顺序消息:同一订单、同一账户、同一设备的状态流转。 3.延时消息:超时取消、延迟提醒、补偿检查。 4.事务消息:本地数据库事务和消息发送的一致性。

分清楚类型以后,代码才不会把所有问题都塞进消费者里。

普通消息也要有规范

  普通消息并不代表可以随便发。

我比较推荐消息体外面套一层统一信封。这样事件 ID、发生时间、业务类型、版本号都能有固定位置。

public record MessageEnvelope<T>(
        String messageId,
        String eventType,
        Integer version,
        LocalDateTime occurredAt,
        T payload) {
}

很多人会觉得这层包装麻烦,但等到排查问题时就知道它很有用。没有 messageId,就很难追踪一条消息从生产到消费的全过程。没有 version,事件升级时也会很被动。

生产者发送时,也不要只把 JSON 字符串扔出去。至少要把 topic、tag、key 的规则写进一个地方,不要散落在业务代码里。

Tag 不要当成万能分类器

  RocketMQ 的 tag 很好用,但也容易被滥用。

有的项目会把业务状态、地区、渠道、租户都放进 tag,最后消费者过滤规则越来越复杂。这样会让消息模型变得很别扭。

我的理解是,tag 更适合表达同一个 topic 下比较稳定的大类。比如订单领域 topic 里,PAIDCANCELEDREFUNDED可以作为 tag。不要把高度动态的业务字段放进去,比如用户 ID、订单号、门店号。

动态字段应该放在消息体里,或者作为 key 用于路由和排查。tag 设计得越克制,后面越容易维护。

顺序消息的核心是 key

  顺序消息不是声明一下“我要顺序”就完事了。它的关键是同一类需要有序的数据必须进入同一个队列。

比如订单状态事件,通常会用订单号做 sharding key。这样同一订单的创建、支付、取消、退款事件会被路由到同一个队列。

String shardingKey = order.getOrderNo();
messageClient.sendOrderly("order-event", shardingKey, event);

但顺序也有代价。同一个队列内要保持顺序,消费并发就会受到限制。如果某个 key 事件量特别大,可能会拖慢整个队列。

所以顺序消息应该只用在真正需要顺序的地方。不是所有消息都要顺序,很多通知类、日志类、统计类消息其实不需要。

延时消息适合补偿,不适合替代调度系统

  延时消息很好用,比如订单 30 分钟未支付自动取消。

用户下单时发送一条延时消息,30 分钟后消费者检查订单状态,如果仍然未支付,就取消订单。如果已经支付,就忽略。

这里有一个重点:延时消息到达后仍然要查业务状态,不能直接执行取消。

Order order = orderRepository.findByOrderNo(event.orderNo());
if (order.isWaitingPayment()) {
    order.cancelByTimeout();
}

因为延时消息只是提醒你“该检查了”,不是证明业务一定还处在某个状态。

我不建议把延时消息当成复杂调度系统来用。如果任务需要复杂日历、批量调度、人工暂停、失败重跑,那还是应该用专门的任务调度方案。

事务消息不要神化

  RocketMQ 的事务消息经常被拿来解决“数据库提交成功但消息发送失败”的问题。

它的思路是先发送半消息,再执行本地事务,最后提交或回滚消息。如果生产者异常,Broker 会回查本地事务状态。

这个机制很有用,但它不是分布式事务万能药。它解决的是本地事务和消息发送之间的一致性,不负责保证下游消费一定成功。

所以事务消息后面仍然要配合消费者幂等、失败重试和补偿。

本地事务状态也要能被回查。比如订单表里要有订单状态,或者单独有事务记录表。不能让回查逻辑只能依赖内存变量。

消费失败要分清可重试和不可重试

  消费者里最常见的坏味道是 catch 住所有异常,然后统一重试。

有些异常是暂时性的,比如数据库连接闪断、下游接口超时、Redis 短暂不可用。这种可以重试。

但有些异常是数据本身有问题,比如消息字段缺失、枚举值不认识、业务状态已经不允许处理。重试多少次都不会好。

所以消费者最好区分异常类型。不可重试的问题要记录清楚,进入人工处理或死信流程。可重试的问题再交给重试机制。

try {
    handler.handle(event);
} catch (InvalidMessageException e) {
    log.error("invalid message, skip. messageId={}", event.messageId(), e);
} catch (Exception e) {
    throw e;
}

这段代码的重点不是示例本身,而是提醒自己不要把所有失败都当成系统抖动。

观测要从生产者开始

  排查消息问题时,很多人只看消费者日志。但消息链路其实从生产者就开始了。

我希望生产者至少打印这些信息:topic、tag、key、messageId、业务单号、发送耗时、发送结果。

消费者也要打印:消费组、topic、tag、messageId、业务单号、消费耗时、处理结果。

如果系统接了链路追踪,可以把 traceId 放进消息属性里,消费时再恢复上下文。这样一次 HTTP 请求触发的异步处理也能串起来。

没有这些信息时,线上排查会很痛苦。只知道“用户说订单没有更新”,但不知道消息发没发、谁消费了、失败在哪里。

小结

  RocketMQ 5 在 Java 后端里不是只负责“把消息发出去”。真正要用好,需要先分清消息类型,再设计好 topic、tag、key、事件体、幂等、重试和观测。

  我的理解是,消息中间件越强,越不能把业务一致性的责任全部甩给它。它能提供可靠投递、延时、顺序、事务消息这些能力,但业务状态、幂等处理、失败补偿仍然要由系统自己设计清楚。

  把这些边界想明白,再去写生产者和消费者,代码会稳很多。

喜欢的话,留下你的评论吧~

© 2019 - 2026 VincentHo @VincentHo
Powered by theme astro-koharu · Inspired by Shoka