들어가며
Reactive Streams 표준 스펙을 구현한 Reactor는 Reactive 시스템이 가지고있어야하는 특징인 Non-Blocking, Backpressure, Functional API을 모두 구현하고있다.
기본적으로 선언형을 지향하는 특징때문에 손쉽게 위 특징들을 실행할 수 있다. 다만 사용하다보면 점차 이게 정확히 어떻게 동작하는지 이해 안될 때가 많다. 그러다보니 테스트 코드를 만들거나 각 단계별 hook (doOnSubscribe, doOnNext등등)을 만들어 로그를 찍어 이해하려고 노력을 많이한다. 적어도 나는 지금도 그러고있다. 그와중에 Spring blog에 생명주기 관련된 글이 있어 공부하다 내가 이해한 내용을 정리하고자 이 글을 작성하게되었다.
바로 본론으로 넘어가자면 Reactor에선 Reactive Streams를 아래와 같이 3단계의 생명 주기로 구현하고있다.
-
Assembly-time - 조립
-
Subscription-time - 구독
-
Runtime - 실행
첫번째. Assembly-time
💁♂️ Nothing Happens Until subscribe.
Reactor를 사용하면 아래와 같이 쉽게 Reactive Streams 흐름을 구성할 수 있다.
Flux.range(1, 10) // 1부터 10까지 숫자 emit
.filter(data -> data % 2 == 0) // 짝수만 필터링
.map(data -> data * 2) // 2배로 곱하기
그런데 위 코드를 실제 실행해보면 아무 일도 일어나지않는다. 그리고 이를 Reactor 공식 문서에선 "Nothing Happens Until subscribe."
라고 명시하고있다.
쉽게 말해 publisher의 구현체인 Flux와 Mono를 사용하면 메서드 체인으로 다양한 operator와 함께 간단히 sequence를 구성할 수 있지만, 이 체인을 작성한다고해서 바로 데이터를 공급하진 않는다는 의미다. 대신 subscribe (구독)을 해야 publisher와 subscriber가 연결되어 전체 체인에 데이터 흐름이 트리거된다.
💁♂️ 각 Operator API는 모두 Publisher이면서 Subscriber다
위에서 언급했듯이, Reactor에선 subscribe()
메서드를 호출해야 실제 데이터 emit이 트리거된다.
그리고 구독하기전까지의 단계를 Assembly-time이라 부르며, Reactor에서 제공하는 선언형 체인 API를 사용하여 Flux와 Mono를 만들 수 있다.
Flux.range(1, 10)
.filter(data -> data % 2 == 0)
.map(data -> data * 2)
위 코드를 보면 빌더 패턴을 떠올릴 수 있는데, 사실 위 API들은 모두 immutable한 Flux나 Mono로 Assemble (조립)된다. 즉 위 코드는 아래와 같이 동작한다고 볼 수 있다.
Flux<Integer> fluxRange = Flux.range(1, 10);
Flux<Integer> fluxFilter = fluxRange.filter(data -> data % 2 == 0);
Flux<Integer> fluxMap = fluxFilter.map(data -> data * 2);
그리고 각 API들은 아래 filter 코드에서 볼 수 있듯이, 각 타입의 Flux 객체를 생성하고 다음 체인 Flux 객체의 생성자에 이전 Flux를 참조할 수 있도록 파라미터로 전달한다.
조금 더 쉽게 이해하자면 아래와 같이 구성된다고 볼 수 있다.
FluxMap(
FluxFilter(
FluxRange(1, 10)
)
)
위 코드를 보면 하나 생각나는게있다. 바로 프록시의 데코레이터 패턴과 유사하다는 것이다. 즉, 동작만 보면 FluxRange -> FluxFilter -> FluxMap
이지만, 조금 더 자세히보면 아래와 같이 전파되면서 동작한다.
FluxRange (publisher) <-> FluxFilter (subscriber, publisher) <-> FluxMap (subscriber, publisher)
- FluxRange
- Downstream FluxFilter의 Publisher
- FluxFilter
- Upstream FluxRange의 Subscriber
- Downstream FluxMap의 Publisher
- FluxMap
- Upstream FluxFilter의 Subscriber
- (추후에 구독될) Downstream Subscriber의 Publisher
정리하면 각 operator API 모두 Upstream의 Subscriber이면서, Downstream의 Publisher인 것이다. 그리고 이렇게 데코레이터처럼 연결되는 구조는 조립 단계에서 설계된다.
코드를 살펴보면 더욱 쉽게 이해할 수 있다.
FluxFilter.onNext 코드를 보면 굉장히 명시적으로 Reactive Streams의 onNext와 request signal 보내는 것을 확인할 수 있다. 그리고 아래와 같이 직접 로깅으로 실행해보면 예상한대로 동작한다는 것을 확인할 수 있다.
Flux.range(1, 10)
.doOnNext(data -> log.info("range -> filter onNext: {}", data))
.filter(data -> data % 2 == 0)
.doOnNext(data -> log.info("filter -> map onNext: {}", data))
.map(data -> data * 2)
.doOnNext(data -> log.info("map -> subscriber onNext: {}", data))
.subscribe(data -> log.info("# onNext: {}", data));
// log
range -> filter onNext: 1
range -> filter onNext: 2
filter -> map onNext: 2
map -> subscriber onNext: 4
# onNext: 4
range -> filter onNext: 3
range -> filter onNext: 4
filter -> map onNext: 4
map -> subscriber onNext: 8
# onNext: 8
range -> filter onNext: 5
range -> filter onNext: 6
filter -> map onNext: 6
map -> subscriber onNext: 12
# onNext: 12
range -> filter onNext: 7
range -> filter onNext: 8
filter -> map onNext: 8
map -> subscriber onNext: 16
# onNext: 16
range -> filter onNext: 9
range -> filter onNext: 10
filter -> map onNext: 10
map -> subscriber onNext: 20
# onNext: 20
🤔 조립 단계의 동작 방식이 중요한 이유
위 구조가 중요한 이유는 각 stream의 단계별로 Reactive Streams의 특징을 적용할 수 있다는 것이다.
예를 들어, 특정 operator만 다른 스레드에서 동작 시킬 수 있으며 (Scheduler), concatWith
처럼 각각 FluxConcatArray를 만들어 체이닝하지 않고, 하나의 FluxConcatArray를 만들어서 전체적인 성능을 향상시킬 수도 있다.
두번째. Subscription-time
조립 단계에서 비동기 pipeline을 Flux 혹은 Mono로 만들었지만, 앞서 말했듯이 subscribe(구독)하기전엔 data의 흐름은 트리거되지않는다.
또 한번 강조: “Nothing Happens Until You Subscribe”
subscribe한다는 것은 “이 파이프라인은 데이터 변환을 나타내며, 이제 해당 데이터의 최종 형태에 관심있다”라고 말하는 행위와 같으며, 조금 다르게 해석하면 “설계한 파이프라인 체인을 따라 subscribe() 메서드를 downstream에서 upstream으로 전파해라”와 같다.
즉, 위에서 살펴본 예시 코드를 아래와 같이 subscribe 한다면
Flux.range(1, 10)
.filter(data -> data % 2 == 0)
.map(data -> data * 2)
.subscribe(data -> log.info("# onNext: {}", data));
가장 Downstream의 Subscriber입장에서 아래와 같이 호출한다고 볼 수 있다.
fluxMap.subscribe(Subscriber) {
fluxFilter.subscribe(new MapSubscriber(Subscriber)) {
fluxRange.subscribe(new FilterSubscriber(MapSubscriber(Subscriber))) {
}
}
}
다시 말해 publisher 체인을 따라 FluxMap부터 source인 FluxRange까지 subscribe()가 전파된다. 그리고 전파되면서 다음 체인의 Subscriber 생성자로 자기 자신을 파라미터로 전달하게 됨으로써 아래와 같이 publisher 체인과는 다시 반대 순서의 체인이 형성된다.
RangeSubscriber (
FilterSubscriber (
MapSubscriber (
Subscriber
)
)
)
테스트를 위해 아래와 같이 hook으로 로깅을 해보면 의도한 대로 동작하는 것을 확인할 수 있다.
Flux.range(1, 10)
.doOnSubscribe(sub -> log.info("FluxRange onSubscribe"))
.filter(data -> data % 2 == 0)
.doOnSubscribe(sub -> log.info("FluxFilter onSubscribe"))
.map(data -> data * 2)
.doOnSubscribe(sub -> log.info("FluxMap onSubscribe"))
.subscribe(data -> log.info("# onNext: {}", data));
// log
# FluxRange onSubscribe
# FluxFilter onSubscribe
# FluxMap onSubscribe
# onNext: 4
# onNext: 8
# onNext: 12
# onNext: 16
# onNext: 20
그리고 위 내용은 Reactor 공식 문서에서 보여주는 Cold와 Hot Sequence 마블 다이어그램에서도 드러나있다.
Cold Sequence
Hot Sequence
세번째. Runtime
Reactor의 Reactive Streams 생명주기의 마지막인 런타임은 말그대로 실행 단계다.
publisher와 subscriber가 onSubscribe()
signal과 request()
, onNext()
등의 signal을 교환하면서 stream을 실행한다.
한가지 중요한 점으론 앞서 말했듯이 각각의 operator 체인들이 모두 각각의 Flux 데코레이터로써 서로서로 publisher와 subscriber가되어 위 signal이 전파된다는 점이다.
마치며
처음 Reactive Streams에 대해서 어느정도 공부하고나서 바로 Reactor를 사용하다보니 어느순간부터 이게 어떻게 동작하는 것이지?란 생각이 들 때가 있었다.
예를 들어, Backpressure에서의 buffer나 Scheduler의 다양한 전략등을 이해할 때 마법처럼 느껴지는데, 위와 같이 Reactor Reactive Streams의 생명주기를 이해하면 관련된 개념들을 공부할 때 꽤나 도움이 되는 듯 하다. 실제로 위 구조를 이해하고나니 Reactor Core 코드를 볼 때 많은 도움이 되었다.
참고
- https://projectreactor.io
- https://spring.io/blog/2019/03/06/flight-of-the-flux-1-assembly-vs-subscription
- 실전! 스프링5를 활용한 리액티브 프로그래밍, 2019