thumbnail
사례를 통해 이해하는 Reactor Backpressure
Reactive / Non-blocking / Webflux
2024.03.02.

들어가며

Project ReactorReactive Streams의 대표적인 구현체로써, Subscriber의 처리 능력을 존중하여 데이터 스트림을 비동적으로 처리할 수 있게한다.

그리고 당연 Reactive Streams를 사용하는 이유중 가장 큰 부분이 Backpressure다.

Backpressure은 ‘배압’이라는 의미로 실세계에선 가스나 액체등의 흐름을 제어하기위해 역으로 가하는 압력을 의미한다. Reactor에서도 동일한 의미로 publisher로부터 전달 받은 데이터를 처리하는 데 있어 subscriber쪽 과부하가 걸리지 않도록 제어하는데 사용된다.

이번 글은 Reactor가 Backpressure를 어떻게 구현하고 어떤 정책을 가지고있는지 살펴본다.

단순히 Reactor에서 어떤 Backpressure 전략을 제공하는지 살펴보기보단, 직접 Backpressure가 발생할 수 있는 상황을 코드로 시뮬레이션하며 어떻게 동작하는지 정리했다.


Backpressure의 기본적인 동작방식

Reactive Streams는 어떻게 Backpressure를 구현했을까?

Reactive 세계에선 크게 두 가지의 엔티티가 존재한다. Publisher, Subscriber.

그리고 Subscriber가 데이터를 소비하는 속도가 Publisher가 데이터를 생성하는 속도보다 낮은 경우, Subscriber는 Publisher에 signal을 보내 데이터 생산 속도를 제한할 수 있다.

여기서 signal은 Reactive Streams 구성요소중 하나인 Subscription의 request, cancel등이 해당된다.

개념은 어느정도 알겠으니, 이제 Reactor로 직접 Backpressure를 구현해보면 다음과 같다.

Flux.range(1, 6)
    .log()
    .subscribe(new Subscriber<Integer>() {
        private Subscription s;
        int onNextAmount;

        @Override
        public void onSubscribe(Subscription s) {
            this.s = s;
            s.request(2);
        }

        @Override
        public void onNext(Integer integer) {
            onNextAmount++;
            if (onNextAmount % 2 == 0) {
                s.request(2);
            }
        }

        @Override
        public void onError(Throwable t) {}

        @Override
        public void onComplete() {}
    });

