본문 바로가기
Spring/SpringBoot

[SpringBoot] SpringBatch 사용하기 (2) - Job Parameter 활용 및 동일 Job 반복 실행

by J4J 2023. 8. 26.
300x250
반응형

안녕하세요. J4J입니다.

 

이번 포스팅은 SpringBatch 사용하기 두 번째인 Job Parameter 활용 및 동일 Job 반복 실행하는 방법에 대해 적어보는 시간을 가져보려고 합니다.

 

 

 

이전 글

 

[SpringBoot] SpringBatch 사용하기 (1) - Scheduler를 이용하여 Tasklet, Chunk 배치 만들기

 

 

반응형

 

 

Parameter 사용 (1) - Tasklet

 

이전 글에서 Tasklet을 통해 로그 데이터 적재하는 코드를 작성해 봤습니다.

 

해당 코드를 Parameter 사용하는 소스로 변경해 보겠습니다.

 

 

 

먼저 Job이 수행될 때 활용할 Parameter를 등록하는 곳은 JobLanucher를 이용하여 Job을 등록할 때입니다.

 

예를 들어, Scheduler가 동작될 때 입력해 둔 Parameter값을 다음과 같이 추가해 볼 수 있습니다.

 

package com.batch.schedule;

import com.batch.jobConfig.LogRegisterJobConfig;
import com.batch.jobConfig.LogUpdateJobConfig;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class LogSchedule {

    ...
    
    @Scheduled(cron = "0/10 * * * * ?") // 10초마다 Job 실행, cron 표현식 활용 (초 분 시 일 월 요일)
    public void logRegister() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
        jobLauncher.run(
                logRegisterJobConfig.logRegisterJob(),
                new JobParametersBuilder()
                        .addString("contents", "parameterContents") // 파라미터 추가
                        .toJobParameters()
        );
    }
}

 

 

 

위와 같이 넣어둔 Parameter 값은 Tasklet을 활용할 때 다음과 같이 가져와 사용할 수 있습니다.

 

package com.batch.jobConfig;

import com.batch.entity.LogEntity;
import com.batch.repository.LogJpaRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;

@Configuration
@RequiredArgsConstructor
@Slf4j
public class LogRegisterJobConfig {

    ...
    
    /**
     * contents가 입력된 로그 등록을 위한 Tasklet
     *
     * @return contents 입력 로그 등록 Tasklet
     */
    @Bean
    public Tasklet contentsLogRegisterTasklet() {
        return (contribution, chunkContext) -> {
            // StepContext에서 JobParameter 가져오기
            Map<String, Object> jobParameters = chunkContext.getStepContext().getJobParameters();

            // 로그 데이터 적재
            logJpaRepository.save(
                    LogEntity
                            .builder()
                            .contents(jobParameters.get("contents").toString()) // 파라미터에서 전달받은 값으로 변경
                            .build()
            );

            // 배치 종료
            return RepeatStatus.FINISHED;
        };
    }
}

 

 

 

 

Parameter 사용 (2) - Chunk

 

이번에는 Chunk에서 사용하는 방법에 대해 적어보겠습니다.

 

Tasklet과 유사하게 이전 글에서 Chunk를 활용하여 contents 값을 수정하는 코드를 작성해 봤습니다.

 

Chunk도 Tasklet과 동일하게 JobLanucher에서 Parameter 값을 등록한 뒤 해당 값을 Job이 실행될 때 사용해 보겠습니다.

 

 

 

먼저 Scheduler 동작할 때 다음과 같이 소스 변경을 해주시면 됩니다.

 

package com.batch.schedule;

import com.batch.jobConfig.LogRegisterJobConfig;
import com.batch.jobConfig.LogUpdateJobConfig;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class LogSchedule {

    ...
    
    @Scheduled(fixedRate = 20000) // 메서드가 실행된 시간부터 20초마다 Job 실행
    public void logUpdate() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
        jobLauncher.run(
                logUpdateJobConfig.logUpdateJob(),
                new JobParametersBuilder()
                        .addString("contents", "parameterUpdateContents") // 파라미터 추가
                        .toJobParameters()
        );
    }
}

 

 

 

위에 넣어둔 Parameter 값은 @Value 어노테이션을 이용하여 사용할 수도 있습니다.

 

다만 해당 방법을 활용할 땐 사용되는 곳에 @StepScope 어노테이션이 추가되어야 하며 StepBuilder 쪽에는 파라미터 값을 추가되는 만큼 null로 넣어주면 됩니다.

 

package com.batch.jobConfig;

import com.batch.entity.LogEntity;
import com.batch.repository.LogJpaRepository;
import jakarta.persistence.EntityManagerFactory;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

@Slf4j
@Configuration
@RequiredArgsConstructor
public class LogUpdateJobConfig {

    ...
    
