vpn网站模板,浙江耀华建设集团网站,抖音代运营服务方案,页面设计升级一、前言 最近做了电子发票的需求#xff0c;分省开票接口和发票下载接口都有一定的延迟。为了完成开票后自动将发票插入用户微信卡包#xff0c;目前的解决方案是利用线程池#xff0c;将开票后插入卡包的任务#xff08;轮询分省发票接口#xff0c;直到获取到发票相关信… 一、前言 最近做了电子发票的需求分省开票接口和发票下载接口都有一定的延迟。为了完成开票后自动将发票插入用户微信卡包目前的解决方案是利用线程池将开票后插入卡包的任务轮询分省发票接口直到获取到发票相关信息或者轮询次数用完如果获取到发票信息执行发票插入微信卡包结束任务放入线程池异步执行。仔细想一想这种实现方案存在一个问题线程池没有充分的利用。为什么没有充分的利用下面详细的分析。 二、异步线程池和异步任务包装 AsyncConfigurerSupport可以帮我们指定异步任务注有Async注解对应的线程池。 Configuration
public class MyAsyncConfigurer extends AsyncConfigurerSupport {private static Logger LOGGER LoggerFactory.getLogger(MyAsyncConfigurer.class);Overridepublic Executor getAsyncExecutor() {ThreadPoolTaskExecutor taskExecutor new ThreadPoolTaskExecutor();taskExecutor.setCorePoolSize(2);taskExecutor.setMaxPoolSize(4);taskExecutor.setQueueCapacity(10);taskExecutor.setRejectedExecutionHandler((runnable, executor) - LOGGER.error(异步线程池拒绝任务... runnable));taskExecutor.setThreadFactory(new MyAsyncThreadFactory());taskExecutor.initialize();return taskExecutor;}static class MyAsyncThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber new AtomicInteger(1);private final String namePrefix;MyAsyncThreadFactory() {SecurityManager s System.getSecurityManager();group (s ! null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix myasync-pool- poolNumber.getAndIncrement() -thread-;}Overridepublic Thread newThread(Runnable r) {Thread t new Thread(group, r,namePrefix threadNumber.getAndIncrement(),0);if (t.isDaemon())t.setDaemon(false);if (t.getPriority() ! Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}}
} 异步任务包装除了异步还加入了retry功能实现指定次数的接口轮询。 Component
public class AsyncWrapped {protected static Logger LOGGER LoggerFactory.getLogger(AsyncWrapped.class);Asyncpublic void asyncProcess(Runnable runnable, Callback callback, Retry retry) {try {if (retry null) {retry new Retry(1);}retry.execute(ctx - {runnable.run();return null;}, ctx - {if (callback ! null) {callback.call();}return null;});} catch (Exception e) {LOGGER.error(异步调用异常..., e);}}
} 业务代码大致逻辑如下。 asyncWrapped.asyncProcess(() - {//调用分省接口获取发票信息//如果发票信息异常抛出异常进入下次重试//否则插入用户微信卡包}, () - {//轮询次数用尽用户插入卡包失败}, new Retry(2, 1000)
); 这里说一下为什么线程池没有充分的利用。异步任务中包含轮询操作轮询有一定的时间间隔导致在这段时间间隔内线程一直处于被闲置的状态。所以为了能更好的利用线程池资源我们得想办法解决时间间隔的问题。假如有个延迟队列队列里放着我们的异步任务不包含重试机制然后延迟轮询的时间间隔一定时间之后将任务放入线程池中执行任务执行完毕之后根据是否需要再次执行决定是否再次放入到延迟队列去这样每个线程池中的线程都不会闲着达到了充分利用的目的。 三、定时任务线程池和实现轮询机制 EnableScheduling 帮助开启Scheduled注解解析。注册一个名字是ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME的定时任务线程池。 Configuration
EnableScheduling
Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class TaskConfiguration {Bean(name ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME)Role(BeanDefinition.ROLE_INFRASTRUCTURE)public ScheduledExecutorService scheduledAnnotationProcessor() {return Executors.newScheduledThreadPool(5, new DefaultThreadFactory());}private static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber new AtomicInteger(1);private final String namePrefix;DefaultThreadFactory() {SecurityManager s System.getSecurityManager();group (s ! null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix pool- poolNumber.getAndIncrement() -schedule-;}Overridepublic Thread newThread(Runnable r) {Thread t new Thread(group, r,namePrefix threadNumber.getAndIncrement(),0);if (t.isDaemon()) {t.setDaemon(false);}if (t.getPriority() ! Thread.NORM_PRIORITY) {t.setPriority(Thread.NORM_PRIORITY);}return t;}}
} 实现轮询任务实现接口SchedulingConfigurer获取ScheduledTaskRegistrar 并指定定时任务线程池。 Override
public void configureTasks(ScheduledTaskRegistrar registrar) {this.registrar registrar;this.registrar.setScheduler(this.applicationContext.getBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, ScheduledExecutorService.class));scheduledTaskRegistrarHelper new ScheduledTaskRegistrarHelper();
} scheduledFutures提交定时任务时返回结果集periodTasks 定时任务结果集。 private static final ConcurrentHashMapString, ScheduledFuture? scheduledFutures new ConcurrentHashMap();
private static final ConcurrentHashMapString, TimingTask periodTasks new ConcurrentHashMap(); 定时任务包装类包含任务的执行次数重试次数、重试间隔、具体任务、重试次数用尽之后的回调等以及自动结束定时任务、重试计数重置功能。 private static class TimingTask {//重试次数private Integer retry;//任务标识private String taskId;//重试间隔private Long period;//具体任务private ScheduledRunnable task;//结束回调private ScheduledCallback callback;//重试计数private AtomicInteger count new AtomicInteger(0);//父线程MDCprivate MapString, String curContext;public TimingTask(Integer retry, String taskId, Long period, ScheduledRunnable task, ScheduledCallback callback) {this.retry retry;this.taskId taskId;this.period period;this.task task;this.callback callback;this.curContext MDC.getCopyOfContextMap();}public Long getPeriod() {return period;}public void setPeriod(Long period) {this.period period;}public String getTaskId() {return taskId;}public void setTaskId(String taskId) {this.taskId taskId;}public Integer getRetry() {return retry;}public void setRetry(Integer retry) {this.retry retry;}public AtomicInteger getCount() {return count;}public boolean reset() {for (int cnt this.count.intValue(); cnt this.retry; cnt this.count.intValue()) {if (this.count.compareAndSet(cnt, 0)) {return true;}}return false;}public void process() {MapString, String preContext MDC.getCopyOfContextMap();try {if (this.curContext null) {MDC.clear();} else {// 将父线程的MDC内容传给子线程MDC.setContextMap(this.curContext);}this.task.run();exitTask(false);} catch (Exception e) {LOGGER.error(定时任务异常... this, e);if (count.incrementAndGet() this.retry) {exitTask(true);}} finally {if (preContext null) {MDC.clear();} else {MDC.setContextMap(preContext);}}}//定时任务退出private void exitTask(boolean execCallback) {scheduledFutures.get(this.taskId).cancel(false);scheduledFutures.remove(this.getTaskId());periodTasks.remove(this.getTaskId());LOGGER.info(结束定时任务: this);if (execCallback callback ! null) {callback.call();}}Overridepublic String toString() {return ReflectionToStringBuilder.toString(this, ToStringStyle.JSON_STYLE, false, false, TimingTask.class);}
} 注意上面定时任务是如何退出的是在某一次任务执行成功之后没有异常抛出或者定时任务执行次数用尽才退出的。直接调用ScheduledFuture的cancel方法可以退出定时任务。还有就是定时任务中的日志需要父线程中的日志变量所以需要对MDC进行一下处理。 Scope(prototype)
Bean
public AspectTimingTask aspectTimingTask() {return new AspectTimingTask();
}Aspect
Component
public static class ScheduledAspect {Around(target(AspectTimingTask))public Object executeScheduledWrapped(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {if (proceedingJoinPoint instanceof MethodInvocationProceedingJoinPoint) {MethodInvocationProceedingJoinPoint methodJoinPoint (MethodInvocationProceedingJoinPoint) proceedingJoinPoint;Method method ((MethodSignature) methodJoinPoint.getSignature()).getMethod();if (AnnotatedElementUtils.isAnnotated(method, ScheduledTask.class)) {LOGGER.info(电子发票定时任务日志同步...);//其他处理}}return proceedingJoinPoint.proceed();}
}public static class AspectTimingTask implements Runnable {private TimingTask timingTask;OverrideScheduledTaskpublic void run() {timingTask.process();}public void setTimingTask(TimingTask timingTask) {this.timingTask timingTask;}
} AspectTimingTask 是对TimingTask 的包装类实现了Runnable接口。主要是为了对run接口做一层切面获取ProceedingJoinPoint 实例公司中的日志调用链系统需要这个参数。AspectTimingTask 的bean实例的scope是prototype这个注意下。 public static void register(Integer retry, Long period, String taskId, ScheduledRunnable task, ScheduledCallback callback) {scheduledTaskRegistrarHelper.register(retry, taskId, period, task, callback);
}private class ScheduledTaskRegistrarHelper {public void register(Integer retry, String taskId, Long period, ScheduledRunnable task, ScheduledCallback callback) {//是否可以重置定时任务TimingTask preTask periodTasks.get(taskId);if (null ! preTask preTask.reset() existTask(taskId)) {return;}TimingTask curTask new TimingTask(retry, taskId, period, task, callback);AspectTimingTask aspectTimingTask applicationContext.getBean(AspectTimingTask.class);aspectTimingTask.setTimingTask(curTask);ScheduledFuture? scheduledFuture registrar.getScheduler().scheduleAtFixedRate(aspectTimingTask, period);scheduledFutures.put(taskId, scheduledFuture);periodTasks.put(taskId, curTask);LOGGER.info(注册定时任务: curTask);}private boolean existTask(String taskId) {return scheduledFutures.containsKey(taskId) periodTasks.containsKey(taskId);}
} 如果taskId的定时任务已经存在则重置定时任务否则注册新的定时任务。AspectTimingTask 实例通过ApplicationContext获取每次获取都是一个新的实例。 由 异步轮询任务 优化成 定时任务充分利用了线程池。修改之后的业务代码如下。 ScheduledTaskRegistrarHelper.register(10, 5*1000L, taskId, () - {//调用分省接口获取发票信息//如果发票信息异常抛出异常进入下次重试//否则插入用户微信卡包}() - {//轮询次数用尽用户插入卡包失败}
); 针对电子发票插入微信卡包定时任务重试执行次数10次每隔5秒执行一次。任务完成之后结束定时任务执行次数用尽之后触发插入卡包失败动作。 四、参考 Spring异步调用原理及SpringAop拦截器链原理 Springboot定时任务原理及如何动态创建定时任务 转载于:https://www.cnblogs.com/hujunzheng/p/10660479.html