본문 바로가기

Web/Spring

맨땅에 Spring Batch - 회원 복제

728x90
반응형

Spring Batch를 사용하여 Database Member Table에 데이터 읽어와 이름 앞에 'AI_'를 붙이고 AI Table에 insert 해보겠다.

 

복제 인간을 만드는 것이다.

 

구성

  • JDK 17
  • Gradle
  • Spring Boot 2.7.12
  • H2 Database
  • Spring Batch
  • Spring data JPA
  • Lombok

application.properties

  • spring.batch.jdbc.initialize-schema = always : Spring Batch의 MetaTable 생성
  • spring.batch.job.names=${job.name:NONE} : 실행 할 Job Name을 arguments로 전달

 

 

기초 데이터 (Member, AI)

 

 

이제 할 일은 Member data를 읽어와야 한다.

 

음 어디서부터 설명해야 할지 고민이다...

 

디렉토리 구조

 

Spring Batch가 주제이니 JPA쪽은 간단히만 설명하겠다.

현재 프로젝트에 Member, AI 두개의 Entity를 작성했고 그에 따른 MemberRepository, AiRepository를 만들었다.

 

* Job, Step, Reader 등을 별개 class 파일로 만들어서 관리하는 방법도 있던데 거기까진 아직이라서 하나의 class 안에 모두 정의 했다.

 

MakeAiJobConfig.class

package com.example.demo.job;

import com.example.demo.domain.AI;
import com.example.demo.domain.Member;
import com.example.demo.repository.AiRepository;
import com.example.demo.repository.MemberRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.data.RepositoryItemReader;
import org.springframework.batch.item.data.RepositoryItemWriter;
import org.springframework.batch.item.data.builder.RepositoryItemReaderBuilder;
import org.springframework.batch.item.data.builder.RepositoryItemWriterBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.domain.Sort;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Configuration
@RequiredArgsConstructor
public class MakeAiJobConfig {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final MemberRepository memberRepository;
    private final AiRepository aiRepository;
    private static final int chunkSize = 10;

    @Bean
    Job makeAiJob() {
        return jobBuilderFactory
                .get("makeAiJob")
                .start(makeAiStep())
                .build();
    }

    @Bean
    @JobScope
    Step makeAiStep() {
        return stepBuilderFactory
                .get("makeAiStep")
                .<Member, AI>chunk(chunkSize)
                .reader(memberReader())
                .processor(memberAiProcessor())
                .writer(aiWriter())
                .build();
    }

    @Bean
    @StepScope
    public RepositoryItemReader<Member> memberReader() {
        Map<String , Sort.Direction> sortKeys = new HashMap<>(1);
        sortKeys.put("id", Sort.Direction.DESC);
        return new RepositoryItemReaderBuilder<Member>()
                .name("memberReader")
                .repository(memberRepository)
                .methodName("findAll")
                .pageSize(10)
                .sorts(sortKeys)
                .build();
    }

    @Bean
    @StepScope
    public ItemProcessor<Member, AI> memberAiProcessor() {
        return item -> AI.builder()
                .name(generateAiName(item.getName()))
                .loginId(item.getLoginId())
                .password(item.getPassword())
                .build();
    }

    @Bean
    @StepScope
    public RepositoryItemWriter<AI> aiWriter() {
        return new RepositoryItemWriterBuilder<AI>()
                .repository(aiRepository)
                .methodName("save")
                .build();
    }

    private String generateAiName(String name) {
        return String.format("AI_%s", name);
    }
}
  • Config로 등록하기 위한 @Configuration
  • Job 사용을 위해 JobBuilderFactory DI
  • Step 사용을 위해 StepBuilderFactory DI 
  • ChunkSize static으로 선언 (선택사항)

Job

    @Bean
    Job makeAiJob() {
        return jobBuilderFactory
                .get("makeAiJob")
                .start(makeAiStep())
                .build();
    }
  • JobBuilderFactory로 Job 정의
  • .get()으로 Job Name 설정
  • .start()으로 시작할 Step 정의 

