thumbnail
예시를 통해 Reactor Sinks 이해하기
Reactive / Non-blocking / Reactor / Webflux
2024.12.16.

들어가며

최근 개발하고있는 네트워크 라이브러리에 Reactive기반의 Non-Blocking 처리를 추가하기위해 Reactive Streams를 살펴보고있다.

구현체는 Reactor를 사용할 예정인데, Reactor의 문서나 코드를 살펴보다보니 Sinks를 자주 접하게 되었다.

개인적으로 Mono/Flux 다음으로 자주보는 느낌인데.. Sinks에 대해서 제대로 이해하지못하는 것 같아 이참에 여러 에시를 통해 내가 이해한 내용을 정리해고자한다.

이번 글은 예시를 통해 Reactor Project의 Sinks를 정리해본다.

Sinks란 무엇이며, 왜 존재하는 것일까?

우선 공식문서를 살펴보고 이해해보자면..

공식 문서를 살펴면 아래와 같이 설명하고있다.

In Reactor a sink is a class that allows safe manual triggering of signals in a standalone fashion, creating a Publisher-like structure capable of dealing with multiple Subscriber (with the exception of unicast() flavors).

해석하면 아래와 같이 설명한다.

여러 구독자를 처리할 수 있는 Publisher와 유사한 구조를 가지며, 수동으로 Reactive Streams의 signal을 트리거할 수 있는 클래스

또 Sinks의 코드레벨에선 아래와 같이 설명하고있다.

중요한 첫 부분만 해석하면 아래와 같이 설명한다.

Sinks는 Flux와 Mono의 의미 체계를 이용하여 Reactor Stream의 Signal들을 프로그래밍적으로 Push할 수 있게 해주는 구성체이다.

위 두 설명과 여러 자료를 바탕으로 봤을 때 Sinks의 개념과 역할은 아래와 같다.

  • Reactive Streams에서 발생하는 signal을 Publisher (Mono, Flux)에서 직접 호출하여 데이터를 Push할 수 있다.
    • Sinks는 데이터를 emit, error, complete등의 signal을 Subscriber에게 전송할 수 있는 진입점이다.
    • 프로그래밍적으로 Publisher 역할 + signal 수동 호출 가능
  • Reactive Streams에서 발생하는 signal을 프로그래밍적으로 Push 할 수 있는 기능을 가지고있는 일종의 Publisher이다.
    • Publisher가 프로그래밍적으로 Push 할 수 있다는 의미는, Subscriber의 구독 없이도 기존 구독자에게 새로운 데이터를 Push 할 수 있다는 의미이다.
  • 절차형으로 작성된 로직이나 외부 이벤트를 Reactive 파이프라인으로 통합할 수 있다.
    • 절차형으로 작성된 로직으로 Reactive 파이프라인에 데이터를 Push하고, signal을 호출함으로써 Reactive 프로그래밍 세계와 일반적인 절차형 프로그래밍 세계의 통합을 할 수 있게한다.
  • Sinks는 Sinks.Many 인터페이스를 통해 멀티 스레드 환경에서도 Thread-Safe하게 signal을 발생시킨다.

정리하면 Sinks가 존재하는 이유는 2가지로 볼 수 있을 듯하다.

  1. Subscriber의 동의 없이도 Publisher 입장에서 데이터를 내보내고 error, complete등등의 signal을 전송하기 위함.
  2. 절차형으로 작성된 일반 프로그래밍 코드를 Reactive 파이프라인에 접목하기위한 진입점 역할. (Non-Reactive Source Integration)

물론 멀티 스레드 환경에서 데이터를 emit하는 환경에서의 Thread-Safe도 존재 이유이다.

Sinks가 없다면 -> Reactive Streams의 기본적인 실행 흐름

공식 문서만을 봤을땐 사실 바로 이해가 되지않았다. 그래서 Sinks가 없다고 가정하고 기존의 Reactive Streams의 실행 흐름을 통해 Sinks가 왜 필요한지 살펴보았다.


