网站建设专题会议,网站的盈利方法,如何删除hao123主页,长沙岳麓区广告公司前面我们在数据库初始化时额外创建了一张任务表#xff0c;用来模拟处理任务#xff1a;
key模拟业务sendMail模拟用户注册后给用户发送邮件任务#xff0c;多线程异步任务处理analysisLog模拟每晚定时分析日志业务#xff0c;定时任务处理
异步任务
异步任务通过方法上…前面我们在数据库初始化时额外创建了一张任务表用来模拟处理任务
key模拟业务sendMail模拟用户注册后给用户发送邮件任务多线程异步任务处理analysisLog模拟每晚定时分析日志业务定时任务处理
异步任务
异步任务通过方法上的Async(taskExecutor)和启动类的EnableAsync注解实现Async中的参数指定了异步任务使用的的线程池。
首先实现AsyncUserService服务接口
接口
package com.example.demospringboot.service;public interface AsyncUserService {void sendMailTask();
}
对应实现
package com.example.demospringboot.service.impl;import com.example.demospringboot.dao.JobMapper;
import com.example.demospringboot.task.AsyncTasks;
import com.example.demospringboot.service.AsyncUserService;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;Service
public class AsyncUserServiceImpl implements AsyncUserService {Autowiredprivate AsyncTasks asyncTasks;AutowiredJobMapper jobMapper;Overridepublic void sendMailTask() {if (jobMapper.getSendmail() 0) {asyncTasks.doAsyncTask(doSendMailTask);jobMapper.setSendmail(0); // 发送结束标记}};
}
用到的task类如下
package com.example.demospringboot.task;import com.example.demospringboot.utils.ThreadUtils;import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;import java.util.Random;
import java.util.concurrent.CompletableFuture;Slf4j
Component
public class AsyncTasks {public static Random random new Random();// Async注解中的参数指向异步任务的线程池Async(taskExecutor)public CompletableFutureString doAsyncTask(String taskNo){log.info(start AsyncTask: {}, taskNo);long start System.currentTimeMillis();ThreadUtils.sleepUtil(random.nextInt(10000));long end System.currentTimeMillis();log.info(end:{}, task:{} ms, taskNo, end - start);return CompletableFuture.completedFuture(finished);}}
1异步任务通过方法上的Async(taskExecutor)和启动类的EnableAsync注解实现Async中的参数指定了异步任务使用的的线程池。调用异步方法时不会等待方法执行完调用即过被调用方法在自己的线程池中奔跑。 2多线程执行的返回值是Future类型或void。Future是非序列化的微服务架构中有可能传递失败。spring boot推荐使用的CompletableFuture来返回异步调用的结果。
用到的thread工具类如下
package com.example.demospringboot.utils;import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Repository;import java.util.concurrent.ThreadPoolExecutor;Repository
Slf4j
public class ThreadUtils {public static final int MAX_POOL_SIZE 2;public static final String SCHED_EXECUTOR_POOL_PREFIX sched-exe- MAX_POOL_SIZE -;public static final String ASYNC_EXECUTOR_POOL_PREFIX async-exe- MAX_POOL_SIZE -;public static final String ASYNC_TASK_POOL_PREFIX async-task- MAX_POOL_SIZE -;// 自定义AsyncTask线程池Beanpublic ThreadPoolTaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();executor.setCorePoolSize(MAX_POOL_SIZE);executor.setMaxPoolSize(MAX_POOL_SIZE);executor.setQueueCapacity(MAX_POOL_SIZE);executor.setKeepAliveSeconds(0);executor.setThreadNamePrefix(ASYNC_TASK_POOL_PREFIX);// 如果添加到线程池失败那么主线程会自己去执行该任务executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());return executor;}// 启动Executor的线程池public static ThreadPoolTaskExecutor getThreadPool(String threadNamePrefix) {ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();executor.setCorePoolSize(MAX_POOL_SIZE);executor.setMaxPoolSize(MAX_POOL_SIZE);executor.setQueueCapacity(MAX_POOL_SIZE);executor.setKeepAliveSeconds(0);executor.setThreadNamePrefix(threadNamePrefix);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}public static void sleepUtil(long millis) {try {Thread.sleep(millis);} catch (InterruptedException e) {log.error({}, e);}}
}
线程池用的是ThreadPoolTaskExecutor 。Executor 顾名思义是专门用来处理多线程相关的一个接口所有线程相关的类都实现了这个接口里面有一个execute()方法用来执行线程线程池主要提供一个线程队列队列中保存着所有等待状态的线程。避免了创建与销毁的额外开销提高了响应的速度。
ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理是spring core包中提供的而ThreadPoolExecutor是JDK中的JUC。
参数说明
corePoolSize核心线程数queueCapacity任务队列容量阻塞队列maxPoolSize最大线程数keepAliveTime线程空闲时间rejectedExecutionHandler任务拒绝处理器 异步任务会先占用核心线程核心线程满了其他任务进入队列等待在缓冲队列也满了之后才会申请超过核心线程数的线程来进行处理。当线程数已经达到maxPoolSize且队列已满线程池可以调用这四个策略处理 AbortPolicy策略默认策略如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。DiscardPolicy策略如果线程池队列满了会直接丢掉这个任务并且不会有任何异常。DiscardOldestPolicy策略如果队列满了会将最早进入队列的任务删掉腾出空间再尝试加入队列。CallerRunsPolicy策略如果添加到线程池失败那么主线程会自己去执行该任务不会等待线程池中的线程去执行。也可以自己实现RejectedExecutionHandler接口可自定义处理器
为了控制异步任务的并发不影响到应用的正常运作我们必须要对线程池做好相应的配置防止资源的过渡使用。需考虑好默认线程池的配置和多任务情况下的线程池隔离。
上述服务我们就用不同线程池的JobManager进行管理
package com.example.demospringboot.job;import com.example.demospringboot.service.AsyncUserService;
import com.example.demospringboot.utils.ThreadUtils;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;Slf4j
Component
public class AsyncJobManager {private static final ThreadPoolTaskExecutor ASYNC_EXECUTOR_POOL ThreadUtils.getThreadPool(ThreadUtils.ASYNC_EXECUTOR_POOL_PREFIX);Autowiredprivate AsyncUserService asyncUserService;public void startSyncExecutor() {ASYNC_EXECUTOR_POOL.execute(new AsyncExecutor(asyncUserService));}static class AsyncExecutor implements Runnable {private AsyncUserService asyncUserService;public AsyncExecutor(AsyncUserService asyncUserService) {this.asyncUserService asyncUserService;}Overridepublic void run() {while (true) {asyncUserService.sendMailTask();// sleep 1sThreadUtils.sleepUtil(1000L);}}}
}
定时任务
定时任务通过在主类家EnableScheduling注解在任务类加Scheduled 实现。
同样地先实现SchedUserService服务接口
接口
package com.example.demospringboot.service;public interface SchedUserService {void AnalysisLogTask();
}
对应实现
package com.example.demospringboot.service.impl;import com.example.demospringboot.ScheduledTasks;
import com.example.demospringboot.dao.JobMapper;
import com.example.demospringboot.service.SchedUserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;Slf4j
Service
public class SchedUserServiceImpl implements SchedUserService {Autowiredprivate ScheduledTasks scheduledTasks;AutowiredJobMapper jobMapper;Overridepublic void AnalysisLogTask() {if (jobMapper.getAnalysisLog() 0) {scheduledTasks.AnalysisLogTask();}};
}对应的task如下
package com.example.demospringboot;import com.example.demospringboot.utils.ThreadUtils;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;Slf4j
Component
AllArgsConstructor
public class ScheduledTasks {private static final SimpleDateFormat dateFormat new SimpleDateFormat(HH:mm:ss);/*** 秒 分 时 日 月 周几* 0 * * * * MON-FRI*/Scheduled(cron 10 0 0 * * ?)public void AnalysisLogTask() {log.info(AnalysisLogTask start dateFormat.format(new Date()));ThreadUtils.sleepUtil(10000);log.info(AnalysisLogTask end dateFormat.format(new Date()));}
}
我们来详细了解下cron表达式
10/2 * * * * ? 表示每2秒 执行任务
10 0/2 * * * ? 表示每2分钟 执行任务
10 0 2 1 * ? 表示在每月的1日的凌晨2点调整任务
20 15 10 ? * MON-FRI 表示周一到周五每天上午10:15执行作业
30 15 10 ? 6L 2002-2006 表示2002-2006年的每个月的最后一个星期五上午10:15执行作业
40 0 10,14,16 * * ? 每天上午10点下午2点4点
50 0/30 9-17 * * ? 朝九晚五工作时间内每半小时
60 0 12 ? * WED 表示每个星期三中午12点
70 0 12 * * ? 每天中午12点触发
80 15 10 ? * * 每天上午10:15触发
90 15 10 * * ? 每天上午10:15触发
100 15 10 * * ? 每天上午10:15触发
110 15 10 * * ? 2005 2005年的每天上午10:15触发
120 * 14 * * ? 在每天下午2点到下午2:59期间的每1分钟触发
130 0/5 14 * * ? 在每天下午2点到下午2:55期间的每5分钟触发
140 0/5 14,18 * * ? 在每天下午2点到2:55期间和下午6点到6:55期间的每5分钟触发
150 0-5 14 * * ? 在每天下午2点到下午2:05期间的每1分钟触发
160 10,44 14 ? 3 WED 每年三月的星期三的下午2:10和2:44触发
170 15 10 ? * MON-FRI 周一至周五的上午10:15触发
180 15 10 15 * ? 每月15日上午10:15触发
190 15 10 L * ? 每月最后一日的上午10:15触发
200 15 10 ? * 6L 每月的最后一个星期五上午10:15触发
210 15 10 ? * 6L 2002-2005 2002年至2005年的每月的最后一个星期五上午10:15触发
220 15 10 ? * 6#3 每月的第三个星期五上午10:15触发
SchedJobManager进行任务管理:
package com.example.demospringboot.job;import com.example.demospringboot.service.SchedUserService;
import com.example.demospringboot.utils.ThreadUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;Slf4j
Component
public class SchedJobManager {private static final ThreadPoolTaskExecutor SCHED_EXECUTOR_POOL ThreadUtils.getThreadPool(ThreadUtils.SCHED_EXECUTOR_POOL_PREFIX);Autowiredprivate SchedUserService schedUserService;public void startSchedExecutor() {SCHED_EXECUTOR_POOL.execute(new Executor(schedUserService));}static class Executor implements Runnable {private SchedUserService schedUserService;public Executor(SchedUserService schedUserService) {this.schedUserService schedUserService;}Overridepublic void run() {while (true) {schedUserService.AnalysisLogTask();// sleep 1sThreadUtils.sleepUtil(1000L);}}}
}
主启动类
主类如下同时开启任务
package com.example.demospringboot;import com.example.demospringboot.job.SchedJobManager;
import com.example.demospringboot.job.AsyncJobManager;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.jdbc.datasource.init.ScriptUtils;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.cache.annotation.EnableCaching;import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import com.spring4all.swagger.EnableSwagger2Doc;Slf4j
EnableCaching
EnableAsync
SpringBootApplication
EnableSwagger2Doc
// 需要指定扫描的类并在配置文件指定mybatis.mapper-locations为对应的xml路径
MapperScan(value {com.example.demospringboot.dao})
public class DemospringbootApplication implements CommandLineRunner {Autowiredprivate SchedJobManager schedJobManager;Autowiredprivate AsyncJobManager asyncJobManager;Autowiredprivate DataSource dataSource;public static void main(String[] args) {SpringApplication.run(DemospringbootApplication.class, args);}Overridepublic void run(String... strings) throws SQLException {initDatabase();schedJobManager.startSchedExecutor();asyncJobManager.startSyncExecutor();}private void initDatabase() throws SQLException {log.info( 自动初始化数据库开始 );Resource initData new ClassPathResource(schema.sql);Connection connection null;try {connection dataSource.getConnection();ScriptUtils.executeSqlScript(connection, initData);} catch (SQLException e) {throw new RuntimeException(e);} finally {if (connection ! null) {connection.close();}}log.info( 自动初始化数据库结束 );}
}
主启动类实现了CommandLineRunner 接口会直接执行run方法。 我们在其中调用了对应JobManager的startExecutor方法用线程池execute方法启动了对应线程类的run方法。