中国网站建设公司排行,湘潭做网站 活动磐石网络,wordpress文章数据库表,在线logo设计生成器1.设计
1.1 背景
系统启动后#xff0c;所有任务都在被执行#xff0c;如果这时某个节点宕机#xff0c;那它负责的任务就不能执行了#xff0c;这对有稳定性要求的任务是不能接受的#xff0c;所以系统要实现rebalance的功能。
1.2 设计
下面是Job分配与执行的业务点…1.设计
1.1 背景
系统启动后所有任务都在被执行如果这时某个节点宕机那它负责的任务就不能执行了这对有稳定性要求的任务是不能接受的所以系统要实现rebalance的功能。
1.2 设计
下面是Job分配与执行的业务点重分配就是在 follower下线、controller下线、节点新上线进行重分配。理清楚接下来实现就是水到渠成了 2. 实现
2.1 RebalanceJobType
定义了重平衡job的类型
public enum RebalanceJobType {FOLLOWER_OFFLINE(0), CONTROLLER_OFFLINE(1), NODE_ONLINE(2);private int code;RebalanceJobType(int code) {this.code code;}public boolean isFollowerOffline() {return this.code FOLLOWER_OFFLINE.code;}public boolean isControllerOffline() {return this.code CONTROLLER_OFFLINE.code;}public boolean isNodeOnline() {return this.code NODE_ONLINE.code;}}
2.2 AverageJobAllotStrategy
添加了 rebalanceJob的方法只有Controller才能调用对不同的重平衡情况进行分别处理
private MapLong, ListDttaskJob getDttaskJobMap() {ListDttaskJob allDttaskJob getAllDttaskJob();return average(allDttaskJob);
}Override
public void rebalanceJob(RebalanceJobContext rebalanceJobContext) {if (rebalanceJobContext.getType().isFollowerOffline()|| rebalanceJobContext.getType().isControllerOffline()) {long offlineServerId rebalanceJobContext.getServerId();log.info({}节点{}下线-重平衡job{},rebalanceJobContext.getType().isFollowerOffline() ? follower : controller,offlineServerId,rebalanceJobContext);ListDttaskJob dttaskJobs getByDttaskId(offlineServerId);ListNodeInfo nodeInfoList ServerInfo.getNodeInfoList();MapLong, ListDttaskJob allotMap new HashMap();int i 0;int nodeCount nodeInfoList.size();while (i dttaskJobs.size()) {DttaskJob dttaskJob dttaskJobs.get(i);NodeInfo nodeInfo nodeInfoList.get(i % nodeCount);i;ListDttaskJob dttaskJobList allotMap.getOrDefault(nodeInfo.getServerId(), new ArrayList());dttaskJobList.add(dttaskJob);allotMap.put(nodeInfo.getServerId(), dttaskJobList);}executeDttaskJob(new ExecuteDttaskJobContext(allotMap, true));} else if (rebalanceJobContext.getType().isNodeOnline()) {log.info(节点上线-重平衡job{}, rebalanceJobContext);long onlineServerId rebalanceJobContext.getServerId();MapLong, ListDttaskJob dttaskJobMap BeanUseHelper.entityHelpService().queryDttaskJob();MapLong, ListDttaskJob allotDttaskJobMap getDttaskJobMap();MapLong, ListDttaskJob stopDttaskJobMapOfOldNodes new HashMap();MapLong, ListDttaskJob startDttaskJobMapOfNewNodes new HashMap();ListDttaskJob startDttaskJobs new ArrayList();dttaskJobMap.forEach((serverId, dttaskJobList) - {int size dttaskJobList.size();int newSize allotDttaskJobMap.get(serverId).size();if (size newSize) {ListDttaskJob dttaskJobs dttaskJobList.subList(0, size - newSize);stopDttaskJobMapOfOldNodes.put(serverId, dttaskJobs);startDttaskJobs.addAll(dttaskJobs);}});startDttaskJobMapOfNewNodes.put(onlineServerId, startDttaskJobs);executeDttaskJob(new ExecuteDttaskJobContext(stopDttaskJobMapOfOldNodes, false));executeDttaskJob(new ExecuteDttaskJobContext(startDttaskJobMapOfNewNodes, true));}
}
2.3 ServerClientChannelHandler
对节点下线进行重平衡处理 2.4 NodeOnlineMessageService 3. 测试
启动三个节点节点完成选举每个节点执行2个任务 3号节点下线
1 2 节点各分配了一个任务继续执行 3号节点上线
新上线的3号节点重新得到2个任务1 2节点各停止一个任务 至此节点上下线的任务重平衡完成