💁‍♂️ Reactive Streams는 Subscriber 요청에 의해서만 데이터 흐름이 진행된다.

Reactive Streams는 Reactive 시스템이 어떻게 동작해야하는지를 정의해놓은 표준 스펙이다.

Reactive와 Reactive Streams 이해하기에서 얘기했듯이, Reactive Streams는 아래와 같은 흐름으로 동작한다.

Reactive Streams 실행 흐름
Reactive Streams 실행 흐름

코드로 간단히 예시를 작성해보면 아래와 같다.

Flux.range(0, 10)
    .log()
    .subscribe();


// 실행 결과
onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
request(unbounded)
onNext(0)
onNext(1)
onNext(2)
onNext(3)
onNext(4)
onNext(5)
onNext(6)
onNext(7)
onNext(8)
onNext(9)
onComplete()

이때 Reactive Streams의 구현체인 Reactor는 기본적으로 "Nothing Happens Until subscribe."를 강조하며, Reactive 체인을 작성한다고해서 바로 데이터를 공급하지 않고 subscribe(구독)을 해야 전체 체인에 데이터 흐름이 트리거된다.

트리거되면 위 예시에서 볼 수 있듯이, onNext, onError, onComplete 등을 스펙에 따라 호출하면서 체인이 진행된다. (Reactive 세계)

여기서 중요한 점은 subscriber가 subscribe (구독)할 때야 비로소 전반적인 흐름이 진행된다는 것이다. 즉, Subscriber의 요구에 따라서만 Publisher가 데이터를 emit한다.

그래서 실제로 아래 코드를 실행하면 아무런 결과도 나오지 않는다.

Flux.range(0, 10)
    .log()

당연히 Subscriber의 요구가 없기 때문에 아무런 일도 일어나지 않는 것이며, Reactive Streams에선 Assembly 과정만 거쳤다고 볼 수 있다.

그리고 기본적인 Reactive Streams의 실행 흐름에선 Publisher (발행처) 쪽에서 next, complete, error등의 signal을 직접 프로그래밍적으로 호출 할 수 없다.

Sinks를 사용한다면 -> Subscriber의 요구없이도 데이터를 emit하고 signal을 보낼 수 있다

기본적인 Reactive Streams 스펙의 처리 과정과 다르게 Sinks를 사용하면 Subscriber의 요구없이도 Publisher에서 데이터를 emit하고 signal을 프로그래밍적으로 보낼 수 있다.

아래부터는 예시를 들면서 Sinks가 필요한 상황을 살펴본다.


💁‍♂️ Sinks가 없다면, Subscriber는 구독이후에 Publisher에서 발생한 데이터의 변화를 감지하지못하고 매번 재 구독을 해줘야한다.

Sinks가 없다면

List<Integer> source = new ArrayList<>();

source.addAll(List.of(1, 2, 3, 4, 5));

Flux.fromIterable(source)
        .collectList()
        .log()
        .subscribe(System.out::println);

Flux.fromIterable(source)
        .collectList()
        .log()
        .subscribe(System.out::println);

// Publisher 데이터 소스에 데이터가 추가되어도 subscriber에게 전송하지 못한다. 매번 데이터는 정해져있고 거기에 구독자를 추가해야한다.
source.addAll(List.of(6, 7, 8, 9));

// 결과
[main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
[main] INFO reactor.Mono.CollectList.1 - | onSubscribe([Fuseable] MonoCollectList.MonoCollectListSubscriber)
main] INFO reactor.Mono.CollectList.1 - | request(unbounded)
[main] INFO reactor.Mono.CollectList.1 - | onNext([1, 2, 3, 4, 5])
[1, 2, 3, 4, 5]
[main] INFO reactor.Mono.CollectList.1 - | onComplete()
[main] INFO reactor.Mono.CollectList.2 - | onSubscribe([Fuseable] MonoCollectList.MonoCollectListSubscriber)
[main] INFO reactor.Mono.CollectList.2 - | request(unbounded)
[main] INFO reactor.Mono.CollectList.2 - | onNext([1, 2, 3, 4, 5])
[1, 2, 3, 4, 5]
[main] INFO reactor.Mono.CollectList.2 - | onComplete()