[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(2)
[main] INFO reactor.Flux.Range.1 - | onNext(1)
[main] INFO reactor.Flux.Range.1 - | onNext(2)
[main] INFO reactor.Flux.Range.1 - | request(2)
[main] INFO reactor.Flux.Range.1 - | onNext(3)
[main] INFO reactor.Flux.Range.1 - | onNext(4)
[main] INFO reactor.Flux.Range.1 - | request(2)
[main] INFO reactor.Flux.Range.1 - | onNext(5)
[main] INFO reactor.Flux.Range.1 - | onNext(6)
[main] INFO reactor.Flux.Range.1 - | request(2)
[main] INFO reactor.Flux.Range.1 - | onComplete()

로그에서 알 수 있듯이 onSubscriberequest(2)로 publisher에게 2개만 emit하길 요청하고, 2개 emit마다 request를 보내 subscriber 상황에 맞춰 데이터 흐름을 컨트롤한다.

다시 말해 publisher는 더 많은 데이터를 emit할 준비가 되었음에도 subscriber 처리 속도에 맞춰 데이터의 흐름을 컨트롤하는 것이다. 그리고 이게 바로 Backpressure의 순기능이다.

Publisher와 Subscriber 구현체

Backpressure 테스트를 위해선 Publisher와 Subscriber를 직접 구현해줘야한다. 본격적으로 테스트하기전에 Reactor에선 어떤 Publisher와 Subscriber 구현체가 존재하는지 살펴본다.

Publisher

💁‍♂️ Flux와 Mono

Reactive 세계에서 가장 먼저 고려해야 할 부분은 Data Stream을 생성하는 것이다.

Reactor에선 Publisher의 구현체로 두 가지 데이터 유형을 제공한다. Flux, Mono

  • Flux[0|n]
    • Flux sequence를 생성, 변환, 조정하는데 사용할 수 있는 Reactive Streams의 publisher다.
    • [0|n]이라고 표현한 이유는 Flux가 0개 혹은 n개의 item을 emit할 수 있기 때문이다. 즉 0개 혹은 n개의 무한한 item을 onNext를 통해 전달할 수 있다.
    • publisher 구현체이므로, onComplete나 onError에 의해 종료된다. 만약 둘 중 하나라고 발생하지않으면 무한한 sequence가 된다.
  • Mono[0|1]
    • Flux와 동일하게 sequence를 생성, 변환, 조장하는데 사용할 수 있는 Reactive Streams의 publisher다. 다만, Flux와 다르게 최대 1개의 item만을 emit할 수 있으며, onComplete 혹은 onError signal에 의해 종료된다. ([0|1])
    • Flux가 지원하는 모든 연산자를 다 제공하진 않지만, 일부 연산자를 활용하여 Flux로 전환할 수도 있다. (특히 Mono와 다른 Publisher를 합칠 때 많이 사용된다.)
    • 이외에도 Mono는 값은 필요없고, 완료 개념만 있으면 되는 비동기 처리를 표현할 수도 있다. (Runnable과 유사)

Flux와 Mono 모두 publisher의 구현체다. 그렇다면 둘의 차이점은 무엇일까?

여러 자료를 살펴보니 큰 차이점은 없고, 그저 기대되는 값에 대한 카디널리티를 표현하기 위함인듯하다.

즉, 데이터베이스나 외부 호출시 기대하는 값이 0이나 1개인 Java에서의 Optional 같은 역할이 필요하면 Mono. 반대로 기대하는 값이 0이나 여러개인 Java에서의 List 같은 역할이면 Flux를 사용한다.


💁‍♂️ Fast Publish Flux

Backpressure의 예시를 구현하기위해 이번 글에선 일련의 숫자를 생성하는 간단한 예시를 사용한다.

여러 학습 테스트에 활용하기위해 Flux.generate를 활용하여 1부터 특정 값까지 emit하는 Flux를 구성한다. 대신 여러 테스트를 위해 중간에 sleep 시간을 설정할 수 있도록 코드를 구성했다.

public Flux<Long> produce(long delayBetweenEmits, long upto) {
    return Flux.generate(
            () -> 1L,
            (state, sink) -> {
                sleep(delayBetweenEmits);
                long nextState = state + 1;
                if (state > upto) {
                    sink.complete();
                    return nextState;
                } else {
                    log.info("Emitted {}", state);
                    sink.next(state);
                    return nextState;
                }
            }
    );
}

private void sleep(Long millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

첫 번째 인자는 각 데이터 emit 중간에 sleep할 밀리세컨드 값이며, 두 번째 인자는 1부터 몇까지 데이터를 emit 할 것인지를 나타내는 MAX 값이다.

Subscriber

Flux와 Mono를 통해 Data Stream을 생성하였다면, 이제 해당 Data를 구독해서 Stream 처리를 해야한다.

Reactor에서 subsribe는 굉장히 간단하다. 바로 subscribe() 메서드를 호출해주기만 하면 된다.

이번 글에선 Backpressure를 테스트하기위해 아래와 같이 정보를 읽기 직전에 지연을 발생시키는 형태로 subscribe()를 구현한다.

produce(0L, 100)
    .subscribe(data -> {
        sleep(1000L);
        log.info("Consumed {}", data); // onNext()
    });

예시를 통해 이해하는 Backpressure

이제 본격적으로 여러 시나리오 예시를 통해 Reactor에서의 Backpressure는 어떻게 동작하는지 정리해본다.

동일 스레드 - 빠른 생산, 느린 소비

앞서 생산 속도와 소비 속도를 제어할 수 있는 데이터 스트림 코드를 구성해보았으므로, 해당 코드를 그대로 활용하여 Backpressure를 테스트해본다.

첫 번째 테스트는 동일한 스레드내에서 생산자는 1부터 100까지 초당 1개를 emit하고, 소비자는 지연없이 데이터를 소비하는 시나리오로 구성한다.

produce(1000L, 100)
    .doOnRequest(request -> log.info("# request: {}", request))
    .subscribe(data -> {
        log.info("Consumed {}", data); // onNext()
    });

로그로 결과를 보면 아래와 같다.

[main] INFO com.binghe.backpressure.FP_SC_WITHOUT_THREAD - # request: 9223372036854775807
[main] INFO com.binghe.backpressure.FP_SC_WITHOUT_THREAD - Emitted 1
[main] INFO com.binghe.backpressure.FP_SC_WITHOUT_THREAD - Consumed 1
[main] INFO com.binghe.backpressure.FP_SC_WITHOUT_THREAD - Emitted 2
[main] INFO com.binghe.backpressure.FP_SC_WITHOUT_THREAD - Consumed 2
[main] INFO com.binghe.backpressure.FP_SC_WITHOUT_THREAD - Emitted 3
[main] INFO com.binghe.backpressure.FP_SC_WITHOUT_THREAD - Consumed 3
[main] INFO com.binghe.backpressure.FP_SC_WITHOUT_THREAD - Emitted 4
[main] INFO com.binghe.backpressure.FP_SC_WITHOUT_THREAD - Consumed 4
...
[main] INFO com.binghe.backpressure.FP_SC_WITHOUT_THREAD - Emitted 97
[main] INFO com.binghe.backpressure.FP_SC_WITHOUT_THREAD - Consumed 97
[main] INFO com.binghe.backpressure.FP_SC_WITHOUT_THREAD - Emitted 98
[main] INFO com.binghe.backpressure.FP_SC_WITHOUT_THREAD - Consumed 98
[main] INFO com.binghe.backpressure.FP_SC_WITHOUT_THREAD - Emitted 99
[main] INFO com.binghe.backpressure.FP_SC_WITHOUT_THREAD - Consumed 99
[main] INFO com.binghe.backpressure.FP_SC_WITHOUT_THREAD - Emitted 100
[main] INFO com.binghe.backpressure.FP_SC_WITHOUT_THREAD - Consumed 100

처음에 데이터를 생산하는 속도가 소비하는 속도보다 빠르게 설정했기에, 당연히 Publisher가 먼저 emit하고, 그 후에 Subscriber가 consume 할 것이라고 기대했다.

하지만 실제 로그 결과를 보면 request는 무한임에도, 데이터를 하나하나 emit하고 consume하기를 반복한다.

그 이유는 사실 간단한데, Publisher와 Subscriber가 모두 전체 Stream을 동기식으로 처리하기 때문이다. 이런 경우 당연히 Backpressure는 동작하지 않아도 되므로 기존에 예상한 것과 다르게 동작하는 것이다.

정리하면 동일한 스레드에서 생산과 소비를 모두 수행하는 Stream은 동기식으로 동작하므로, Backpressure가 필요없어 위와 같이 하나하나 처리하게 된다.

서로 다른 스레드 - 빠른 생산, 느린 소비

두번째 테스트는 Publisher와 Subscriber를 서로 다른 스레드에서 독립적으로 동작하도록하는 것이다.

즉, 데이터를 생산하는 스레드와 소비하는 스레드를 다르게 하는 것인데, Reactor에서는 Scheduler라는 이름으로 publish와 subsribe뿐 아니라 각 operator마다 어떤 스레드에서 실행할 것인지 쉽게 설정할 수 있는 API들을 제공한다.

  • subscribeOn() - 이름 그대로 subscribe (구독)이 발생한 직후 실행될 스레드를 지정하는 Operator다.
    • subscribeOn() Operator는 subscribe (구독)이 발생한 직후에, 실행될 스레드를 지정하는 Operator다.
    • Upstream에 대한 스케줄링.
  • publishOn() - Downstream으로 signal을 전송할 때 실행되는 스레드를 지정하는 Operator다.
    • publishOn() Operator는 Downstream으로 signal을 전송할 때 실행되는 스레드를 제어하는 Operator다.
    • Downstream에 대한 스케줄링.

당연히 Backpressure를 테스트하기위함이기에, 생산은 빠르게, 소비는 느리게 테스트를 구성해본다.

produce(0, 1000)
    .doOnRequest(request -> log.info("# request: {}", request))
    .subscribeOn(Schedulers.parallel())
    .publishOn(Schedulers.parallel())
    .subscribe(data -> {
        sleep(500L);
        log.info("Consumed {}", data);
    });

1부터 1000까지의 숫자 데이터를 간격 없이 publish하고, subscribe마다 0.5초 delay하는 예시이며, publishr와 subscribe를 각각 서로 다른 스레드에서 실행되도록 구성했다.

로그로 실행 결과를 보면 아래와 같다.

[parallel-2] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - # request: 256
[parallel-2] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Emitted 1
[parallel-2] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Emitted 2
... Emitted 로그
[parallel-2] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Emitted 255
[parallel-2] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Emitted 256
[parallel-1] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Consumed 1
[parallel-1] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Consumed 2
[parallel-1] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Consumed 3
... Consumed 로그
[parallel-1] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Consumed 191
[parallel-1] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Consumed 192
[parallel-2] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - # request: 192
[parallel-2] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Emitted 257
[parallel-2] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Emitted 258
... Emitted 로그
[parallel-2] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Emitted 447
[parallel-2] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Emitted 448
[parallel-1] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Consumed 193
[parallel-1] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Consumed 194
... Consumed 로그
[parallel-1] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Consumed 383
[parallel-1] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Consumed 384
[parallel-2] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - # request: 192
[parallel-2] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Emitted 449
[parallel-2] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Emitted 450
... Emitted 로그
[parallel-2] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Emitted 499
[parallel-2] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Emitted 500
[parallel-1] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Consumed 385
[parallel-1] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Consumed 386
... Consumed 로그
[parallel-1] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Consumed 499
[parallel-1] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Consumed 500

실행된 순서를 보면 아래와 같다.

  1. publisher에게 request 256개.
  2. publisher는 요청받은 256개만큼 데이터 emit.
    • publisher는 요청을 256개만큼 받았으므로, 256개까지 emit후 다음 request까지 대기.
    • 이때 subscriber도 subscribe중이지만 sleep으로인해 느리게 소비중이므로 로그엔 느리게 출력된다.
  3. consumer가 192개 데이터까지 subscribe후 192개 다시 request.
    • 192개까지 소비후 다시 192개를 요청하는 이유는 prefetch가 동작하기 때문이다.
  4. publisher는 요청받은 192개만큼인 256 + 192 = 448개까지 emit후 다음 request까지 다시 대기.
  5. 데이터 500까지 3번-4번 반복.

위 예시의 처리 과정이 조금 복잡해보이는데, 핵심적인 부분만 보면 아래와 같다.

  • publisher는 요청받은 개수까지만 데이터를 emit하며, 요청받은 데이터를 모두 emit후 다음 요청까지 대기한다.
  • subscriber는 publisher가 emit한 데이터를 특정 버퍼 역할의 Queue에 쌓고 데이터를 consume한다. 그리고 Queue의 데이터가 75%만큼 소비되었다면 publisher에게 이전 요청한 데이터 개수의 75%만큼 다시 request한다.

prefetch와 Queue

위 예시의 동작 과정을 글로 설명하려니 조금 복잡하다. 그림을 통해 처리 과정을 살펴보면 다음과 같다.

backpressure prefetch

한가지 눈에 띄는 부분은 publisher와 subscriber사이에 존재하는 buffer 같은 Queue다.


💁‍♂️ prefetch

나도 학습 테스트와 코드를 살펴보다 왠 Queue가 있지해서 공식 문서를 살펴보니 Reactor는 기본적으로 특정 Operator에 한해서 **보충 최적화 (replenishing optimization)**을 구현한다고한다.

즉, 특정 연산자는 prefetch라는 int 값을 입력 파라미터로 받아, Subscriber가 prefetch 요청의 75%를 완료하고 나면 다시 Upstream에 75%를 요청해나가면서 앞으로의 요청을 미리 예측하는 매커니즘이다.

공식 문서의 설명이 조금 부족하다 생각들어 내가 개인적으로 테스트하면서 이해한 prefetch 도입함으로써 얻는 두 가지의 이점은 다음과 같다.


💁‍♂️ 첫번째는 비동기 처리다

데이터를 생산하는 Upstream은 Queue에 요청받은 데이터만큼 지속적으로 데이터를 삽입하고, 소비하는 Downstream은 Queue에서 데이터를 poll하여 처리하면된다.

이때 Upstream과 Downstream 사이에 Queue가 존재함으로써 두 요소가 처리의 작업 완료 여부를 전혀 체크하지 않아도되기때문에 서로 비동기로 동작할 수 있다.

즉, Upstream과 Downstream이 서로 다른 스레드에서 비동기적으로 동작할 수 있는 것이다.

서버 아키텍처에서 서버간의 메시키 브로커는 두는 이유도 비동기 처리를 위함이다. 여기서도 동일하다고 보면 된다.

이외에도 Queue를 사용함으로써 Queue에 데이터가 많이 쌓였을 때 어떻게 할지에 대한 Backpressure 전략 처리에도 활용할 수 있다.


💁‍♂️ 두번째는 Backpressure다.

Upstream과 Downstream 사이에 buffer를 추가함으로써 미리 emit되는 데이터 양을 예측하고 조절한다는 의미는 Backpressure의 핵심 개념과도 부합한다.

조금 풀어말하자면.. buffer 단위로 Upstream과 Downstream간의 데이터 요청-처리 흐름을 가져갈 수 있다는 것이다.

예를 들어보면, 1000개를 처리해야하는 상황에서 Upstream에 매번 request(1)로 하나하나 요청하는 것이 아니라, reqeust(256)과 같이 여러 데이터를 한번에 emit 해달라고 요청하고 처리함으로써 1개가 아닌 다량의 데이터 수 단위로 Backpressure를 적용할 수 있다.


🤔 Reactor는 무조건 위 Queue를 만들어 Upstream와 Downstream이 데이터를 주고 받는가?

내가 처음 Reactor에 대해서 학습 테스트하고 코드 살펴보면서 헷갈린 부분인데, 답은 아니다다.

즉, 일부 연산자에서만 prefetch 매커니즘을 위해 Queue를 사용한다.

그리고 그 대표적인 예시가 바로 publishOn()flatMap()이다.

실제로 publishOn 메서드를 보면 아래와 같이 prefetch를 매개변수로 받는다. (디폴트는 256이다)

publisheOn

그리고 publishOn의 Subsrciber 구현체를보면 아래와 같이 prefetch 값과 Queue를 가진다.

publishOnSubscriber

당연히 publishOn은 Scheduler 역할이 핵심이므로, Task Schedule을 위한 다양한 속성들도 가지고있다.

참고로 Reactor 생명주기에서도 다뤘듯이, Reactor는 데코레이터패턴과 유사하게 여러 Operator를 연결한다.

서로 다른 스레드 - 빠른 생산, 멀티 스레드 소비

앞서 살펴보았던 예시를 잘 살펴보면 Thread-1Thread-2만 동작하는 것을 볼 수 있다.

즉, Publisher 스레드 1개, Subscriber 스레드 1개인 것인데, 사실 빠른 처리를 위해선 Subscriber를 더 늘려 처리 속도를 더 빠르게 할 수 있다.

앞서 살펴보았던 publishOn()subscribeOn()은 모두 다른 스레드에 처리를 위임하고 스케줄링할 뿐, 병렬적으로 처리를 하진 않는다.

Reactor를 사용할 때 이렇게 병렬적인 처리를 위해 보통 2가지 방법을 사용한다.

  1. parallel Operator 사용
  2. flatMapsubscribeOn함께 사용.

💁‍♂️ parallel Operator

produce(0, 500)
    .doOnRequest(request -> log.info("# request: {}", request))
    .subscribeOn(Schedulers.parallel())
//                .publishOn(Schedulers.parallel())
    .parallel(5)
    .runOn(Schedulers.parallel())
    .subscribe(data -> {
        sleep(100L);
        log.info("Consumed {}", data); // onNext()
    });

위와 같이 parallel Operator로 병렬 처리한다는 API를 사용하면 내부에서 publishOn과 동일하게 Queue를 하나 만들어 Upstream의 데이터를 Queue에 받는다.

그리고 runOn Operator로 해당 Queue 데이터를 어떤 스레드 풀내 스레드가 처리할 것인지 설정할 수 있다.

실제 처리 결과를 보면 아래와 같이 consume의 로그가 서로 다른 스레드로 찍힌다.

...
[parallel-2] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Consumed 472
[parallel-4] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Consumed 474
[parallel-1] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Consumed 471
[parallel-5] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Consumed 475
[parallel-2] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Consumed 477
[parallel-3] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Consumed 478
[parallel-4] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Consumed 479
[parallel-1] INFO com.binghe.backpressure.FP_SC_WITH_THREAD - Consumed 476
...

parallel() Operator도 코드를 살펴보면 publishOn과 동일하게 prefetch 매커니즘을 사용한다.


💁‍♂️ flatMap Operator

produce(0, 500)
    .doOnRequest(request -> log.info("# request: {}", request))
    .subscribeOn(Schedulers.parallel())
    .publishOn(Schedulers.parallel())
    .flatMap(value -> Mono.fromSupplier(() -> {
        sleep(100L);
        log.info("Consumed {}", value); // onNext()
        return null;
    }).subscribeOn(Schedulers.parallel()), 10)
    .subscribe();

위와 같이 flatMap을 이용해도 생산된 숫자를 소비하는 작업이 병렬적으로 동작한다.

이번 글은 flatMap에 대해서 알아보는 것이 목적이 아니므로, 여기선 flatMap이 그저 병렬로 동작한다라고만 이해하면 좋을 듯 하다.

Backpressure 전략

Backpressure를 직접 테스트하면서 어떻게 동작하는지 이해하다보면 한가지 의문이 생긴다. 바로 256인 request의 개수가 너무 작다는 것이다. 이로인해 Queue에 공간이 많이 남았음에도 publisher가 wait 하는 경우가 많을 수도 있다는 것이다.

backpressure strategy before

데이터 생산의 비율을 높이는 방법은 당연히 prefetch의 수를 높이는 것이다. 즉, Integer.MAX로해서 publisher가 끊임없이 데이터를 생산하도록 하는 것이다.

문제는 이 경우 소비자의 소비 속도가 생산 속도를 따라가지못하면 Queue에 메모리가 지속적으로 쌓여 OOM (Out Of Memory)가 발생할 수 있다는 것이다.

그리하여 Reactor에서는 이런 경우는 대비해 여러 Backpressure 전략을 제공한다.

  • IGNORE
    • Backpressure 전략을 사용하지않는 것으로 아무 설정 안하면 적용되는 전략이다.
    • 이 전략은 위에서 언급한대로, 생산이 너무 빨라 OOM이 발생할 수 있다.
  • ERROR - onBackpressureError()
    • 에러 전략으로 Downstream의 처리 속도가 느려 Upstream의 emit 속도를 따라가지못하는 경우 IllegalStateException을 발생시키는 전략이다.
    • 이때 에러가 발생한 경우 Publisher는 error signal을 Subscriber에게 전송후 데이터를 폐기한다.
  • DROP - onBackpressrueDrop()
    • DROP 전략으로 Downstream으로 전달된 데이터의 버퍼가 가득찬 경우, 버퍼 밖에서 대기중인 데이터중 먼저 emit되었던 데이터부터 DROP 시키는 전략이다.
  • LATEST - onBackpressureLatest()
    • LATEST 전략으로 Downstream으로 전달된 데이터의 버퍼가 가득찬 경우, 버퍼 밖에서 대기중인 데이터중 데이터가 들어올 때마다 이전에 유지하고있던 데이터를 폐기하고, 최근에 emit된 데이터부터 버퍼에 채우는 전략이다.
  • BUFFER - onBackpressureDrop()
    • DROP, LATEST 전략은 모두 버퍼가 가득찼을 때 버퍼 밖의 데이터를 폐기하는 전략이었다면, BUFFER 전략은 버퍼가 가득찼을 때 버퍼 안의 데이터를 폐기하는 전략이다.
    • 두 가지 전략을 제공한다.
      • DROP_LATEST - Downstream으로 전달된 데이터가 버퍼에 가득 찰 경우 이후에 emit된 데이터를 DROP하는 전략이다. 즉, 최근에 emit되어 버퍼에 들어온 최근 데이터를 드랍한다.
      • DROP_OLDEST - Downstream으로 전달된 데이터가 버퍼에 가득 찰 경우 이후에 emit된 데이터중 가장 오래된 데이터부터 DROP하는 전략이다. 즉, 버퍼 데이터중 가장 오래된 데이터부터 드랍한다.

마치며

이번 글은 직접 다양한 사례를 통해 Reactor가 어떻게 Backpressure를 구현하고 있는지 살펴보았다. 그중에서도 prefetch를 통해 비동기 및 Backpressure 구현하는 것이 가장 인상깊다.

그리고 Reactive Streams의 처리는 trade-off가 있다는 것도 깨닫게 된 것 같다.

개인적으로 Reactor는 기존의 MVC 처리보다 학습 곡선이 훨씬 높다는 것도 다시 한번 느낀 것 같다.


참고


© mark-kim.blog Built with Gatsby