博客
关于我
RocketMQ 源码分析 —— Message 顺序发送与消费
阅读量:799 次
发布时间:2023-03-22

本文共 2416 字,大约阅读时间需要 8 分钟。

RocketMQ严格顺序消息的实现机制

1. 概述

RocketMQ 是 Apache 移动式数据流(Apache Apache RocketMQ)项目的产物,主要用于高效的消息中继和分布式计算。它 提供了两种顺序级别的消息:普通顺序消息和完全严格顺序消息。普通顺序消息适用于大部分场景,而完全严格顺序消息则用于高强一致性的场景,如数据库binlog同步。

2. 生产者(Producer)的严格顺序消息发送

2.1 消息队列选择

生产者发送严格顺序消息时,通过 MessageQueueSelector 接口选择消息队列。该接口根据传递的参数(如订单 ID)将消息发送到与参数相关联的消息队列。例如,传递相同的订单 ID,消息会发送到相同的消息队列,确保消息的严格顺序。

2.2 生产者代码分析

public class Producer {
public static void main(String[] args) throws Exception {
MQProducer producer = new DefaultMQProducer("group_name");
producer.start();
String[] tags = {"TagA", "TagB"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
Message msg = new Message("topic", tags[i % tags.length], "KEY" + i, "Hello RocketMQ " + i);
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List
mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
// 处理发送结果
}
producer.shutdown();
}
}
  • MessageQueueSelector 实现:根据传递的 orderId 将消息发送到与 orderId 相关联的消息队列。
  • DefaultMQProducer:负责发送消息,内部调用 sendSelectImpl 方法进行消息队列选择和发送。

2.3 发送流程

  • 选择消息队列:通过 MessageQueueSelector 根据参数选择消息队列。
  • 发送消息:调用 send 方法,将消息发送至选定的消息队列。
  • 处理结果:根据发送结果进行相应的处理,如日志记录或重试。
  • 3. 消费者(Consumer)的严格顺序消息消费

    3.1 锁机制

    • Broker 消息队列锁:在集群模式下,消费者从 Broker 获得消息队列锁,确保严格顺序消费。
    • 消费者消息队列锁:消费者获得该锁后才能操作消息队列。
    • 消费者消息处理队列锁:消费者获得该锁后才能消费消息队列。

    3.2 消息队列锁定与解锁

    • 锁定消息队列:消费者向 Broker 解锁消息队列,确保该消息队列归属其自身。
    • 解锁消息队列:消费者在不再处理消息时,向 Broker 解锁消息队列。集群模式下需要解锁,而广播模式下无需解锁。

    3.3 消息消费流程

  • 获取锁:消费者先获取消息队列锁,再获取消息处理队列锁,确保无并发消费。
  • 获取消息:从消息处理队列中获取消息。
  • 处理消息:根据消息处理逻辑处理消息,更新消费进度。
  • 提交进度:提交消费进度,标记消息已成功消费。
  • 处理异常:在处理过程中遇到异常时,回滚消费进度,等待下次重新消费。
  • 3.4 消息处理队列核心方法

    • commit():提交消费进度,更新消息队列的消费进度。
    • rollback():回滚消费进度,撤销已提交的消费。
    • makeMessageToCosumeAgain():将消息重新加入消息处理队列,支持延迟消费。

    4. 消息处理结果处理

    消费者在处理消息结果时,可能会遇到以下几种状态:

    • SUCCESS:消息成功消费但未提交。
    • ROLLBACK:消息消费失败,回滚消费。
    • COMMIT:消息成功消费并提交。
    • SUSPEND_CURRENT_QUEUE_A_MOMENT:消息消费失败,暂停当前队列一段时间后重新消费。

    5. 错误处理与重试机制

    在严格顺序消费中,消费者在处理失败时会将消息发送到死信队列,稍后重新消费。这种机制确保了消息的可靠性,但需要设置重试次数的上限,以防止死循环。

    6. 源码分析总结

    • 生产者:通过 MessageQueueSelector 实现严格顺序消息的队列选择,确保消息按顺序发送。
    • 消费者:通过多层锁机制和消息处理队列的核心方法,确保严格顺序消费的高效性和可靠性。
    • 锁机制:多层锁确保了消息的同步性和互斥性,避免并发消费带来的问题。
    • 消息处理:支持消息的持久化存储和回滚处理,确保消息的严格顺序和高可靠性。

    通过理解这些机制,可以更好地掌握RocketMQ的高级特性,提升消息系统的性能和可靠性。

    转载地址:http://qiqfk.baihongyu.com/

    你可能感兴趣的文章
    Objective-C实现多尺度MSR算法(附完整源码)
    查看>>
    Objective-C实现多种方法求解定积分(附完整源码)
    查看>>
    Objective-C实现多组输入(附完整源码)
    查看>>
    Objective-C实现多项式函数在某个点的评估算法(附完整源码)
    查看>>
    Objective-C实现多项式哈希算法(附完整源码)
    查看>>
    Objective-C实现大位数乘法(附完整源码)
    查看>>
    Objective-C实现大根堆(附完整源码)
    查看>>
    Objective-C实现奇偶检验码(附完整源码)
    查看>>
    Objective-C实现奇偶转置排序算法(附完整源码)
    查看>>
    Objective-C实现奇异值分解SVD(附完整源码)
    查看>>
    Objective-C实现子集总和算法(附完整源码)
    查看>>
    Objective-C实现字符串autocomplete using trie(使用 trie 自动完成)算法(附完整源码)
    查看>>
    Objective-C实现字符串boyer moore search博耶摩尔搜索算法(附完整源码)
    查看>>
    Objective-C实现字符串IP地址转DWORD地址(附完整源码)
    查看>>
    Objective-C实现字符串jaro winkler算法(附完整源码)
    查看>>
    Objective-C实现字符串manacher马拉车算法(附完整源码)
    查看>>
    Objective-C实现字符串wildcard pattern matching通配符模式匹配算法(附完整源码)
    查看>>
    Objective-C实现字符串word patterns单词模式算法(附完整源码)
    查看>>
    Objective-C实现字符串Z 函数或 Z 算法(附完整源码)
    查看>>
    Objective-C实现字符串加解密(附完整源码)
    查看>>