위와 같이 데이터 소스에 데이터가 추가되어도 subscriber들에게 데이터 소스를 추가로 전송하지 못한다. 심지어 처음 구독한 시점의 데이터가 모두 emit되면 Publisher는 자연스레 complete를 전송해버린다.

즉, Publisher 입장에서 새로운 데이터를 emit하고싶어도 구독한 시점의 정해진 데이터만을 emit 할 수 있다.


💁‍♂️ Sinks가 있다면, Publisher는 자신의 관점에서 변경된 데이터나 signal을 Subscriber들에게 보낼 수 있다

Sinks를 사용한다면

List<String> source = new ArrayList<>();

source.addAll(List.of("1", "2", "3", "4", "5"));

Sinks.Many<List<String>> sink = Sinks.many().multicast().directBestEffort(); // 발행자

sink.asFlux().log().subscribe(data -> System.out.println("1번 구독자: " + data)); // 구독자 추가
sink.asFlux().log().subscribe(data -> System.out.println("2번 구독자: " + data)); // 구독자 추가
sink.tryEmitNext(source); // 데이터 발행.

System.out.println("\n--- Publisher는 Subscriber의 동의없이도 추가된 데이터를 emit 한다 ---\n");

source.addAll(List.of("6", "7", "8")); // 데이터 추가
sink.tryEmitNext(source); // 데이터 발행.
sink.tryEmitComplete(); // Complete (signal) 호출

// 결과
[main] INFO reactor.Flux.SinkManyBestEffort.1 - onSubscribe(SinkManyBestEffort.DirectInner)
[main] INFO reactor.Flux.SinkManyBestEffort.1 - request(unbounded)
[main] INFO reactor.Flux.SinkManyBestEffort.2 - onSubscribe(SinkManyBestEffort.DirectInner)
[main] INFO reactor.Flux.SinkManyBestEffort.2 - request(unbounded)
[main] INFO reactor.Flux.SinkManyBestEffort.1 - onNext([1, 2, 3, 4, 5])
1번 구독자: [1, 2, 3, 4, 5]
[main] INFO reactor.Flux.SinkManyBestEffort.2 - onNext([1, 2, 3, 4, 5])
2번 구독자: [1, 2, 3, 4, 5]

--- PublisherSubscriber의 동의없이도 추가된 데이터를 emit 한다 ---

[main] INFO reactor.Flux.SinkManyBestEffort.1 - onNext([1, 2, 3, 4, 5, 6, 7, 8])
1번 구독자: [1, 2, 3, 4, 5, 6, 7, 8]
[main] INFO reactor.Flux.SinkManyBestEffort.2 - onNext([1, 2, 3, 4, 5, 6, 7, 8])
2번 구독자: [1, 2, 3, 4, 5, 6, 7, 8]
[main] INFO reactor.Flux.SinkManyBestEffort.1 - onComplete()
[main] INFO reactor.Flux.SinkManyBestEffort.2 - onComplete()

눈 여겨봐야할 부분은 Sinks를 사용하면 Subscriber가 아닌 Publisher가 Subscriber에게 보낼 데이터와 signal을 직접 호출하여 컨트롤한다는 것이다.

즉, Subscriber의 동의없이 Publisher 입장에서 Reactive Streams의 전반적인 흐름을 결정한다. Publisher의 생명주기에 맞춰 Reactive 세계에 신호를 보낸다는 것.

그렇다보니 구독 이후에도 Publisher에서 데이터가 추가되었거나 변화가 감지되었으면 구독자들에게 데이터를 추가로 전송할 수 있게된다.

이제 공식 문서에서 말한 아래 내용이 이해가 될 것이다.

