Spring Batch란?
- 대용량 데이터를 처리하기 위한 프레임워크
- 주기적이고 반복적인 작업을 실행하는 데 사용
- 반복, 재시도, skip 처리
- 실패한 배치 작업을 다시 시작하거나 skip 할 수 있음
- 모니터링
- 배치 작업의 진행 상태와 소요 시간 정보 제공
- 트랜잭션 관리
- 트랜잭션 관리를 지원하여 데이터 일관성 유지
실행 순서
- @EnableBatchProcessing 필요
- Job → Step → ItemReader → ItemProcessor → ItemWriter
- Job으로 하나의 배치 작업을 정의
- Step에서 하나씩 수행
- 읽기 → 처리 → 쓰기 과정을 거침
- Step을 등록하기 위한 Bean 등록 필요
Step 단계
- Chunk
- 대량의 데이터를 끊어서 처리할 최소 단위
- 읽기 → 처리 → 쓰기 작업이 청크 단위로 진행
- PlatformTransactionManager
- 청크가 진행되다가 실패했을 때, 롤백 / 재실행 등 다시 처리할 수 있도록 세팅
RepositoryItemReader(읽기)
- 읽어오는 작업 수행
- findAll을 하더라도 chunk 수 만큼 사용
- 자원 낭비를 방지하기 위해 Sort를 진행하고 pageSize() 단위를 설정해 페이지 만큼 읽어올 수 있도록 설정
ItemProcessor(중간 처리)
- 읽어 온 데이터 처리
- ItemReader에서 ItemWriter로 바로 보내도 됨
- 랭킹 데이터를 처리할 때는 읽어온 데이터를 Ranking 엔티티에 맞게 처리하는 것을 여기서 함
RepositoryItemWriter
- 처리한 결과를 저장
DataSourceConfig
- DataSource와 TransactionManager를 지정해 준 이유
- 서버와 DB를 분리하여 사용하였기 때문
- MSA로 진행하기로 했지만, 시간 부족으로 인프라를 구축하지 못하여 서버를 도커 내에서 컨테이너만 분리
- DB를 분리해서 사용하였기 때문에 대용량 데이터를 BatchJob을 통해 갱신하기 위해서는 FeignClient를 사용하기보다는 DB에 직접 접근하는 것이 좋았음
- MSA를 사용하였더라도 이 방법이 대용량 데이터 처리에는 더 효과적
아래는 해당 코드이다.
BatchConfig
package com.apink.rankingservice.config;
import com.apink.rankingservice.dto.PopupRankingDto;
import com.apink.rankingservice.entity.PopupRanking;
import com.apink.rankingservice.service.RankingService;
import jakarta.persistence.EntityManagerFactory;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.LocalDate;
import javax.sql.DataSource;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
@RequiredArgsConstructor
public class BatchConfig {
private static final Logger logger = LoggerFactory.getLogger(BatchConfig.class);
private final RankingService rankingService;
private final EntityManagerFactory entityManagerFactory;
@Bean
public Job updatePopupRankingJob(JobRepository jobRepository, Step updatePopupRankingStep) {
return new JobBuilder("updatePopupRankingJob", jobRepository)
.start(updatePopupRankingStep)
.build();
}
@Bean
public Step updatePopupRankingStep(JobRepository jobRepository,
@Qualifier("mainTransactionManager") PlatformTransactionManager mainTransactionManager,
JdbcCursorItemReader<PopupRankingDto> reader,
ItemWriter<PopupRanking> writer) {
return new StepBuilder("updatePopupRankingStep", jobRepository)
.<PopupRankingDto, PopupRanking>chunk(100, jpaTransactionManager())
.reader(reader)
.processor(this::processPopupRanking)
.writer(writer)
.build();
}
@Bean
public PlatformTransactionManager jpaTransactionManager() {
return new JpaTransactionManager(entityManagerFactory);
}
@Bean
public JdbcCursorItemReader<PopupRankingDto> reader(
@Qualifier("mainServiceDataSource") DataSource dataSource) {
String sql = "SQL문 넣으면 됩니다.";
return new JdbcCursorItemReaderBuilder<PopupRankingDto>()
.dataSource(dataSource)
.name("popupRankingReader")
.sql(sql)
.rowMapper(new PopupRankingRowMapper())
.build();
}
@Bean
public ItemWriter<PopupRanking> writer() {
return new ItemWriter<PopupRanking>() {
@Override
public void write(Chunk<? extends PopupRanking> chunk) throws Exception {
rankingService.deleteAllAndSave(chunk);
}
};
}
private PopupRanking processPopupRanking(PopupRankingDto dto) {
logger.debug("dto : {}", dto);
PopupRanking result = PopupRanking.builder()
.popupId(dto.getPopupId())
.views(dto.getViews())
.likes(dto.getLikes())
.reviewScore(dto.getReviewScore())
.reviewCount(dto.getReviewCount())
.reservations(dto.getReservations())
.creationDate(dto.getCreationDate())
.build();
logger.debug("Processed entity: {}", result);
return result;
}
private static class PopupRankingRowMapper implements RowMapper<PopupRankingDto> {
@Override
public PopupRankingDto mapRow(ResultSet rs, int rowNum) throws SQLException {
return PopupRankingDto.builder()
.popupId(rs.getLong("popupId"))
.creationDate(rs.getObject("creationDate", LocalDate.class))
.views(rs.getInt("views"))
.likes(rs.getInt("likes"))
.reviewScore(rs.getDouble("reviewScore"))
.reviewCount(rs.getInt("reviewCount"))
.reservations(rs.getInt("reservations"))
.build();
}
}
}
DataSourceConfig
package com.apink.rankingservice.config;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
@Configuration
public class DataSourceConfig {
@Primary
@Bean
@ConfigurationProperties("spring.datasource.ranking-service")
public DataSourceProperties rankingServiceDataSourceProperties() {
return new DataSourceProperties();
}
@Primary
@Bean("rankingServiceDataSource")
public DataSource rankingServiceDataSource() {
return rankingServiceDataSourceProperties()
.initializeDataSourceBuilder()
.build();
}
@Primary
@Bean(name = "transactionManager")
public PlatformTransactionManager rankingTransactionManager(@Qualifier("rankingServiceDataSource") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Bean
@ConfigurationProperties("spring.datasource.main-service")
public DataSourceProperties mainServiceDataSourceProperties() {
return new DataSourceProperties();
}
@Bean("mainServiceDataSource")
public DataSource mainServiceDataSource() {
return mainServiceDataSourceProperties()
.initializeDataSourceBuilder()
.build();
}
@Bean(name = "mainTransactionManager")
public PlatformTransactionManager mainTransactionManager(@Qualifier("mainServiceDataSource") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
}
'개발' 카테고리의 다른 글
[OAuth2] X(구 트위터) 로그인 API 사용기 (1) | 2024.11.17 |
---|---|
Docker 공부하기 (기초) (1) | 2024.09.01 |
JWT 심화 (1) | 2024.08.11 |
OAuth2.0 + SpringSecurity (0) | 2024.08.04 |
스프링 시큐리티(Spring Security) + JWT (0) | 2024.07.21 |