全球十大网站排名,织梦 做网站 知乎,哈尔滨房管局官网查询,wordpress网络报名系统hazelcast 提供了3中方法调用startCleanup:第一种是在ConcuurentMapManager的构造函数中#xff0c;通过调用node的executorManager中的ScheduledExecutorService来创建每秒运行一次cleanup操作的线程#xff08;代码例如以下#xff09;。因为这是ConcuurentMapManager构造…hazelcast 提供了3中方法调用startCleanup: 第一种是在ConcuurentMapManager的构造函数中通过调用node的executorManager中的ScheduledExecutorService来创建每秒运行一次cleanup操作的线程代码例如以下。因为这是ConcuurentMapManager构造函数的代码所以这样的调用startCleanup的操作是默认就会有的。node.executorManager.getScheduledExecutorService().scheduleAtFixedRate(new Runnable() {publicvoid run() {for (CMap cMap : maps.values()) {cMap.startCleanup(false);}}}, 1, 1, TimeUnit.SECONDS); 另外一种是通过配置文件来触发startCleanup的运行。配置 PutOperationhandlerif overcapacity policy。我们系统的配置文件没有配置这方面的policy全部这样的方式在我们系统中没有使用。 第三种是自己直接写代码去调用startCleanup函数public方法。线程安全的. 这个没有实如今我们的系统中。 所以我的调查方向放在了第一种调用的情况hazelcast里面的ScheduledExecutorService是通过java.util.ScheduledThreadPoolExecutor 来实现的. esScheduled new ScheduledThreadPoolExecutor(5, new ExecutorThreadFactory(node.threadGroup,node.getThreadPoolNamePrefix(scheduled), classLoader), new RejectionHandler()) {protected void beforeExecute(Thread t, Runnable r) {threadPoolBeforeExecute(t, r);}}查看ScheduledThreadPoolExecutor的实现它把线程实现分成了3个部分 runnable tasks可运行任务, workers to execute the tasks运行任务的详细线程 以及 ScheduledThreadPoolExecutor 调度workers依照要求运行runnable tasks。我们通过scheduleAtFixdRate提交了taskscheduleAtFixedRate先把它打包成反复运行的ScheduleFutureTask pre namecode classjava public ScheduledFuture? scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command null || unit null)throw new NullPointerException();if (period 0)throw new IllegalArgumentException();RunnableScheduledFuture? t decorateTask(command, new strongScheduledFutureTas/strongkObject(command, null, triggerTime(initialDelay, unit), unit.toNanos(period))); delayedExecute(t); return t; } ScheduleFutureTask的run方法实现又一次schedule public void run() {boolean periodic isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);else if (!periodic)ScheduledFutureTask.super.run();else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();strong reExecutePeriodic(outerTask);/strong}
}delayedExecute里面假设当前worker的数目小于初始化定义的CorePool的数目就创建新的worker线程然后把task放到queue里面 private void delayedExecute(Runnable command) {if (isShutdown()) {reject(command);return;}// Prestart a thread if necessary. We cannot prestart it// running the task because the task (probably) shouldnt be// run yet, so thread will just idle until delay elapses.if (getPoolSize() getCorePoolSize())prestartCoreThread();strong super.getQueue().add(command);/strong
}
public boolean prestartCoreThread() {return addIfUnderCorePoolSize(null);}private boolean addIfUnderCorePoolSize(Runnable firstTask) {Thread t null;final ReentrantLock mainLock this.mainLock;mainLock.lock();try {if (poolSize corePoolSize runState RUNNING)t addThread(firstTask);} finally {mainLock.unlock();}return t ! null;}
private Thread addThread(Runnable firstTask) {Worker w new Worker(firstTask);Thread t threadFactory.newThread(w);boolean workerStarted false;if (t ! null) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();w.thread t;workers.add(w);int nt poolSize;if (nt largestPoolSize)largestPoolSize nt;try {t.start();workerStarted true;}finally {if (!workerStarted)workers.remove(w);}}return t;
}全部启动的worker就做一件事情从queue中取task运行 try {hasRun true;Runnable task firstTask;firstTask null;while (task ! null || (task stronggetTask/strong()) ! null) {strongrunTask(task);/strongtask null;}} finally {workerDone(this);}}}Runnable getTask() {strong for (;;) {/strongtry {int state runState;if (state SHUTDOWN)return null;Runnable r;if (state SHUTDOWN) // Help drain queuer workQueue.poll();else if (poolSize corePoolSize || allowCoreThreadTimeOut)r workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);elsestrong r workQueue.take();/strongif (r ! null)return r;if (workerCanExit()) {if (runState SHUTDOWN) // Wake up othersinterruptIdleWorkers();return null;}// Else retry} catch (InterruptedException ie) {// On interruption, re-check runState}}
}
private void runTask(Runnable task) {final ReentrantLock runLock this.runLock;runLock.lock();try {if ((runState STOP ||(Thread.interrupted() runState STOP)) hasRun)thread.interrupt();boolean ran false;beforeExecute(thread, task);strong try {task.run();ran true;afterExecute(task, null);completedTasks;} catch (RuntimeException ex) {if (!ran)afterExecute(task, ex);throw ex;}/strong} finally {runLock.unlock();}}了解了java threadpool的工作原理之后。我们能够知道。startCleanup是代码pass给ScheduledThreadPoolExecutor的runnable task它不被运行可能的原因有 1. ScheduledThreadPoolExecutor初始化时候出错task全然没有提交成功。因为lastCleanup并非系统应用的启动时间已经过了几个月了所以。非常明显在系统初始化的时候esScheduled(ScheduledThreadPoolExecutor)还是正常工作的仅仅是突然在2月4号停止了工作所以这样的可能性能够排除。 2. Worker 没有正常工作。不在从ScheduledThreadPoolExecutor的queue里面取数据这个非常快就被我排除了 首先heap dump中有5个pending workers in esScheduled (0/2/3/5/9) 其次从thread dump中能够看出这五个线程都是在等着从queue里面取数据 ……strong at java/util/concurrent/locks/AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)[optimiz/stronged]at java/util/concurrent/DelayQueue.take(DelayQueue.java:164)[optimized]at java/util/concurrent/ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)[inlined]at java/util/concurrent/ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)[optimized]at java/util/concurrent/ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)[optimized]at java/util/concurrent/ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)at java/lang/Thread.run(Thread.java:662)at jrockit/vm/RNI.c2java(JJJJJ)V(Native Method)-- end of trace
hz._hzInstance_1_com.ericsson.ngin.session.ra.hazelcast.scheduled.thread-2 id51 idx0xd8 tid32639 prio5 alive, parked, native_blocked
hz._hzInstance_1_com.ericsson.ngin.session.ra.hazelcast.scheduled.thread-3 id52 idx0xdc tid32640 prio5 alive, parked, native_blocked
hz._hzInstance_1_com.ericsson.ngin.session.ra.hazelcast.scheduled.thread-4 id53 idx0xe0 tid32641 prio5 alive, parked, native_blocked
hz._hzInstance_1_com.ericsson.ngin.session.ra.hazelcast.scheduled.thread-5 id75590 idx0x3cc tid3308 prio5 alive, parked, native_blocked
所以worker不正常也被排除了。3. 我们提交给系统的runner task自己主动从queue里面消失了从memory dump中确实发现queue没有tasks了 而没有task的原因非常明显是由于当前task运行完之后没有又一次reschedule至于原因由于scheduledFutrueTask已经不存在无法从memory dump和thread dump中分析出结果成为了一个谜。。。。。。 public void run() {boolean periodic isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);else if (!periodic)ScheduledFutureTask.super.run();else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();strong reExecutePeriodic(outerTask);/strong}
} 转载于:https://www.cnblogs.com/liguangsunls/p/6898371.html