Subscriber의 동의 없이도 Publisher 입장에서 데이터를 내보내고 error, complete등등의 signal을 전송하기 위함.

그리고 Sinks라는 이름에서도 알 수 있듯이, 비유해서 말하자면 싱크대와 같다.

Created By Chat-GPT
Created By Chat-GPT

여기서 물은 데이터이며, 물을 받는 것은 Subscriber, 물을 공급하는 수도꼭지(Publisher)가 Sinks인 것이다.

현실세계에서의 싱크대 수도꼭지의 on/off에 따라 물 (데이터)가 흐르는 것과 동일하다.


Sinks는 절차형 프로그래밍 세계와 Reactive 세계의 브릿지 역할을 주로 한다

앞서 Sinks를 사용하면 Publisher에서 Reactive Streams의 흐름을 조절한다고 하였는데, Sinks는 그외에도 절차형 프로그래밍 세계와 Reactive 세계의 브릿지 역할을 수행한다.

대표적인 예시가 바로 Spring Webflux의 기본 네트워크 구현체인 Reactor-Netty이다.

Netty는 기본적으로 NIO 기반의 Non-Blocking으로 동작하지만 절차형 프로그래밍으로 Pipeline내 Handler 코드를 작성한다.

이때 Pipeline내 비즈니스 로직을 동일하게 Non-Blocking으로 처리하기위해 Reactor와 연결하는데 이를 위해선 절차형 프로그래밍 (Netty)과 Reactive (Reactor)를 연결해야한다.

두 라이브러리의 브릿지 역할로는 앞서 말한 Reactor-Netty라는 라이브러리가 존재한다.

그리고 Reactor-Netty의 내부 구현을 살펴보면 실제로 Sinks를 사용하여 절차형 프로그래밍과 Reactive 프로그래밍을 연결하고있다.

이제 공식 문서에서 말한 아래 내용이 이해가 될 것이다.

절차형으로 작성된 일반 프로그래밍 코드를 Reactive 파이프라인에 접목하기위한 진입점 역할.

Sinks 종류와 사용 예시

어느정도 Sinks에 대한 역할을 이해했으니, 이제 Reactor의 Sinks는 어떤 종류가 있고 어떻게 사용되지는지 간단히 살펴본다.

Reactor에서의 Sinks는 크게 세 가지가 존재한다.

  • Sink.Many (Continuous Flow)
    • 여러 건의 데이터를 여러 가지 방식으로 전송하는 기능을 정의해 둔 기능 명세.
    • 종류
      • unicast -> n message : 1 subscriber
      • multicast -> n message : n subscribers (Hot Publisher -> 구독 이후 발행한 메세지부터 받는다)
      • multicast replay = n message : n subscribers (Cold Publisher -> 구독 이전 발행 한 메세지도 받는다)
  • Sink.One (Single Pour)
    • 한 건의 데이터를 전송하는 방법을 정의해 둔 기능 명세.
    • emit 된 데이터 중에서 단 하나의 데이터만 Subscriber에게 전달한다. 나머지 데이터는 Drop 된다.
    • 1 message : n subscribers
  • Sink.Empty (No Water): It is similar to a faucet which cannot fill anything at all but either goes dry or encounters some problems (lack of water flowing through).

이제 Reactor의 Sinks는 어떤 종류가 있고 어떻게 사용되지는지 간단히 살펴본다.

Sinks One

Sinks One은 한 건의 데이터를 전송하는 방법을 정의해 둔 기능 명세이다.

즉, 1 message : n subscribers 형태일 때 사용된다. (아래 예시를 보면 바로 이해가 될 것이다)

Sinks.One<String> sink = Sinks.one();
Mono<String> mono = sink.asMono();

mono.log().subscribe(data -> System.out.println("1번 구독자: " + data)); // 구독자 추가
mono.log().subscribe(data -> System.out.println("2번 구독자: " + data)); // 구독자 추가

sink.tryEmitValue("1");
sink.tryEmitValue("2");

