当前位置: 首页 > news >正文

网站转化率是什么意思网站建设哪里最好

网站转化率是什么意思,网站建设哪里最好,百度竞价推广开户费用,餐饮网络营销方式Java生鲜电商平台-SpringCloud微服务架构中分布式事务解决方案 说明#xff1a;Java生鲜电商平台中由于采用了微服务架构进行业务的处理#xff0c;买家#xff0c;卖家#xff0c;配送#xff0c;销售#xff0c;供应商等进行服务化#xff0c;但是不可避免存在分布式事… Java生鲜电商平台-SpringCloud微服务架构中分布式事务解决方案   说明Java生鲜电商平台中由于采用了微服务架构进行业务的处理买家卖家配送销售供应商等进行服务化但是不可避免存在分布式事务的问题 业界有很多的解决方案对此我相信大家都百度一下子就有很多但是我巨人大哥想说的是微服务架构中应当尽量避免分布式事务。   下面就是来讨论下分布式事务中主要聚焦于强一致性和最终一致性的解决方案。 微服务的发展 微服务倡导将复杂的单体应用拆分为若干个功能简单、松耦合的服务这样可以降低开发难度、增强扩展性、便于敏捷开发。当前被越来越多的开发者推崇很多互联网行业巨头、开源社区等都开始了微服务的讨论和实践。 微服务落地存在的问题 虽然微服务现在如火如荼但对其实践其实仍处于探索阶段。很多中小型互联网公司鉴于经验、技术实力等问题微服务落地比较困难。 如著名架构师Chris Richardson所言目前存在的主要困难有如下几方面 单体应用拆分为分布式系统后进程间的通讯机制和故障处理措施变的更加复杂。系统微服务化后一个看似简单的功能内部可能需要调用多个服务并操作多个数据库实现服务调用的分布式事务问题变的非常突出。微服务数量众多其测试、部署、监控等都变的更加困难。随着RPC框架的成熟第一个问题已经逐渐得到解决。例如springcloud可以非常好的支持restful调用dubbo可以支持多种通讯协议。 对于第三个问题随着docker、devops技术的发展以及各公有云paas平台自动化运维工具的推出微服务的测试、部署与运维会变得越来越容易。 而对于第二个问题现在还没有通用方案很好的解决微服务产生的事务问题。分布式事务已经成为微服务落地最大的阻碍也是最具挑战性的一个技术难题。 ACID 原子性Atomicity: 一个事务的所有系列操作步骤被看成是一个动作所有的步骤要么全部完成要么一个也不会完成如果事务过程中任何一点失败将要被改变的数据库记录就不会被真正被改变。 一致性Consistency: 数据库的约束 级联和触发机制Trigger都必须满足事务的一致性。也就是说通过各种途径包括外键约束等任何写入数据库的数据都是有效的不能发生表与表之间存在外键约束但是有数据却违背这种约束性。所有改变数据库数据的动作事务必须完成没有事务会创建一个无效数据状态这是不同于CAP理论的一致性consistency. 隔离性Isolation: 主要用于实现并发控制, 隔离能够确保并发执行的事务能够顺序一个接一个执行通过隔离一个未完成事务不会影响另外一个未完成事务。 持久性Durability: 一旦一个事务被提交它应该持久保存不会因为和其他操作冲突而取消这个事务。很多人认为这意味着事务是持久在磁盘上但是规范没有特别定义这点。 一致性理论 分布式事务的目的是保障分库数据一致性而跨库事务会遇到各种不可控制的问题如个别节点永久性宕机像单机事务一样的 ACID 是无法奢望的。 另外业界著名的 CAP 理论也告诉我们对分布式系统需要将数据一致性和系统可用性、分区容忍性放在天平上一起考虑。 两阶段提交协议(简称2PC)是实现分布式事务较为经典的方案但 2PC 的可扩展性很差在分布式架构下应用代价较大eBay 架构师 Dan Pritchett 提出了 BASE 理论用于解决大规模分布式系统下的数据一致性问题。 BASE 理论告诉我们可以通过放弃系统在每个时刻的强一致性来换取系统的可扩展性。 CAP 理论 在分布式系统中一致性(Consistency)、可用性(Availability)和分区容忍性(Partition Tolerance)3 个要素最多只能同时满足两个不可兼得。其中分区容忍性又是不可或缺的。 一致性分布式环境下多个节点的数据是否强一致。可用性分布式服务能一直保证可用状态。当用户发出一个请求后服务能在有限时间内返回结果。分区容忍性特指对网络分区的容忍性。举例Cassandra、Dynamo 等默认优先选择 AP弱化 C;HBase、MongoDB 等默认优先选择 CP弱化 A。 BASE 理论 核心思想 基本可用(Basically Available)指分布式系统在出现故障时允许损失部分的可用性来保证核心可用软状态(Soft state)指允许分布式系统存在中间状态该中间状态不会影响到系统的整体可用性最终一致性(Eventual consistency)指分布式系统中的所有副本数据经过一定时间后最终能够达到一致的状态原子性A与持久性D必须根本保障为了可用性、性能与降级服务的需要只有降低一致性( C ) 与 隔离性( I ) 的要求酸碱平衡(ACID-BASE Balance)BASE 是对 CAP 中 AP 的一个扩展 一致性模型 数据的一致性模型可以分成以下三类 强一致性数据更新成功后任意时刻所有副本中的数据都是一致的一般采用同步的方式实现。弱一致性数据更新成功后系统不承诺立即可以读到最新写入的值也不承诺具体多久之后可以读到。最终一致性弱一致性的一种形式数据更新成功后系统不承诺立即可以返回最新写入的值但是保证最终会返回上一次更新操作的值。分布式系统数据的强一致性、弱一致性和最终一致性可以通过 Quorum NRW 算法分析。 本地事务 在单个数据库的本地并且限制在单个进程内的事务本地事务不涉及多个数据来源分布式事务典型方案 两阶段提交2PC, Two Phase Commit方案本地消息表 eBay 事件队列方案TCC 补偿模式分类 两阶段型补偿型异步确保型最大努力通知型服务模式 可查询操作幂等操作TCC操作可补偿操作两阶段提交2PC强一致性 基于XA协议的两阶段提交: 第一阶段是表决阶段所有参与者都将本事务能否成功的信息反馈发给协调者第二阶段是执行阶段协调者根据所有参与者的反馈通知所有参与者步调一致地在所有分支上提交或者回滚; 缺点 单点问题事务管理器在整个流程中扮演的角色很关键如果其宕机比如在第一阶段已经完成在第二阶段正准备提交的时候事务管理器宕机资源管理器就会一直阻塞导致数据库无法使用。同步阻塞在准备就绪之后资源管理器中的资源一直处于阻塞直到提交完成释放资源。数据不一致两阶段提交协议虽然为分布式数据强一致性所设计但仍然存在数据不一致性的可能。比如在第二阶段中假设协调者发出了事务 Commit 的通知但是因为网络问题该通知仅被一部分参与者所收到并执行了 Commit 操作其余的参与者则因为没有收到通知一直处于阻塞状态这时候就产生了数据的不一致性。总的来说XA 协议比较简单成本较低但是其单点问题以及不能支持高并发(由于同步阻塞)依然是其最大的弱点。 本地消息表最终一致性 eBay 的架构师 Dan Pritchett曾在一篇解释 BASE 原理的论文《BaseAn Acid Alternative》中提到一个 eBay 分布式系统一致性问题的解决方案。   它的核心思想是将需要分布式处理的任务通过消息或者日志的方式来异步执行消息或日志可以存到本地文件、数据库或消息队列再通过业务规则进行失败重试它要求各服务的接口是幂等的。 本地消息表与业务数据表处于同一个数据库中这样就能利用本地事务来保证在对这两个表的操作满足事务特性并且使用了消息队列来保证最终一致性。 在分布式事务操作的一方完成写业务数据的操作之后向本地消息表发送一个消息本地事务能保证这个消息一定会被写入本地消息表中之后将本地消息表中的消息转发到 Kafka 等消息队列中如果转发成功则将消息从本地消息表中删除否则继续重新转发消息消费方处理这个消息并完成自己的业务逻辑。此时如果本地事务处理成功表明已经处理成功了如果处理失败那么就会重试执行。如果是业务上面的失败可以给生产方发送一个业务补偿消息通知生产方进行回滚等操作优点 一种非常经典的实现避免了分布式事务实现了最终一致性。 缺点 消息表会耦合到业务系统中如果没有封装好的解决方案会有很多杂活需要处理。 这个方案的核心在于第二阶段的重试和幂等执行。失败后重试这是一种补偿机制它是能保证系统最终一致的关键流程。 可靠消息的最终一致性代码示例 表结构 DROP TABLE IF EXISTS rp_transaction_message;CREATE TABLE rp_transaction_message (id VARCHAR (50) NOT NULL DEFAULT COMMENT 主键ID,version INT (11) NOT NULL DEFAULT 0 COMMENT 版本号,editor VARCHAR (100) DEFAULT NULL COMMENT 修改者,creater VARCHAR (100) DEFAULT NULL COMMENT 创建者,edit_time datetime DEFAULT NULL COMMENT 最后修改时间,create_time datetime NOT NULL DEFAULT 0000-00-00 00:00:00 COMMENT 创建时间,message_id VARCHAR (50) NOT NULL DEFAULT COMMENT 消息ID,message_body LONGTEXT NOT NULL COMMENT 消息内容,message_data_type VARCHAR (50) DEFAULT NULL COMMENT 消息数据类型,consumer_queue VARCHAR (100) NOT NULL DEFAULT COMMENT 消费队列,message_send_times SMALLINT (6) NOT NULL DEFAULT 0 COMMENT 消息重发次数,areadly_dead VARCHAR (20) NOT NULL DEFAULT COMMENT 是否死亡,status VARCHAR (20) NOT NULL DEFAULT COMMENT 状态,remark VARCHAR (200) DEFAULT NULL COMMENT 备注,field1 VARCHAR (200) DEFAULT NULL COMMENT 扩展字段1,field2 VARCHAR (200) DEFAULT NULL COMMENT 扩展字段2,field3 VARCHAR (200) DEFAULT NULL COMMENT 扩展字段3,PRIMARY KEY (id),KEY AK_Key_2 (message_id) ) ENGINE INNODB DEFAULT CHARSET utf8;public interface RpTransactionMessageService {/*** 预存储消息.*/public int saveMessageWaitingConfirm(RpTransactionMessage rpTransactionMessage) throws MessageBizException;/*** 确认并发送消息.*/public void confirmAndSendMessage(String messageId) throws MessageBizException;/*** 存储并发送消息.*/public int saveAndSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;/*** 直接发送消息.*/public void directSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;/*** 重发消息.*/public void reSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;/*** 根据messageId重发某条消息.*/public void reSendMessageByMessageId(String messageId) throws MessageBizException;/*** 将消息标记为死亡消息.*/public void setMessageToAreadlyDead(String messageId) throws MessageBizException;/*** 根据消息ID获取消息*/public RpTransactionMessage getMessageByMessageId(String messageId) throws MessageBizException;/*** 根据消息ID删除消息*/public void deleteMessageByMessageId(String messageId) throws MessageBizException;/*** 重发某个消息队列中的全部已死亡的消息.*/public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) throws MessageBizException;/*** 获取分页数据*/PageBean listPage(PageParam pageParam, MapString, Object paramMap) throws MessageBizException;} Service(rpTransactionMessageService) public class RpTransactionMessageServiceImpl implements RpTransactionMessageService {private static final Log log LogFactory.getLog(RpTransactionMessageServiceImpl.class);Autowiredprivate RpTransactionMessageDao rpTransactionMessageDao;Autowiredprivate JmsTemplate notifyJmsTemplate;public int saveMessageWaitingConfirm(RpTransactionMessage message) {if (message null) {throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, 保存的消息为空);}if (StringUtil.isEmpty(message.getConsumerQueue())) {throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, 消息的消费队列不能为空 );}message.setEditTime(new Date());message.setStatus(MessageStatusEnum.WAITING_CONFIRM.name());message.setAreadlyDead(PublicEnum.NO.name());message.setMessageSendTimes(0);return rpTransactionMessageDao.insert(message);}public void confirmAndSendMessage(String messageId) {final RpTransactionMessage message getMessageByMessageId(messageId);if (message null) {throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, 根据消息id查找的消息为空);}message.setStatus(MessageStatusEnum.SENDING.name());message.setEditTime(new Date());rpTransactionMessageDao.update(message);notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());notifyJmsTemplate.send(new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage(message.getMessageBody());}});}public int saveAndSendMessage(final RpTransactionMessage message) {if (message null) {throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, 保存的消息为空);}if (StringUtil.isEmpty(message.getConsumerQueue())) {throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, 消息的消费队列不能为空 );}message.setStatus(MessageStatusEnum.SENDING.name());message.setAreadlyDead(PublicEnum.NO.name());message.setMessageSendTimes(0);message.setEditTime(new Date());int result rpTransactionMessageDao.insert(message);notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());notifyJmsTemplate.send(new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage(message.getMessageBody());}});return result;}public void directSendMessage(final RpTransactionMessage message) {if (message null) {throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, 保存的消息为空);}if (StringUtil.isEmpty(message.getConsumerQueue())) {throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, 消息的消费队列不能为空 );}notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());notifyJmsTemplate.send(new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage(message.getMessageBody());}});}public void reSendMessage(final RpTransactionMessage message) {if (message null) {throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, 保存的消息为空);}if (StringUtil.isEmpty(message.getConsumerQueue())) {throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, 消息的消费队列不能为空 );}message.addSendTimes();message.setEditTime(new Date());rpTransactionMessageDao.update(message);notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());notifyJmsTemplate.send(new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage(message.getMessageBody());}});}public void reSendMessageByMessageId(String messageId) {final RpTransactionMessage message getMessageByMessageId(messageId);if (message null) {throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, 根据消息id查找的消息为空);}int maxTimes Integer.valueOf(PublicConfigUtil.readConfig(message.max.send.times));if (message.getMessageSendTimes() maxTimes) {message.setAreadlyDead(PublicEnum.YES.name());}message.setEditTime(new Date());message.setMessageSendTimes(message.getMessageSendTimes() 1);rpTransactionMessageDao.update(message);notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());notifyJmsTemplate.send(new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage(message.getMessageBody());}});}public void setMessageToAreadlyDead(String messageId) {RpTransactionMessage message getMessageByMessageId(messageId);if (message null) {throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, 根据消息id查找的消息为空);}message.setAreadlyDead(PublicEnum.YES.name());message.setEditTime(new Date());rpTransactionMessageDao.update(message);}public RpTransactionMessage getMessageByMessageId(String messageId) {MapString, Object paramMap new HashMapString, Object();paramMap.put(messageId, messageId);return rpTransactionMessageDao.getBy(paramMap);}public void deleteMessageByMessageId(String messageId) {MapString, Object paramMap new HashMapString, Object();paramMap.put(messageId, messageId);rpTransactionMessageDao.delete(paramMap);}SuppressWarnings(unchecked)public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) {log.info(reSendAllDeadMessageByQueueName);int numPerPage 1000;if (batchSize 0 batchSize 100) {numPerPage 100;} else if (batchSize 100 batchSize 5000) {numPerPage batchSize;} else if (batchSize 5000) {numPerPage 5000;} else {numPerPage 1000;}int pageNum 1;MapString, Object paramMap new HashMapString, Object();paramMap.put(consumerQueue, queueName);paramMap.put(areadlyDead, PublicEnum.YES.name());paramMap.put(listPageSortType, ASC);MapString, RpTransactionMessage messageMap new HashMapString, RpTransactionMessage();ListObject recordList new ArrayListObject();int pageCount 1;PageBean pageBean rpTransactionMessageDao.listPage(new PageParam(pageNum, numPerPage), paramMap);recordList pageBean.getRecordList();if (recordList null || recordList.isEmpty()) {log.info(recordList is empty);return;}pageCount pageBean.getTotalPage();for (final Object obj : recordList) {final RpTransactionMessage message (RpTransactionMessage) obj;messageMap.put(message.getMessageId(), message);}for (pageNum 2; pageNum pageCount; pageNum) {pageBean rpTransactionMessageDao.listPage(new PageParam(pageNum, numPerPage), paramMap);recordList pageBean.getRecordList();if (recordList null || recordList.isEmpty()) {break;}for (final Object obj : recordList) {final RpTransactionMessage message (RpTransactionMessage) obj;messageMap.put(message.getMessageId(), message);}}recordList null;pageBean null;for (Map.EntryString, RpTransactionMessage entry : messageMap.entrySet()) {final RpTransactionMessage message entry.getValue();message.setEditTime(new Date());message.setMessageSendTimes(message.getMessageSendTimes() 1);rpTransactionMessageDao.update(message);notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());notifyJmsTemplate.send(new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage(message.getMessageBody());}});}}SuppressWarnings(unchecked)public PageBeanRpTransactionMessage listPage(PageParam pageParam, MapString, Object paramMap) {return rpTransactionMessageDao.listPage(pageParam, paramMap);}} Component(messageBiz) public class MessageBiz {private static final Log log LogFactory.getLog(MessageBiz.class);Autowiredprivate RpTradePaymentQueryService rpTradePaymentQueryService;Autowiredprivate RpTransactionMessageService rpTransactionMessageService;/*** 处理[waiting_confirm]状态的消息* param messages*/public void handleWaitingConfirmTimeOutMessages(MapString, RpTransactionMessage messageMap) {log.debug(开始处理[waiting_confirm]状态的消息,总条数[ messageMap.size() ]);// 单条消息处理目前该状态的消息消费队列全部是accounting如果后期有业务扩充需做队列判断做对应的业务处理。for (Map.EntryString, RpTransactionMessage entry : messageMap.entrySet()) {RpTransactionMessage message entry.getValue();try {log.debug(开始处理[waiting_confirm]消息ID为[ message.getMessageId() ]的消息);String bankOrderNo message.getField1();RpTradePaymentRecord record rpTradePaymentQueryService.getRecordByBankOrderNo(bankOrderNo);// 如果订单成功把消息改为待处理并发送消息if (TradeStatusEnum.SUCCESS.name().equals(record.getStatus())) {// 确认并发送消息rpTransactionMessageService.confirmAndSendMessage(message.getMessageId());} else if (TradeStatusEnum.WAITING_PAYMENT.name().equals(record.getStatus())) {// 订单状态是等到支付可以直接删除数据log.debug(订单没有支付成功,删除[waiting_confirm]消息id[ message.getMessageId() ]的消息);rpTransactionMessageService.deleteMessageByMessageId(message.getMessageId());}log.debug(结束处理[waiting_confirm]消息ID为[ message.getMessageId() ]的消息);} catch (Exception e) {log.error(处理[waiting_confirm]消息ID为[ message.getMessageId() ]的消息异常, e);}}}/*** 处理[SENDING]状态的消息* param messages*/public void handleSendingTimeOutMessage(MapString, RpTransactionMessage messageMap) {SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);log.debug(开始处理[SENDING]状态的消息,总条数[ messageMap.size() ]);// 根据配置获取通知间隔时间MapInteger, Integer notifyParam getSendTime();// 单条消息处理for (Map.EntryString, RpTransactionMessage entry : messageMap.entrySet()) {RpTransactionMessage message entry.getValue();try {log.debug(开始处理[SENDING]消息ID为[ message.getMessageId() ]的消息);// 判断发送次数int maxTimes Integer.valueOf(PublicConfigUtil.readConfig(message.max.send.times));log.debug([SENDING]消息ID为[ message.getMessageId() ]的消息,已经重新发送的次数[ message.getMessageSendTimes() ]);// 如果超过最大发送次数直接退出if (maxTimes message.getMessageSendTimes()) {// 标记为死亡rpTransactionMessageService.setMessageToAreadlyDead(message.getMessageId());continue;}// 判断是否达到发送消息的时间间隔条件int reSendTimes message.getMessageSendTimes();int times notifyParam.get(reSendTimes 0 ? 1 : reSendTimes);long currentTimeInMillis Calendar.getInstance().getTimeInMillis();long needTime currentTimeInMillis - times * 60 * 1000;long hasTime message.getEditTime().getTime();// 判断是否达到了可以再次发送的时间条件if (hasTime needTime) {log.debug(currentTime[ sdf.format(new Date()) ],[SENDING]消息上次发送时间[ sdf.format(message.getEditTime()) ],必须过了[ times ]分钟才可以再发送。);continue;}// 重新发送消息rpTransactionMessageService.reSendMessage(message);log.debug(结束处理[SENDING]消息ID为[ message.getMessageId() ]的消息);} catch (Exception e) {log.error(处理[SENDING]消息ID为[ message.getMessageId() ]的消息异常, e);}}}/*** 根据配置获取通知间隔时间* return*/private MapInteger, Integer getSendTime() {MapInteger, Integer notifyParam new HashMapInteger, Integer();notifyParam.put(1, Integer.valueOf(PublicConfigUtil.readConfig(message.send.1.time)));notifyParam.put(2, Integer.valueOf(PublicConfigUtil.readConfig(message.send.2.time)));notifyParam.put(3, Integer.valueOf(PublicConfigUtil.readConfig(message.send.3.time)));notifyParam.put(4, Integer.valueOf(PublicConfigUtil.readConfig(message.send.4.time)));notifyParam.put(5, Integer.valueOf(PublicConfigUtil.readConfig(message.send.5.time)));return notifyParam;}} public class AccountingMessageListener implements SessionAwareMessageListenerMessage {private static final Log LOG LogFactory.getLog(AccountingMessageListener.class);/*** 会计队列模板(由Spring创建并注入进来)*/Autowiredprivate JmsTemplate notifyJmsTemplate;Autowiredprivate RpAccountingVoucherService rpAccountingVoucherService;Autowiredprivate RpTransactionMessageService rpTransactionMessageService;public synchronized void onMessage(Message message, Session session) {RpAccountingVoucher param null;String strMessage null;try {ActiveMQTextMessage objectMessage (ActiveMQTextMessage) message;strMessage objectMessage.getText();LOG.info(strMessage1 accounting: strMessage);param JSONObject.parseObject(strMessage, RpAccountingVoucher.class);// 这里转换成相应的对象还有问题if (param null) {LOG.info(param参数为空);return;}int entryType param.getEntryType();double payerChangeAmount param.getPayerChangeAmount();String voucherNo param.getVoucherNo();String payerAccountNo param.getPayerAccountNo();int fromSystem param.getFromSystem();int payerAccountType 0;if (param.getPayerAccountType() ! null !param.getPayerAccountType().equals()) {payerAccountType param.getPayerAccountType();}double payerFee param.getPayerFee();String requestNo param.getRequestNo();double bankChangeAmount param.getBankChangeAmount();double receiverChangeAmount param.getReceiverChangeAmount();String receiverAccountNo param.getReceiverAccountNo();String bankAccount param.getBankAccount();String bankChannelCode param.getBankChannelCode();double profit param.getProfit();double income param.getIncome();double cost param.getCost();String bankOrderNo param.getBankOrderNo();int receiverAccountType 0;double payAmount param.getPayAmount();if (param.getReceiverAccountType() ! null !param.getReceiverAccountType().equals()) {receiverAccountType param.getReceiverAccountType();}double receiverFee param.getReceiverFee();String remark param.getRemark();rpAccountingVoucherService.createAccountingVoucher(entryType, voucherNo, payerAccountNo, receiverAccountNo, payerChangeAmount, receiverChangeAmount, income, cost, profit, bankChangeAmount, requestNo, bankChannelCode, bankAccount, fromSystem, remark, bankOrderNo, payerAccountType, payAmount, receiverAccountType, payerFee, receiverFee);//删除消息rpTransactionMessageService.deleteMessageByMessageId(param.getMessageId());} catch (BizException e) {// 业务异常不再写会队列LOG.error(BizException, e);} catch (Exception e) {// 不明异常不再写会队列LOG.error(Exception, e);}}public JmsTemplate getNotifyJmsTemplate() {return notifyJmsTemplate;}public void setNotifyJmsTemplate(JmsTemplate notifyJmsTemplate) {this.notifyJmsTemplate notifyJmsTemplate;}public RpAccountingVoucherService getRpAccountingVoucherService() {return rpAccountingVoucherService;}public void setRpAccountingVoucherService(RpAccountingVoucherService rpAccountingVoucherService) {this.rpAccountingVoucherService rpAccountingVoucherService;}}与常规MQ的ACK机制对比 常规MQ确认机制 Producer生成消息并发送给MQ同步、异步MQ接收消息并将消息数据持久化到消息存储持久化操作为可选配置MQ向Producer返回消息的接收结果返回值、异常Consumer监听并消费MQ中的消息Consumer获取到消息后执行业务处理Consumer对已成功消费的消息向MQ进行ACK确认确认后的消息将从MQ中删除常规MQ队列消息的处理流程无法实现消息发送一致性因此直接使用现成的MQ中间件产品无法实现可靠消息最终一致性的分布式事务解决方案 消息发送一致性是指产生消息的业务动作与消息发送的一致。也就是说如果业务操作成功那么由这个业务操作所产生的消息一定要成功投递出去(一般是发送到kafka、rocketmq、rabbitmq等消息中间件中)否则就丢消息。 下面用伪代码进行演示消息发送和投递的不可靠性 先进行数据库操作再发送消息 public void test1(){ //1 数据库操作 //2 发送MQ消息 } 这种情况下无法保证数据库操作与发送消息的一致性因为可能数据库操作成功发送消息失败。 先发送消息再操作数据库 public void test1(){ //1 发送MQ消息 //2 数据库操作 } 这种情况下无法保证数据库操作与发送消息的一致性因为可能发送消息成功数据库操作失败。 在数据库事务中先发送消息后操作数据库 Transactional public void test1(){ //1 发送MQ消息 //2 数据库操作 } 这里使用spring 的Transactional注解方法里面的操作都在一个事务中。同样无法保证一致性因为发送消息成功了数据库操作失败的情况下数据库操作是回滚了但是MQ消息没法进行回滚。 在数据库事务中先操作数据库后发送消息 Transactional public void test1(){ //1 数据库操作 //2 发送MQ消息 } 这种情况下貌似没有问题如果发送MQ消息失败抛出异常事务一定会回滚(加上了Transactional注解后spring方法抛出异常后会自动进行回滚)。 这只是一个假象因为发送MQ消息可能事实上已经成功如果是响应超时导致的异常。这个时候数据库操作依然回滚但是MQ消息实际上已经发送成功导致不一致。 与消息发送一致性流程的对比 常规MQ队列消息的处理流程无法实现消息发送一致性投递消息的流程其实就是消息的消费流程可细化TCC (Try-Confirm-Cancel)补偿模式最终一致性 TCC 其实就是采用的补偿机制其核心思想是针对每个操作都要注册一个与其对应的确认和补偿撤销操作。 它分为三个阶段 Try 阶段主要是对业务系统做检测及资源预留Confirm 阶段主要是对业务系统做确认提交Try阶段执行成功并开始执行 Confirm阶段时默认 Confirm阶段是不会出错的。即只要Try成功Confirm一定成功。Cancel 阶段主要是在业务执行错误需要回滚的状态下执行的业务取消预留资源释放。 举例Bob 要向 Smith 转账 首先在 Try 阶段要先调用远程接口把 Smith 和 Bob 的钱给冻结起来。在 Confirm 阶段执行远程调用的转账的操作转账成功进行解冻。如果第2步执行成功那么转账成功如果第二步执行失败则调用远程冻结接口对应的解冻方法 (Cancel)。优点 跟2PC比起来实现以及流程相对简单了一些但数据的一致性比2PC也要差一些 缺点 缺点还是比较明显的在2,3步中都有可能失败。TCC属于应用层的一种补偿方式所以需要程序员在实现的时候多写很多补偿的代码在一些场景中一些业务流程可能用TCC不太好定义及处理。 可靠消息最终一致常用 不要用本地的消息表了直接基于MQ来实现事务。比如阿里的RocketMQ就支持消息事务。 可靠消息最终一致性方案 大概流程 A系统先发送一个prepared消息到mq如果这个prepared消息发送失败那么就直接取消操作别执行了如果这个消息发送成功过了那么接着执行本地事务如果成功就告诉mq发送确认消息如果失败就告诉mq回滚消息如果发送了确认消息那么此时B系统会接收到确认消息然后执行本地的事务mq会自动定时轮询所有prepared消息回调你的接口问你这个消息是不是本地事务处理失败了所有没发送确认消息那是继续重试还是回滚一般来说这里你就可以查下数据库看之前本地事务是否执行如果回滚了那么这里也回滚吧。这个就是避免可能本地事务执行成功了别确认消息发送失败了。这个方案里要是系统B的事务失败了咋办重试咯自动不断重试直到成功如果实在是不行要么就是针对重要的资金类业务进行回滚比如B系统本地回滚后想办法通知系统A也回滚或者是发送报警由人工来手工回滚和补偿 目前国内互联网公司大都是这么玩儿的要不你使用RocketMQ支持的要不你就基于其他MQ中间件自己封装一套类似的逻辑总之思路就是这样的。 最大努力通知 业务发起方将协调服务的消息发送到MQ下游服务接收此消息如果处理失败将进行重试重试N次后依然失败将不进行重试放弃处理这个应用场景要求对事物性要求不高的地方。 最大努力通知方案   最终总结       需要讨论与学习请加QQ群793305035    转载于:https://www.cnblogs.com/jurendage/p/11353968.html
http://wiki.neutronadmin.com/news/41074/

