青岛网站设计品牌企业,外链代发,seo培训班 有用吗,杭州 兼职 网站建设#x1f680; 优质资源分享 #x1f680;
学习路线指引#xff08;点击解锁#xff09;知识定位人群定位#x1f9e1; Python实战微信订餐小程序 #x1f9e1;进阶级本课程是python flask微信小程序的完美结合#xff0c;从项目搭建到腾讯云部署上线#xff0c;打造一… 优质资源分享
学习路线指引点击解锁知识定位人群定位 Python实战微信订餐小程序 进阶级本课程是python flask微信小程序的完美结合从项目搭建到腾讯云部署上线打造一个全栈订餐系统。Python量化交易实战入门级手把手带你打造一个易扩展、更安全、效率更高的量化交易系统
消息发送
首先来看一个RcoketMQ发送消息的例子
Service
public class MQService {AutowiredDefaultMQProducer defaultMQProducer;public void sendMsg() {String msg 我是一条消息;// 创建消息指定TOPIC、TAG和消息内容Message sendMsg new Message(TestTopic, TestTag, msg.getBytes());SendResult sendResult null;try {// 同步发送消息sendResult defaultMQProducer.send(sendMsg);System.out.println(消息发送响应 sendResult.toString());} catch (MQClientException e) {e.printStackTrace();} catch (RemotingException e) {e.printStackTrace();} catch (MQBrokerException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}
}
RocketMQ是通过DefaultMQProducer进行消息发送的它实现了MQProducer接口MQProducer接口中定义了消息发送的方法方法主要分为三大类
同步进行消息发送向Broker发送消息之后等待响应结果异步进行消息发送向Broker发送消息之后立刻返回当消息发送完毕之后触发回调函数sendOneway单向发送也是异步消息发送向Broker发送消息之后立刻返回但是没有回调函数
public interface MQProducer extends MQAdmin {// 同步发送消息SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,InterruptedException;// 异步发送消息SendCallback为回调函数void send(final Message msg, final SendCallback sendCallback) throws MQClientException,RemotingException, InterruptedException;// 异步发送消息没有回调函数void sendOneway(final Message msg) throws MQClientException, RemotingException,InterruptedException;// 省略其他方法
}
接下来以将以同步消息发送为例来分析消息发送的流程。
DefaultMQProducer里面有一个DefaultMQProducerImpl类型的成员变量defaultMQProducerImpl从默认的无参构造函数中可以看出在构造函数中对defaultMQProducerImpl进行了实例化在send方法中就是调用defaultMQProducerImpl的方法进行消息发送的
public class DefaultMQProducer extends ClientConfig implements MQProducer {/*** 默认消息生产者实现类*/protected final transient DefaultMQProducerImpl defaultMQProducerImpl;/*** 默认的构造函数*/public DefaultMQProducer() {this(null, MixAll.DEFAULT_PRODUCER_GROUP, null);}/*** 构造函数*/public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {this.namespace namespace;this.producerGroup producerGroup;// 实例化defaultMQProducerImpl new DefaultMQProducerImpl(this, rpcHook);}/*** 同步发送消息*/Overridepublic SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 设置主题msg.setTopic(withNamespace(msg.getTopic()));// 发送消息return this.defaultMQProducerImpl.send(msg);}
}
DefaultMQProducerImpl中消息的发送在sendDefaultImpl方法中实现处理逻辑如下
根据设置的主题查找对应的路由信息TopicPublishInfo获取失败重试次数在消息发送失败时进行重试获取上一次选择的消息队列所在的Broker如果上次选择的Broker为空则为NULL然后调用selectOneMessageQueue方法选择一个消息队列并记录本次选择的消息队列在下一次发送消息时选择队列时使用计算选择消息队列的耗时如果大于超时时间终止本次发送调用sendKernelImpl方法进行消息发送调用updateFaultItem记录向Broker发送消息的耗时在开启故障延迟处理机制时使用
public class DefaultMQProducerImpl implements MQProducerInner {/*** DEFAULT SYNC -------------------------------------------------------*/public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 发送消息return send(msg, this.defaultMQProducer.getSendMsgTimeout());}public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// 发送消息return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);}/*** 发送消息* param msg 发送的消息* param communicationMode* param sendCallback 回调函数* param timeout 超时时间*/private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID random.nextLong();// 开始时间long beginTimestampFirst System.currentTimeMillis();long beginTimestampPrev beginTimestampFirst;long endTimestamp beginTimestampFirst;// 查找主题路由信息TopicPublishInfo topicPublishInfo this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo ! null topicPublishInfo.ok()) {boolean callTimeout false;// 消息队列MessageQueue mq null;Exception exception null;SendResult sendResult null;// 获取失败重试次数int timesTotal communicationMode CommunicationMode.SYNC ? 1 this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times 0;String[] brokersSent new String[timesTotal];for (; times timesTotal; times) {// 获取BrokerNameString lastBrokerName null mq ? null : mq.getBrokerName();// 根据BrokerName选择一个消息队列MessageQueue mqSelected this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected ! null) {// 记录本次选择的消息队列mq mqSelected;brokersSent[times] mq.getBrokerName();try {// 记录时间beginTimestampPrev System.currentTimeMillis();if (times 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}// 计算选择消息队列的耗时时间long costTime beginTimestampPrev - beginTimestampFirst;// 如果已经超时终止发送if (timeout costTime) {callTimeout true;break;}// 发送消息sendResult this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);// 结束时间endTimestamp System.currentTimeMillis();// 记录向Broker发送消息的请求耗时,消息发送结束时间 - 开始时间this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:// 如果发送失败if (sendResult.getSendStatus() ! SendStatus.SEND_OK) {// 是否重试if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}// 返回结果return sendResult;default:break;}} catch (RemotingException e) {endTimestamp System.currentTimeMillis();// 如果抛出异常记录请求耗时this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format(sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s, invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception e;continue;}// ... 省略其他异常处理} else {break;}}if (sendResult ! null) {return sendResult;}// ...}validateNameServerSetting();throw new MQClientException(No route info of this topic: msg.getTopic() FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);}}
获取路由信息
DefaultMQProducerImpl中有一个路由信息表topicPublishInfoTable记录了主题对应的路由信息其中KEY为topic, value为对应的路由信息对象TopicPublishInfo
public class DefaultMQProducerImpl implements MQProducerInner {// 路由信息表KEY为topic, value为对应的路由信息对象TopicPublishInfoprivate final ConcurrentMap topicPublishInfoTable new ConcurrentHashMap();
}
主题路由信息
TopicPublishInfo中记录了主题所在的消息队列信息、所在Broker等信息
messageQueueList一个MessageQueue类型的消息队列列表MessageQueue中记录了主题名称、主题所属的Broker名称和队列ID
sendWhichQueue计数器选择消息队列的时候增1以此达到轮询的目的
topicRouteData从NameServer查询到的主题对应的路由数据包含了队列和Broker的相关数据
public class TopicPublishInfo {// 消息队列列表private List messageQueueList new ArrayList(); // 一个计数器每次选择消息队列的时候增1以此达到轮询的目的private volatile ThreadLocalIndex sendWhichQueue new ThreadLocalIndex(); // 主题路由数据private TopicRouteData topicRouteData;// ...
}// 消息队列
public class MessageQueue implements Comparable, Serializable {private static final long serialVersionUID 6191200464116433425L;private String topic; // 主题private String brokerName; // 所属Broker名称private int queueId; // 队列ID// ...
}// 主题路由数据
public class TopicRouteData extends RemotingSerializable {private List queueDatas; // 队列数据列表private List brokerDatas; // Broker信息列表// ...
}// 队列数据
public class QueueData implements Comparable {private String brokerName; // Broker名称private int readQueueNums; // 可读队列数量private int writeQueueNums; // 可写队列数量private int perm;private int topicSysFlag;
}// Broker数据
public class BrokerData implements Comparable {private String cluster; // 集群名称private String brokerName; // Broker名称private HashMap brokerAddrs; // Broker地址集合KEY为Broker ID, value为Broker 地址// ...
}
查找路由信息
在查找主题路由信息的时候首先从DefaultMQProducerImpl缓存的路由表topicPublishInfoTable中根据主题查找路由信息如果查询成功返回即可如果未查询到需要从NameServer中获取路由信息如果获取失败则使用默认的主题路由信息
public class DefaultMQProducerImpl implements MQProducerInner {// 路由信息表KEY为topic, value为对应的路由信息对象TopicPublishInfoprivate final ConcurrentMap topicPublishInfoTable new ConcurrentHashMap();/*** 根据主题查找路由信息* param topic 主题* return*/private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {// 根据主题获取对应的主题路由信息TopicPublishInfo topicPublishInfo this.topicPublishInfoTable.get(topic);// 如果未获取到if (null topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());// 从NameServer中查询路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo this.topicPublishInfoTable.get(topic);}// 如果路由信息获取成功if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {// 返回路由信息return topicPublishInfo;} else {// 如果路由信息未获取成功使用默认主题查询路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo this.topicPublishInfoTable.get(topic);// 返回路由信息return topicPublishInfo;}}
}
从NameServer获取主题路由信息
从NameServer获取主题路由信息数据是在MQClientInstance中的updateTopicRouteInfoFromNameServer方法中实现的
判断是否使用默认的主题路由信息如果是则获取默认的路由信息如果不使用默认的路由信息则从NameServer根据Topic查询取路由信息获取到的主题路由信息被封装为TopicRouteData类型的对象返回从topicRouteTable主题路由表中根据主题获取旧的路由信息与新的对比判断信息是否发生了变化如果发送了变化需要更新brokerAddrTable中记录的数据将新的路由信息对象加入到路由表topicRouteTable中替换掉旧的信息
public class MQClientInstance {public boolean updateTopicRouteInfoFromNameServer(final String topic) {// 从NameServer更新路由信息return updateTopicRouteInfoFromNameServer(topic, false, null);}/*** 从NameServer更新路由信息* param topic 主题* param isDefault 是否使用默认的主题* param defaultMQProducer 默认消息生产者* return*/public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,DefaultMQProducer defaultMQProducer) {try {if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {try {TopicRouteData topicRouteData;// 是否使用默认的路由信息if (isDefault defaultMQProducer ! null) {// 使用默认的主题路由信息topicRouteData this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),clientConfig.getMqClientApiTimeout());if (topicRouteData ! null) {for (QueueData data : topicRouteData.getQueueDatas()) {int queueNums Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums); // 设置可读队列数量data.setWriteQueueNums(queueNums); // 设置可写队列数量}}} else {// 从NameServer获取路由信息topicRouteData this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());}// 如果路由信息不为空if (topicRouteData ! null) {// 从路由表中获取旧的路由信息TopicRouteData old this.topicRouteTable.get(topic);// 判断路由信息是否发生变化boolean changed topicRouteDataIsChange(old, topicRouteData);if (!changed) {// 是否需要更新路由信息changed this.isNeedUpdateTopicRouteInfo(topic);} else {log.info(the topic[{}] route info changed, old[{}] ,new[{}], topic, old, topicRouteData);}// 如果数据发生变化if (changed) {// 克隆一份新的路由信息TopicRouteData cloneTopicRouteData topicRouteData.cloneTopicRouteData();// 处理brokerAddrTable中的数据for (BrokerData bd : topicRouteData.getBrokerDatas()) {// 更新brokerAddrTable中的数据this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());}// ...log.info(topicRouteTable.put. Topic {}, TopicRouteData[{}], topic, cloneTopicRouteData);// 将新的路由信息加入到路由表this.topicRouteTable.put(topic, cloneTopicRouteData);return true;}} else {log.warn(updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}], topic, this.clientId);}} catch (MQClientException e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) !topic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {log.warn(updateTopicRouteInfoFromNameServer Exception, e);}} catch (RemotingException e) {log.error(updateTopicRouteInfoFromNameServer Exception, e);throw new IllegalStateException(e);} finally {this.lockNamesrv.unlock();}} else {log.warn(updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}], LOCK_TIMEOUT_MILLIS, this.clientId);}} catch (InterruptedException e) {log.warn(updateTopicRouteInfoFromNameServer Exception, e);}return false;}
}
发送请求
向NameServer发送请求的代码实现在MQClientAPIImpl的getTopicRouteInfoFromNameServer方法中可以看到构建了请求命令RemotingCommand并设置请求类型为RequestCode.GET_ROUTEINFO_BY_TOPIC表示从NameServer获取路由信息之后通过Netty向NameServer发送请求并解析返回结果
public class MQClientAPIImpl {public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)throws RemotingException, MQClientException, InterruptedException {// 从NameServer获取路由信息return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);}/*** 从NameServer获取路由信息* param topic* param timeoutMillis* param allowTopicNotExist* return* throws MQClientException* throws InterruptedException* throws RemotingTimeoutException* throws RemotingSendRequestException* throws RemotingConnectException*/public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {GetRouteInfoRequestHeader requestHeader new GetRouteInfoRequestHeader();requestHeader.setTopic(topic);// 创建请求命令请求类型为获取主题路由信息GET\_ROUTEINFO\_BY\_TOPICRemotingCommand request RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);// 发送请求RemotingCommand response this.remotingClient.invokeSync(null, request, timeoutMillis);assert response ! null;switch (response.getCode()) {// 如果主题不存在case ResponseCode.TOPIC_NOT_EXIST: {if (allowTopicNotExist) {log.warn(get Topic [{}] RouteInfoFromNameServer is not exist value, topic);}break;}// 如果请求发送成功case ResponseCode.SUCCESS: {byte[] body response.getBody();// 返回获取的路由信息if (body ! null) {return TopicRouteData.decode(body, TopicRouteData.class);}}default:break;}throw new MQClientException(response.getCode(), response.getRemark());}
}
选择消息队列
主题路由信息数据TopicPublishInfo获取到之后需要从中选取一个消息队列是通过调用MQFaultStrategy的selectOneMessageQueue方法触发的之后会进入MQFaultStrategy的selectOneMessageQueue方法从主题路由信息中选择消息队列
public class DefaultMQProducerImpl implements MQProducerInner {private MQFaultStrategy mqFaultStrategy new MQFaultStrategy();public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {// 选择消息队列return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);}
}
MQFaultStrategy的selectOneMessageQueue方法主要是通过调用TopicPublishInfo中的相关方法进行消息队列选择的。
启用故障延迟机制
如果启用了故障延迟机制会遍历TopicPublishInfo中存储的消息队列列表对计数器增1轮询选择一个消息队列接着会判断消息队列所属的Broker是否可用如果Broker可用返回消息队列即可。
如果选出的队列所属Broker不可用会调用latencyFaultTolerance的pickOneAtLeast方法下面会讲到选择一个Broker从tpInfo中获取此Broker可写的队列数量如果数量大于0调用selectOneMessageQueue()方法选择一个队列。
如果故障延迟机制未选出消息队列依旧会调用selectOneMessageQueue()选择出一个消息队列。
未启用故障延迟机制
直接调用的selectOneMessageQueue(String lastBrokerName)方法并传入上一次使用的Broker名称进行选择。
public class MQFaultStrategy {/*** 选择消息队列* param tpInfo 主题路由信息* param lastBrokerName 上一次使用的Broker名称* return*/public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {// 如果启用故障延迟机制if (this.sendLatencyFaultEnable) {try {// 计数器增1int index tpInfo.getSendWhichQueue().incrementAndGet();// 遍历TopicPublishInfo中存储的消息队列列表for (int i 0; i tpInfo.getMessageQueueList().size(); i) {// 轮询选择一个消息队列int pos Math.abs(index) % tpInfo.getMessageQueueList().size();// 如果下标小于0则使用0if (pos 0)pos 0;// 根据下标获取消息队列MessageQueue mq tpInfo.getMessageQueueList().get(pos);// 判断消息队列所属的Broker是否可用,如果可用返回当前选择的消息队列if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))return mq;}// 如果未获取到可用的Broker// 调用pickOneAtLeast选择一个final String notBestBroker latencyFaultTolerance.pickOneAtLeast();// 从tpInfo中获取Broker可写的队列数量int writeQueueNums tpInfo.getQueueIdByBroker(notBestBroker);// 如果可写的队列数量大于0if (writeQueueNums 0) {// 选择一个消息队列final MessageQueue mq tpInfo.selectOneMessageQueue();if (notBestBroker ! null) {// 设置消息队列所属的Brokermq.setBrokerName(notBestBroker);// 设置队列IDmq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);}// 返回消息队列return mq;} else {// 移除BrokerlatencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error(Error occurred when selecting message queue, e);}// 如果故障延迟机制未选出消息队列调用selectOneMessageQueue选择消息队列return tpInfo.selectOneMessageQueue();}// 根据上一次使用的BrokerName获取消息队列return tpInfo.selectOneMessageQueue(lastBrokerName);}
}
selectOneMessageQueue方法的实现
selectOneMessageQueue方法中如果上一次选择的BrokerName为空则调用无参的selectOneMessageQueue方法选择消息队列也是默认的选择方式首先对计数器增一然后用计数器的值对messageQueueList列表的长度取余得到下标值pos再从messageQueueList中获取pos位置的元素以此达到轮询从messageQueueList列表中选择消息队列的目的。
如果传入的BrokerName不为空遍历messageQueueList列表同样对计数器增一并对messageQueueList列表的长度取余选取一个消息队列不同的地方是选择消息队列之后会判断消息队列所属的Broker是否与上一次选择的Broker名称一致如果一致则继续循环轮询选择下一个消息队列也就是说如果上一次选择了某个Broker发送消息本次将不会再选择这个Broker当然如果最后仍未找到满足要求的消息队列则仍旧使用默认的选择方式也就是调用无参的selectOneMessageQueue方法进行选择。
public class TopicPublishInfo {private boolean orderTopic false;private boolean haveTopicRouterInfo false;private List messageQueueList new ArrayList(); // 消息队列列表private volatile ThreadLocalIndex sendWhichQueue new ThreadLocalIndex(); // 一个计数器每次选择消息队列的时候增1以此达到轮询的目的private TopicRouteData topicRouteData;// ...public MessageQueue selectOneMessageQueue(final String lastBrokerName) {// 如果上一次选择的BrokerName为空if (lastBrokerName null) {// 选择消息队列return selectOneMessageQueue();} else {// 遍历消息队列列表for (int i 0; i this.messageQueueList.size(); i) {// 计数器增1int index this.sendWhichQueue.incrementAndGet();// 对长度取余int pos Math.abs(index) % this.messageQueueList.size();if (pos 0)pos 0;// 获取消息队列也就是使用使用轮询的方式选择消息队列MessageQueue mq this.messageQueueList.get(pos);// 如果队列所属的Broker与上一次选择的不同返回消息队列if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}// 使用默认方式选择return selectOneMessageQueue();}}// 选择消息队列public MessageQueue selectOneMessageQueue() {// 自增int index this.sendWhichQueue.incrementAndGet();// 对长度取余int pos Math.abs(index) % this.messageQueueList.size();if (pos 0)pos 0;// 选择消息队列return this.messageQueueList.get(pos);}
}
故障延迟机制
回到发送消息的代码中可以看到消息发送无论成功与否都会调用updateFaultItem方法更新失败条目
public class DefaultMQProducerImpl implements MQProducerInner {private MQFaultStrategy mqFaultStrategy new MQFaultStrategy();// 发送消息private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {// ...for (; times timesTotal; times) {try {// 开始时间beginTimestampPrev System.currentTimeMillis();// ...// 发送消息sendResult this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);// 结束时间endTimestamp System.currentTimeMillis();// 更新失败条目this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);// ...} catch (RemotingException e) {endTimestamp System.currentTimeMillis();// 更新失败条目this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format(sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s, invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception e;continue;}// 省略其他catch// ...catch (InterruptedException e) {endTimestamp System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);log.warn(String.format(sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s, invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());log.warn(sendKernelImpl exception, e);log.warn(msg.toString());throw e;}} else {break;}}// ...}// 更新FaultItempublic void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {// 调用MQFaultStrategy的updateFaultItem方法this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);}}
MQFaultStrategy中有一个类型的成员变量最终是通过调用latencyFaultTolerance的updateFaultItem方法进行更新的并传入了三个参数
brokerNameBroker名称
currentLatency当前延迟时间由上面的调用可知传入的值为发送消息的耗时时间即消息发送结束时间 - 开始时间
duration持续时间根据isolation的值决定如果为trueduration的值为30000ms也就是30s否则与currentLatency的值一致
public class MQFaultStrategy {// 故障延迟机制private final LatencyFaultTolerance latencyFaultTolerance new LatencyFaultToleranceImpl();/*** 更新失败条目* param brokerName Broker名称* param currentLatency 发送消息耗时请求结束时间 - 开始时间* param isolation*/public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {if (this.sendLatencyFaultEnable) {// 计算durationisolation为true时使用30000否则使用发送消息的耗时时间currentLatencylong duration computeNotAvailableDuration(isolation ? 30000 : currentLatency);// 更新到latencyFaultTolerance中this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);}}
}
LatencyFaultToleranceImpl
LatencyFaultToleranceImpl中有一个faultItemTable记录了每个Broker对应的FaultItem在updateFaultItem方法中首先根据Broker名称从faultItemTable获取FaultItem
如果获取为空说明需要新增FaultItem新建FaultItem对象设置传入的currentLatency延迟时间消息发送结束时间 - 开始时间和开始时间即当前时间 notAvailableDurationnotAvailableDuration值有两种情况值为30000毫秒或者与currentLatency的值一致如果获取不为空说明之前已经创建过对应的FaultItem更新FaultItem中的currentLatency延迟时间和StartTimestamp开始时间
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance {// FaultItem集合Key为BrokerNamevalue为对应的FaultItem对象private final ConcurrentHashMap faultItemTable new ConcurrentHashMap(16);/*** 更新FaultItem* param name Broker名称* param currentLatency 延迟时间也就是发送消息耗时请求结束时间 - 开始时间* param notAvailableDuration 不可用的持续时间也就是上一步中的duration*/Overridepublic void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {// 获取FaultItemFaultItem old this.faultItemTable.get(name);// 如果不存在if (null old) {// 新建FaultItemfinal FaultItem faultItem new FaultItem(name);// 设置currentLatency延迟时间faultItem.setCurrentLatency(currentLatency);// 设置规避故障开始时间当前时间 不可用的持续时间不可用的持续时间有两种情况值为30000或者与currentLatency一致faultItem.setStartTimestamp(System.currentTimeMillis() notAvailableDuration);// 添加到faultItemTableold this.faultItemTable.putIfAbsent(name, faultItem);if (old ! null) {old.setCurrentLatency(currentLatency);old.setStartTimestamp(System.currentTimeMillis() notAvailableDuration);}} else {// 更新时间old.setCurrentLatency(currentLatency);// 更新开始时间old.setStartTimestamp(System.currentTimeMillis() notAvailableDuration);}}
}
失败条目
FaultItem是LatencyFaultToleranceImpl的一个内部类里面有三个变量
nameBroker名称。currentLatency延迟时间等于发送消息耗时时间发送消息结束时间 - 开始时间。startTimestamp规避故障开始时间新建/更新FaultItem的时间 不可用的时间notAvailableDurationnotAvailableDuration值有两种情况值为30000毫秒或者与currentLatency的值一致。
isAvailable方法
isAvailable方法用于开启故障延迟机制时判断Broker是否可用可用判断方式为当前时间 - startTimestamp的值大于等于 0如果小于0则认为不可用。
上面分析可知startTimestamp的值为新建/更新FaultItem的时间 不可用的时间如果当前时间减去规避故障开始时间的值大于等于0说明此Broker已经超过了设置的规避时间可以重新被选择用于发送消息。
compareTo方法
FaultItem还实现了Comparable重写了compareTo方法在排序的时候使用对比大小的规则如下 调用isAvailable方法判断当前对象和other的值是否相等如果相等继续第2步如果不相等说明两个对象一个返回true一个返回false此时优先判断当前对象的isAvailable方法返回值是否为true true表示当前对象比other小返回-1对应当前对象为trueother对象为false的情况。false调用other的isAvailable方法判断是否为true如果为true返回1表示other比较大对应当前对象为falseother对象为true的情况否则继续第2步根据其他条件判断。 对比currentLatency的值如果currentLatency值小于other的返回-1表示当前对象比other小。 对比startTimestamp的值如果startTimestamp值小于other的返回-1同样表示当前对象比other小。
总结
isAvailable方法返回true的时候表示FaultItem对象的值越小因为true代表Broker已经过了规避故障的时间可以重新被选择。
currentLatency的值越小表示FaultItem的值越小。currentLatency的值与Broker发送消息的耗时有关耗时越低值就越小。
startTimestamp值越小同样表示整个FaultItem的值也越小。startTimestamp的值与currentLatency有关值不为默认的30000毫秒情况下currentLatency值越小startTimestamp的值也越小。
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance { class FaultItem implements Comparable {private final String name; // Broker名称private volatile long currentLatency; // 发送消息耗时时间请求结束时间 - 开始时间private volatile long startTimestamp; // 规避开始时间新建/更新FaultItem的时间 不可用的时间notAvailableDurationOverridepublic int compareTo(final FaultItem other) {// 如果isAvailable不相等说明一个为true一个为falseif (this.isAvailable() ! other.isAvailable()) {if (this.isAvailable()) // 如果当前对象为truereturn -1; // 当前对象小if (other.isAvailable())// 如果other对象为truereturn 1; // other对象大}// 对比发送消息耗时时间if (this.currentLatency other.currentLatency)return -1;// 当前对象小else if (this.currentLatency other.currentLatency) {return 1; // other对象大}// 对比故障规避开始时间if (this.startTimestamp other.startTimestamp)return -1;else if (this.startTimestamp other.startTimestamp) {return 1;}return 0;}// 用于判断Broker是否可用public boolean isAvailable() {// 当前时间减去startTimestamp的值是否大于等于0大于等于0表示可用return (System.currentTimeMillis() - startTimestamp) 0;}}
}
在选择消息队列时如果开启故障延迟机制并且未找到合适的消息队列会调用pickOneAtLeast方法选择一个Broker那么是如何选择Broker的呢 首先遍历faultItemTableMap集合将每一个Broker对应的FaultItem加入到LinkedList链表中 调用sort方法对链表进行排序默认是正序从小到大排序FaultItem还实现Comparable就是为了在这里进行排序值小的排在链表前面 计算中间值half 如果half值小于等于0取链表中的第一个元素如果half值大于0从前half个元素中轮询选择元素
由FaultItem的compareTo方法可知currentLatency和startTimestamp的值越小整个FaultItem的值也就越小正序排序时越靠前靠前表示向Broker发送消息的延迟越低在选择Broker时优先级越高所以如果half值小于等于0的时候取链表中的第一个元素half值大于0的时候处于链表前half个的Brokerddd延迟都是相对较低的此时轮询从前haft个Broker中选择一个Broker。
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance {// FaultItem集合Key为BrokerNamevalue为对应的FaultItem对象private final ConcurrentHashMap faultItemTable new ConcurrentHashMap(16);Overridepublic String pickOneAtLeast() {final Enumeration elements this.faultItemTable.elements();List tmpList new LinkedList();// 遍历faultItemTablewhile (elements.hasMoreElements()) {final FaultItem faultItem elements.nextElement();// 将FaultItem添加到列表中tmpList.add(faultItem);}if (!tmpList.isEmpty()) {Collections.shuffle(tmpList);// 排序Collections.sort(tmpList);// 计算中间数final int half tmpList.size() / 2;// 如果中位数小于等于0if (half 0) {// 获取第一个元素return tmpList.get(0).getName();} else {// 对中间数取余final int i this.whichItemWorst.incrementAndGet() % half;return tmpList.get(i).getName();}}return null;}
}
故障规避
再回到MQFaultStrategy中选择消息队列的地方在开启故障延迟机制的时候选择队列后会调用LatencyFaultToleranceImpl的isAvailable方法来判断Broker是否可用而LatencyFaultToleranceImpl的isAvailable方法又是调用Broker对应 FaultItem的isAvailable方法来判断的。
由上面的分析可知isAvailable返回true表示Broker已经过了规避时间可以用于发送消息返回false表示还在规避时间内需要避免选择此Broker所以故障延迟机制指的是在发送消息时记录每个Broker的耗时时间如果某个Broker发生故障但是生产者还未感知NameServer 30s检测一次心跳有可能Broker已经发生故障但未到检测时间所以会有一定的延迟用耗时时间做为一个故障规避时间也可以是30000ms此时消息会发送失败在重试或者下次选择消息队列的时候如果在规避时间内可以在短时间内避免再次选择到此Broker以此达到故障规避的目的。
如果某个主题所在的所有Broker都处于不可用状态此时调用pickOneAtLeast方法尽量选择延迟时间最短、规避时间最短排序后的失败条目中靠前的元素的Broker作为此次发生消息的Broker。
public class MQFaultStrategy {private final LatencyFaultTolerance latencyFaultTolerance new LatencyFaultToleranceImpl();/*** 选择消息队列* param tpInfo 主题路由信息* param lastBrokerName 上一次使用的Broker名称* return*/public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {// 如果启用故障延迟机制if (this.sendLatencyFaultEnable) {try {// 计数器增1int index tpInfo.getSendWhichQueue().incrementAndGet();// 遍历TopicPublishInfo中存储的消息队列列表for (int i 0; i tpInfo.getMessageQueueList().size(); i) {// 轮询选择一个消息队列int pos Math.abs(index) % tpInfo.getMessageQueueList().size();// 如果下标小于0则使用0if (pos 0)pos 0;// 根据下标获取消息队列MessageQueue mq tpInfo.getMessageQueueList().get(pos);// 判断消息队列所属的Broker是否可用,如果可用返回当前选择的消息队列if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))return mq;}// 如果未获取到可用的Broker// 调用pickOneAtLeast选择一个final String notBestBroker latencyFaultTolerance.pickOneAtLeast();// 从tpInfo中获取Broker可写的队列数量int writeQueueNums tpInfo.getQueueIdByBroker(notBestBroker);// 如果可写的队列数量大于0if (writeQueueNums 0) {// 选择一个消息队列final MessageQueue mq tpInfo.selectOneMessageQueue();if (notBestBroker ! null) {// 设置消息队列所属的Brokermq.setBrokerName(notBestBroker);// 设置队列IDmq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);}// 返回消息队列return mq;} else {// 移除BrokerlatencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error(Error occurred when selecting message queue, e);}// 如果故障延迟机制未选出消息队列调用selectOneMessageQueue选择消息队列return tpInfo.selectOneMessageQueue();}// 根据上一次使用的BrokerName获取消息队列return tpInfo.selectOneMessageQueue(lastBrokerName);}
}public class LatencyFaultToleranceImpl implements LatencyFaultTolerance {private final ConcurrentHashMap faultItemTable new ConcurrentHashMap(16);Overridepublic boolean isAvailable(final String name) {final FaultItem faultItem this.faultItemTable.get(name);if (faultItem ! null) {// 调用FaultItem的isAvailable方法判断是否可用return faultItem.isAvailable();}return true;}
}
参考 丁威、周继锋《RocketMQ技术内幕》
RocketMQ版本4.9.3