// 결과
[main] INFO reactor.Mono.SinkOneMulticast.1 - onSubscribe(SinkOneMulticast.NextInner)
[main] INFO reactor.Mono.SinkOneMulticast.1 - request(unbounded)
[main] INFO reactor.Mono.SinkOneMulticast.2 - onSubscribe(SinkOneMulticast.NextInner)
[main] INFO reactor.Mono.SinkOneMulticast.2 - request(unbounded)
[main] INFO reactor.Mono.SinkOneMulticast.1 - onNext(1)
1번 구독자: 1
[main] INFO reactor.Mono.SinkOneMulticast.1 - onComplete()
[main] INFO reactor.Mono.SinkOneMulticast.2 - onNext(1)
2번 구독자: 2
[main] INFO reactor.Mono.SinkOneMulticast.2 - onComplete()

당연히 1 message 만을 다루기때문에 Mono를 Publisher로 사용하며, 각 구독자별로 1개의 메시지를 받으면 complete가 호출된다.

emitValue, FAIL_FAST, asMono 등의 내용은 API 문서를 확인해보면 바로 어떤 의미인지 나오기때문에 여기선 다루지 않는다.

Sinks Many

Sinks Many는 여러 건의 데이터를 여러 가지 방식으로 전송하는 기능을 정의해 둔 기능 명세이다.

3가지의 스펙이 존재하는데 하나하나 코드를 통해 살펴본다.

Unicast -> n message : 1 subscriber

Sinks.Many<String> sinks = Sinks.many().unicast().onBackpressureBuffer();

Flux<String> flux = sinks.asFlux();

flux.log().subscribe(data -> System.out.println("1번 구독자: " + data)); // 구독자 추가

sinks.tryEmitNext("1");
sinks.tryEmitNext("2");
sinks.tryEmitNext("3");

// 결과
[main] INFO reactor.Flux.SinkManyUnicast.1 - | onSubscribe([Fuseable] SinkManyUnicast)
[main] INFO reactor.Flux.SinkManyUnicast.1 - | request(unbounded)
[main] INFO reactor.Flux.SinkManyUnicast.1 - | onNext(1)
1번 구독자: 1
[main] INFO reactor.Flux.SinkManyUnicast.1 - | onNext(2)
1번 구독자: 2
[main] INFO reactor.Flux.SinkManyUnicast.1 - | onNext(3)
1번 구독자: 3

Sinks.one과 다르게 n개의 메시지이므로, next로 메시지를 보내도 complete를 호출하지 않는다. 따로 trytryEmitComplete()를 호출해주어야한다.

그리고 갑자기 많은 데이터를 emit하면 FAIL_NON_SERIALIZED 발생할 수 있다고한다. 이때는 sink.next 사용시 재시도 로직을 EmitFailureHandler에 정의해주는 것이 좋다.

Multicast -> n message : n subscribers

Multicast는 n message : n subscribers 형태인데, 주의할 점은 Hot Publisher라는 것이다.

즉, 구독 이후 발행한 메세지부터 받는다

Sinks.Many<String> sinks = Sinks.many().multicast().directBestEffort();

sinks.tryEmitNext("1"); // 구독자 없을 때 방출 한번.

Flux<String> flux = sinks.asFlux();
// 구독자 추가
flux.log().subscribe(data -> System.out.println("1번 구독자: " + data));

// 구독자 추가후 방출 두 번.
sinks.tryEmitNext("2");
sinks.tryEmitNext("3");

// 구독자 추가
flux.log().subscribe(data -> System.out.println("2번 구독자: " + data));

// 구독자 추가후 방출 두 번.
sinks.tryEmitNext("4");
sinks.tryEmitNext("5");

