搭建网站开发网站环境,怎么自己做一个网页链接,四川建设人才官方网站,手机网站 案例一、普通消息 1 消息发送分类 Producer对于消息的发送方式也有多种选择#xff0c;不同的方式会产生不同的系统效果。 同步发送消息 同步发送消息是指#xff0c;Producer发出⼀条消息后#xff0c;会在收到MQ返回的ACK之后才发下⼀条消息。该方式的消息可靠性最高#xff…一、普通消息 1 消息发送分类 Producer对于消息的发送方式也有多种选择不同的方式会产生不同的系统效果。 同步发送消息 同步发送消息是指Producer发出⼀条消息后会在收到MQ返回的ACK之后才发下⼀条消息。该方式的消息可靠性最高但消息发送效率太低。 异步发送消息 异步发送消息是指Producer发出消息后无需等待MQ返回ACK直接发送下⼀条消息。该方式的消息可靠性可以得到保障消息发送效率也可以。 单向发送消息 单向发送消息是指Producer仅负责发送消息不等待、不处理MQ的ACK。该发送方式时MQ也不返回ACK。该方式的消息发送效率最高但消息可靠性较差。 2 代码举例 创建工程 创建一个Maven的Java工程rocketmq-test。 导入依赖 导入rocketmq的client依赖。
properties
project.build.sourceEncodingUTF-
8/project.build.sourceEncoding
maven.compiler.source1.8/maven.compiler.source
maven.compiler.target1.8/maven.compiler.target
/properties
dependencies
dependency
groupIdorg.apache.rocketmq/groupId
artifactIdrocketmq-client/artifactId
version4.8.0/version
/dependency
/dependencies定义同步消息发送生产者
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 创建一个producer参数为Producer Group名称
DefaultMQProducer producer new DefaultMQProducer(pg);
// 指定nameServer地址
producer.setNamesrvAddr(rocketmqOS:9876);
// 设置当发送失败时重试发送的次数默认为2次
producer.setRetryTimesWhenSendFailed(3);
// 设置发送超时时限为5s默认3s
producer.setSendMsgTimeout(5000);
// 开启生产者
producer.start();
// 生产并发送100条消息
for (int i 0; i 100; i) {
byte[] body (Hi, i).getBytes();
Message msg new Message(someTopic, someTag, body);
// 为消息指定key
msg.setKeys(key- i);
// 发送消息
SendResult sendResult producer.send(msg);
System.out.println(sendResult);
}
// 关闭producer
producer.shutdown();
}
}// 消息发送的状态
public enum SendStatus {
SEND_OK, // 发送成功
FLUSH_DISK_TIMEOUT, // 刷盘超时。当Broker设置的刷盘策略为同步刷盘时才可能出
现这种异常状态。异步刷盘不会出现
FLUSH_SLAVE_TIMEOUT, // Slave同步超时。当Broker集群设置的Master-Slave的复
制方式为同步复制时才可能出现这种异常状态。异步复制不会出现
SLAVE_NOT_AVAILABLE, // 没有可用的Slave。当Broker集群设置为Master-Slave的
复制方式为同步复制时才可能出现这种异常状态。异步复制不会出现
}定义异步消息发送生产者
public class AsyncProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer new DefaultMQProducer(pg);
producer.setNamesrvAddr(rocketmqOS:9876);
// 指定异步发送失败后不进行重试发送
producer.setRetryTimesWhenSendAsyncFailed(0);
// 指定新创建的Topic的Queue数量为2默认为4
producer.setDefaultTopicQueueNums(2);
producer.start();
for (int i 0; i 100; i) {
byte[] body (Hi, i).getBytes();
try {
Message msg new Message(myTopicA, myTag, body);
// 异步发送。指定回调
producer.send(msg, new SendCallback() {
// 当producer接收到MQ发送来的ACK后就会触发该回调方法的执行
Override
public void onSuccess(SendResult sendResult) {
System.out.println(sendResult);
}
Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
} // end-for
// sleep一会儿
// 由于采用的是异步发送所以若这里不sleep
// 则消息还未发送就会将producer给关闭报错
TimeUnit.SECONDS.sleep(3);
producer.shutdown();
}
}定义单向消息发送生产者
public class OnewayProducer {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer new DefaultMQProducer(pg);
producer.setNamesrvAddr(rocketmqOS:9876);
producer.start();
for (int i 0; i 10; i) {
byte[] body (Hi, i).getBytes();
Message msg new Message(single, someTag, body);
// 单向发送
producer.sendOneway(msg);
}
producer.shutdown();
System.out.println(producer shutdown);
}
}定义消息消费者
public class SomeConsumer {
public static void main(String[] args) throws MQClientException {
// 定义一个pull消费者
// DefaultLitePullConsumer consumer new
DefaultLitePullConsumer(cg);
// 定义一个push消费者
DefaultMQPushConsumer consumer new
DefaultMQPushConsumer(cg);
// 指定nameServer
consumer.setNamesrvAddr(rocketmqOS:9876);
// 指定从第一条消息开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
);
// 指定消费topic与tag
consumer.subscribe(someTopic, *);
// 指定采用“广播模式”进行消费默认为“集群模式”
// consumer.setMessageModel(MessageModel.BROADCASTING);
// 注册消息监听器
consumer.registerMessageListener(new
MessageListenerConcurrently() {
// 一旦broker中有了其订阅的消息就会触发该方法的执行
// 其返回值为当前consumer消费的状态
Override
public ConsumeConcurrentlyStatus
consumeMessage(ListMessageExt msgs,
ConsumeConcurrentlyContext context) {
// 逐条消费消息
for (MessageExt msg : msgs) {
System.out.println(msg);
}
// 返回消费状态消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 开启消费者消费
consumer.start();
System.out.println(Consumer Started);
}
}二、顺序消息 1 什么是顺序消息 顺序消息指的是严格按照消息的发送顺序进行消费的消息(FIFO)。 默认情况下生产者会把消息以Round Robin轮询方式发送到不同的Queue分区队列而消费消息时会从 多个Queue上拉取消息这种情况下的发送和消费是不能保证顺序的。如果将消息仅发送到同一个 Queue中消费时也只从这个Queue上拉取消息就严格保证了消息的顺序性。 2 为什么需要顺序消息 例如现在有TOPIC ORDER_STATUS (订单状态)其下有4个Queue队列该Topic中的不同消息用于 描述当前订单的不同状态。假设订单有状态未支付、已支付、发货中、发货成功、发货失败。 根据以上订单状态生产者从时序上可以生成如下几个消息 订单T0000001:未支付 -- 订单T0000001:已支付 -- 订单T0000001:发货中 -- 订单 T0000001:发货失败 消息发送到MQ中之后Queue的选择如果采用轮询策略消息在MQ的存储可能如下 这种情况下我们希望Consumer消费消息的顺序和我们发送是一致的然而上述MQ的投递和消费方 式我们无法保证顺序是正确的。对于顺序异常的消息Consumer即使设置有一定的状态容错也不 能完全处理好这么多种随机出现组合情况。 基于上述的情况可以设计如下方案对于相同订单号的消息通过一定的策略将其放置在一个 Queue中然后消费者再采用一定的策略例如一个线程独立处理一个queue保证处理消息的顺序 性能够保证消费的顺序性。
3 有序性分类 根据有序范围的不同RocketMQ可以严格地保证两种消息的有序性分区有序与全局有序。
全局有序 当发送和消费参与的Queue只有一个时所保证的有序是整个Topic中消息的顺序 称为全局有序。 在创建Topic时指定Queue的数量。有三种指定方式 1在代码中创建Producer时可以指定其自动创建的Topic的Queue数量 2在RocketMQ可视化控制台中手动创建Topic时指定Queue数量 3使用mqadmin命令手动创建Topic时指定Queue数量
分区有序 如果有多个Queue参与其仅可保证在该Queue分区队列上的消息顺序则称为分区有序。 如何实现Queue的选择在定义Producer时我们可以指定消息队列选择器而这个选择器是我们 自己实现了MessageQueueSelector接口定义的。 在定义选择器的选择算法时一般需要使用选择key。这个选择key可以是消息key也可以是其它 数据。但无论谁做选择key都不能重复都是唯一的。 一般性的选择算法是让选择key或其hash值与该Topic所包含的Queue的数量取模其结果 即为选择出的Queue的QueueId。 取模算法存在一个问题不同选择key与Queue数量取模结果可能会是相同的即不同选择key的 消息可能会出现在相同的Queue即同一个Consuemr可能会消费到不同选择key的消息。这个问 题如何解决一般性的作法是从消息中获取到选择key对其进行判断。若是当前Consumer需 要消费的消息则直接消费否则什么也不做。这种做法要求选择key要能够随着消息一起被 Consumer获取到。此时使用消息key作为选择key是比较好的做法。 以上做法会不会出现如下新的问题呢不属于那个Consumer的消息被拉取走了那么应该消费 该消息的Consumer是否还能再消费该消息呢同一个Queue中的消息不可能被同一个Group中的 不同Consumer同时消费。所以消费现一个Queue的不同选择key的消息的Consumer一定属于不 同的Group。而不同的Group中的Consumer间的消费是相互隔离的互不影响的。
public class OrderedProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer new DefaultMQProducer(pg);
producer.setNamesrvAddr(rocketmqOS:9876);
producer.start();
for (int i 0; i 100; i) {
Integer orderId i;
byte[] body (Hi, i).getBytes();
Message msg new Message(TopicA, TagA, body);
SendResult sendResult producer.send(msg, new
MessageQueueSelector() {
Override
public MessageQueue select(ListMessageQueue mqs,
Message msg, Object arg) {
Integer id (Integer) arg;
int index id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.println(sendResult);
}
producer.shutdown();
}
}三、延时消息 1 什么是延时消息 当消息写入到Broker后在指定的时长后才可被消费处理的消息称为延时消息。 采用RocketMQ的延时消息可以实现定时任务的功能而无需使用定时器。典型的应用场景是电商交 易中超时未支付关闭订单的场景12306平台订票超时未支付取消订票的场景。 在电商平台中订单创建时会发送一条延迟消息。这条消息将会在30分钟后投递给后台业务系 统Consumer后台业务系统收到该消息后会判断对应的订单是否已经完成支付。如果未完 成则取消订单将商品再次放回到库存如果完成支付则忽略。 在12306平台中车票预订成功后就会发送一条延迟消息。这条消息将会在45分钟后投递给后台 业务系统Consumer后台业务系统收到该消息后会判断对应的订单是否已经完成支付。如 果未完成则取消预订将车票再次放回到票池如果完成支付则忽略。 2 延时等级 延时消息的延迟时长不支持随意时长的延迟是通过特定的延迟等级来指定的。延时等级定义在 RocketMQ服务端的MessageStoreConfig类中的如下变量中 即若指定的延时等级为3则表示延迟时长为10s即延迟等级是从1开始计数的。 当然如果需要自定义的延时等级可以通过在broker加载的配置中新增如下配置例如下面增加了1 天这个等级1d。配置文件在RocketMQ安装目录下的conf目录中。
messageDelayLevel 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d3 延时消息实现原理 具体实现方案是 修改消息 Producer将消息发送到Broker后Broker会首先将消息写入到commitlog文件然后需要将其分发到相 应的consumequeue。不过在分发之前系统会先判断消息中是否带有延时等级。若没有则直接正 常分发若有则需要经历一个复杂的过程 修改消息的Topic为SCHEDULE_TOPIC_XXXX 根据延时等级在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId 目录与consumequeue文件如果没有这些目录与文件的话。 延迟等级delayLevel与queueId的对应关系为queueId delayLevel -1 需要注意在创建queueId目录时并不是一次性地将所有延迟等级对应的目录全部创建完毕 而是用到哪个延迟等级创建哪个目录 修改消息索引单元内容。索引单元中的Message Tag HashCode部分原本存放的是消息的Tag的 Hash值。现修改为消息的投递时间。投递时间是指该消息被重新修改为原Topic后再次被写入到 commitlog中的时间。投递时间 消息存储时间 延时等级时间。消息存储时间指的是消息 被发送到Broker时的时间戳。 将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中 SCHEDULE_TOPIC_XXXX目录中各个延时等级Queue中的消息是如何排序的 是按照消息投递时间排序的。一个Broker中同一等级的所有延时消息会被写入到consumequeue 目录中SCHEDULE_TOPIC_XXXX目录下相同Queue中。即一个Queue中消息投递时间的延迟等 级时间是相同的。那么投递时间就取决于于消息存储时间了。即按照消息被发送到Broker的时 间进行排序的。 投递延时消息 Broker内部有⼀个延迟消息服务类ScheuleMessageService其会消费SCHEDULE_TOPIC_XXXX中的消 息即按照每条消息的投递时间将延时消息投递到⽬标Topic中。不过在投递之前会从commitlog 中将原来写入的消息再次读出并将其原来的延时等级设置为0即原消息变为了一条不延迟的普通消 息。然后再次将消息投递到目标Topic中。 ScheuleMessageService在Broker启动时会创建并启动一个定时器TImer用于执行相应的定时 任务。系统会根据延时等级的个数定义相应数量的TimerTask每个TimerTask负责一个延迟 等级消息的消费与投递。每个TimerTask都会检测相应Queue队列的第一条消息是否到期。若第 一条消息未到期则后面的所有消息更不会到期消息是按照投递时间排序的若第一条消 息到期了则将该消息投递到目标Topic即消费该消息。 将消息重新写入commitlog 延迟消息服务类ScheuleMessageService将延迟消息再次发送给了commitlog并再次形成新的消息索 引条目分发到相应Queue。 这其实就是一次普通消息发送。只不过这次的消息Producer是延迟消息服务类 ScheuleMessageService。
4 代码举例 定义DelayProducer类
public class DelayProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer new DefaultMQProducer(pg);
producer.setNamesrvAddr(rocketmqOS:9876);
producer.start();
for (int i 0; i 10; i) {
byte[] body (Hi, i).getBytes();
Message msg new Message(TopicB, someTag, body);
// 指定消息延迟等级为3级即延迟10s
// msg.setDelayTimeLevel(3);
SendResult sendResult producer.send(msg);
// 输出消息被发送的时间
System.out.print(new SimpleDateFormat(mm:ss).format(new
Date()));
System.out.println( , sendResult);
}
producer.shutdown();
}
}定义OtherConsumer类
public class OtherConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer new
DefaultMQPushConsumer(cg);
consumer.setNamesrvAddr(rocketmqOS:9876);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
);
consumer.subscribe(TopicB, *);
consumer.registerMessageListener(new
MessageListenerConcurrently() {
Override
public ConsumeConcurrentlyStatus
consumeMessage(ListMessageExt msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 输出消息被消费的时间
SimpleDateFormat(mm:ss).format(new Date()));
System.out.println( , msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println(Consumer Started);
}
}四、事务消息 1 问题引入 这里的一个需求场景是工行用户A向建行用户B转账1万元。 我们可以使用同步消息来处理该需求场景
工行系统发送一个给B增款1万元的同步消息M给Broker消息被Broker成功接收后向工行系统发送成功ACK工行系统收到成功ACK后从用户A中扣款1万元建行系统从Broker中获取到消息M建行系统消费消息M即向用户B中增加1万元 这其中是有问题的若第3步中的扣款操作失败但消息已经成功发送到了Broker。对于MQ来 说只要消息写入成功那么这个消息就可以被消费。此时建行系统中用户B增加了1万元。出 现了数据不一致问题。
2 解决思路 解决思路是让第1、2、3步具有原子性要么全部成功要么全部失败。即消息发送成功后必须要 保证扣款成功。如果扣款失败则回滚发送成功的消息。而该思路即使用事务消息。这里要使用分布 式事务解决方案。 使用事务消息来处理该需求场景
事务管理器TM向事务协调器TC发起指令开启全局事务工行系统发一个给B增款1万元的事务消息M给TCTC会向Broker发送半事务消息prepareHalf将消息M预提交到Broker。此时的建行系统是看 不到Broker中的消息M的Broker会将预提交执行结果Report给TC。如果预提交失败则TC会向TM上报预提交失败的响应全局事务结束如果预提交成功TC会 调用工行系统的回调操作去完成工行用户A的预扣款1万元的操作工行系统会向TC发送预扣款执行结果即本地事务的执行状态TC收到预扣款执行结果后会将结果上报给TM。
预扣款执行结果存在三种可能性
// 描述本地事务执行状态
public enum LocalTransactionState {
COMMIT_MESSAGE, // 本地事务执行成功
ROLLBACK_MESSAGE, // 本地事务执行失败
UNKNOW, // 不确定表示需要进行回查以确定本地事务的执行结果
}TM会根据上报结果向TC发出不同的确认指令 若预扣款成功本地事务状态为COMMIT_MESSAGE则TM向TC发送Global Commit指令 若预扣款失败本地事务状态为ROLLBACK_MESSAGE则TM向TC发送Global Rollback指令 若现未知状态本地事务状态为UNKNOW则会触发工行系统的本地事务状态回查操作。回 查操作会将回查结果即COMMIT_MESSAGE或ROLLBACK_MESSAGE Report给TC。TC将结果上 报给TMTM会再向TC发送最终确认指令Global Commit或Global RollbackTC在接收到指令后会向Broker与工行系统发出确认指令 TC接收的若是Global Commit指令则向Broker与工行系统发送Branch Commit指令。此时 Broker中的消息M才可被建行系统看到此时的工行用户A中的扣款操作才真正被确认 TC接收到的若是Global Rollback指令则向Broker与工行系统发送Branch Rollback指令。此时 Broker中的消息M将被撤销工行用户A中的扣款操作将被回滚 以上方案就是为了确保消息投递与扣款操作能够在一个事务中要成功都成功有一个失败 则全部回滚。 以上方案并不是一个典型的XA模式。因为XA模式中的分支事务是异步的而事务消息方案中的 消息预提交与预扣款操作间是同步的。
3 基础 分布式事务 对于分布式事务通俗地说就是一次操作由若干分支操作组成这些分支操作分属不同应用分布在 不同服务器上。分布式事务需要保证这些分支操作要么全部成功要么全部失败。分布式事务与普通事 务一样就是为了保证操作结果的一致性。 事务消息 RocketMQ提供了类似X/Open XA的分布式事务功能通过事务消息能达到分布式事务的最终一致。XA 是一种分布式事务解决方案一种分布式事务处理模式。 半事务消息 暂不能投递的消息发送方已经成功地将消息发送到了Broker但是Broker未收到最终确认指令此时 该消息被标记成“暂不能投递”状态即不能被消费者看到。处于该种状态下的消息即半事务消息。 本地事务状态 Producer回调操作执行的结果为本地事务状态其会发送给TC而TC会再发送给TM。TM会根据TC发 送来的本地事务状态来决定全局事务确认指令。
// 描述本地事务执行状态
public enum LocalTransactionState {
COMMIT_MESSAGE, // 本地事务执行成功
ROLLBACK_MESSAGE, // 本地事务执行失败
UNKNOW, // 不确定表示需要进行回查以确定本地事务的执行结果
}消息回查 消息回查即重新查询本地事务的执行状态。本例就是重新到DB中查看预扣款操作是否执行成功。 注意消息回查不是重新执行回调操作。回调操作是进行预扣款操作而消息回查则是查看预扣款操作执行的结果。 引发消息回查的原因最常见的有两个 1)回调操作返回UNKNWON 2)TC没有接收到TM的最终全局事务确认指令
RocketMQ中的消息回查设置 关于消息回查有三个常见的属性设置。它们都在broker加载的配置文件中设置例如
transactionTimeout20指定TM在20秒内应将最终确认状态发送给TC否则引发消息回查。默 认为60秒 transactionCheckMax5指定最多回查5次超过后将丢弃消息并记录错误日志。默认15次。 transactionCheckInterval10指定设置的多次消息回查的时间间隔为10秒。默认为60秒。
4 XA模式三剑客 XA协议 XAUnix Transaction是一种分布式事务解决方案一种分布式事务处理模式是基于XA协议的。 XA协议由TuxedoTransaction for Unix has been Extended for Distributed Operation分布式操作扩 展之后的Unix事务系统首先提出的并交给X/Open组织作为资源管理器与事务管理器的接口标 准。 XA模式中有三个重要组件TC、TM、RM。 TC Transaction Coordinator事务协调者。维护全局和分支事务的状态驱动全局事务提交或回滚。 RocketMQ中Broker充当着TC。 TM Transaction Manager事务管理器。定义全局事务的范围开始全局事务、提交或回滚全局事务。它 实际是全局事务的发起者。 RocketMQ中事务消息的Producer充当着TM。 RM Resource Manager资源管理器。管理分支事务处理的资源与TC交谈以注册分支事务和报告分支事 务的状态并驱动分支事务提交或回滚。 RocketMQ中事务消息的Producer及Broker均是RM。
5 XA模式架构 XA模式是一个典型的2PC其执行原理如下
TM向TC发起指令开启一个全局事务。根据业务要求各个RM会逐个向TC注册分支事务然后TC会逐个向RM发出预执行指令。各个RM在接收到指令后会在进行本地事务预执行。RM将预执行结果Report给TC。当然这个结果可能是成功也可能是失败。TC在接收到各个RM的Report后会将汇总结果上报给TM根据汇总结果TM会向TC发出确认指 令。 若所有结果都是成功响应则向TC发送Global Commit指令。 只要有结果是失败响应则向TC发送Global Rollback指令。TC在接收到指令后再次向RM发送确认指令。 事务消息方案并不是一个典型的XA模式。因为XA模式中的分支事务是异步的而事务消息方案 中的消息预提交与预扣款操作间是同步的。
6 注意 事务消息不支持延时消息 对于事务消息要做好幂等性检查因为事务消息可能不止一次被消费因为存在回滚后再提交的 情况 7 代码举例
定义工行事务监听器
public class ICBCTransactionListener implements TransactionListener {
// 回调操作方法
// 消息预提交成功就会触发该方法的执行用于完成本地事务
Override
public LocalTransactionState executeLocalTransaction(Message msg,
Object arg) {
System.out.println(预提交消息成功 msg);
// 假设接收到TAGA的消息就表示扣款操作成功TAGB的消息表示扣款失败
// TAGC表示扣款结果不清楚需要执行消息回查
if (StringUtils.equals(TAGA, msg.getTags())) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals(TAGB, msg.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (StringUtils.equals(TAGC, msg.getTags())) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.UNKNOW;
}
// 消息回查方法
// 引发消息回查的原因最常见的有两个
// 1)回调操作返回UNKNWON
// 2)TC没有接收到TM的最终全局事务确认指令
Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println(执行消息回查 msg.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
}定义事物消息生产者
public class TransactionProducer {
public static void main(String[] args) throws Exception {
TransactionMQProducer producer new
TransactionMQProducer(tpg);
producer.setNamesrvAddr(rocketmqOS:9876);
/**
* 定义一个线程池
* param corePoolSize 线程池中核心线程数量
* param maximumPoolSize 线程池中最多线程数
* param keepAliveTime 这是一个时间。当线程池中线程数量大于核心线程数量
是
* 多余空闲线程的存活时长
* param unit 时间单位
* param workQueue 临时存放任务的队列其参数就是队列的长度
* param threadFactory 线程工厂
*/
ExecutorService executorService new ThreadPoolExecutor(2, 5,
100, TimeUnit.SECONDS,
new ArrayBlockingQueueRunnable(2000), new
ThreadFactory() {
Override
public Thread newThread(Runnable r) {
Thread thread new Thread(r);
thread.setName(client-transaction-msg-check-thread);
return thread;
}
});
// 为生产者指定一个线程池
producer.setExecutorService(executorService);
// 为生产者添加事务监听器
producer.setTransactionListener(new ICBCTransactionListener());
producer.start();
String[] tags {TAGA,TAGB,TAGC};
for (int i 0; i 3; i) {
byte[] body (Hi, i).getBytes();
Message msg new Message(TTopic, tags[i], body);
// 发送事务消息
// 第二个参数用于指定在执行本地事务时要使用的业务参数
SendResult sendResult
producer.sendMessageInTransaction(msg,null);
System.out.println(发送结果为
sendResult.getSendStatus());
}
}
}定义消费者 直接使用普通消息的SomeConsumer作为消费者即可。
public class SomeConsumer {
public static void main(String[] args) throws MQClientException {
// 定义一个pull消费者
faultLitePullConsumer(cg);
// 定义一个push消费者
DefaultMQPushConsumer consumer new
DefaultMQPushConsumer(cg);
// 指定nameServer
consumer.setNamesrvAddr(rocketmqOS:9876);
// 指定从第一条消息开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
);
// 指定消费topic与tag
consumer.subscribe(TTopic, *);
// 指定采用“广播模式”进行消费默认为“集群模式”
// consumer.setMessageModel(MessageModel.BROADCASTING);
// 注册消息监听器
consumer.registerMessageListener(new
MessageListenerConcurrently() {
// 一旦broker中有了其订阅的消息就会触发该方法的执行
// 其返回值为当前consumer消费的状态
Override
public ConsumeConcurrentlyStatus
consumeMessage(ListMessageExt msgs,
ConsumeConcurrentlyContext context) {
// 逐条消费消息
for (MessageExt msg : msgs) {
System.out.println(msg);
}
// 返回消费状态消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 开启消费者消费
consumer.start();
System.out.println(Consumer Started);
}
}五、批量消息 1 批量发送消息 发送限制 生产者进行消息发送时可以一次发送多条消息这可以大大提升Producer的发送效率。不过需要注意以 下几点 批量发送的消息必须具有相同的Topic 批量发送的消息必须具有相同的刷盘策略 批量发送的消息不能是延时消息与事务消息 批量发送大小 默认情况下一批发送的消息总大小不能超过4MB字节。如果想超出该值有两种解决方案 方案一将批量消息进行拆分拆分为若干不大于4M的消息集合分多次批量发送 方案二在Producer端与Broker端修改属性 ** Producer端需要在发送之前设置Producer的maxMessageSize属性 ** Broker端需要修改其加载的配置文件中的maxMessageSize属性 生产者发送的消息大小 生产者通过send()方法发送的Message并不是直接将Message序列化后发送到网络上的而是通过这 个Message生成了一个字符串发送出去的。这个字符串由四部分构成Topic、消息Body、消息日志 占20字节及用于描述消息的一堆属性key-value。这些属性中包含例如生产者地址、生产时间、 要发送的QueueId等。最终写入到Broker中消息单元中的数据都是来自于这些属性。
2 批量消费消息 修改批量属性 Consumer的MessageListenerConcurrently监听接口的consumeMessage()方法的第一个参数为消息列 表但默认情况下每次只能消费一条消息。若要使其一次可以消费多条消息则可以通过修改 Consumer的consumeMessageBatchMaxSize属性来指定。不过该值不能超过32。因为默认情况下消 费者每次可以拉取的消息最多是32条。若要修改一次拉取的最大值则可通过修改Consumer的 pullBatchSize属性来指定。 存在的问题 Consumer的pullBatchSize属性与consumeMessageBatchMaxSize属性是否设置的越大越好当然不 是。 pullBatchSize值设置的越大Consumer每拉取一次需要的时间就会越长且在网络上传输出现 问题的可能性就越高。若在拉取过程中若出现了问题那么本批次所有消息都需要全部重新拉 取。 consumeMessageBatchMaxSize值设置的越大Consumer的消息并发消费能力越低且这批被消 费的消息具有相同的消费结果。因为consumeMessageBatchMaxSize指定的一批消息只会使用一 个线程进行处理且在处理过程中只要有一个消息处理异常则这批消息需要全部重新再次消费 处理。 3 代码举例 该批量发送的需求是不修改最大发送4M的默认值但要防止发送的批量消息超出4M的限制。 定义消息列表分割器
// 消息列表分割器其只会处理每条消息的大小不超4M的情况。
// 若存在某条消息其本身大小大于4M这个分割器无法处理
// 其直接将这条消息构成一个子列表返回。并没有再进行分割
public class MessageListSplitter implements IteratorListMessage {
// 指定极限值为4M
private final int SIZE_LIMIT 4 *1024 * 1024;
// 存放所有要发送的消息
private final ListMessage messages;
// 要进行批量发送消息的小集合起始索引
private int currIndex;
public MessageListSplitter(ListMessage messages) {
this.messages messages;
}
Override
public boolean hasNext() {
// 判断当前开始遍历的消息索引要小于消息总数
return currIndex messages.size();
}
Override
public ListMessage next() {
int nextIndex currIndex;
// 记录当前要发送的这一小批次消息列表的大小
int totalSize 0;
for (; nextIndex messages.size(); nextIndex) {
// 获取当前遍历的消息
Message message messages.get(nextIndex);
// 统计当前遍历的message的大小
int tmpSize message.getTopic().length()
message.getBody().length;
MapString, String properties message.getProperties();
for (Map.EntryString, String entry :
properties.entrySet()) {
tmpSize entry.getKey().length()
entry.getValue().length();
}
tmpSize tmpSize 20;
// 判断当前消息本身是否大于4M
if (tmpSize SIZE_LIMIT) {
if (nextIndex - currIndex 0) {
nextIndex;
}
break;
}
if (tmpSize totalSize SIZE_LIMIT) {
break;
} else {
totalSize tmpSize;
}
} // end-for
// 获取当前messages列表的子集合[currIndex, nextIndex)
ListMessage subList messages.subList(currIndex, nextIndex);
// 下次遍历的开始索引
currIndex nextIndex;
return subList;
}
}定义批量消息生产者
public class BatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer new DefaultMQProducer(pg);
producer.setNamesrvAddr(rocketmqOS:9876);
// 指定要发送的消息的最大大小默认是4M
// 不过仅修改该属性是不行的还需要同时修改broker加载的配置文件中的
// maxMessageSize属性
// producer.setMaxMessageSize(8 * 1024 * 1024);
producer.start();
// 定义要发送的消息集合
ListMessage messages new ArrayList();
for (int i 0; i 100; i) {
byte[] body (Hi, i).getBytes();
Message msg new Message(someTopic, someTag, body);
messages.add(msg);
}
// 定义消息列表分割器将消息列表分割为多个不超出4M大小的小列表
MessageListSplitter splitter new
MessageListSplitter(messages);
while (splitter.hasNext()) {
try {
ListMessage listItem splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}定义批量消息消费者
public class BatchConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer new
DefaultMQPushConsumer(cg);
consumer.setNamesrvAddr(rocketmqOS:9876);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
);
consumer.subscribe(someTopicA, *);
// 指定每次可以消费10条消息默认为1
consumer.setConsumeMessageBatchMaxSize(10);
// 指定每次可以从Broker拉取40条消息默认为32
consumer.setPullBatchSize(40);
consumer.registerMessageListener(new
MessageListenerConcurrently() {
Override
public ConsumeConcurrentlyStatus
consumeMessage(ListMessageExt msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(msg);
}
// 消费成功的返回结果
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
// 消费异常时的返回结果
// return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
System.out.println(Consumer Started);
}
}