我的网站百度怎么搜索不到,河南住房和建设厅网站,wordpress手机页面,红酒商城网站建设SpringBatch主要是一个轻量级的大数据量的并行处理(批处理)的框架。 作用和Hadoop很相似#xff0c;不过Hadoop是基于重量级的分布式环境(处理巨量数据)#xff0c;而SpringBatch是基于轻量的应用框架(处理中小数据)。 这里使用SpringBatch做了一个能跑的最简单例子#xff…SpringBatch主要是一个轻量级的大数据量的并行处理(批处理)的框架。 作用和Hadoop很相似不过Hadoop是基于重量级的分布式环境(处理巨量数据)而SpringBatch是基于轻量的应用框架(处理中小数据)。 这里使用SpringBatch做了一个能跑的最简单例子进行描述SpringBatch的基本作用。 如果需要进行深入学习请详细参考阅读 https://docs.spring.io/spring-batch/4.0.x/reference/html/index.html 英文不好的同学请和我一样右键(翻译成中文查看)。 简单的技术栈 : SpringBoot SpringBatch JPA 完整demo的项目地址 : https://github.com/EalenXie/springboot-batch 1 . 新建项目springboot-batch基本的pom.xml依赖 : ?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdname.ealen/groupIdartifactIdspringboot-batch/artifactIdversion1.0/versionparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.0.1.RELEASE/version/parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-batch/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-data-jpa/artifactId/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdscoperuntime/scope/dependency/dependencies
/project 2 . 你需要在数据库中建立springbatch的相关元数据表所以你需要在数据库中执行如下来自官方元数据模式的脚本。 -- do not edit this file
-- BATCH JOB 实例表 包含与aJobInstance相关的所有信息
-- JOB ID由batch_job_seq分配
-- JOB 名称,与spring配置一致
-- JOB KEY 对job参数的MD5编码,正因为有这个字段的存在同一个job如果第一次运行成功第二次再运行会抛出JobInstanceAlreadyCompleteException异常。
CREATE TABLE BATCH_JOB_INSTANCE (JOB_INSTANCE_ID BIGINT NOT NULL PRIMARY KEY ,VERSION BIGINT ,JOB_NAME VARCHAR(100) NOT NULL,JOB_KEY VARCHAR(32) NOT NULL,constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY)
) ENGINEInnoDB;
-- 该BATCH_JOB_EXECUTION表包含与该JobExecution对象相关的所有信息
CREATE TABLE BATCH_JOB_EXECUTION (JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,VERSION BIGINT ,JOB_INSTANCE_ID BIGINT NOT NULL,CREATE_TIME DATETIME NOT NULL,START_TIME DATETIME DEFAULT NULL ,END_TIME DATETIME DEFAULT NULL ,STATUS VARCHAR(10) ,EXIT_CODE VARCHAR(2500) ,EXIT_MESSAGE VARCHAR(2500) ,LAST_UPDATED DATETIME,JOB_CONFIGURATION_LOCATION VARCHAR(2500) NULL,constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID)references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID)
) ENGINEInnoDB;
-- 该表包含与该JobParameters对象相关的所有信息
CREATE TABLE BATCH_JOB_EXECUTION_PARAMS (JOB_EXECUTION_ID BIGINT NOT NULL ,TYPE_CD VARCHAR(6) NOT NULL ,KEY_NAME VARCHAR(100) NOT NULL ,STRING_VAL VARCHAR(250) ,DATE_VAL DATETIME DEFAULT NULL ,LONG_VAL BIGINT ,DOUBLE_VAL DOUBLE PRECISION ,IDENTIFYING CHAR(1) NOT NULL ,constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINEInnoDB;
-- 该表包含与该StepExecution 对象相关的所有信息
CREATE TABLE BATCH_STEP_EXECUTION (STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY ,VERSION BIGINT NOT NULL,STEP_NAME VARCHAR(100) NOT NULL,JOB_EXECUTION_ID BIGINT NOT NULL,START_TIME DATETIME NOT NULL ,END_TIME DATETIME DEFAULT NULL ,STATUS VARCHAR(10) ,COMMIT_COUNT BIGINT ,READ_COUNT BIGINT ,FILTER_COUNT BIGINT ,WRITE_COUNT BIGINT ,READ_SKIP_COUNT BIGINT ,WRITE_SKIP_COUNT BIGINT ,PROCESS_SKIP_COUNT BIGINT ,ROLLBACK_COUNT BIGINT ,EXIT_CODE VARCHAR(2500) ,EXIT_MESSAGE VARCHAR(2500) ,LAST_UPDATED DATETIME,constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINEInnoDB;
-- 该BATCH_STEP_EXECUTION_CONTEXT表包含ExecutionContext与Step相关的所有信息
CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT (STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,SHORT_CONTEXT VARCHAR(2500) NOT NULL,SERIALIZED_CONTEXT TEXT ,constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID)references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID)
) ENGINEInnoDB;
-- 该表包含ExecutionContext与Job相关的所有信息
CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT (JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY,SHORT_CONTEXT VARCHAR(2500) NOT NULL,SERIALIZED_CONTEXT TEXT ,constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID)references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID)
) ENGINEInnoDB;
CREATE TABLE BATCH_STEP_EXECUTION_SEQ (ID BIGINT NOT NULL,UNIQUE_KEY CHAR(1) NOT NULL,constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINEInnoDB;
INSERT INTO BATCH_STEP_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, 0 as UNIQUE_KEY) as tmp where not exists(select * from BATCH_STEP_EXECUTION_SEQ);
CREATE TABLE BATCH_JOB_EXECUTION_SEQ (ID BIGINT NOT NULL,UNIQUE_KEY CHAR(1) NOT NULL,constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINEInnoDB;
INSERT INTO BATCH_JOB_EXECUTION_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, 0 as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_EXECUTION_SEQ);
CREATE TABLE BATCH_JOB_SEQ (ID BIGINT NOT NULL,UNIQUE_KEY CHAR(1) NOT NULL,constraint UNIQUE_KEY_UN unique (UNIQUE_KEY)
) ENGINEInnoDB;
INSERT INTO BATCH_JOB_SEQ (ID, UNIQUE_KEY) select * from (select 0 as ID, 0 as UNIQUE_KEY) as tmp where not exists(select * from BATCH_JOB_SEQ); 3 . 测试数据的实体类 : Access.java package name.ealen.model;import javax.persistence.*;
/*** Created by EalenXie on 2018/9/10 16:17.*/
Entity
Table
public class Access {IdGeneratedValue(strategy GenerationType.AUTO)private Integer id;private String username;private String shopName;private String categoryName;private String brandName;private String shopId;private String omit;private String updateTime;private boolean deleteStatus;private String createTime;private String description;public Integer getId() {return id;}public void setId(Integer id) {this.id id;}public String getUsername() {return username;}public void setUsername(String username) {this.username username;}public String getShopName() {return shopName;}public void setShopName(String shopName) {this.shopName shopName;}public String getCategoryName() {return categoryName;}public void setCategoryName(String categoryName) {this.categoryName categoryName;}public String getBrandName() {return brandName;}public void setBrandName(String brandName) {this.brandName brandName;}public String getShopId() {return shopId;}public void setShopId(String shopId) {this.shopId shopId;}public String getOmit() {return omit;}public void setOmit(String omit) {this.omit omit;}public String getUpdateTime() {return updateTime;}public void setUpdateTime(String updateTime) {this.updateTime updateTime;}public boolean isDeleteStatus() {return deleteStatus;}public void setDeleteStatus(boolean deleteStatus) {this.deleteStatus deleteStatus;}public String getCreateTime() {return createTime;}public void setCreateTime(String createTime) {this.createTime createTime;}public String getDescription() {return description;}public void setDescription(String description) {this.description description;}Overridepublic String toString() {return Access{ id id , username username \ , shopName shopName \ , categoryName categoryName \ , brandName brandName \ , shopId shopId \ , omit omit \ , updateTime updateTime \ , deleteStatus deleteStatus , createTime createTime \ , description description \ };}
} 4 . 配置一个最简单的Job 之前准备一些基本配置例如为Job添加一个监听器 : 配置TaskExecutorExecutorConfiguration.java package name.ealen.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;/*** 配置TaskExecutor*/
Configuration
public class ExecutorConfiguration {Beanpublic ThreadPoolTaskExecutor threadPoolTaskExecutor() {ThreadPoolTaskExecutor threadPoolTaskExecutor new ThreadPoolTaskExecutor();threadPoolTaskExecutor.setCorePoolSize(50);threadPoolTaskExecutor.setMaxPoolSize(200);threadPoolTaskExecutor.setQueueCapacity(1000);threadPoolTaskExecutor.setThreadNamePrefix(Data-Job);return threadPoolTaskExecutor;}
} 为Job准备一个简单的监听器 实现JobExecutionListener即可 : package name.ealen.listener;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** Created by EalenXie on 2018/9/10 15:09.* 一个简单的JOB listener*/
Component
public class JobListener implements JobExecutionListener {private static final Logger log LoggerFactory.getLogger(JobListener.class);Resourceprivate ThreadPoolTaskExecutor threadPoolTaskExecutor;private long startTime;Overridepublic void beforeJob(JobExecution jobExecution) {startTime System.currentTimeMillis();log.info(job before jobExecution.getJobParameters());}Overridepublic void afterJob(JobExecution jobExecution) {log.info(JOB STATUS : {}, jobExecution.getStatus());if (jobExecution.getStatus() BatchStatus.COMPLETED) {log.info(JOB FINISHED);threadPoolTaskExecutor.destroy();} else if (jobExecution.getStatus() BatchStatus.FAILED) {log.info(JOB FAILED);}log.info(Job Cost Time : {}ms , (System.currentTimeMillis() - startTime));}
} 5 . 配置一个最基本的Job : 一个Job 通常由一个或多个Step组成(基本就像是一个工作流)一个Step通常由三部分组成(读入数据 ItemReader处理数据 ItemProcessor写入数据 ItemWriter) package name.ealen.batch;import name.ealen.listener.JobListener;
import name.ealen.model.Access;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.orm.JpaNativeQueryProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import javax.annotation.Resource;
import javax.persistence.EntityManagerFactory;/*** Created by EalenXie on 2018/9/10 14:50.* :EnableBatchProcessing提供用于构建批处理作业的基本配置*/
Configuration
EnableBatchProcessing
public class DataBatchConfiguration {private static final Logger log LoggerFactory.getLogger(DataBatchConfiguration.class);Resourceprivate JobBuilderFactory jobBuilderFactory; //用于构建JOB
Resourceprivate StepBuilderFactory stepBuilderFactory; //用于构建Step
Resourceprivate EntityManagerFactory emf; //注入实例化Factory 访问数据
Resourceprivate JobListener jobListener; //简单的JOB listener/*** 一个简单基础的Job通常由一个或者多个Step组成*/Beanpublic Job dataHandleJob() {return jobBuilderFactory.get(dataHandleJob).incrementer(new RunIdIncrementer()).start(handleDataStep()). //start是JOB执行的第一个step
// next(xxxStep()).
// next(xxxStep()).
// ...listener(jobListener). //设置了一个简单JobListenerbuild();}/*** 一个简单基础的Step主要分为三个部分* ItemReader : 用于读取数据* ItemProcessor : 用于处理数据* ItemWriter : 用于写数据*/Beanpublic Step handleDataStep() {return stepBuilderFactory.get(getData).Access, Accesschunk(100). // 输入,输出 。chunk通俗的讲类似于SQL的commit; 这里表示处理(processor)100条后写入(writer)一次。faultTolerant().retryLimit(3).retry(Exception.class).skipLimit(100).skip(Exception.class). //捕捉到异常就重试,重试100次还是异常,JOB就停止并标志失败reader(getDataReader()). //指定ItemReaderprocessor(getDataProcessor()). //指定ItemProcessorwriter(getDataWriter()). //指定ItemWriterbuild();}Beanpublic ItemReader? extends Access getDataReader() {//读取数据,这里可以用JPA,JDBC,JMS 等方式 读入数据JpaPagingItemReaderAccess reader new JpaPagingItemReader();//这里选择JPA方式读数据 一个简单的 native SQLString sqlQuery SELECT * FROM access;try {JpaNativeQueryProviderAccess queryProvider new JpaNativeQueryProvider();queryProvider.setSqlQuery(sqlQuery);queryProvider.setEntityClass(Access.class);queryProvider.afterPropertiesSet();reader.setEntityManagerFactory(emf);reader.setPageSize(3);reader.setQueryProvider(queryProvider);reader.afterPropertiesSet();//所有ItemReader和ItemWriter实现都会在ExecutionContext提交之前将其当前状态存储在其中,如果不希望这样做,可以设置setSaveState(false)reader.setSaveState(true);} catch (Exception e) {e.printStackTrace();}return reader;}Beanpublic ItemProcessorAccess, Access getDataProcessor() {return new ItemProcessorAccess, Access() {Overridepublic Access process(Access access) throws Exception {log.info(processor data : access.toString()); //模拟 假装处理数据,这里处理就是打印一下return access;}};
// lambda也可以写为:
// return access - {
// log.info(processor data : access.toString());
// return access;
// };}Beanpublic ItemWriterAccess getDataWriter() {return list - {for (Access access : list) {log.info(write data : access); //模拟 假装写数据 ,这里写真正写入数据的逻辑}};}
} 6 . 配置好基本的Job之后为Access表导入一些基本的数据(git上面有demo数据access.sql)写一个SpringBoot的启动类进行测试。 注意 : Job中的各个组件请使用Bean注解声明这样在元数据中才会有相应的正常操作记录 : package name.ealen;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;/*** Created by EalenXie on 2018/9/10 14:41.*/
SpringBootApplication
public class SpringBatchApplication {public static void main(String[] args) {SpringApplication.run(SpringBatchApplication.class, args);}
} 7 . 运行可以看到基本数据处理效果这里是模拟处理和模拟写入 : 8 . 从元数据等表中查看验证JOB的执行情况 : 这里提一下之前写过一篇SpringBootQuartz的整合 大家应该想到些什么了吧。SpringBatch像是一个天然的JobQuartz是完全可以做为它运作的调度器。两者结合效果很不错。 感谢各位提出意见和支持。 转载于:https://www.cnblogs.com/ealenxie/p/9647703.html