// 결과
[main] INFO reactor.Flux.SinkManyBestEffort.1 - onSubscribe(SinkManyBestEffort.DirectInner)
[main] INFO reactor.Flux.SinkManyBestEffort.1 - request(unbounded)
[main] INFO reactor.Flux.SinkManyBestEffort.1 - onNext(2)
1번 구독자: 2
[main] INFO reactor.Flux.SinkManyBestEffort.1 - onNext(3)
1번 구독자: 3
[main] INFO reactor.Flux.SinkManyBestEffort.2 - onSubscribe(SinkManyBestEffort.DirectInner)
[main] INFO reactor.Flux.SinkManyBestEffort.2 - request(unbounded)
[main] INFO reactor.Flux.SinkManyBestEffort.1 - onNext(4)
1번 구독자: 4
[main] INFO reactor.Flux.SinkManyBestEffort.2 - onNext(4)
2번 구독자: 4
[main] INFO reactor.Flux.SinkManyBestEffort.1 - onNext(5)
1번 구독자: 5
[main] INFO reactor.Flux.SinkManyBestEffort.2 - onNext(5)
2번 구독자: 5

예시에서 알 수 있듯이, Hot Publisher이다. 또한 Many이기 때문에 마찬가지로 complete를 프로그래밍적으로 호출해주지않으면 자동으로 호출되지 않는다.

Multicast Replay -> n message : n subscribers

Multicast Replay는 동일하게 n message : n subscribers 형태인데, 이름에서 알 수 있듯이 Cold Publisher라는 것이다.

즉, 구독 이전 발행 한 메세지도 받는다.

Sinks.Many<String> sinks = Sinks.many().replay().all();

sinks.tryEmitNext("1"); // 구독자 없을 때 방출 한번.

Flux<String> flux = sinks.asFlux();
// 구독자 추가
flux.log().subscribe(data -> System.out.println("1번 구독자: " + data));

// 구독자 추가후 방출 두 번.
sinks.tryEmitNext("2");
sinks.tryEmitNext("3");

// 구독자 추가
flux.log().subscribe(data -> System.out.println("2번 구독자: " + data));

// 구독자 추가후 방출 두 번.
sinks.tryEmitNext("4");
sinks.tryEmitNext("5");

// 결과
INFO reactor.Flux.SinkManyReplayProcessor.1 - | onSubscribe([Fuseable] SinkManyReplayProcessor.ReplayInner)
INFO reactor.Flux.SinkManyReplayProcessor.1 - | request(unbounded)
INFO reactor.Flux.SinkManyReplayProcessor.1 - | onNext(1)
1번 구독자: 1
INFO reactor.Flux.SinkManyReplayProcessor.1 - | onNext(2)
1번 구독자: 2
INFO reactor.Flux.SinkManyReplayProcessor.1 - | onNext(3)
1번 구독자: 3
INFO reactor.Flux.SinkManyReplayProcessor.2 - | onSubscribe([Fuseable] SinkManyReplayProcessor.ReplayInner)
INFO reactor.Flux.SinkManyReplayProcessor.2 - | request(unbounded)
INFO reactor.Flux.SinkManyReplayProcessor.2 - | onNext(1)
2번 구독자: 1
INFO reactor.Flux.SinkManyReplayProcessor.2 - | onNext(2)
2번 구독자: 2
INFO reactor.Flux.SinkManyReplayProcessor.2 - | onNext(3)
2번 구독자: 3
INFO reactor.Flux.SinkManyReplayProcessor.1 - | onNext(4)
1번 구독자: 4
INFO reactor.Flux.SinkManyReplayProcessor.2 - | onNext(4)
2번 구독자: 4
INFO reactor.Flux.SinkManyReplayProcessor.1 - | onNext(5)
1번 구독자: 5
INFO reactor.Flux.SinkManyReplayProcessor.2 - | onNext(5)
2번 구독자: 5

예시에서 알 수 있듯이, Cold Publisher이다.

마치며

이번 글은 공부하고 실제 네트워크 라이브러리를 만들면서 사용했던 Sinks에 대해서 내가 이해한대로 정리해보았다.

다음 글은 Reactor-Netty에 대해서 다룰 예정이다.


참고

  • Sinks Docs

© mark-kim.blog Built with Gatsby