一个主机怎么做两个网站,网站建设有哪些工作,wordpress使用流程,四川城乡建设网站根据使用者对读取操作的控制情况#xff0c;消费者可分为两种类型。一个是DefaultMQPushConsumer#xff0c;由系统控制读取操作#xff0c;收到消息后自动调用传入的处理方法来处理#xff1b;另一个是DefaultMQPullConsumer#xff0c;读取操作中的大部分功能由使用者自… 根据使用者对读取操作的控制情况消费者可分为两种类型。一个是DefaultMQPushConsumer由系统控制读取操作收到消息后自动调用传入的处理方法来处理另一个是DefaultMQPullConsumer读取操作中的大部分功能由使用者自主控制。
1.DefaultMQPushConsumer的使用
使用DefaultMQPushConsumer主要是设置好各种参数和传入处理消息的函数。系统收到消息后自动调用处理函数来处理消息自动保存Offset而且加入新的DefaultMQPushConsumer后会自动做负载均衡。下面结合org.apache.rocketmq.example.quickstart包中的源码来介绍如代码清单3-1所示。
代码清单3-1 DefaultMQPushConsumer示例 public class QuickStart { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer Consumer new DefaultMQPushConsumer (please_rename_unique_group_name_4); Consumer.setNamesrvAddr(name-server1-ip:9876;name-server2-ip:9876); Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); Consumer.setMessageModel(MessageModel.BROADCASTING); Consumer.subscribe(TopicTest, *); Consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() Receive New Messages: msgs %n); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); Consumer.start(); } } DefaultMQPushConsumer需要设置三个参数一是这个Consumer的GroupName二是NameServer的地址和端口号三是Topic的名称下面将分别进行详细介绍。
1Consumer的GroupName用于把多个Consumer组织到一起提高并发处理能力GroupName需要和消息模式MessageModel配合使用。
RocketMQ支持两种消息模式Clustering和Broadcasting。
·在Clustering模式下同一个ConsumerGroupGroupName相同里的每个Consumer只消费所订阅消息的一部分内容同一个ConsumerGroup里所有的Consumer消费的内容合起来才是所订阅Topic内容的整体从而达到负载均衡的目的。
·在Broadcasting模式下同一个ConsumerGroup里的每个Consumer都能消费到所订阅Topic的全部消息也就是一个消息会被多次分发被多个Consumer消费。
2NameServer的地址和端口号可以填写多个用分号隔开达到消除单点故障的目的比如“ip1portip2portip3port”。
3Topic名称用来标识消息类型需要提前创建。如果不需要消费某个Topic下的所有消息可以通过指定消息的Tag进行消息过滤比如Consumer.subscribeTopicTesttag1||tag2||tag3表示这个Consumer要消费“TopicTest”下带有tag1或tag2或tag3的消息Tag是在发送消息时设置的标签。在填写Tag参数的位置用null或者“*”表示要消费这个Topic的所有消息。
2.DefaultMQPushConsumer的处理流程
本节通过分析源码来说明DefaultMQPushConsumer的处理流程。
DefaultMQPushConsumer主要功能实现在DefaultMQPushConsumerImpl类中消息的处理逻辑是在pullMessage这个函数里的PullCallBack中。在PullCallBack函数里有个switch语句根据从Broker返回的消息类型做相应的处理具体处理逻辑可以查看源码如代码清单3-2所示。
代码清单3-2 DefaultMQPushConsuer的处理逻辑 switch (pullResult.getPullStatus()) { case FOUND: …… break; case NO_NEW_MSG: …… break; case OFFSET_ILLEGAL: …… break; default: break; } DefaultMQPushConsuer的源码中有很多PullRequest语句比如Default-MQPushConsumerImpl.this.executePullRequestImmediatelypullRequest。为什么“PushConsumer”中使用“PullRequest”呢这是通过“长轮询”方式达到Push效果的方法长轮询方式既有Pull的优点又兼具Push方式的实时性。
Push方式是Server端接收到消息后主动把消息推送给Client端实时性高。对于一个提供队列服务的Server来说用Push方式主动推送有很多弊端首先是加大Server端的工作量进而影响Server的性能其次Client的处理能力各不相同Client的状态不受Server控制如果Client不能及时处理Server推送过来的消息会造成各种潜在问题。
Pull方式是Client端循环地从Server端拉取消息主动权在Client手里自己拉取到一定量消息后处理妥当了再接着取。Pull方式的问题是循环拉取消息的间隔不好设定间隔太短就处在一个“忙等”的状态浪费资源每个Pull的时间间隔太长Server端有消息到来时有可能没有被及时处理。
“长轮询”方式通过Client端和Server端的配合达到既拥有Pull的优点又能达到保证实时性的目的。我们结合源码来分析如代码清单3-3和3-4所示。
代码清单3-3 发送Pull消息代码片段 PullMessageRequestHeader requestHeader new PullMessageRequestHeader(); requestHeader.setConsumerGroup(this.ConsumerGroup); requestHeader.setTopic(mq.getTopic()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setQueueOffset(Offset); requestHeader.setMaxMsgNums(maxNums); requestHeader.setSysFlag(sysFlagInner); requestHeader.setCommitOffset(commitOffset); requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis); requestHeader.setSubscription(subExpression); requestHeader.setSubVersion(subVersion); requestHeader.setExpressionType(expressionType); ------ PullResult pullResult this.mQClientFactory.getMQClientAPIImpl().pullMessage( brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback); 源码中有这一行设置语句requestHeader.setSuspendTimeoutMillisbrokerSus-pendMaxTimeMillis作用是设置Broker最长阻塞时间默认设置是15秒注意是Broker在没有新消息的时候才阻塞有消息会立刻返回。
代码清单3-4 “长轮询”服务端代码片段 package org.apache.rocketmq.broker.longpolling ------ if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { this.waitForRunning(5 * 1000); } else { this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills()); } long beginLockTimestamp this.systemClock.now(); this.checkHoldRequest(); long costTime this.systemClock.now() - beginLockTimestamp; if (costTime 5 * 1000) { Log.info([NOTIFYME] check hold request cost {} ms., costTime); } 从Broker的源码中可以看出服务端接到新消息请求后如果队列里没有新消息并不急于返回通过一个循环不断查看状态每次waitForRunning一段时间默认是5秒然后后再Check。默认情况下当Broker一直没有新消息第三次Check的时候等待时间超过Request里面的Broker-SuspendMaxTimeMillis就返回空结果。在等待的过程中Broker收到了新的消息后会直接调用notifyMessageArriving函数返回请求结果。“长轮询”的核心是Broker端HOLD住客户端过来的请求一小段时间在这个时间内有新消息到达就利用现有的连接立刻返回消息给Consumer。“长轮询”的主动权还是掌握在Consumer手中Broker即使有大量消息积压也不会主动推送给Consumer。
长轮询方式的局限性是在HOLD住Consumer请求的时候需要占用资源它适合用在消息队列这种客户端连接数可控的场景中。
3.DefaultMQPushConsumer的流量控制
本节分析PushConsumer的流量控制方法。PushConsumer的核心还是Pull方式所以采用这种方式的客户端能够根据自身的处理速度调整获取消息的操作速度。因为采用多线程处理方式实现流量控制的方面比单线程要复杂得多。
PushConsumer有个线程池消息处理逻辑在各个线程里同时执行这个线程池的定义如代码清单3-5所示。
代码清单3-5 DefaultMQPushConsumer的线程池定义 this.consumeExecutor new ThreadPoolExecutor( this.defaultMQPushConsumer.getConsumeThreadMin(), this.defaultMQPushConsumer.getConsumeThreadMax(), 1000 * 60, TimeUnit.MILLISECONDS, this.consumeRequestQueue, new ThreadFactoryImpl(ConsumeMessageThread_)); Pull获得的消息如果直接提交到线程池里执行很难监控和控制比如如何得知当前消息堆积的数量如何重复处理某些消息如何延迟处理某些消息RocketMQ定义了一个快照类ProcessQueue来解决这些问题在PushConsumer运行的时候每个Message Queue都会有个对应的ProcessQueue对象保存了这个Message Queue消息处理状态的快照。
ProcessQueue对象里主要的内容是一个TreeMap和一个读写锁。TreeMap里以Message Queue的Offset作为Key以消息内容的引用为Value保存了所有从MessageQueue获取到但是还未被处理的消息读写锁控制着多个线程对TreeMap对象的并发访问。
有了ProcessQueue对象流量控制就方便和灵活多了客户端在每次Pull请求前会做下面三个判断来控制流量如代码清单3-6所示。
代码清单3-6 PushConsumer的流量控制逻辑 long cachedMessageCount processQueue.getMsgCount().get(); long cachedMessageSizeInMiB processQueue.getMsgSize().get() / (1024 * 1024); if (cachedMessageCount this.defaultMQPushConsumer.getPullThresholdForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes % 1000) 0) { log.warn( the cached message count exceeds the threshold {}, so do flow control, minOffset{}, maxOffset{}, count{}, size{} MiB, pullRequest{}, flowControlTimes{}, this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } if (cachedMessageSizeInMiB this.defaultMQPushConsumer.getPullThresholdSize-ForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes % 1000) 0) { log.warn( the cached message size exceeds the threshold {} MiB, so do flow control, minOffset{}, maxOffset{}, count{}, size{} MiB, pullRequest{}, flowControlTimes{}, this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControl-Times); } return; } if (!this.consumeOrderly) { if (processQueue.getMaxSpan() this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueMaxSpanFlowControlTimes % 1000) 0) { log.warn( the queues messages, span too long, so do flow control, minOffset{}, maxOffset{}, maxSpan{}, pullRequest{}, flowControlTimes{}, processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, queueMaxSpanFlowControlTimes); } return; } } 从代码中可以看出PushConsumer会判断获取但还未处理的消息个数、消息总大小、Offset的跨度任何一个值超过设定的大小就隔一段时间再拉取消息从而达到流量控制的目的。此外ProcessQueue还可以辅助实现顺序消费的逻辑。
4.DefaultMQPullConsumer
使用DefaultMQPullConsumer像使用DefaultMQPushConsumer一样需要设置各种参数写处理消息的函数同时还需要做额外的事情。接下来结合org.apache.rocketmq.example.simple包中的例子源码来介绍如代码清单3-7所示。
代码清单3-7 PullConsumer示例 public class PullConsumer { private static final MapMessageQueue, Long OFFSE_TABLE new HashMapMessageQueue, Long(); public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer Consumer new DefaultMQPullConsumer (please_rename_unique_group_name_5); Consumer.start(); SetMessageQueue mqs Consumer.fetchSubscribeMessageQueues(TopicTest1); for (MessageQueue mq : mqs) { long Offset Consumer.fetchConsumeOffset(mq, true); System.out.printf(Consume from the Queue: mq %n); SINGLE_MQ: while (true) { try { PullResult pullResult Consumer.pullBlockIfNotFound(mq, null, getMessage-QueueOffset(mq), 32); System.out.printf(%s%n, pullResult); putMessageQueueOffset(mq, pullResult.getNextBegin-Offset()); switch (pullResult.getPullStatus()) { case FOUND: break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } Consumer.shutdown(); } private static long getMessageQueueOffset(MessageQueue mq) { Long Offset OFFSE_TABLE.get(mq); if (Offset ! null) return Offset; return 0; } private static void putMessageQueueOffset(MessageQueue mq, long Offset) { OFFSE_TABLE.put(mq, Offset); } } 示例代码的处理逻辑是逐个读取某Topic下所有Message Queue的内容读完一遍后退出主要处理额外的三件事情
1获取Message Queue并遍历
一个Topic包括多个Message Queue如果这个Consumer需要获取Topic下所有的消息就要遍历多有的Message Queue。如果有特殊情况也可以选择某些特定的Message Queue来读取消息。
2维护Offsetstore
从一个Message Queue里拉取消息的时候要传入Offset参数long类型的值随着不断读取消息Offset会不断增长。这个时候由用户负责把Offset存储下来根据具体情况可以存到内存里、写到磁盘或者数据库里等。
3根据不同的消息状态做不同的处理
拉取消息的请求发出后会返回FOUND、NO_MATCHED_MSG、NO_NEW_MSG、OFFSET_ILLEGAL四种状态需要根据每个状态做不同的处理。比较重要的两个状态是FOUNT和NO_NEW_MSG分别表示获取到消息和没有新的消息。
实际情况中可以把whiletrue放到外层达到无限循环的目的。因为PullConsumer需要用户自己处理遍历Message Queue、保存Offset所以PullConsumer有更多的自主性和灵活性。
5.Consumer的启动、关闭流程
消息队列一般是提供一个不间断的持续性服务Consumer在使用过程中如何才能优雅地启动和关闭确保不漏掉或者重复消费消息呢
Consumer分为Push和Pull两种方式对于PullConsumer来说使用者主动权很高可以根据实际需要暂停、停止、启动消费过程。需要注意的是Offset的保存要在程序的异常处理部分增加把Offset写入磁盘方面的处理记准了每个Message Queue的Offset才能保证消息消费的准确性。
DefaultMQPushConsumer的退出要调用shutdown函数以便释放资源、保存Offset等。这个调用要加到Consumer所在应用的退出逻辑中。
PushConsumer在启动的时候会做各种配置检查然后连接NameServer获取Topic信息启动时如果遇到异常比如无法连接NameServer程序仍然可以正常启动不报错日志里有WARN信息。在单机环境下可以测试这种情况启动DefaultMQPushConsumer时故意把NameServer地址填错程序仍然可以正常启动但是不会收到消息。
为什么DefaultMQPushConsumer在无法连接NameServer时不直接报错退出呢这和分布式系统的设计有关RocketMQ集群可以有多个NameServer、Broker某个机器出异常后整体服务依然可用。所以DefaultMQPushConsumer被设计成当发现某个连接异常时不立刻退出而是不断尝试重新连接。可以进行这样一个测试在DefaultMQPushConsumer正常运行的时候手动kill掉Broker或NameServer过一会儿再启动。会发现DefaultMQPushConsumer不会出错退出在服务恢复后正常运行在服务不可用的这段时间仅仅会在日志里报异常信息。
如果需要在DefaultMQPushConsumer启动的时候及时暴露配置问题该如何操作呢可以在Consumer.start语句后调用Consumer.fetchSubscribeMe-ssageQueuesTopicName这时如果配置信息写得不准确或者当前服务不可用这个语句会报MQClientException异常。