当前位置: 首页 > news >正文

魔站网站开发网站 开发逻辑

魔站网站开发,网站 开发逻辑,怎么样注册企业邮箱,做哪个app软件下载消息发送基本流程 消息发送流程主要的步骤#xff1a;验证消息、查找路由、消息发送#xff08;包含异常处理机制#xff09;。 代码#xff1a;同步消息发送入口 DefaultMQProducer#send public SendResult send(Message msg) throws MQClientException, RemotingExcep…消息发送基本流程 消息发送流程主要的步骤验证消息、查找路由、消息发送包含异常处理机制。 代码同步消息发送入口 DefaultMQProducer#send public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException{return this.defaultMQProducerImpl.send(msg); }DefaultMQProducerImpl#send public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return send(msg, this.defaultMQProducer.getSendMsgTimeout()); } public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); }默认消息发送以同步方式发送默认超时时间为3s。 消息长度验证 消息发送之前首先确保生产者处于运行状态然后验证消息是否符合相应的规范具体的规范要求是主题名称、消息体不能为空、消息长度不能等于0且默认不能超过允许发送消息的最大长度4MmaxMessageSize102410244。 第一次发送消息时本地没有缓存topic的路由信息查询NameServer尝试获取如果路由信息未找到再次尝试用默认主题DefaultMQProducerImpl#createTopicKey去查询如果BrokerConfig#autoCreateTopicEnable为true时NameServer将返回路由信息如果autoCreateTopicEnable为false将抛出无法找到topic路由异常。 代码MQClientInstance#updateTopicRouteInfoFromNameServer这个方法的功能是消息生产者更新和维护路由缓存具体代码如下。 TopicRouteData topicRouteData; if (isDefault defaultMQProducer ! null) {topicRouteData this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),1000 * 3);if (topicRouteData ! null) {for (QueueData data : topicRouteData.getQueueDatas()) {int queueNums Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());data.setReadQueueNums(queueNums);data.setWriteQueueNums(queueNums);}} } else {topicRouteData this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3); }Step1如果isDefault为true则使用默认主题去查询如果查询到路由信息则替换路由信息中读写队列个数为消息生产者默认的队列个数defaultTopicQueueNums如果isDefault为false则使用参数topic去查询如果未查询到路由信息则返回false表示路由信息未变化。 代码清单3-11  MQClientInstance#updateTopicRouteInfoFromNameServer TopicRouteData old this.topicRouteTable.get(topic); boolean changed topicRouteDataIsChange(old, topicRouteData); if (!changed) {changed this.isNeedUpdateTopicRouteInfo(topic); } else {log.info(the topic[{}] route info changed, old[{}] ,new[{}], topic, old, topicRouteData); }Step2如果路由信息找到与本地缓存中的路由信息进行对比判断路由信息是否发生了改变如果未发生变化则直接返回false。 Step3更新MQClientInstance Broker地址缓存表。 代码MQClientInstance#updateTopicRouteInfoFromNameServer// {TopicPublishInfo publishInfo topicRouteData2TopicPublishInfo(topic, topicRouteData);publishInfo.setHaveTopicRouterInfo(true);IteratorEntryString, MQProducerInner it this.producerTable.entrySet().iterator();while (it.hasNext()) {EntryString, MQProducerInner entry it.next();MQProducerInner impl entry.getValue();if (impl ! null) {impl.updateTopicPublishInfo(topic, publishInfo);}} }Step4根据topicRouteData中的List转换成topicPublishInfo的List列表。其具体实现在topicRouteData2TopicPublishInfo然后会更新该MQClientInstance所管辖的所有消息发送关于topic的路由信息。 代码 MQClientInstance#updateTopicRouteInfoFromNameServer ListQueueData qds route.getQueueDatas(); Collections.sort(qds); for (QueueData qd : qds) {if (PermName.isWriteable(qd.getPerm())) {BrokerData brokerData null;for (BrokerData bd : route.getBrokerDatas()) {if (bd.getBrokerName().equals(qd.getBrokerName())) {brokerData bd;break;}}if (null brokerData) {continue;} if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {continue;}for (int i 0; i qd.getWriteQueueNums(); i) {MessageQueue mq new MessageQueue(topic, qd.getBrokerName(), i);info.getMessageQueueList().add(mq);}} }循环遍历路由信息的QueueData信息如果队列没有写权限则继续遍历下一个QueueData根据brokerName找到brokerData信息找不到或没有找到Master节点则遍历下一个QueueData根据写队列个数根据topic序号创建MessageQueue填充topicPublishInfo的List。完成消息发送的路由查找。 选择消息队列 根据路由信息选择消息队列返回的消息队列按照broker、序号排序。举例说明如果topicA在broker-a,broker-b上分别创建了4个队列那么返回的消息队列[{“brokerName”:“broker-a”,“queueId”:0},{“brokerName”:“broker-a”,“queueId”:1},{“brokerName”:“broker-a”,“queueId”:2},{“brokerName”:“broker-a”,“queueId”:3},{“brokerName”:“broker-b”,“queueId”:0},{“brokerName”:“broker-b”,“queueId”:1},{“brokerName”:“broker-b”,“queueId”:2},{“brokerName”:“broker-b”,“queueId”:3}]那RocketMQ如何选择消息队列呢 首先消息发送端采用重试机制由retryTimesWhenSendFailed指定同步方式重试次数异步重试机制在收到消息发送结构后执行回调之前进行重试。由retryTimesWhenSendAsyncFailed指定接下来就是循环执行选择消息队列、发送消息发送成功则返回收到异常则重试。选择消息队列有两种方式。 1sendLatencyFaultEnablefalse默认不启用Broker故障 延迟机制。 2sendLatencyFaultEnabletrue启用Broker故障延迟机制。 1.默认机制 sendLatencyFaultEnablefalse调用TopicPublishInfo#selectOneMessageQueue。 代码TopicPublishInfo#selectOneMessageQueue public MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName null) {return selectOneMessageQueue();} else {int index this.sendWhichQueue.getAndIncrement();for (int i 0; i this.messageQueueList.size(); i) {int pos Math.abs(index) % this.messageQueueList.size();if (pos 0)pos 0;MessageQueue mq this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) {return mq;}}return selectOneMessageQueue();} } public MessageQueue selectOneMessageQueue() {int index this.sendWhichQueue.getAndIncrement();int pos Math.abs(index) % this.messageQueueList.size();if (pos 0)pos 0;return this.messageQueueList.get(pos); }首先在一次消息发送过程中可能会多次执行选择消息队列这个方法lastBrokerName就是上一次选择的执行发送消息失败的Broker。第一次执行消息队列选择时lastBrokerName为null此时直接用sendWhichQueue自增再获取值与当前路由表中消息队列个数取模返回该位置的MessageQueueselectOneMessageQueue()方法如果消息发送再失败的话下次进行消息队列选择时规避上次MesageQueue所在的Broker否则还是很有可能再次失败。 该算法在一次消息发送过程中能成功规避故障的Broker但如果Broker宕机由于路由算法中的消息队列是按Broker排序的如果上一次根据路由算法选择的是宕机的Broker的第一个队列那么随后的下次选择的是宕机Broker的第二个队列消息发送很有可能会失败再次引发重试带来不必要的性能损耗那么有什么方法在一次消息发送失败后暂时将该Broker排除在消息队列选择范围外呢或许有朋友会问Broker不可用后路由信息中为什么还会包含该Broker的路由信息呢其实这不难解释首先NameServer检测Broker是否可用是有延迟的最短为一次心跳检测间隔10s其次NameServer不会检测到Broker宕机后马上推送消息给消息生产者而是消息生产者每隔30s更新一次路由信息所以消息生产者最快感知Broker最新的路由信息也需要30s。如果能引入一种机制在Broker宕机期间如果一次消息发送失败后可以将该Broker暂时排除在消息队列的选择范围中。 2.Broker故障延迟机制 MQFaultStrategy#selectOneMessageQueue public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {if (this.sendLatencyFaultEnable) {try {int index tpInfo.getSendWhichQueue().getAndIncrement();for (int i 0; i tpInfo.getMessageQueueList().size(); i) {int pos Math.abs(index) % tpInfo.getMessageQueueList().size();if (pos 0)pos 0;MessageQueue mq tpInfo.getMessageQueueList().get(pos);if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {if (null lastBrokerName || mq.getBrokerName().equals(lastBrokerName))return mq;}}final String notBestBroker latencyFaultTolerance.pickOneAtLeast();int writeQueueNums tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums 0) {final MessageQueue mq tpInfo.selectOneMessageQueue();if (notBestBroker ! null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() %writeQueueNums);}return mq;} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error(Error occurred when selecting message queue, e);}return tpInfo.selectOneMessageQueue();}return tpInfo.selectOneMessageQueue(lastBrokerName); }1根据对消息队列进行轮询获取一个消息队列。 2验证该消息队列是否可用latencyFaultTolerance.isAvailablemq.getBrokerName()是关键。 3如果返回的MessageQueue可用移除latencyFaultTolerance关于该topic条目表明该Broker故障已经恢复。 3.4.4 消息发送 消息发送API核心入口DefaultMQProducerImpl#sendKernelImpl private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout)消息发送参数详解。 1Message msg待发送消息。 2MessageQueue mq消息将发送到该消息队列上。 3CommunicationMode communicationMode消息发送模式SYNC、ASYNC、ONEWAY。 4SendCallback sendCallback异步消息回调函数。 5TopicPublishInfo topicPublishInfo主题路由信息 6long timeout消息发送超时时间。 代码DefaultMQProducerImpl#sendKernelImpl String brokerAddr this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null brokerAddr) {tryToFindTopicPublishInfo(mq.getTopic());brokerAddr this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); }Step1根据MessageQueue获取Broker的网络地址。如果MQClientInstance的brokerAddrTable未缓存该Broker的信息则从NameServer主动更新一下topic的路由信息。如果路由更新后还是找不到Broker信息则抛出MQClientException提示Broker不存在。 代码 DefaultMQProducerImpl#sendKernelImpl //for MessageBatch,ID has been set in the generating process if (!(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg); } int sysFlag 0; if (this.tryToCompressMessage(msg)) {sysFlag | MessageSysFlag.COMPRESSED_FLAG; } final String tranMsg msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg ! null Boolean.parseBoolean(tranMsg)) {sysFlag | MessageSysFlag.TRANSACTION_PREPARED_TYPE; }Step2为消息分配全局唯一ID如果消息体默认超过4KcompressMsgBodyOverHowmuch会对消息体采用zip压缩并设置消息的系统标记为MessageSysFlag.COMPRESSED_FLAG。如果是事务Prepared消息则设置消息的系统标记为MessageSysFlag.TRANSACTION_PREPARED_TYPE。 代码 if (this.hasSendMessageHook()) {context new SendMessageContext();context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);String isTrans msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans ! null isTrans.equals(true)) {context.setMsgType(MessageType.Trans_Msg_Half);}if (msg.getProperty(__STARTDELIVERTIME) ! null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) ! null) {context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context); }1.同步发送 MQ客户端发送消息的入口是MQClientAPIImpl#sendMessage。请求命令是RequestCode.SEND_MESSAGE我们可以找到该命令的处理类org.apache.rocketmq.broker.processor.SendMessageProcessor。入口方法在SendMessageProcessor#sendMessage。 代码AbstractSendMessageProcessor#msgCheck protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,finalSendMessageRequestHeader requestHeader, final RemotingCommand response) {if(!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission()) this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark(the broker[ this.brokerController.getBrokerConfig().getBrokerIP1() ] sending message is forbidden);return response;}if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {String errorMsg the topic[ requestHeader.getTopic() ] is conflict with system reserved words.;log.warn(errorMsg);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(errorMsg);return response;}TopicConfig topicConfig this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (null topicConfig) {int topicSysFlag 0;if (requestHeader.isUnitMode()) {if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {topicSysFlag TopicSysFlag.buildSysFlag(false, true);} else {topicSysFlag TopicSysFlag.buildSysFlag(true, false);}}log.warn(the topic {} not exist, producer: {}, requestHeader.getTopic(), ctx.channel().remoteAddress());topicConfig this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(requestHeader.getTopic(),requestHeader.getDefaultTopic(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()),requestHeader.getDefaultTopicQueueNums(), topicSysFlag);if (null topicConfig) {if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {topicConfig this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(requestHeader.getTopic(), 1, PermName.PERM_WRITE |PermName.PERM_READ,topicSysFlag);}}if (null topicConfig) {response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark(topic[ requestHeader.getTopic() ] notexist, apply first please! FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));return response;}}int queueIdInt requestHeader.getQueueId();int idValid Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());if (queueIdInt idValid) {String errorInfo String.format(request queueId[%d] is illegal, %sProducer: %s,queueIdInt,topicConfig.toString(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));log.warn(errorInfo);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(errorInfo);return response;}return response; }Step1检查消息发送是否合理这里完成了以下几件事情。 1检查该Broker是否有写权限。 2检查该Topic是否可以进行消息发送。主要针对默认主题默认主题不能发送消息仅仅供路由查找。 3在NameServer端存储主题的配置信息默认路径${ROCKET_HOME}/store/config/topic.json。下面是主题存储信息。order是否是顺序消息perm权限码read QueueNums读队列数量writeQueueNums写队列数量topicName主题名称topicSysFlagtopic Flag当前版本暂为保留topicFilterType主题过滤方式当前版本仅支持SINGLE_TAG。 4检查队列如果队列不合法返回错误码。 Step2如果消息重试次数超过允许的最大重试次数消息将进入到DLD延迟队列。延迟队列主题%DLQ%消费组名延迟队列在消息消费时将重点讲解。 Step3调用DefaultMessageStore#putMessage进行消息存储。关于消息存储的实现细节将在第4章重点剖析。 2.异步发送 消息异步发送是指消息生产者调用发送的API后无须阻塞等待消息服务器返回本次消息发送结果只需要提供一个回调函数供消息发送客户端在收到响应结果回调。异步方式相比同步方式消息发送端的发送性能会显著提高但为了保护消息服务器的负载压力RocketMQ对消息发送的异步消息进行了并发控制通过参数clientAsyncSemaphoreValue来控制默认为65535。异步消息发送虽然也可以通过DefaultMQProducer#retryTimesWhenSendAsyncFailed属性来控制消息重试次数但是重试的调用入口是在收到服务端响应包时进行的如果出现网络异常、网络超时等将不会重试。 3.单向发送 单向发送是指消息生产者调用消息发送的API后无须等待消息服务器返回本次消息发送结果并且无须提供回调函数表示消息发送压根就不关心本次消息发送是否成功其实现原理与异步消息发送相同只是消息发送客户端在收到响应结果后什么都不做而已并且没有重试机制。
http://wiki.neutronadmin.com/news/104132/

