网站开发招聘名称,嘉兴网站建设,上海专业做网站建设方法,廊坊网站制作网页之前对这个的理解有些问题#xff0c;今天用到有仔细梳理了一遍#xff0c;记录一下 首先开启storm tracker机制的前提是#xff0c; 1. 在spout emit tuple的时候#xff0c;要加上第3个参数messageid 2. 在配置中acker数目至少为1 3. 在bolt emit的时候#xff0c;要加… 之前对这个的理解有些问题今天用到有仔细梳理了一遍记录一下 首先开启storm tracker机制的前提是 1. 在spout emit tuple的时候要加上第3个参数messageid 2. 在配置中acker数目至少为1 3. 在bolt emit的时候要加上第二个参数anchor tuple以保持tracker链路 流程 1. 当tuple具有messageid时spout会把该tuple加到pending list里面 并发消息给acker通知acker开始tracker这条tuple 2. 然后再后续的bolt的处理逻辑中你必须显式的ack或fail所有处理的tuple 如果这条tuple在整个DAG图上都成功执行了那么acker会发现该tuple的track异或值为0 于是acker会发ack_message给spout 当然如果在DAG图上任意一个节点bolt上fail那么acker会认为该tuple fail 于是acker会发fail_message给spout 3. 当spout收到ack或fail message如何处理 首先是从pending list里面删掉这条tuple因为无论ack或fail只要得到结果这条tuple就没有继续被cache的必要了 然后做的事是调用spout.ack或spout.fail 所以系统默认是不会做任何事的甚至是fail后的重发你也需要在fail里面自己实现 如何实现后面看 4. 如果一条tuple没有被ack或fail最终是会超时的 Spout会根据system tick去rotate pending list对于每个过时的tuple都调用spout.fail 下面的问题就是如何做fail重发 这个必须用户通过自己处理fail来做系统是不会自己做的 public void fail(Object msgId) 看看系统提供的接口只有msgId这个参数这里的设计不合理其实在系统里是有cache整个msg的只给用户一个messageid用户如何取得原来的msg 貌似需要自己cache然后用这个msgId去查询太坑爹了 阿里自己的Jstorm会提供 public interface IFailValueSpout { void fail(Object msgId, Listobjectvalues); } 这样更合理一些, 可以直接取得系统cache的msg values 本文章摘自博客园原文发布日期 2014-06-24