Step

    @Bean
    @JobScope
    Step makeAiStep() {
        return stepBuilderFactory
                .get("makeAiStep")
                .<Member, AI>chunk(chunkSize)
                .reader(memberReader())
                .processor(memberAiProcessor())
                .writer(aiWriter())
                .build();
    }
  • @JobScope 사용으로 makeAiStep은 호출되는 시점에 실제 Bean으로 등록 및 사용 (그 전에는 Proxy 객체)
  • @JobScope 사용으로 JobParamter 전달 가능
  • StepBuilderFactory로 Step 정의
  • .get()으로 Step Name 설정
  • <Read Data Type, Write Data Type>chunk(chunkSize) : Member를 read해서 chunkSize 단위로 AI를 write 하겠다.
  • .reader()로 사용할 reader 등록
  • .processor()로 사용할 processor 등록 (선택사항)
  • .writer()로 사용할 writer 등록
  • .build()로 Step build

ItemReader

 - MemberRepository에 findAll Method 사용으로 모든 회원 읽어오기

    @Bean
    @StepScope
    public RepositoryItemReader<Member> memberReader() {
        Map<String , Sort.Direction> sortKeys = new HashMap<>(1);
        sortKeys.put("id", Sort.Direction.DESC);
        return new RepositoryItemReaderBuilder<Member>()
                .name("memberReader")
                .repository(memberRepository)
                .methodName("findAll")
                .pageSize(10)
                .sorts(sortKeys)
                .build();
    }
  • @StepScope 사용으로 memberReader은 호출되는 시점에 실제 Bean으로 등록 및 사용 (그 전에는 Proxy 객체)
  • @ StepScope 사용으로 JobParamter 전달 가능
  • ItemReader 중에 RepositoryItemReader를 사용 
  • RepositoryItemReader<Read Date Type>
  • .name()으로 Reader Name 설정
  • .repository()로 사용할 Repository 설정
  • .methodName()으로 Repository에서 사용할 Method 설정 (ex : memberRepository.findAll())
  • .pageSize()로 한번에 읽어올 데이터 row 수 설정
  • .sorts()로 sort 설정
  • .build()로 build

ItemProcessor

 - ItemReader로 읽어온 Member를 AI로 가공

    @Bean
    @StepScope
    public ItemProcessor<Member, AI> memberAiProcessor() {
        return item -> AI.builder()
                .name(generateAiName(item.getName()))
                .loginId(item.getLoginId())
                .password(item.getPassword())
                .build();
    }
  • @StepScope 사용으로 memberAiProcessor은 호출되는 시점에 실제 Bean으로 등록 및 사용 (그 전에는 Proxy 객체)
  • @ StepScope 사용으로 JobParamter 전달 가능

ItemWriter

 - RepositoryItemWriter로 전달 받은 AI save

    @Bean
    @StepScope
    public RepositoryItemWriter<AI> aiWriter() {
        return new RepositoryItemWriterBuilder<AI>()
                .repository(aiRepository)
                .methodName("save")
                .build();
    }
  • @StepScope 사용으로 aiwriter는 호출되는 시점에 실제 Bean으로 등록 및 사용 (그 전에는 Proxy 객체)
  • @StepScope 사용으로 JobParamter 전달 가능
  • ItemWriter 중에 RepositoryItemWriter를 사용 
  • .repository()로 사용할 Repository 설정 
  • .methodName()으로 사용할 method 설정  (ex : aiRepository.save())

사용한 기타 Method

    private String generateAiName(String name) {
        return String.format("AI_%s", name);
    }

 

 

Job 실행

  • Run Configurations에 Arguments로 실행 할 Job Name 등록 후 실행 (여기서 Job Name은 위에 Job에서 .get()으로 설정한 Job Name)

Log 확인

로그를 잘 살펴보면 Job 실행 시점에 SimpleJobLauncher로 name=makeAiJob이 실행됐고

pageSize만큼 Member를 Select 한다.

Select 한 데이터가 Processor를 거쳐서 chunkSize만큼 데이터가 쌓였다면 insert 문 실행

 

위에서 chunkSize를 10으로 설정했기 때문에 reader -> processor를 거친 data가 10개가 되었을 때 insert가 10번 나가는 걸 볼 수 있다.

 

1. Reader가 Member 10개의 Data를 Select

2. Data가 Processor를 하나씩 거쳐 Member -> AI로 가공

3. Processor를 거친 data가 chunkSize만큼 쌓이면

4. Writer가 10개 data를 insert

5. 다음 10개 data를 reader가 select

6. 반복

7. reader가 select를 했는데 data null이면 종료

 

* Chunk 방식을 사용할 땐 Rollback도 Transaction 단위로 실행되기 때문에 Transaction에 주의해야 한다.

 

결과

 - Table AI에 NAME은 AI_Member.name으로 모든 Member가 AI로 INSERT 됐음 성공

728x90
반응형