    /**
     * 로그 수정을 위한 Step
     *
     * @return 로그 수정 Step
     */
    @Bean
    public Step logUpdateStep() {
        return new StepBuilder("logUpdateStep", jobRepository)
                .<LogEntity, LogEntity>chunk(20, transactionManager) // transaction 처리를 20개 단위씩 묶어 처리, Generic은 <reader에서 넘겨주는 객체, writer로 넘겨줄 객체>
                .reader(logUpdateItemReader())
                .processor(logUpdateItemProcessor(null)) // 파라미터 사용하는 곳에 null 넣기
                .writer(logUpdateItemWriter())
                .build();
    }
    
    ...

    /**
     * 로그 수정을 위한 ItemProcessor (데이터 수정 작업 처리) (ItemProcessor Generic은 <reader에서 넘겨주는 객체, writer로 넘겨줄 객체>)
     *
     * @return 로그 수정 ItemProcessor
     */
    @Bean
    @StepScope // 파라미터 사용을 위해 StepScope 추가
    public ItemProcessor<LogEntity, LogEntity> logUpdateItemProcessor(@Value("#{jobParameters[contents]}") String contents) {
        // reader에서 넘겨준 객체를 1개 단위로 처리 (한 번에 chunkSize 만큼 수행)
        return logEntity -> {
            logEntity.updateContents(contents); // 파라미터 값을 가져와 활용
            return logEntity;
        };
    }
    
    ...
    
}

 

 

 

해당 방법은 Chunk에서 사용할 수 있지만 그렇다고 Tasklet에서 사용할 수 없는 것은 아닙니다.

 

Tasklet도 위와 동일한 방법으로 Parameter 값을 사용해 볼 수 있습니다.

 

 

 

 

동일 Job 반복 실행

 

지금까지 작성한 배치 관련 소스를 동작해 보면 항상 첫 번째 실행은 정상적으로 동작되나 Scheduler에 등록된 주기가 지나 한번 더 실행이 된다면 정상 동작이 되지 않을 것을 확인해볼 수 있습니다.

 

예를 들면, 현재까지 설정한 로그 데이터를 수정하는 Chunk를 동작시켜 보면 처음에는 데이터 수정이 이루어지지만 20초가 지난 뒤 한번 더 Job이 실행되면 다음과 같은 에러를 확인할 수 있습니다.

 

org.springframework.batch.core.repository.JobRestartException: JobInstance already exists and is not restartable
	at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:114) ~[spring-batch-core-5.0.2.jar:5.0.2]
	at org.springframework.batch.core.launch.support.TaskExecutorJobLauncher.run(TaskExecutorJobLauncher.java:59) ~[spring-batch-core-5.0.2.jar:5.0.2]
	at com.batch.schedule.LogSchedule.logUpdate(LogSchedule.java:38) ~[main/:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
	at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
	at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) ~[spring-context-6.0.11.jar:6.0.11]
	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-6.0.11.jar:6.0.11]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

 

 

 

해당 이슈가 발생되는 이유는 SpringBatch를 사용하기 위해 생성했던 메타데이터를 위한 테이블에 동일한 설정 값에서 실행된 JobInstance가 존재하기 때문입니다.

 

이런 문제가 발생될 때 해결할 수 있는 방법은 간단합니다.

 

실행되는 Job의 JobParameter 값을 매번 동작할 때마다 항상 다른 값이 들어갈 수 있게 해 주시면 됩니다.

 

예를 들면, 로그 데이터 수정하는 Job을 반복 동작하기 위해 다음과 같이 소스를 변경해볼 수 있습니다.

 

package com.batch.schedule;

import com.batch.jobConfig.LogRegisterJobConfig;
import com.batch.jobConfig.LogUpdateJobConfig;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
@RequiredArgsConstructor
public class LogSchedule {

    ...
    
    @Scheduled(fixedRate = 20000) // 메서드가 실행된 시간부터 20초마다 Job 실행
    public void logUpdate() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
        jobLauncher.run(
                logUpdateJobConfig.logUpdateJob(),
                new JobParametersBuilder()
                        .addString("contents", "parameterUpdateContents") // 파라미터 추가
                        .addLong("time", new Date().getTime()) // 파라미터를 매번 달리하여 동일한 job이어도 스케줄 설정에 맞게 반복되어 실행되도록 설정
                        .toJobParameters()
        );
    }
}

 

 

 

위처럼 JobParameter에 동작되는 현재 시간을 넣어주면 매번 동작될 때마다 Parameter의 time값이 달라지기 때문에 동일한 JobInstance로 인지하지 않고 반복적으로 Job이 수행됩니다.

 

일반적으로 배치를 돌린다는 것은 주기적으로 동일한 Job이 수행되게 하실 테니 Job에 사용되는 Parameter 값이 항상 동일하다면 위와 같이 임의 값을 넣어주시면 됩니다.

 

 

 

 

 

 

 

 

이상으로 SpringBatch 사용하기 두 번째인 Job Parameter 활용 및 동일 Job 반복 실행하는 방법에 대해 간단하게 알아보는 시간이었습니다.

 

읽어주셔서 감사합니다.

 

 

 

728x90
반응형

댓글