相关文章:

  • 贵州建设厅网站厅长表情包制作在线
  • 凡科建的网站怎么样软件开发的本质
  • 中国制药网网站开发技术连城县住房和城乡建设局 网站
  • 建网站租服务器多少钱虚拟主机和服务器
  • 中国建设监理协会网站提高景区网站建设
  • 学前心理学课程建设网站医疗网站建设精英
  • 深圳 网站 传播优秀网站特点
  • 桂林做旅游网站失败的网站网站包括哪些内容吗
  • 青海省建设厅网站备案资料旅游网站建设费用预算
  • wordpress中清理优化大师
  • 今天猪最新价格seo软件定制
  • 聊城做网站乐都网站建设多少钱
  • 哈尔滨有多少家网站建设公司百度官方网站下载
  • 建网页和建网站手机上如何上传wordpress
  • 沙漠网站建设游戏ui设计师网站有哪些
  • 成都响应网站建设创意logo图片大全
  • 常州建设工程交易网站怎样卸载电脑上的wordpress
  • 公司网站服务费计入什么科目网站定制哪家快
  • wordpress小说网站主题哪家做网站的好
  • 网站建设的结论和体会wordpress媒体库显示
  • 天津哪里有做网站的公司网站菜单导航制作教程
  • 廊坊市广阳区建设局网站百度区域代理
  • 卫生系统网站的建设和维护自己做的网站无法访问
  • 三亚网站建设价格成都企业建站
  • 最大的地方门户网站源码wordpress多麦
  • 网站维护外包做婚庆网站图片下载
  • 网站备案进度查询个人简介网页制作
  • 重庆网站设计最佳科技wordpress免费好用主题
  • 永久打开本网站的wordpress 添加外链
  • 社交博客网站开发深圳网站建设及优化