Spring Batch批处理框架
约 1382 字大约 5 分钟
spring-batchbatch
2025-04-05
概述
Spring Batch 是一个轻量级的批处理框架,专为企业级批量数据处理场景设计。它提供了可重用的组件来处理大量数据的读取、转换和写入,支持事务管理、任务重启、跳过和重试等企业级特性。适用于 ETL、报表生成、数据迁移等场景。
核心架构
| 组件 | 说明 |
|---|---|
| Job | 一个批处理任务,由多个 Step 组成 |
| Step | Job 中的一个独立执行步骤 |
| ItemReader | 从数据源读取数据 |
| ItemProcessor | 对读取的数据进行转换/过滤 |
| ItemWriter | 将处理后的数据写入目标 |
| Tasklet | 简单的单步任务(非 Chunk 模式) |
| JobRepository | 存储 Job 执行的元数据(状态、参数等) |
| JobLauncher | 启动 Job 的入口 |
Chunk 处理模型
基本示例:CSV 导入数据库
定义实体和处理器
// 数据模型
public class Employee {
private Long id;
private String name;
private String email;
private String department;
private BigDecimal salary;
// constructors, getters, setters...
}
// ItemProcessor:数据转换和过滤
public class EmployeeProcessor implements ItemProcessor<Employee, Employee> {
@Override
public Employee process(Employee employee) throws Exception {
// 返回null表示过滤掉该条记录
if (employee.getSalary() == null || employee.getSalary().compareTo(BigDecimal.ZERO) <= 0) {
return null;
}
// 数据标准化
employee.setName(employee.getName().trim());
employee.setEmail(employee.getEmail().toLowerCase().trim());
employee.setDepartment(employee.getDepartment().toUpperCase());
return employee;
}
}配置 Job
@Configuration
public class EmployeeImportJobConfig {
@Autowired
private JobRepository jobRepository;
@Autowired
private PlatformTransactionManager transactionManager;
@Autowired
private DataSource dataSource;
@Bean
public Job employeeImportJob(Step importStep, Step reportStep) {
return new JobBuilder("employeeImportJob", jobRepository)
.incrementer(new RunIdIncrementer())
.validator(new DefaultJobParametersValidator(
new String[]{"inputFile"}, // 必需参数
new String[]{"date"} // 可选参数
))
.start(importStep)
.next(reportStep)
.listener(new JobCompletionListener())
.build();
}
@Bean
public Step importStep(ItemReader<Employee> reader,
ItemProcessor<Employee, Employee> processor,
ItemWriter<Employee> writer) {
return new StepBuilder("importStep", jobRepository)
.<Employee, Employee>chunk(100, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.skipLimit(10)
.skip(FlatFileParseException.class)
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.listener(new StepExecutionListener() {
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
long readCount = stepExecution.getReadCount();
long writeCount = stepExecution.getWriteCount();
long skipCount = stepExecution.getSkipCount();
System.out.printf("Read: %d, Written: %d, Skipped: %d%n",
readCount, writeCount, skipCount);
return ExitStatus.COMPLETED;
}
})
.build();
}
// ItemReader:读取CSV
@Bean
@StepScope
public FlatFileItemReader<Employee> reader(
@Value("#{jobParameters['inputFile']}") String inputFile) {
return new FlatFileItemReaderBuilder<Employee>()
.name("employeeReader")
.resource(new FileSystemResource(inputFile))
.delimited()
.names("name", "email", "department", "salary")
.fieldSetMapper(fieldSet -> {
Employee emp = new Employee();
emp.setName(fieldSet.readString("name"));
emp.setEmail(fieldSet.readString("email"));
emp.setDepartment(fieldSet.readString("department"));
emp.setSalary(fieldSet.readBigDecimal("salary"));
return emp;
})
.linesToSkip(1) // 跳过标题行
.build();
}
// ItemProcessor
@Bean
public EmployeeProcessor processor() {
return new EmployeeProcessor();
}
// ItemWriter:写入数据库
@Bean
public JdbcBatchItemWriter<Employee> writer() {
return new JdbcBatchItemWriterBuilder<Employee>()
.dataSource(dataSource)
.sql("INSERT INTO employees (name, email, department, salary) " +
"VALUES (:name, :email, :department, :salary)")
.beanMapped()
.build();
}
}Tasklet 模式
适用于简单的单步操作(如清理临时文件、发送汇总邮件):
@Bean
public Step reportStep() {
return new StepBuilder("reportStep", jobRepository)
.tasklet((contribution, chunkContext) -> {
StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
JobExecution jobExecution = stepExecution.getJobExecution();
long totalRead = jobExecution.getStepExecutions().stream()
.mapToLong(StepExecution::getReadCount)
.sum();
System.out.println("Import completed. Total records: " + totalRead);
// 可以在这里发送邮件通知等
return RepeatStatus.FINISHED;
}, transactionManager)
.build();
}Job 流程控制
@Bean
public Job complexJob(Step importStep, Step validateStep,
Step transformStep, Step errorLogStep,
Step alertStep, Step reportStep) {
return new JobBuilder("complexJob", jobRepository)
.start(importStep)
.on("COMPLETED").to(validateStep)
.from(importStep).on("FAILED").to(alertStep)
.from(validateStep).on("COMPLETED").to(transformStep)
.from(validateStep).on("FAILED").to(errorLogStep)
.from(errorLogStep).on("*").to(alertStep)
.from(transformStep).on("*").to(reportStep)
.end()
.build();
}重启与跳过
配置跳过策略
@Bean
public Step faultTolerantStep() {
return new StepBuilder("faultTolerantStep", jobRepository)
.<Employee, Employee>chunk(100, transactionManager)
.reader(reader())
.processor(processor())
.writer(writer())
.faultTolerant()
// 跳过策略
.skipLimit(50)
.skip(ValidationException.class)
.skip(FlatFileParseException.class)
.noSkip(DatabaseException.class)
// 重试策略
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.retry(OptimisticLockingFailureException.class)
// 跳过监听器
.listener(new SkipListener<Employee, Employee>() {
@Override
public void onSkipInRead(Throwable t) {
log.warn("Skipped during read: {}", t.getMessage());
}
@Override
public void onSkipInProcess(Employee item, Throwable t) {
log.warn("Skipped during process: {}, error: {}",
item.getEmail(), t.getMessage());
}
@Override
public void onSkipInWrite(Employee item, Throwable t) {
log.warn("Skipped during write: {}", item.getEmail());
}
})
.build();
}分区并行处理(Partitioning)
@Bean
public Step partitionedStep(Step workerStep) {
return new StepBuilder("partitionedStep", jobRepository)
.partitioner("workerStep", rangePartitioner())
.step(workerStep)
.gridSize(4) // 4个分区
.taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}
@Bean
public Partitioner rangePartitioner() {
return gridSize -> {
Map<String, ExecutionContext> partitions = new HashMap<>();
long totalRecords = getTotalRecordCount();
long chunkSize = totalRecords / gridSize;
for (int i = 0; i < gridSize; i++) {
ExecutionContext ctx = new ExecutionContext();
ctx.putLong("minId", i * chunkSize + 1);
ctx.putLong("maxId", (i + 1) * chunkSize);
partitions.put("partition" + i, ctx);
}
return partitions;
};
}
@Bean
@StepScope
public JdbcPagingItemReader<Employee> partitionedReader(
@Value("#{stepExecutionContext['minId']}") Long minId,
@Value("#{stepExecutionContext['maxId']}") Long maxId) {
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);
return new JdbcPagingItemReaderBuilder<Employee>()
.dataSource(dataSource)
.name("partitionedReader")
.selectClause("SELECT *")
.fromClause("FROM employees")
.whereClause("WHERE id >= :minId AND id <= :maxId")
.sortKeys(sortKeys)
.parameterValues(Map.of("minId", minId, "maxId", maxId))
.pageSize(100)
.rowMapper(new BeanPropertyRowMapper<>(Employee.class))
.build();
}调度执行
// 与 @Scheduled 集成
@Component
public class JobScheduler {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job employeeImportJob;
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点执行
public void runDailyImport() throws Exception {
JobParameters params = new JobParametersBuilder()
.addString("inputFile", "/data/employees_" +
LocalDate.now().format(DateTimeFormatter.ISO_DATE) + ".csv")
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobLauncher.run(employeeImportJob, params);
log.info("Job finished with status: {}", execution.getStatus());
}
}总结
Spring Batch 提供了完整的批处理框架:Job/Step/Chunk 模型定义任务结构,ItemReader/Processor/Writer 组件化数据处理,JobRepository 管理执行状态支持重启。通过 skip/retry 策略实现容错处理,Partitioning 实现并行处理提升吞吐量。适合数据导入导出、ETL、报表生成、定时对账等企业级批处理场景。
贡献者
更新日志
2026/3/14 13:09
查看所有更新日志
9f6c2-feat: organize wiki content and refresh site setup于