加快政务公开网站建设,专业网站建设最便宜,青海做网站找谁,网站制作哪个好一些前些天发现了一个巨牛的人工智能学习网站#xff0c;通俗易懂#xff0c;风趣幽默#xff0c;忍不住分享一下给大家。点击跳转到教程。
RocketMQ 是出自 A 公司的开源产品#xff0c;用 Java 语言实现#xff0c;在设计时参考了 Kafka#xff0c;并做出了自己的一些改进…前些天发现了一个巨牛的人工智能学习网站通俗易懂风趣幽默忍不住分享一下给大家。点击跳转到教程。
RocketMQ 是出自 A 公司的开源产品用 Java 语言实现在设计时参考了 Kafka并做出了自己的一些改进消息可靠性上比 Kafka 更好目前RocketMQ 的文档仍然不够丰富 1 2社区仍然无法与 Kafka 比肩但 A 公司已经推出了基于 RocketMQ 的云产品 3相信未来 RocketMQ 也会有不错的发展。本文采用 RocketMQ 3.2.6 进行实验由于 RocketMQ 与 Kafka 很相似本文很多地方对两者做出了比较。
基本概念
RocketMQ 由于借鉴了 Kafka 的设计包括组件的命名也很多与 Kafka 相似下面摘抄一段《RocketMQ 原理简介》中的介绍可以与 Kafka 的命名比对一下
Producer消息生产者负责产生消息一般由业务系统负责产生消息。Consumer消息消费者负责消费消息一般是后台系统负责异步消费。Push ConsumerConsumer 的一种应用通常向 Consumer 对象注册一个 Listener 接口一旦收到消息Consumer 对象立 刻回调 Listener 接口方法。Pull ConsumerConsumer 的一种应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息主动权由应用控制。Producer Group一类 Producer 的集合名称这类 Producer 通常发送一类消息且发送逻辑一致。Consumer Group一类 Consumer 的集合名称这类 Consumer 通常消费一类消息且消费逻辑一致。Broker消息中转角色负责存储消息转发消息一般也称为 Server。在 JMS 规范中称为 Provider。
《RocketMQ 原理简介》中还介绍了一些其他的概念例如广播消费和集群消费广播消费是 Consumer Group 中对于同一条消息每个 Consumer 都消费集群消费是 Consumer Group 中对于同一条消息只有一个 Consumer 消费。Kafka 采用的是集群消费不支持广播消费好吧是我没有找到。再例如普通顺序消息和严格顺序消息普通顺序消息在 Broker 重启情况下不会保证消息顺序性严格顺序消息即使在异常情况下也会保证消息的顺序性。个人理解所谓普通顺序消息应该就是 Kafka 中的 Partition 级别有序严格顺序消息应该是 Topic 级别有序但文中也提到这样的有序级别是要付出代价的Broker 集群中只要有一台机器不可用则整个集群都不可用降低服务可用性。使用这种模式需要依赖同步双写主备自动切换但自动切换功能目前还未实现我猜自动切换仅仅是没开源吧。说白了严格顺序消息不具备生产可用性自己玩玩还行其应用场景主要是数据库 binlog 同步。
关于 RocketMQ 和 Kafka 的对比可以参考 RocketMQ Wiki 中的文章 4看看就行不必较真。
关于顺序和分区
顺序性的话题刚才已经提到了一些RocketMQ 的实现应该不弱于 Kafka。对于分区RocketMQ 似乎有意弱化了这个概念只有在 Producer 中有一个参数 defaultTopicQueueNums分区在 RocketMQ 中有时被称为队列。RocketMQ 的普通顺序消息模式应该就是分区顺序性这点与 Kafka 一致。
关于高可用
RocketMQ 实现高可用的方式有多种《RocketMQ 用户指南》文档中提到的有多主模式、多主多从异步复制模式、多主多从同步复制模式。多主模式下性能较好但是在 Broker 宕机的时候该 Broker 上未消费的交易不可消费多主多从异步复制模式与 Kafka 的副本模式比较类似主 Broker 宕机后会自动切换到从 Broker消息的消费不会出现间断多主多从同步复制模式更进一步采用同步刷盘的方式避免了主 Broker 宕机带来的消息丢失但是目前不支持自动切换。
虽然 RocketMQ 提供了多种高可用方式但是目前能生产使用的就只有多主多从异步复制模式即使在这个模式上其实现也比 Kafka 要差。因为 RocketMQ 的机制中主从关系是人为指定的主 Broker 上承担所有的消息派发而 Kafka 的主从关系是通过选举的方式选出来的每个分区的主节点都是不一样的可以从不同的节点派发消息。Kafka 的模式是分散模式有利于负载均衡而且当一个 Broker 宕机的时候只影响部分 Topic而 RocketMQ 一旦主 Broker 宕机会影响所有的 Topic。另外Kafka 可以支持 Broker 间同步复制通过设置 Broker 的 acks 参数这样比的话RocketMQ 就差太多了。
关于 RocketMQ 的介绍网上的文章不算太多也比较杂《分布式开放消息系统(RocketMQ)的原理与实践》5 6 7这篇原理介绍的不错推荐。
RocketMQ 的工具和编程接口
RocketMQ 的工具
相比较 Kafka 而言RocketMQ 提供的工具要少一些如下
bin/mqadminbin/mqbrokerbin/mqbroker.numanode0bin/mqbroker.numanode1bin/mqbroker.numanode2bin/mqbroker.numanode3bin/mqfiltersrvbin/mqnamesrvbin/mqshutdown
除了进程启停之外常用的运维命令都在 mqadmin 中详见《RocketMQ 运维指令》文档。我实验中常用的一些命令如下
sh mqnamesrv sh mqbroker -c async-broker-a.properties sh mqbroker -c async-broker-a-s.properties sh mqadmin topicList -n 192.168.232.23:9876sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTestjjjsh mqadmin clusterList -n 192.168.232.23:9876sh mqadmin deleteTopic -c DefaultCluster -n 192.168.232.23:9876 -t TopicTestjjjsh mqadmin consumerProgress -n 192.168.232.23:9876 -g ConsumerGroupNamecc4sh mqadmin deleteSubGroup -c DefaultCluster -n 192.168.232.23:9876 -g ConsumerGroupNamecc4sh mqadmin consumerConnection -n 192.168.232.23:9876 -g ConsumerGroupNamecc4
RocketMQ 使用了自己的 name server 来做调度Kafka 用了 Zookeeper使用 sh mqnamesrv 来启动默认监听端口9876sh mqnamesrv -m 可以查看所有默认参数使用 -c xxxx.properties 参数来指定自定义配置。sh mqbroker 是用于启动 Broker 的命令参数比较多详细可以通过 sh mqbroker -m 查看默认参数配置项细节后文再说。sh mqadmin 是运维命令入口topicList 是列出所有 TopictopicRoute 是列出单个 Topic 的详细信息clusterList 是列出集群的信息deleteTopic 是删除 Topic。consumerProgress 是查看消费者消费进度deleteSubGroup 是删除消费者的订阅consumerConnection 是查询消费者订阅的情况。
Broker 的配置是最多的实验中我修改到的部分如下其他使用默认
brokerClusterNameDefaultClusterbrokerIP1192.168.232.23brokerNamebroker-abrokerId0namesrvAddr192.168.232.23:9876listenPort10911deleteWhen04fileReservedTime120storePathRootDir/home/arnes/alibaba-rocketmq/data/store-a-asyncstorePathCommitLog/home/arnes/alibaba-rocketmq/data/store-a-async/commitlogbrokerRoleASYNC_MASTERflushDiskTypeASYNC_FLUSH
配置文件中的多数配置看例子就可以知道意思挑几个说一下。brokerName 和 brokerId 同名的 BrokerID 是0的是主节点其他是从节点deleteWhen删除文件时间点默认凌晨4点fileReservedTime文件保留时间设置为120小时brokerRoleBroker 的角色ASYNC_MASTER 是异步复制主节点SYNC_MASTER 是同步双写主节点SLAVE 是备节点。
其实这些工具的写法也基本一致都是先做一些检查最后运行 Java 程序JVM 系统上的应用应该差不多都这样。
RocketMQ 的 Java API
RocketMQ 是用 Java 语言开发的因此其 Java API 相对是比较丰富的当然也有部分原因是 RocketMQ 本身提供的功能就比较多。RocketMQ API 提供的功能包括
广播消费这个在之前已经提到过消息过滤支持简单的 Message Tag 过滤也支持按 Message Header、body 过滤顺序消费和乱序消费之前也提到过这里的顺序消费应该指的是普通顺序性这一点与 Kafka 相同Pull 模式消费这个是相对 Push 模式来说的Kafka 就是 Pull 模式消费事务消息这个好像没有开源但是 example 代码中有示例总之不推荐用TagRocketMQ 在 Topic 下面又分了一层 Tag用于表示消息类别可以用来过滤但是顺序性还是以 Topic 来看
单看功能的话即使不算事务消息也不算 TagRocketMQ 也远超 KafkaKafka 应该只实现了 Pull 模式消费 顺序消费这2个功能。RocketMQ 的代码示例在 rocketmq-example 中注意代码是不能直接运行的因为所有的代码都少了设置 name server 的部分需要自己手动加上例如producer.setNamesrvAddr(192.168.232.23:9876);。
先来看一下生产者的 API比较简单只有一种如下
import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.client.producer.DefaultMQProducer;import com.alibaba.rocketmq.client.producer.MessageQueueSelector;import com.alibaba.rocketmq.client.producer.SendResult;import com.alibaba.rocketmq.common.message.Message;import com.alibaba.rocketmq.common.message.MessageQueue;import java.util.List;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer new DefaultMQProducer(ProducerGroupName);producer.setNamesrvAddr(192.168.232.23:9876);producer.start();for (int i 0; i 10; i)try {{Message msg new Message(TopicTest1,// topicTagA,// tagOrderID188,// key(RocketMQ String.format(%05d, i)).getBytes());// bodySendResult sendResult producer.send(msg, new MessageQueueSelector() {Overridepublic MessageQueue select(ListMessageQueue mqs, Message msg, Object arg) {Integer id (Integer) arg;int index id % mqs.size();return mqs.get(index);}}, i));System.out.println(String.format(%05d, i)sendResult);}}catch (Exception e) {e.printStackTrace();}producer.shutdown();}}
可以发现相比 Kafka 的 API只多了 Tag但实际上行为有很大不同。Kafka 的生产者客户端有同步和异步两种模式但都是阻塞模式send 方法返回发送状态的 Future可以通过 Future 的 get 方法阻塞获得发送状态。而 RocketMQ 采用的是同步非阻塞模式发送之后立刻返回发送状态而不是 Future。正常情况下两者使用上差别不大但是在高可用场景中发生主备切换的时候Kafka 的同步可以等待切换完成并重连最后返回而 RocketMQ 只能立刻报错由生产者选择是否重发。所以在生产者的 API 上其实 Kafka 是要强一些的。
另外RocketMQ 可以通过指定 MessageQueueSelector 类的实现来指定将消息发送到哪个分区去Kafka 是通过指定生产者的 partitioner.class 参数来实现的灵活性上 RocketMQ 略胜一筹。
再来看消费者的API由于 RocketMQ 的功能比较多我们先看 Pull 模式消费的API如下
import java.util.HashMap;import java.util.Map;import java.util.Set;import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;import com.alibaba.rocketmq.client.consumer.PullResult;import com.alibaba.rocketmq.client.consumer.store.OffsetStore;import com.alibaba.rocketmq.client.consumer.store.RemoteBrokerOffsetStore;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.message.MessageExt;import com.alibaba.rocketmq.common.message.MessageQueue;public class PullConsumer {private static final MapMessageQueue, Long offseTable new HashMapMessageQueue, Long();public static void main(String[] args) throws MQClientException {DefaultMQPullConsumer consumer new DefaultMQPullConsumer(please_rename_unique_group_name_5);consumer.setNamesrvAddr(192.168.232.23:9876);consumer.start();SetMessageQueue mqs consumer.fetchSubscribeMessageQueues(TopicTest1);for (MessageQueue mq : mqs) {System.out.println(Consume from the queue: mq);SINGLE_MQ: while (true) {try {long offset consumer.fetchConsumeOffset(mq, true);PullResult pullResult consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);if (null ! pullResult.getMsgFoundList()) {for (MessageExt messageExt : pullResult.getMsgFoundList()) {System.out.print(new String(messageExt.getBody()));System.out.print(pullResult);System.out.println(messageExt);}}putMessageQueueOffset(mq, pullResult.getNextBeginOffset());switch (pullResult.getPullStatus()) {case FOUND:// TODObreak;case NO_MATCHED_MSG:break;case NO_NEW_MSG:break SINGLE_MQ;case OFFSET_ILLEGAL:break;default:break;}}catch (Exception e) {e.printStackTrace();}}}consumer.shutdown();}private static void putMessageQueueOffset(MessageQueue mq, long offset) {offseTable.put(mq, offset);}private static long getMessageQueueOffset(MessageQueue mq) {Long offset offseTable.get(mq);if (offset ! null)return offset;return 0;}}
这部分的 API 其实是和 Kafka 很相似的唯一不同的是RocketMQ 需要手工管理 offset 和指定分区而 Kafka 可以自动管理当然也可以手动管理并且不需要指定分区分区是在 Kafka 订阅的时候指定的。例子中RocketMQ 使用 HashMap 自行管理也可以用 OffsetStore 接口提供了两种管理方式本地文件和远程 Broker。这部分感觉两者差不多。
下面再看看 Push 模式顺序消费代码如下
import java.util.List;import java.util.concurrent.atomic.AtomicLong;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;import com.alibaba.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(please_rename_unique_group_name_3);consumer.setNamesrvAddr(192.168.232.23:9876);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe(TopicTest1, TagA || TagC || TagD);consumer.registerMessageListener(new MessageListenerOrderly() {AtomicLong consumeTimes new AtomicLong(0);Overridepublic ConsumeOrderlyStatus consumeMessage(ListMessageExt msgs, ConsumeOrderlyContext context) {context.setAutoCommit(false);System.out.println(Thread.currentThread().getName() Receive New Messages: msgs);this.consumeTimes.incrementAndGet();if ((this.consumeTimes.get() % 2) 0) {return ConsumeOrderlyStatus.SUCCESS;}else if ((this.consumeTimes.get() % 3) 0) {return ConsumeOrderlyStatus.ROLLBACK;}else if ((this.consumeTimes.get() % 4) 0) {return ConsumeOrderlyStatus.COMMIT;}else if ((this.consumeTimes.get() % 5) 0) {context.setSuspendCurrentQueueTimeMillis(3000);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.println(Consumer Started.);}}
虽然提供了 Push 模式RocketMQ 内部实际上还是 Pull 模式的 MQPush 模式的实现应该采用的是长轮询这点与 Kafka 一样。使用该方式有几个注意的地方
接收消息的监听类要使用 MessageListenerOrderlyConsumeFromWhere 有几个参数表示从头开始消费从尾开始消费还是从某个 TimeStamp 开始消费可以控制 offset 的提交应该就是 context.setAutoCommit(false); 的作用
控制 offset 提交这个特性非常有用某种程度上扩展一下就可以当做事务来用了看代码 ConsumeMessageOrderlyService 的实现其实并没有那么复杂在不启用 AutoCommit 的时候只有返回 COMMIT 才 commit offset启用 AutoCommit 的时候返回 COMMIT、ROLLBACK这个比较扯、SUCCESS 的时候都 commit offset。 后来发现commit offset 功能在 Kafka 里面也有提供使用新的 API调用 consumer.commitSync。 再看一个 Push 模式乱序消费 消息过滤的例子消费者的代码如下
import java.util.List;import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;import com.alibaba.rocketmq.client.exception.MQClientException;import com.alibaba.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(ConsumerGroupNamecc4);consumer.setNamesrvAddr(192.168.232.23:9876);consumer.subscribe(TopicTest1, MessageFilterImpl.class.getCanonicalName());consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() Receive New Messages: msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println(Consumer Started.);}} 接收消息的监听类使用的是 MessageListenerConcurrently这个例子与之前顺序消费不同的地方在于
回调方法中使用的是自动 offset commit订阅的时候增加了消息过滤类 MessageFilterImpl
消息过滤类 MessageFilterImpl 的代码如下
import com.alibaba.rocketmq.common.filter.MessageFilter;import com.alibaba.rocketmq.common.message.MessageExt;public class MessageFilterImpl implements MessageFilter {Overridepublic boolean match(MessageExt msg) {String property msg.getUserProperty(SequenceId);if (property ! null) {int id Integer.parseInt(property);if ((id % 3) 0 (id 10)) {return true;}}return false;}}
RocketMQ 执行过滤是在 Broker 端Broker 所在的机器会启动多个 FilterServer 过滤进程Consumer 启动后会向 FilterServer 上传一个过滤的 Java 类Consumer 从 FilterServer 拉消息FilterServer 将请求转发给 BrokerFilterServer 从 Broker 收到消息后按照 Consumer 上传的 Java 过滤程序做过滤过滤完成后返回给 Consumer。这种过滤方法可以节省网络流量但是增加了 Broker 的负担。可惜我没有实验出来使用过滤的效果即使是用 github wiki 上的例子8也没成功不纠结了。RocketMQ 的按 Tag 过滤的功能也是在 Broker 上做的过滤能用是个很方便的功能。
还有一种广播消费模式比较简单可以去看代码不再列出。
总之RocketMQ 提供的功能比较多比 Kafka 多很多易用的 API。
RocketMQ 的主备模式
按之前所说只有 RocketMQ 的多主多从异步复制是可以生产使用的因此只在这个场景下测试。另外消息采用 Push 顺序模式消费。
假设集群采用2主2备的模式需要启动4个 Broker配置文件如下
brokerNamebroker-abrokerId0listenPort10911storePathRootDir/home/arnes/alibaba-rocketmq/data/store-a-asyncstorePathCommitLog/home/arnes/alibaba-rocketmq/data/store-a-async/commitlogbrokerRoleASYNC_MASTERbrokerNamebroker-abrokerId1listenPort10921storePathRootDir/home/arnes/alibaba-rocketmq/data/store-a-async-slavestorePathCommitLog/home/arnes/alibaba-rocketmq/data/store-a-async-slave/commitlogbrokerRoleSLAVEbrokerNamebroker-bbrokerId0listenPort20911storePathRootDir/home/arnes/alibaba-rocketmq/data/store-b-asyncstorePathCommitLog/home/arnes/alibaba-rocketmq/data/store-b-async/commitlogbrokerRoleASYNC_MASTERbrokerRoleASYNC_MASTERbrokerNamebroker-bbrokerId1listenPort20921storePathRootDir/home/arnes/alibaba-rocketmq/data/store-b-async-slavestorePathCommitLog/home/arnes/alibaba-rocketmq/data/store-b-async-slave/commitlogbrokerRoleSLAVE
另外每个机构共通的配置项如下
brokerClusterNameDefaultClusterbrokerIP1192.168.232.23namesrvAddr192.168.232.23:9876deleteWhen04fileReservedTime120flushDiskTypeASYNC_FLUSH
其他设置均采用默认。启动 NameServer 和所有 Broker并试运行一下 Producer然后看一下 TestTopic1 当前的情况
$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1{brokerDatas:[{brokerAddrs:{0:192.168.232.23:20911,1:192.168.232.23:20921},brokerName:broker-b},{brokerAddrs:{0:192.168.232.23:10911,1:192.168.232.23:10921},brokerName:broker-a}],filterServerTable:{},queueDatas:[{brokerName:broker-a,perm:6,readQueueNums:4,topicSynFlag:0,writeQueueNums:4},{brokerName:broker-b,perm:6,readQueueNums:4,topicSynFlag:0,writeQueueNums:4}]}
可见TestTopic1 在2个 Broker 上且每个 Broker 备机也在运行。下面开始主备切换的实验分别启动 Consumer 和 Producer 进程消息采用 Pull 顺序模式消费。在消息发送接收过程中使用 kill -9 停掉 broker-a 的主进程模拟突然宕机。此时TestTopic1 的状态如下
$ sh mqadmin topicRoute -n 192.168.232.23:9876 -t TopicTest1{brokerDatas:[{brokerAddrs:{0:192.168.232.23:20911,1:192.168.232.23:20921},brokerName:broker-b},{brokerAddrs:{1:192.168.232.23:10921},brokerName:broker-a}],filterServerTable:{},queueDatas:[{brokerName:broker-a,perm:6,readQueueNums:4,topicSynFlag:0,writeQueueNums:4},{brokerName:broker-b,perm:6,readQueueNums:4,topicSynFlag:0,writeQueueNums:4}]}
此时RocketMQ 已经恢复。
再来看看 Producer 和 Consumer 的日志先看 Producer 的如下
......00578SendResult [sendStatusSEND_OK, msgIdC0A8E81700002A9F0000000000126F08, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-a, queueId2], queueOffset141]00579SendResult [sendStatusSEND_OK, msgIdC0A8E81700002A9F0000000000126F9F, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-a, queueId3], queueOffset141]00580SendResult [sendStatusSEND_OK, msgIdC0A8E817000051AF0000000000078D47, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-b, queueId0], queueOffset700]00581SendResult [sendStatusSEND_OK, msgIdC0A8E817000051AF0000000000078DDE, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-b, queueId1], queueOffset700]00582SendResult [sendStatusSEND_OK, msgIdC0A8E817000051AF0000000000078E75, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-b, queueId2], queueOffset699]00583SendResult [sendStatusSEND_OK, msgIdC0A8E817000051AF0000000000078F0C, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-b, queueId3], queueOffset699]com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to 192.168.232.23:10911 failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to 192.168.232.23:10911 failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to 192.168.232.23:10911 failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to 192.168.232.23:10911 failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)00588SendResult [sendStatusSEND_OK, msgIdC0A8E817000051AF0000000000078FA3, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-b, queueId0], queueOffset701]00589SendResult [sendStatusSEND_OK, msgIdC0A8E817000051AF000000000007903A, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-b, queueId1], queueOffset701]00590SendResult [sendStatusSEND_OK, msgIdC0A8E817000051AF00000000000790D1, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-b, queueId2], queueOffset700]00591SendResult [sendStatusSEND_OK, msgIdC0A8E817000051AF0000000000079168, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-b, queueId3], queueOffset700]com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to 192.168.232.23:10911 failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to 192.168.232.23:10911 failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to 192.168.232.23:10911 failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to 192.168.232.23:10911 failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessageSync(MQClientAPIImpl.java:306)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.sendMessage(MQClientAPIImpl.java:289)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendKernelImpl(DefaultMQProducerImpl.java:679)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendSelectImpl(DefaultMQProducerImpl.java:867)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:842)at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:836)at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:164)at com.comstar.demo.rocketmq.simple.Producer.main(Producer.java:61)00596SendResult [sendStatusSEND_OK, msgIdC0A8E817000051AF00000000000791FF, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-b, queueId0], queueOffset702]00597SendResult [sendStatusSEND_OK, msgIdC0A8E817000051AF0000000000079296, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-b, queueId1], queueOffset702]00598SendResult [sendStatusSEND_OK, msgIdC0A8E817000051AF000000000007932D, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-b, queueId2], queueOffset701]00599SendResult [sendStatusSEND_OK, msgIdC0A8E817000051AF00000000000793C4, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-b, queueId3], queueOffset701]00600SendResult [sendStatusSEND_OK, msgIdC0A8E817000051AF000000000007945B, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-b, queueId0], queueOffset703]00601SendResult [sendStatusSEND_OK, msgIdC0A8E817000051AF00000000000794F2, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-b, queueId1], queueOffset703]00602SendResult [sendStatusSEND_OK, msgIdC0A8E817000051AF0000000000079589, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-b, queueId2], queueOffset702]00603SendResult [sendStatusSEND_OK, msgIdC0A8E817000051AF0000000000079620, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-b, queueId3], queueOffset702]......01389SendResult [sendStatusSEND_OK, msgIdC0A8E817000051AF00000000000965BE, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-b, queueId1], queueOffset900]01390SendResult [sendStatusSEND_OK, msgIdC0A8E817000051AF0000000000096655, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-b, queueId2], queueOffset899]01391SendResult [sendStatusSEND_OK, msgIdC0A8E817000051AF00000000000966EC, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-b, queueId3], queueOffset899]01392SendResult [sendStatusSEND_OK, msgIdC0A8E81700002A9F0000000000127036, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-a, queueId0], queueOffset143]01393SendResult [sendStatusSEND_OK, msgIdC0A8E81700002A9F00000000001270CD, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-a, queueId1], queueOffset141]01394SendResult [sendStatusSEND_OK, msgIdC0A8E81700002A9F0000000000127164, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-a, queueId2], queueOffset142]01395SendResult [sendStatusSEND_OK, msgIdC0A8E81700002A9F00000000001271FB, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-a, queueId3], queueOffset142]01396SendResult [sendStatusSEND_OK, msgIdC0A8E817000051AF0000000000096783, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-b, queueId0], queueOffset901]01397SendResult [sendStatusSEND_OK, msgIdC0A8E817000051AF000000000009681A, messageQueueMessageQueue [topicTopicTest1, brokerNamebroker-b, queueId1], queueOffset901]
日志中显示在发送完00583条消息之后开始发生异常 connect to 192.168.232.23:10911 failed原因应该是 broker-a 的主节点被 kill 掉。之后从00596条消息开始RocketMQ 又恢复正常原因是 broker-b 已经开始提供服务承担了所有的工作。然后又重新启动了 broker-a 主节点由于该节点的加入从01392条消息开始broker-a 又开始恢复工作。实验中可以验证RocketMQ 所谓的多主多备模式实际上备机被弱化到无以复加在主节点宕机的时候备机无法接替主机的工作而只是将尚未发送的数据发送出去由剩下的主节点接替工作。也就是说N 主 N 备的 RocketMQ 集群中总共有 2N 台机器实际工作的只有 N 台如果有一台挂了就只有 N-1 台工作了机器的利用率太低了。
再来看一下 Consumer 的日志如下
RocketMQ 00551PullResult [pullStatusFOUND, nextBeginOffset696, minOffset0, maxOffset696, msgFoundList1]MessageExt [queueId3, storeSize151, queueOffset695, sysFlag0, bornTimestamp1469175032446, bornHost/192.168.234.98:51987, storeTimestamp1469175020973, storeHost/192.168.232.23:20911, msgIdC0A8E817000051AF000000000007859C, commitLogOffset492956, bodyCRC943070764, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET696, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 00559PullResult [pullStatusFOUND, nextBeginOffset697, minOffset0, maxOffset697, msgFoundList1]MessageExt [queueId3, storeSize151, queueOffset696, sysFlag0, bornTimestamp1469175032720, bornHost/192.168.234.98:51987, storeTimestamp1469175021247, storeHost/192.168.232.23:20911, msgIdC0A8E817000051AF00000000000787F8, commitLogOffset493560, bodyCRC921540126, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET697, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 00567PullResult [pullStatusFOUND, nextBeginOffset698, minOffset0, maxOffset698, msgFoundList1]MessageExt [queueId3, storeSize151, queueOffset697, sysFlag0, bornTimestamp1469175033005, bornHost/192.168.234.98:51987, storeTimestamp1469175021533, storeHost/192.168.232.23:20911, msgIdC0A8E817000051AF0000000000078A54, commitLogOffset494164, bodyCRC2054744282, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET698, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 00575PullResult [pullStatusFOUND, nextBeginOffset699, minOffset0, maxOffset699, msgFoundList1]MessageExt [queueId3, storeSize151, queueOffset698, sysFlag0, bornTimestamp1469175033286, bornHost/192.168.234.98:51987, storeTimestamp1469175021814, storeHost/192.168.232.23:20911, msgIdC0A8E817000051AF0000000000078CB0, commitLogOffset494768, bodyCRC225294519, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET699, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 00583PullResult [pullStatusFOUND, nextBeginOffset700, minOffset0, maxOffset700, msgFoundList1]MessageExt [queueId3, storeSize151, queueOffset699, sysFlag0, bornTimestamp1469175033586, bornHost/192.168.234.98:51987, storeTimestamp1469175022113, storeHost/192.168.232.23:20911, msgIdC0A8E817000051AF0000000000078F0C, commitLogOffset495372, bodyCRC1670775117, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET700, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 00591PullResult [pullStatusFOUND, nextBeginOffset701, minOffset0, maxOffset701, msgFoundList1]MessageExt [queueId3, storeSize151, queueOffset700, sysFlag0, bornTimestamp1469175037890, bornHost/192.168.234.98:51987, storeTimestamp1469175026418, storeHost/192.168.232.23:20911, msgIdC0A8E817000051AF0000000000079168, commitLogOffset495976, bodyCRC344150304, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET701, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 00599PullResult [pullStatusFOUND, nextBeginOffset702, minOffset0, maxOffset702, msgFoundList1]MessageExt [queueId3, storeSize151, queueOffset701, sysFlag0, bornTimestamp1469175042200, bornHost/192.168.234.98:51987, storeTimestamp1469175030734, storeHost/192.168.232.23:20911, msgIdC0A8E817000051AF00000000000793C4, commitLogOffset496580, bodyCRC442030354, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET702, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 00603PullResult [pullStatusFOUND, nextBeginOffset703, minOffset0, maxOffset703, msgFoundList1]MessageExt [queueId3, storeSize151, queueOffset702, sysFlag0, bornTimestamp1469175042345, bornHost/192.168.234.98:51987, storeTimestamp1469175030872, storeHost/192.168.232.23:20911, msgIdC0A8E817000051AF0000000000079620, commitLogOffset497184, bodyCRC688469276, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET703, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 00607PullResult [pullStatusFOUND, nextBeginOffset704, minOffset0, maxOffset704, msgFoundList1]MessageExt [queueId3, storeSize151, queueOffset703, sysFlag0, bornTimestamp1469175042481, bornHost/192.168.234.98:51987, storeTimestamp1469175031008, storeHost/192.168.232.23:20911, msgIdC0A8E817000051AF000000000007987C, commitLogOffset497788, bodyCRC778367237, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET704, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 00611PullResult [pullStatusFOUND, nextBeginOffset705, minOffset0, maxOffset705, msgFoundList1]MessageExt [queueId3, storeSize151, queueOffset704, sysFlag0, bornTimestamp1469175042615, bornHost/192.168.234.98:51987, storeTimestamp1469175031143, storeHost/192.168.232.23:20911, msgIdC0A8E817000051AF0000000000079AD8, commitLogOffset498392, bodyCRC1578919281, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET705, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 00615PullResult [pullStatusFOUND, nextBeginOffset706, minOffset0, maxOffset706, msgFoundList1]MessageExt [queueId3, storeSize151, queueOffset705, sysFlag0, bornTimestamp1469175042753, bornHost/192.168.234.98:51987, storeTimestamp1469175031280, storeHost/192.168.232.23:20911, msgIdC0A8E817000051AF0000000000079D34, commitLogOffset498996, bodyCRC1500619112, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET706, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 00619PullResult [pullStatusFOUND, nextBeginOffset707, minOffset0, maxOffset707, msgFoundList1]MessageExt [queueId3, storeSize151, queueOffset706, sysFlag0, bornTimestamp1469175042887, bornHost/192.168.234.98:51987, storeTimestamp1469175031414, storeHost/192.168.232.23:20911, msgIdC0A8E817000051AF0000000000079F90, commitLogOffset499600, bodyCRC1355279683, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET707, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 00623PullResult [pullStatusFOUND, nextBeginOffset708, minOffset0, maxOffset708, msgFoundList1]MessageExt [queueId3, storeSize151, queueOffset707, sysFlag0, bornTimestamp1469175043021, bornHost/192.168.234.98:51987, storeTimestamp1469175031548, storeHost/192.168.232.23:20911, msgIdC0A8E817000051AF000000000007A1EC, commitLogOffset500204, bodyCRC457136030, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET708, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 00627PullResult [pullStatusFOUND, nextBeginOffset709, minOffset0, maxOffset709, msgFoundList1]MessageExt [queueId3, storeSize151, queueOffset708, sysFlag0, bornTimestamp1469175043154, bornHost/192.168.234.98:51987, storeTimestamp1469175031681, storeHost/192.168.232.23:20911, msgIdC0A8E817000051AF000000000007A448, commitLogOffset500808, bodyCRC475173767, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET709, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 00631PullResult [pullStatusFOUND, nextBeginOffset710, minOffset0, maxOffset710, msgFoundList1]MessageExt [queueId3, storeSize151, queueOffset709, sysFlag0, bornTimestamp1469175043299, bornHost/192.168.234.98:51987, storeTimestamp1469175031826, storeHost/192.168.232.23:20911, msgIdC0A8E817000051AF000000000007A6A4, commitLogOffset501412, bodyCRC1814693875, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET710, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 00635PullResult [pullStatusFOUND, nextBeginOffset711, minOffset0, maxOffset711, msgFoundList1]MessageExt [queueId3, storeSize151, queueOffset710, sysFlag0, bornTimestamp1469175043435, bornHost/192.168.234.98:51987, storeTimestamp1469175031962, storeHost/192.168.232.23:20911, msgIdC0A8E817000051AF000000000007A900, commitLogOffset502016, bodyCRC1799865322, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET711, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to 192.168.232.23:10911 failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to 192.168.232.23:10911 failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to 192.168.232.23:10911 failedat com.alibaba.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:641)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessageSync(MQClientAPIImpl.java:518)at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.pullMessage(MQClientAPIImpl.java:433)at com.alibaba.rocketmq.client.impl.consumer.PullAPIWrapper.pullKernelImpl(PullAPIWrapper.java:237)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullSyncImpl(DefaultMQPullConsumerImpl.java:304)at com.alibaba.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl.pullBlockIfNotFound(DefaultMQPullConsumerImpl.java:425)at com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer.pullBlockIfNotFound(DefaultMQPullConsumer.java:321)at com.comstar.demo.rocketmq.simple.PullConsumer.main(PullConsumer.java:56)Hello MetaQPullResult [pullStatusFOUND, nextBeginOffset31, minOffset0, maxOffset205, msgFoundList31]MessageExt [queueId1, storeSize148, queueOffset0, sysFlag0, bornTimestamp1468572196808, bornHost/192.168.234.98:56837, storeTimestamp1468572191827, storeHost/192.168.232.23:10911, msgIdC0A8E81700002A9F0000000000011C60, commitLogOffset72800, bodyCRC1751783629, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET205, KEYSOrderID188, WAITtrue, TAGSTagA}, body11]]Hello MetaQPullResult [pullStatusFOUND, nextBeginOffset31, minOffset0, maxOffset205, msgFoundList31]MessageExt [queueId1, storeSize148, queueOffset1, sysFlag0, bornTimestamp1468572196876, bornHost/192.168.234.98:56837, storeTimestamp1468572191895, storeHost/192.168.232.23:10911, msgIdC0A8E81700002A9F0000000000011EB0, commitLogOffset73392, bodyCRC1751783629, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET205, KEYSOrderID188, WAITtrue, TAGSTagA}, body11]]Hello MetaQPullResult [pullStatusFOUND, nextBeginOffset31, minOffset0, maxOffset205, msgFoundList31]MessageExt [queueId1, storeSize148, queueOffset2, sysFlag0, bornTimestamp1468572196903, bornHost/192.168.234.98:56837, storeTimestamp1468572191928, storeHost/192.168.232.23:10911, msgIdC0A8E81700002A9F0000000000012100, commitLogOffset73984, bodyCRC1751783629, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET205, KEYSOrderID188, WAITtrue, TAGSTagA}, body11]]RocketMQ 00001PullResult [pullStatusFOUND, nextBeginOffset31, minOffset0, maxOffset205, msgFoundList31]MessageExt [queueId1, storeSize151, queueOffset3, sysFlag0, bornTimestamp1468572718149, bornHost/192.168.234.98:57165, storeTimestamp1468572713175, storeHost/192.168.232.23:10911, msgIdC0A8E81700002A9F000000000001222B, commitLogOffset74283, bodyCRC1133127810, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET205, KEYSOrderID188, WAITtrue, TAGSTagA}, body14]]RocketMQ 00005PullResult [pullStatusFOUND, nextBeginOffset31, minOffset0, maxOffset205, msgFoundList31]MessageExt [queueId1, storeSize151, queueOffset4, sysFlag0, bornTimestamp1468572718178, bornHost/192.168.234.98:57165, storeTimestamp1468572713210, storeHost/192.168.232.23:10911, msgIdC0A8E81700002A9F0000000000012487, commitLogOffset74887, bodyCRC1156050075, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET205, KEYSOrderID188, WAITtrue, TAGSTagA}, body14]]......[queueId1, storeSize151, queueOffset22, sysFlag0, bornTimestamp1469170324786, bornHost/192.168.234.98:49814, storeTimestamp1469170313333, storeHost/192.168.232.23:10911, msgIdC0A8E81700002A9F000000000010D3AA, commitLogOffset1102762, bodyCRC1707898805, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET205, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 00477PullResult [pullStatusFOUND, nextBeginOffset62, minOffset0, maxOffset205, msgFoundList31]MessageExt [queueId1, storeSize151, queueOffset23, sysFlag0, bornTimestamp1469170325237, bornHost/192.168.234.98:49814, storeTimestamp1469170313771, storeHost/192.168.232.23:10911, msgIdC0A8E81700002A9F000000000010D606, commitLogOffset1103366, bodyCRC1654764460, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET205, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 00481PullResult [pullStatusFOUND, nextBeginOffset62, minOffset0, maxOffset205, msgFoundList31]MessageExt [queueId1, storeSize151, queueOffset24, sysFlag0, bornTimestamp1469170325652, bornHost/192.168.234.98:49814, storeTimestamp1469170314163, storeHost/192.168.232.23:10911, msgIdC0A8E81700002A9F000000000010D862, commitLogOffset1103970, bodyCRC207227478, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET205, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 00485PullResult [pullStatusFOUND, nextBeginOffset62, minOffset0, maxOffset205, msgFoundList31]MessageExt [queueId1, storeSize151, queueOffset25, sysFlag0, bornTimestamp1469170326066, bornHost/192.168.234.98:49814, storeTimestamp1469170314595, storeHost/192.168.232.23:10911, msgIdC0A8E81700002A9F000000000010DABE, commitLogOffset1104574, bodyCRC188206671, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET205, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]......RocketMQ 01370PullResult [pullStatusFOUND, nextBeginOffset895, minOffset0, maxOffset895, msgFoundList1]MessageExt [queueId2, storeSize151, queueOffset894, sysFlag0, bornTimestamp1469175070573, bornHost/192.168.234.98:51987, storeTimestamp1469175059101, storeHost/192.168.232.23:20911, msgIdC0A8E817000051AF0000000000095A89, commitLogOffset613001, bodyCRC1094080495, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET895, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 01374PullResult [pullStatusFOUND, nextBeginOffset896, minOffset0, maxOffset896, msgFoundList1]MessageExt [queueId2, storeSize151, queueOffset895, sysFlag0, bornTimestamp1469175070712, bornHost/192.168.234.98:51987, storeTimestamp1469175059251, storeHost/192.168.232.23:20911, msgIdC0A8E817000051AF0000000000095CE5, commitLogOffset613605, bodyCRC1180406774, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET896, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 01378PullResult [pullStatusFOUND, nextBeginOffset897, minOffset0, maxOffset897, msgFoundList1]MessageExt [queueId2, storeSize151, queueOffset896, sysFlag0, bornTimestamp1469175070899, bornHost/192.168.234.98:51987, storeTimestamp1469175059427, storeHost/192.168.232.23:20911, msgIdC0A8E817000051AF0000000000095F41, commitLogOffset614209, bodyCRC1340989405, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET897, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 01382PullResult [pullStatusFOUND, nextBeginOffset898, minOffset0, maxOffset898, msgFoundList1]MessageExt [queueId2, storeSize151, queueOffset897, sysFlag0, bornTimestamp1469175071054, bornHost/192.168.234.98:51987, storeTimestamp1469175059582, storeHost/192.168.232.23:20911, msgIdC0A8E817000051AF000000000009619D, commitLogOffset614813, bodyCRC681585164, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET898, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 01386PullResult [pullStatusFOUND, nextBeginOffset899, minOffset0, maxOffset899, msgFoundList1]MessageExt [queueId2, storeSize151, queueOffset898, sysFlag0, bornTimestamp1469175071203, bornHost/192.168.234.98:51987, storeTimestamp1469175059731, storeHost/192.168.232.23:20911, msgIdC0A8E817000051AF00000000000963F9, commitLogOffset615417, bodyCRC802024981, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET899, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 01390PullResult [pullStatusFOUND, nextBeginOffset900, minOffset0, maxOffset900, msgFoundList1]MessageExt [queueId2, storeSize151, queueOffset899, sysFlag0, bornTimestamp1469175071338, bornHost/192.168.234.98:51987, storeTimestamp1469175059866, storeHost/192.168.232.23:20911, msgIdC0A8E817000051AF0000000000096655, commitLogOffset616021, bodyCRC1605728865, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET900, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]Hello MetaQPullResult [pullStatusFOUND, nextBeginOffset31, minOffset0, maxOffset209, msgFoundList31]MessageExt [queueId0, storeSize148, queueOffset0, sysFlag0, bornTimestamp1468571752640, bornHost/192.168.234.98:56433, storeTimestamp1468571747895, storeHost/192.168.232.23:10911, msgIdC0A8E81700002A9F0000000000011B38, commitLogOffset72504, bodyCRC1751783629, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET209, KEYSOrderID188, WAITtrue, TAGSTagA}, body11]]Hello MetaQPullResult [pullStatusFOUND, nextBeginOffset31, minOffset0, maxOffset209, msgFoundList31]MessageExt [queueId0, storeSize148, queueOffset1, sysFlag0, bornTimestamp1468572196772, bornHost/192.168.234.98:56837, storeTimestamp1468572191803, storeHost/192.168.232.23:10911, msgIdC0A8E81700002A9F0000000000011BCC, commitLogOffset72652, bodyCRC1751783629, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET209, KEYSOrderID188, WAITtrue, TAGSTagA}, body11]]Hello MetaQPullResult [pullStatusFOUND, nextBeginOffset31, minOffset0, maxOffset209, msgFoundList31]MessageExt [queueId0, storeSize148, queueOffset2, sysFlag0, bornTimestamp1468572196865, bornHost/192.168.234.98:56837, storeTimestamp1468572191886, storeHost/192.168.232.23:10911, msgIdC0A8E81700002A9F0000000000011E1C, commitLogOffset73244, bodyCRC1751783629, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET209, KEYSOrderID188, WAITtrue, TAGSTagA}, body11]]Hello MetaQPullResult [pullStatusFOUND, nextBeginOffset31, minOffset0, maxOffset209, msgFoundList31]MessageExt [queueId0, storeSize148, queueOffset3, sysFlag0, bornTimestamp1468572196899, bornHost/192.168.234.98:56837, storeTimestamp1468572191917, storeHost/192.168.232.23:10911, msgIdC0A8E81700002A9F000000000001206C, commitLogOffset73836, bodyCRC1751783629, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET209, KEYSOrderID188, WAITtrue, TAGSTagA}, body11]]RocketMQ 00000PullResult [pullStatusFOUND, nextBeginOffset31, minOffset0, maxOffset209, msgFoundList31]MessageExt [queueId0, storeSize151, queueOffset4, sysFlag0, bornTimestamp1468572718127, bornHost/192.168.234.98:57165, storeTimestamp1468572713166, storeHost/192.168.232.23:10911, msgIdC0A8E81700002A9F0000000000012194, commitLogOffset74132, bodyCRC881661972, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET209, KEYSOrderID188, WAITtrue, TAGSTagA}, body14]]RocketMQ 00004PullResult [pullStatusFOUND, nextBeginOffset31, minOffset0, maxOffset209, msgFoundList31]MessageExt [queueId0, storeSize151, queueOffset5, sysFlag0, bornTimestamp1468572718170, bornHost/192.168.234.98:57165, storeTimestamp1468572713197, storeHost/192.168.232.23:10911, msgIdC0A8E81700002A9F00000000000123F0, commitLogOffset74736, bodyCRC870374413, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET209, KEYSOrderID188, WAITtrue, TAGSTagA}, body14]]......RocketMQ 00560PullResult [pullStatusFOUND, nextBeginOffset210, minOffset0, maxOffset210, msgFoundList24]MessageExt [queueId0, storeSize151, queueOffset140, sysFlag0, bornTimestamp1469175032756, bornHost/192.168.234.98:51986, storeTimestamp1469175021285, storeHost/192.168.232.23:10911, msgIdC0A8E81700002A9F0000000000126922, commitLogOffset1206562, bodyCRC1679588729, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET210, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 00568PullResult [pullStatusFOUND, nextBeginOffset210, minOffset0, maxOffset210, msgFoundList24]MessageExt [queueId0, storeSize151, queueOffset141, sysFlag0, bornTimestamp1469175033043, bornHost/192.168.234.98:51986, storeTimestamp1469175021570, storeHost/192.168.232.23:10911, msgIdC0A8E81700002A9F0000000000126B7E, commitLogOffset1207166, bodyCRC1791489355, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET210, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 00576PullResult [pullStatusFOUND, nextBeginOffset210, minOffset0, maxOffset210, msgFoundList24]MessageExt [queueId0, storeSize151, queueOffset142, sysFlag0, bornTimestamp1469175033320, bornHost/192.168.234.98:51986, storeTimestamp1469175021848, storeHost/192.168.232.23:10911, msgIdC0A8E81700002A9F0000000000126DDA, commitLogOffset1207770, bodyCRC342157581, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET210, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 01392PullResult [pullStatusFOUND, nextBeginOffset210, minOffset0, maxOffset210, msgFoundList24]MessageExt [queueId0, storeSize151, queueOffset143, sysFlag0, bornTimestamp1469175071411, bornHost/192.168.234.98:52034, storeTimestamp1469175059951, storeHost/192.168.232.23:10911, msgIdC0A8E81700002A9F0000000000127036, commitLogOffset1208374, bodyCRC834345805, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET210, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 01400PullResult [pullStatusFOUND, nextBeginOffset210, minOffset0, maxOffset210, msgFoundList24]MessageExt [queueId0, storeSize151, queueOffset144, sysFlag0, bornTimestamp1469175071746, bornHost/192.168.234.98:52034, storeTimestamp1469175060289, storeHost/192.168.232.23:10911, msgIdC0A8E81700002A9F0000000000127292, commitLogOffset1208978, bodyCRC188274605, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET210, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 01408PullResult [pullStatusFOUND, nextBeginOffset211, minOffset0, maxOffset211, msgFoundList1]MessageExt [queueId0, storeSize151, queueOffset145, sysFlag0, bornTimestamp1469175072078, bornHost/192.168.234.98:52034, storeTimestamp1469175060614, storeHost/192.168.232.23:10911, msgIdC0A8E81700002A9F00000000001274EE, commitLogOffset1209582, bodyCRC98787231, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET211, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]RocketMQ 01416PullResult [pullStatusFOUND, nextBeginOffset214, minOffset0, maxOffset214, msgFoundList3]MessageExt [queueId0, storeSize151, queueOffset146, sysFlag0, bornTimestamp1469175072405, bornHost/192.168.234.98:52034, storeTimestamp1469175060934, storeHost/192.168.232.23:10911, msgIdC0A8E81700002A9F000000000012774A, commitLogOffset1210186, bodyCRC2067809241, reconsumeTimes0, preparedTransactionOffset0, toString()Message [topicTopicTest1, flag0, properties{MIN_OFFSET0, MAX_OFFSET214, KEYSOrderID188, WAITtrue, TAGSTagB}, body14]]
可以看到Consumer 在 broker-a 宕机时间的附近也出现了异常connect to 192.168.232.23:10911 failed。虽然还能保持分区上的顺序性但是已经某种程度上出现了一些紊乱例如将我在实验之前的数据给取了出来Hello MetaQ的消息。可是我在实验前明明做过删除这个 Topic 的动作看来 RocketMQ 所谓的删除并未删除 Topic 的数据。之后broker-a 主机重启之后又恢复正常。
RocketMQ Pull模式消费需要手动管理 offset 和指定分区这个在调用的时候不觉得实际运行的时候才会发现每次总是消费一个分区消费完之后才开始消费下一个分区而下一个分区可能已经堆积了很多消息了手动做消息分配又比较费事。或许Push 顺序模式消费才是更好的选择。
另外还有几个比较异常的情况实验中有几次出现了 CODE: 17 DESC: topic[TopicTest1] not exist, apply first please! 这样的错误实际上这时候我只是关掉了 Producer还有sh mqadmin updateTopic –n 192.168.232.23:9876 –c DefaultCluster –t TopicTest1 明明文档中说可以用来新增 Topic而实际上不行。 补充一下之后我又使用 Push 顺序模式消费重做了上述实验结论差不多。只是因为有多线程的原因日志看起来偶尔有错位这个问题不大可以解决。而且在关闭重启 Broker 的附近往往伴随着多次的消息重发不过RocketMQ 也不保证消息只收到一次就是了。消息重复的问题Kafka 要比 RocketMQ 显得不那么严重一些。Push 顺序模式消费不需要指定 offset不需要指定分区第二次启动可以自动从前一次的 offset 后开始消费。功能上这个与 Kafka 的 Consumer 更类似虽然 RocketMQ 采用的是异步模式。 RocketMQ 最佳实践
实际上RocketMQ 自己就有一份《RocketMQ 最佳实践》的文档里面提到了一些系统设计的问题例如消费者要幂等一个应用对应一个 Topic如此等等。这些经验不仅仅是对 RocketMQ 有用对 Kafka 也颇有借鉴意义。
后记
这里谈谈我对选择 RocketMQ 还是 Kafka 的个人建议。以上已经做了多处 RocketMQ 和 Kafka 的对比我个人觉得Kafka 是一个不断发展中的系统开源社区比 RocketMQ 要大也要更活跃一些另外Kafka 最新版本已经有了同步复制消息可靠性更有保障还有Kafka 的分区机制几乎实现了自动负载均衡这绝对是个杀手级特性RocketMQ 虽然提供了很多易用的功能远超出 Kafka但这些功能并不一定都能用得上而且多数可以绕过。相比之下Kafka 的基本功能更加吸引我再处理故障恢复的时候细节上要胜过 RocketMQ。当然如果是 A 公司内部或者所在公司使用了 A 公司的云产品那么 RocketMQ 的企业级特性更多一些或许我会选择 RocketMQ。 转自http://valleylord.github.io/post/201607-mq-rocketmq/