相关文章:

  • 怎么用ps做网站首页字天堂网
  • 网站由哪儿三部分组成哪里有网站设计公司
  • 朝阳企业网站建设方案wordpress主题首页问题
  • 提升网站打开速度系统app定制开发
  • 做网站哪种编程语言最好网站弹窗怎么做
  • 17岁高清免费观看完整版网站的结构与布局优化设计
  • 如何拿网站后台账号推广小程序拿佣金
  • 网站官网建设的价格种子搜索引擎在线
  • 做网站的app有什么作用wordpress 4.4 优化
  • 织梦网站图片设置多大wordpress新浪图床会挂吗
  • 一般购物网站项目网站建设 宁夏
  • 简述网站开发的基本原则众筹网站平台建设
  • 新建网站百度搜不到备案的网站名称能重复备案吗
  • 上海有名网站建站开发公司网站的seo 如何优化
  • 海口建设工程信息网站网站 数据备份
  • 制作网站的过程是对信息的可以免费进的服务器网站
  • 专注于响应式网站开发哈尔滨网站推广公司
  • 属于教育主管部门建设的专题资源网站是c2c网站管理系统下载
  • 介绍好的免费网站模板下载网站备案ip地址
  • 商务网站模板下载怎么创建一个自己的网站
  • 什么是云速建站服务视频拍摄和剪辑怎么学
  • 网站建设属什么合同企云网站建设
  • gta 买房网站建设中如何制作个人手机网站
  • 安防网站源码软件开发流程流程图
  • 网站框架模板广州冼村房价多少钱
  • 网站建设代码标准佛山有哪几个区
  • 高端网站设计公司排行榜动画制作软件排行榜
  • 建设一个购物网站多少钱wordpress换中文
  • 网站开发专业就业好不好金寨县重点工程建设管理局网站
  • 旅游网站建设技术解决方案男女生做羞羞事情的网站