Developer.

[멋사 백엔드 19기] TIL 40일차 미니프로젝트와 Flux에 대해

📂 목차


📚 본문

Flux

플럭스는 Spring WebFlux 에서 제공하는 Reactive Stream 타입 중 하나이다.

  • Flux<T>: 0-N 개의 데이터를 비동기 스트림으로 처리
  • Mono<T>: 0-1 개의 데이터를 비동기 처리 (Flux 의 특수 형태)

플럭스는 non-blocking 방식의 비동기 처리이기 때문에 요청에 대한 효율적인 처리를 가능하다. 또한 이 때문에 MSA 에서 서비스 간의 통신과 데이터 스트림 처리에 유리하게 된다.

Flux Subscribe

플럭스는 기본적으로 lazy 하게 동작한다. 해당 데이터를 요청하기 전까지는 아무런 일도 일어나지 않으며 구독 시점에서 데이터 처리가 시작되게 된다.

Flux<Integer> numbers = Flux.range(1, 5);
numbers.subscribe(System.out::println); // 1~5 출력

// 콜백
numbers.subscribe(
    value -> System.out.println("onNext: " + value),
    error -> System.err.println("onError: " + error),
    () -> System.out.println("onComplete")
);

Operators

  • 변환: map, filterMap, concatMap
  • 필터링: filter, take, skip
  • 결합: merge(Flux 합치기), zip(각 Flux1, Flux2 의 요소를 묶어 Tuple 생성), combineLatest
  • 오류 처리: onErrorReturn, onErrorResume, retry

Publisher

  • Cold Publisher: 구독할 때마다 데이터를 새로 생성하는 전략(Flux.range())
Flux<Integer> coldFlux = Flux.range(1, 3);
coldFlux.subscribe(System.out::println); // 구독할 때마다 1,2,3
  • Hot Publisher: 발생하는 이벤트를 여러 구독자가 공유하는 전략(Flux.interval() 에 자주 사용)
// 1초마다 Long 값을 방출하는 Flux 를 생성, 기본적으로 ColdPublisher
Flux<Long> hotFlux = Flux.interval(Duration.ofSeconds(1))
                         // Cold Flux 를 Hot Flux 로 변환함,
                         // 여러 구독자가 같은 값을 동시에 공유하도록 만듦
                         // 즉 구독자가 새로 생겨도 데이터 스트림은 이미 진행중일 수 있음
                         .publish()
                         // 최소 한 명의 구독자가 나타나면 자동으로 Flux 가 실행되도록 설정함
                         .autoConnect();

Cold 는 시간 개념이 없고 Hot 은 시간 개념이 있다고 생각하면 된다.

Backpressure

역압력이라고 하며, 소비자가 처리할 수 있는 속도보다 생산자가 데이터를 더 빨리 보내면 어떻게 처리할지 결정하는 전략을 말한다.

Flux 를 통해 Client 가 쿼리를 보내는 구조

WebFlux 를 쓰면 일반적으로 클라이언트에서 HTTP 요청을 Controller, Service, Repository 의 흐름으로 처리를 하게 될텐데, 이를 비동기/논블로킹으로 처리하게 된다.

만약 클라이언트 요청을 Flux 로 받고 싶다면, Controller 에서 Flux 를 직접 반환하는 경우가 많다. 예시를 보면

@Component
public class UserClient {

    private final WebClient webClient;

    public UserClient(WebClient.Builder webClientBuilder) {
        this.webClient = webClientBuilder.baseUrl("http://user-service").build();
    }

    public Flux<UserDto> getUsers() {
        return webClient.get() // HTTP GET 요청 준비
                        .uri("/users") // 요청할 URI 지정
                        .retrieve() // 서버로부터 응답을 가져오기 위한 응답 처리 준비
                        .bodyToFlux(UserDto.class); // 서버 응답 Body 를 Flux<UserDto> 로 변환 - Mono 로 하면 안에 List 가 들어감 이를 Flux 로 저장하면 0-N 개의 데이터 처리 가능
    }
}

외부 MSA 서비스 호출용 Client 이며, 내부에서 WebClient 를 이용해 비동기 호출을 할 때 Flux/Mono 를 반환한다. 핵심은 Flux 를 직접 사용하는 로직은 Service 또는 Client 에서 구현하고, Controller 는 단순히 반환만 담당한다.

  • Client 클래스: MSA API 호출 담당
  • Service 클래스: 비즈니스 로직 수행

Spring JDBC 의 DB Default 기능을 쓰도록 만들기

JDBC 를 실제로 쓸때, 흔히 DB 의 기본값이 자동으로 들어갈테니 해당 값을 넣지 않고 save 를 하면 될 줄 알았지만 아니였다.

DB 의 기본값 defualt 와 엔티티의 null 은 별개이며, 값을 아직 모른다, 아직 세팅되지 않았다는 null 의 의미는 SQL INSERT 시에 NULL 로 명시적으로 넣게 되어 버린다. 즉 값이 없으니 DEFAULT 를 써야지가 아니라 NULL 을 넣으려고 했구나 라고 인식하게 되며, 이럴 때는 null 도 없도록 아예 필드명을 쓰지 말아야 한다.

만약 이를 하고 싶다면 Spring Data JDBC 에서 Auditing 을 활성화하면 되며, 이는 엔티티가 생성 및 수정이 될 때 자동으로 메타데이터 필드들을 관리해준다.

@Configuration
@EnableJdbcAuditing
public class JdbcConfig {}

이렇게 하면 다음 어노테이션을 사용할 수 있게 된다.

@CreatedDate
private LocalDateTime createdAt;

@LastModifiedDate
private LocalDateTime updatedAt;

@CreatedBy
private String createdBy;
  • insert 시점: @CreatedDate, @CreatedBy 값 자동 세팅
  • update 시점: @LastModifiedDate 값 자동 세팅

즉, Audit 필드를 자동으로 채워주는 기능이지 DB 의 default 값을 직접 트리거 하는 기능은 아니지만, 이를 통해 필드를 자동으로 채워지게 할 수 있다.