티스토리 뷰
Spring Batch
향로님 Spring Batch 가이드 보고 따라하기
배치 기능 활성화
보통 실시간 처리가 어려운 대용량 데이터나 대규모 데이터일 경우 배치를 활용
- 배치기능 활성화를 위해 프로젝트 Application 클래스에 @EnableBatchProcessing 를 추가해주자.
@EnableBatchProcessing
@SpringBootApplication
public class SpringBatchTestApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchTestApplication.class, args);
}
}
Job 생성하기
- Spring Batch의 모든 Job은 @Configuration으로 등록해서 사용
spring.batch.job.names: ${job.name:NONE}
: Program arguments로 job.name 값이 넘어오면 해당 값과 일치하는 Job만 실행--job.name=stepNextJob
- 실제 운영 환경에서는 아래와 같이 배치를 실행
java -jar batch-application.jar --job.name=simpleJob
DB 연동
schema~.sql 파일을 활용하여 spring batch에 필요한 테이블 생성
spring:
datasource:
hikari:
jdbc-url: jdbc:h2:tcp://localhost/~/spring_batch;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
username: sa
password:
driver-class-name: org.h2.Driver
BATCH_JOB_EXECUTION
BATCH_JOB_EXECUTION_CONTEXT
BATCH_JOB_EXECUTION_PARAMS
BATCH_JOB_INSTANCE
BATCH_STEP_EXECUTION
BATCH_STEP_EXECUTION_CONTEXT
BATCH_JOB_INSTANCE
Job Parameter에 따라 생성되는 테이블
- Job Parameter
- Spring Batch가 실행될 때 외부에서 받을 수 있는 파라미터
- Job Parameter를 사용할 경우 Program arguments에 requestDate=20220805 추가
- Job Parameter가 같을 경우 테이블에 기록되지 않음 -> 동일한 Job Parameter는 여러개 존재할 수 없음
- Job Parameter를 사용하기 위해선 Spring Batch 전용 Scope인
@StepScope
와@JobScope
선언으로 Bean을 생성해야 함
JOB_INSTANCE_ID | VERSION | JOB_NAME | JOB_KEY |
---|---|---|---|
1 | 0 | simpleJob | d41d8cd98f00b204e9800998ecf8427e |
2 | 0 | simpleJob | cf8d12b1bf88779ebd68926856b6bbfc |
BATCH_JOB_EXECUTION
JOB_EXECUTION와 JOB_INSTANCE는 부모-자식 관계
- 자신의 부모 JOB_INSTACNE가 성공/실패했던 모든 내역 저장
- 동일한 Job Parameter로 성공한 기록이 있을 때만 재수행 불가
JOB_EXECUTION_ID | VERSION | JOB_INSTANCE_ID | CREATE_TIME | START_TIME | END_TIME | STATUS | EXIT_CODE | EXIT_MESSAGE | LAST_UPDATED | JOB_CONFIGURATION_LOCATION |
---|---|---|---|---|---|---|---|---|---|---|
1 | 2 | 1 | 2022-08-05 20:59:22.693000 | 2022-08-05 20:59:22.726000 | 2022-08-05 20:59:22.795000 | COMPLETED | COMPLETED | 2022-08-05 20:59:22.795000 | NULL | |
2 | 2 | 2 | 2022-08-05 21:04:21.394000 | 2022-08-05 21:04:21.451000 | 2022-08-05 21:04:21.539000 | COMPLETED | COMPLETED | 2022-08-05 21:04:21.539000 | NULL | |
3 | 2 | 3 | 2022-08-05 21:09:25.507000 | 2022-08-05 21:09:25.548000 | 2022-08-05 21:09:25.658000 | FAILED | FAILED | java.lang.IllegalArgumentException: step1에서 실패합니다. at com.example.springbatchtest.job.SimpleJobConfiguration.lambda$simpleStep1$0(SimpleJobConfiguration.java:35) at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:137) at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) at com.sun.proxy.$Proxy71.execute(Unknown Source) at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:152) at org.springframewo |
2022-08-05 21:09:25.658000 | NULL |
4 | 2 | 3 | 2022-08-05 21:11:00.480000 | 2022-08-05 21:11:00.521000 | 2022-08-05 21:11:00.700000 | COMPLETED | COMPLETED | 2022-08-05 21:11:00.701000 | NULL |
BATCH_JOB_EXECUTION_PARAM
BATCH_JOB_EXECUTION 테이블이 생성될 당시에 입력 받은 Job Parameter를 저장
JOB_EXECUTION_ID | TYPE_CD | KEY_NAME | STRING_VAL | DATE_VAL | LONG_VAL | DOUBLE_VAL | IDENTIFYING |
---|---|---|---|---|---|---|---|
2 | STRING | requestDate | 20220805 | 1970-01-01 09:00:00.000000 | 0 | 0 | Y |
3 | STRING | requestDate | 20220806 | 1970-01-01 09:00:00.000000 | 0 | 0 | Y |
4 | STRING | requestDate | 20220806 | 1970-01-01 09:00:00.000000 | 0 | 0 | Y |
Job 구성
Step
실제 Batch 작업을 수행하는 역할
- Batch로 실제 처리하고자 하는 기능과 설정을 모두 포함
@Slf4j //=> log 사용을 위한 lombok 어노테이션
@RequiredArgsConstructor //=> 생성자 DI를 위한 lombok 어노테이션
@Configuration //=> Spring Batch의 모든 Job은 @Configuration을 등록해서 사용
public class SimpleJobConfiguration {
private final JobBuilderFactory jobBuilderFactory; // 생성자 DI 받음
private final StepBuilderFactory stepBuilderFactory; // 생성자 DI 받음
@Bean
public Job simpleJob() {
return jobBuilderFactory.get("simpleJob") //=> simpleJob 이란 이름의 Batch Job 생성
.start(simpleStep1(null))
.next(simpleStep2(null))
.build();
}
@Bean
@JobScope
public Step simpleStep1(@Value("#{jobParameters[requestDate]}") String requestDate) {
return stepBuilderFactory.get("simpleStep1")
.tasklet((contribution, chunkContext) -> {
log.info(">>>>> This is Step1");
log.info(">>>>> requestDate = {}", requestDate);
return RepeatStatus.FINISHED;
// throw new IllegalArgumentException("step1에서 실패합니다.");
})
.build();
}
@Bean
@JobScope
public Step simpleStep2(@Value("#{jobParameters[requestDate]}") String requestDate) {
return stepBuilderFactory.get("simpleStep2")
.tasklet((contribution, chunkContext) -> {
log.info(">>>>> This is Step2");
log.info(">>>>> requestDate = {}", requestDate);
return RepeatStatus.FINISHED;
})
.build();
}
}
Next
Step들간에 순서 혹은 처리 흐름을 제어
@Slf4j
@Configuration
@RequiredArgsConstructor
public class StepNextJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job stepNextJob() {
return jobBuilderFactory.get("stepNextJob")
.start(step1())
.next(step2())
.next(step3())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet((contribution, chunkContext) -> {
log.info(">>>>> This is Step1");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.tasklet((contribution, chunkContext) -> {
log.info(">>>>> This is Step2");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step step3() {
return stepBuilderFactory.get("step3")
.tasklet((contribution, chunkContext) -> {
log.info(">>>>> This is Step3");
return RepeatStatus.FINISHED;
})
.build();
}
}
Flow
조건별 흐름 제어
- 상황에 따라 정상일때는 Step B로, 오류가 났을때는 Step C로 수행할 경우 활용
@Slf4j
@Configuration
@RequiredArgsConstructor
public class StepNextConditionalJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
/**
* step1 실패 시나리오: step1 -> step3
* step1 성공 시나리오: step1 -> step2 -> step3
*/
@Bean
public Job stepNextConditionalJob() {
return jobBuilderFactory.get("stepNextConditionalJob")
.start(conditionalJobStep1())
.on("FAILED") // FAILED 일 경우 (캐치할 ExitStatus 지정, * 일 경우 모든 ExitStatus가 지정)
.to(conditionalJobStep3()) // step3으로 이동한다. (다음으로 이동할 Step 지정)
.on("*") // step3의 결과 관계 없이
.end() // step3으로 이동하면 Flow가 종료한다.
.from(conditionalJobStep1()) // step1로부터 (상태값을 보고 일치하는 상태라면 to()에 포함된 step을 호출)
.on("*") // FAILED 외에 모든 경우
.to(conditionalJobStep2()) // step2로 이동한다.
.next(conditionalJobStep3()) // step2가 정상 종료되면 step3으로 이동한다.
.on("*") // step3의 결과 관계 없이
.end() // step3으로 이동하면 Flow가 종료한다. (FlowBuilder를 반환하는 end)
.end() // Job 종료 (FlowBuilder를 종료하는 end)
.build();
}
@Bean
public Step conditionalJobStep1() {
return stepBuilderFactory.get("step1")
.tasklet((contribution, chunkContext) -> {
log.info(">>>>> This is stepNextConditionalJob Step1");
/**
ExitStatus를 FAILED로 지정한다.
해당 status를 보고 flow가 진행된다.
**/
// contribution.setExitStatus(ExitStatus.FAILED);
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step conditionalJobStep2() {
return stepBuilderFactory.get("conditionalJobStep2")
.tasklet((contribution, chunkContext) -> {
log.info(">>>>> This is stepNextConditionalJob Step2");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step conditionalJobStep3() {
return stepBuilderFactory.get("conditionalJobStep3")
.tasklet((contribution, chunkContext) -> {
log.info(">>>>> This is stepNextConditionalJob Step3");
return RepeatStatus.FINISHED;
})
.build();
}
}
Decide
분기 처리
- Step과는 명확히 역할과 책임을 분리
- Step들의 Flow속에서 분기만 담당하는 타입 (JobExecutionDecider)
@Slf4j
@Configuration
@RequiredArgsConstructor
public class DeciderJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
@Bean
public Job deciderJob() {
return jobBuilderFactory.get("deciderJob")
.start(startStep())// Job Flow의 첫번째 Step을 시작
.next(decider()) // 홀수 | 짝수 구분 (startStep 이후에 decider를 실행)
.from(decider()) // decider의 상태가 (decider의 상태값을 보고 일치하는 상태라면 to()에 포함된 step 호출)
.on("ODD") // ODD라면
.to(oddStep()) // oddStep로 간다.
.from(decider()) // decider의 상태가
.on("EVEN") // ODD라면
.to(evenStep()) // evenStep로 간다.
.end() // builder 종료
.build();
}
@Bean
public Step startStep() {
return stepBuilderFactory.get("startStep")
.tasklet((contribution, chunkContext) -> {
log.info(">>>>> Start!");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step evenStep() {
return stepBuilderFactory.get("evenStep")
.tasklet((contribution, chunkContext) -> {
log.info(">>>>> 짝수입니다.");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step oddStep() {
return stepBuilderFactory.get("oddStep")
.tasklet((contribution, chunkContext) -> {
log.info(">>>>> 홀수입니다.");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public JobExecutionDecider decider() {
return new OddDecider();
}
public static class OddDecider implements JobExecutionDecider {
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
Random rand = new Random();
int randomNumber = rand.nextInt(50) + 1;
log.info("랜덤숫자: {}", randomNumber);
if(randomNumber % 2 == 0) {
return new FlowExecutionStatus("EVEN");
} else {
return new FlowExecutionStatus("ODD");
}
}
}
}
Chunk
데이터 덩어리로 작업 할 때 각 커밋 사이에 처리되는 row 수
- Chunk 지향 처리: 한 번에 하나씩 데이터를 읽어 Chunk라는 덩어리를 만든 뒤, Chunk 단위로 트랜잭션을 다루는 것
- 실패할 경우엔 해당 Chunk 만큼만 롤백
Chunk Size VS Page Size
- Chunk Size는 한번에 처리될 트랜잭션 단위를 얘기하며, Page Size는 한번에 조회(Page 단위로 끊어서 조회)할 Item의 양
- 성능 이슈와 영속성 컨텍스트의 깨지는 문제를 막기 위해 두 개 값을 일치시키는 것이 좋음
동작 ChunkOrientedTasklet
ItemReader, ItemWriter는 ChunkOrientedTasklet에서 필수 요소
ItemReader
: Reader에서 데이터를 읽고, chunk size만큼 데이터를 누적, 처리가 필요할 경우 ItemProcessor에 전달- 가장 큰 장점은 데이터를 Streaming이 가능하다는 것
- read() 메소드는 데이터를 하나씩 가져와 ItemWriter로 데이터를 전달하고, 다음 데이터를 다시 가져 온다.
- 이를 통해 reader & processor & writer가 Chunk 단위로 수행되고 주기적으로 Commit
ItemProcessor
: 청크의 Item 개수 만큼 처리 될 때까지 가공ItemWriter
: 가공된 데이터들을 별도의 공간에 모은 뒤, Chunk 단위만큼 쌓이게 되면 Writer에 전달하고 Writer는 일괄 처리
ItemReader
- JpaRepository를 ListItemReader, QueueItemReader에 사용하고 싶을 경우, RepositoryItemReader 사용하기
- 영속성 컨텍스트가 필요한 Reader 사용시 fetchSize와 ChunkSize는 같은 값을 유지
- PagingItemReader 사용 시 정렬 포함 필수
CursorItemReader
- Database와 SocketTimeout을 충분히 큰 값으로 설정 필요
- Cursor는 하나의 Connection으로 Batch가 끝날때까지 사용되므로 중간에 끊어질 수 있음
@Slf4j
@RequiredArgsConstructor
@Configuration
public class JdbcCursorItemReaderJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource; // DataSource DI
private static final int chunkSize = 10;
@Bean
public Job jdbcCursorItemReaderJob() {
return jobBuilderFactory.get("jdbcCursorItemReaderJob")
.start(jdbcCursorItemReaderStep())
.build();
}
@Bean
public Step jdbcCursorItemReaderStep() {
return stepBuilderFactory.get("jdbcCursorItemReaderStep")
.<Pay, Pay>chunk(chunkSize) // <Reader에서 반환할 타입, Writer에 파라미터로 넘어올 타입> ()
.reader(jdbcCursorItemReader())
.writer(jdbcCursorItemWriter())
.build();
}
@Bean
public JdbcCursorItemReader<Pay> jdbcCursorItemReader() {
return new JdbcCursorItemReaderBuilder<Pay>()
.fetchSize(chunkSize) // Database에서 한번에 가져올 데이터 양
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(Pay.class)) // 쿼리 결과를 Java 인스턴스로 매핑하기 위한 Mapper
.sql("SELECT id, amount, tx_name, tx_date_time FROM pay") // Reader로 사용할 쿼리문
.name("jdbcCursorItemReader") // reader의 이름
.build();
}
private ItemWriter<Pay> jdbcCursorItemWriter() {
return list -> {
for (Pay pay: list) {
log.info("Current Pay={}", pay);
}
};
}
}
PagingItemReader
- Batch 수행 시간이 오래 걸리는 경우 CursorItemReader 대신 사용
- 한 페이지를 읽을때마다 Connection을 맺고 끊기 때문에 많은 데이터라도 타임아웃과 부하 없이 수행 가능
@Slf4j
@RequiredArgsConstructor
@Configuration
public class JdbcPagingItemReaderJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource; // DataSource DI
private static final int chunkSize = 10;
@Bean
public Job jdbcPagingItemReaderJob() throws Exception {
return jobBuilderFactory.get("jdbcPagingItemReaderJob")
.start(jdbcPagingItemReaderStep())
.build();
}
@Bean
public Step jdbcPagingItemReaderStep() throws Exception {
return stepBuilderFactory.get("jdbcPagingItemReaderStep")
.<Pay, Pay>chunk(chunkSize)
.reader(jdbcPagingItemReader())
.writer(jdbcPagingItemWriter())
.build();
}
@Bean
public JdbcPagingItemReader<Pay> jdbcPagingItemReader() throws Exception {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("amount", 2000);
return new JdbcPagingItemReaderBuilder<Pay>()
.pageSize(chunkSize)
.fetchSize(chunkSize)
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(Pay.class))
.queryProvider(createQueryProvider()) // JdbcCursorItemReader와 다른 부분
.parameterValues(parameterValues) // 쿼리에 대한 매개 변수 값의 Map을 지정 (where 절에서 선언된 파라미터 변수)
.name("jdbcPagingItemReader")
.build();
}
private ItemWriter<Pay> jdbcPagingItemWriter() {
return list -> {
for (Pay pay: list) {
log.info("Current Pay={}", pay);
}
};
}
@Bean
public PagingQueryProvider createQueryProvider() throws Exception {
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource); // Database에 맞는 PagingQueryProvider를 선택하기 위해
queryProvider.setSelectClause("id, amount, tx_name, tx_date_time");
queryProvider.setFromClause("from pay");
queryProvider.setWhereClause("where amount >= :amount");
Map<String, Order> sortKeys = new HashMap<>(1);
sortKeys.put("id", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
return queryProvider.getObject();
}
}
JpaPagingItemReader
- Querydsl, Jooq 등을 통한 ItemReader 구현체는 공식 지원되지 않으므로 CustomItemReader 구현 필요
- HibernatePagingItemReader 에서는 Cursor 지원이 되지만 JpaPagingItemReader에서는 Cursor 기반 Database 접근을 지원하지 않음
- Creating Custom ItemReaders and ItemWriters
@Slf4j
@RequiredArgsConstructor
@Configuration
public class JpaPagingItemReaderJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
private int chunkSize = 10;
@Bean
public Job jpaPagingItemReaderJob() {
return jobBuilderFactory.get("jpaPagingItemReaderJob")
.start(jpaPagingItemReaderStep())
.build();
}
@Bean
public Step jpaPagingItemReaderStep() {
return stepBuilderFactory.get("jpaPagingItemReaderStep")
.<Pay, Pay>chunk(chunkSize)
.reader(jpaPagingItemReader())
.writer(jpaPagingItemWriter())
.build();
}
@Bean
public JpaPagingItemReader<Pay> jpaPagingItemReader() {
return new JpaPagingItemReaderBuilder<Pay>()
.name("jpaPagingItemReader")
.entityManagerFactory(entityManagerFactory)
.pageSize(chunkSize)
.queryString("SELECT p FROM Pay p WHERE amount >= 2000")
.build();
}
private ItemWriter<Pay> jpaPagingItemWriter() {
return list -> {
for (Pay pay: list) {
log.info("Current Pay={}", pay);
}
};
}
}
ItemWriter
Reader, Prcessor와 함께 ChunkOrientedTasklet을 구성하는 3 요소
JdbcBatchItemWriter
- ORM을 사용하지 않는 경우 대부분 사용
- 성능 향상을 위해 ChunkSize만큼 쿼리를 모으고, 모아놓은 쿼리를 DB에 전송하여 한 번에 실행
@Slf4j
@RequiredArgsConstructor
@Configuration
public class JdbcBatchItemWriterJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final DataSource dataSource; // DataSource DI
private static final int chunkSize = 10;
@Bean
public Job jdbcBatchItemWriterJob() {
return jobBuilderFactory.get("jdbcBatchItemWriterJob")
.start(jdbcBatchItemWriterStep())
.build();
}
@Bean
public Step jdbcBatchItemWriterStep() {
return stepBuilderFactory.get("jdbcBatchItemWriterStep")
.<Pay, Pay>chunk(chunkSize)
.reader(jdbcBatchItemWriterReader())
.writer(jdbcBatchItemWriter())
.build();
}
@Bean
public JdbcCursorItemReader<Pay> jdbcBatchItemWriterReader() {
return new JdbcCursorItemReaderBuilder<Pay>()
.fetchSize(chunkSize)
.dataSource(dataSource)
.rowMapper(new BeanPropertyRowMapper<>(Pay.class))
.sql("SELECT id, amount, tx_name, tx_date_time FROM pay")
.name("jdbcBatchItemWriter")
.build();
}
/**
* reader에서 넘어온 데이터를 하나씩 출력하는 writer
*/
@Bean // beanMapped()을 사용할때는 필수
public JdbcBatchItemWriter<Pay> jdbcBatchItemWriter() {
return new JdbcBatchItemWriterBuilder<Pay>()
.dataSource(dataSource)
.sql("insert into pay2(amount, tx_name, tx_date_time) values (:amount, :txName, :txDateTime)")
.beanMapped() // Pojo 기반으로 Insert SQL의 Values를 매핑
.build();
}
}
JpaItemWriter
- Writer에 전달하는 데이터가 Entity 클래스일 경우 사용
- JPA를 사용하기 때문에 영속성 관리를 위해 EntityManager를 할당
@Slf4j
@RequiredArgsConstructor
@Configuration
public class JpaItemWriterJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
private static final int chunkSize = 10;
@Bean
public Job jpaItemWriterJob() {
return jobBuilderFactory.get("jpaItemWriterJob")
.start(jpaItemWriterStep())
.build();
}
@Bean
public Step jpaItemWriterStep() {
return stepBuilderFactory.get("jpaItemWriterStep")
.<Pay, Pay2>chunk(chunkSize)
.reader(jpaItemWriterReader())
.processor(jpaItemProcessor())
.writer(jpaItemWriter())
.build();
}
@Bean
public JpaPagingItemReader<Pay> jpaItemWriterReader() {
return new JpaPagingItemReaderBuilder<Pay>()
.name("jpaItemWriterReader")
.entityManagerFactory(entityManagerFactory) // 영속성 관리를 위한 할당
.pageSize(chunkSize)
.queryString("SELECT p FROM Pay p")
.build();
}
@Bean
public ItemProcessor<Pay, Pay2> jpaItemProcessor() {
return pay -> new Pay2(pay.getAmount(), pay.getTxName(), pay.getTxDateTime());
}
@Bean
public JpaItemWriter<Pay2> jpaItemWriter() {
JpaItemWriter<Pay2> jpaItemWriter = new JpaItemWriter<>();
jpaItemWriter.setEntityManagerFactory(entityManagerFactory);
return jpaItemWriter;
}
}
Custom ItemWriter
ItemWriter Interface 구현
- Querydsl/Jooq 기반의 ItemReader, Custom 하게 구현이 필요한 경우 사용
- Reader에서 읽어온 데이터를 RestTemplate으로 외부 API로 전달해야할 경우
- 임시저장을 하고 비교하기 위해 싱글톤 객체에 값을 넣어야할 경우
- 여러 Entity를 동시에 save 해야할 경우
- ...
Spring Batch ItemWriter에 List 전달하기
@Slf4j
@RequiredArgsConstructor
@Configuration
public class CustomItemWriterJobConfiguration {
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory entityManagerFactory;
private static final int chunkSize = 10;
@Bean
public Job customItemWriterJob() {
return jobBuilderFactory.get("customItemWriterJob")
.start(customItemWriterStep())
.build();
}
@Bean
public Step customItemWriterStep() {
return stepBuilderFactory.get("customItemWriterStep")
.<Pay, Pay2>chunk(chunkSize)
.reader(customItemWriterReader())
.processor(customItemWriterProcessor())
.writer(customItemWriter())
.build();
}
@Bean
public JpaPagingItemReader<Pay> customItemWriterReader() {
return new JpaPagingItemReaderBuilder<Pay>()
.name("customItemWriterReader")
.entityManagerFactory(entityManagerFactory)
.pageSize(chunkSize)
.queryString("SELECT p FROM Pay p")
.build();
}
@Bean
public ItemProcessor<Pay, Pay2> customItemWriterProcessor() {
return pay -> new Pay2(pay.getAmount(), pay.getTxName(), pay.getTxDateTime());
}
@Bean
public ItemWriter<Pay2> customItemWriter() {
return items -> { // write() @Override 하면 구현체 생성
for (Pay2 item : items) {
System.out.println(item);
}
};
}
}
ItemProcessor
Reader에서 넘겨준 데이터 개별건을 가공/처리하는 역할
public interface ItemProcessor<I, O>
- ItemProcessor는 선택사항 (Writer 부분에서도 충분히 구현 가능)
- 각 계층(읽기/처리/쓰기)을 분리할 수 있는 좋은 대안
- 보통 비즈니스 로직을 담당
- 변환
- Reader에서 읽은 데이터를 원하는 타입으로 변환해서 Writer에 넘기기
- 필터
- Reader에서 넘겨준 데이터를 Writer로 넘겨줄 것인지를 결정
- null 반환 시 Writer에 전달하지 않음
- 변환
변환
Reader에서 읽은 타입을 변환하여 Writer에 전달
@Slf4j
@RequiredArgsConstructor
@Configuration
public class ProcessorConvertJobConfiguration {
public static final String JOB_NAME = "ProcessorConvertBatch";
public static final String BEAN_PREFIX = JOB_NAME + "_";
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory emf;
@Value("${chunkSize:1000}")
private int chunkSize;
@Bean(JOB_NAME)
public Job job() {
return jobBuilderFactory.get(JOB_NAME)
.preventRestart()
.start(step())
.build();
}
@Bean(BEAN_PREFIX + "step")
@JobScope
public Step step() {
return stepBuilderFactory.get(BEAN_PREFIX + "step")
.<Teacher, String>chunk(chunkSize) // ChunkSize 앞에 선언될 타입 역시 Reader와 Writer 타입과 동일하게
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
@Bean
public JpaPagingItemReader<Teacher> reader() {
return new JpaPagingItemReaderBuilder<Teacher>()
.name(BEAN_PREFIX+"reader")
.entityManagerFactory(emf)
.pageSize(chunkSize)
.queryString("SELECT t FROM Teacher t")
.build();
}
/**
* public interface ItemProcessor<I, O>
* I : Reader에서 읽어올 타입이
* O : Writer에서 넘겨줄 타입
*/
@Bean
public ItemProcessor<Teacher, String> processor() {
return teacher -> {
return teacher.getName();
};
}
private ItemWriter<String> writer() {
return items -> {
for (String item : items) {
log.info("Teacher Name= {}", item);
}
};
}
}
필터
Writer에 값을 넘길지 말지를 Processor에서 판단
@Slf4j
@RequiredArgsConstructor
@Configuration
public class ProcessorNullJobConfiguration {
public static final String JOB_NAME = "processorNullBatch";
public static final String BEAN_PREFIX = JOB_NAME + "_";
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final EntityManagerFactory emf;
@Value("${chunkSize:1000}")
private int chunkSize;
@Bean(JOB_NAME)
public Job job() {
return jobBuilderFactory.get(JOB_NAME)
.preventRestart()
.start(step())
.build();
}
@Bean(BEAN_PREFIX + "step")
@JobScope
public Step step() {
return stepBuilderFactory.get(BEAN_PREFIX + "step")
.<Teacher, Teacher>chunk(chunkSize)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
@Bean
public JpaPagingItemReader<Teacher> reader() {
return new JpaPagingItemReaderBuilder<Teacher>()
.name(BEAN_PREFIX+"reader")
.entityManagerFactory(emf)
.pageSize(chunkSize)
.queryString("SELECT t FROM Teacher t")
.build();
}
@Bean
public ItemProcessor<Teacher, Teacher> processor() {
return teacher -> {
boolean isIgnoreTarget = teacher.getId() % 2 == 0L;
if(isIgnoreTarget){
log.info(">>>>>>>>> ID={}, Teacher name={}, isIgnoreTarget={}", teacher.getId(), teacher.getName(), isIgnoreTarget);
return null; // null 반환 시 Writer에 전달하지 않음
}
return teacher;
};
}
private ItemWriter<Teacher> writer() {
return items -> {
for (Teacher item : items) {
log.info("DI= {}, Teacher Name={}", item.getId(), item.getName());
}
};
}
}
CompositeItemProcessor
ItemProcessor간의 체이닝을 지원하는 Processor
- 변환이 2번 이상으로 필요할 경우
트랜잭션 범위
트랜잭션 범위는 Chunk단위
- Reader에서 Entity를 반환해 주었다면 Entity간의 Lazy Loading 가능
Processor
- Processor는 트랜잭션 범위 안이며, Entity의 Lazy Loading 가능
Writer
- Writer도 마찬가지로 트랜잭션 범위 안이며, Entity의 Lazy Loading 가능
Reference
blog
1. Spring Batch 가이드 - 배치 어플리케이션이란?
2. Spring Batch 가이드 - Batch Job 실행해보기
3. Spring Batch 가이드 - 메타테이블엿보기
4. Spring Batch 가이드 - Spring Batch Job Flow
5. Spring Batch 가이드 - Spring Batch Scope & Job Parameter
6. Spring Batch 가이드 - Chunk 지향 처리
7. Spring Batch 가이드 - ItemReader
8. Spring Batch 가이드 - ItemWriter
9. Spring Batch 가이드 - ItemProcessor
10. Spring Batch 가이드 - Spring Batch 테스트 코드
10.1. Spring Batch 단위 테스트 코드 - Reader 편
documentation
'Web > Spring' 카테고리의 다른 글
[ThreadLocal] 동시성 문제와 스레드 로컬 (0) | 2022.11.24 |
---|---|
[Spring Batch] 통합 테스트 (1) | 2022.11.10 |
[Spring] @Transactional 잘 사용해보기 (0) | 2022.05.07 |
[JPA] 양방향 관계 Entity 저장하기 (0) | 2022.03.14 |
Spring Boot, Gradle 환경에 Querydsl 설정 방법 (2) | 2022.03.09 |