thumbnail
Reactive와 Reactive Streams 이해하기
Reactive / Non-blocking / Webflux
2024.02.09.

들어가며

많이 사용되는 스레드 풀 기반의 멀티 스레드 모델은 동기 블로킹 매커니즘을 가지기때문에 고성능을 내기위해선 스레드의 개수를 높이는 방법외엔 딱히 다른 방법이 없다.

작은 수의 스레드로 고성능을 내기위해선 비동기 논블로킹 방식의 처리 방식이 필요한데, 이와 관련하여 서치하다보면 자연스레 Reactor, RxJava등 Reactive Programming을 접하게 된다.

이러한 비동기 논블로킹 방식의 Reactive Programming은 기존 동기 블로킹 방식과는 패러다임 자체가 다르기에, 핵심적인 개념을 정확히 이해하는게 중요하다고 생각든다.

그래서 이번 글을 시작으로 Reactive Programming을 하기 위한 여러 기초가 되는 개념들을 정리해보고자한다.

그 시작으로 이번 글은 Reactive란 무엇이며, Reactive 방식의 가장 기초가 되는 개념인 Reactive Streams에 대해서 내가 이해한대로 정리해본다.


Reactive와 Reactive 프로그래밍

💁‍♂️ Reactive

보통 Reactive한 프로그래밍 관련하여 검색하다보면 “Non-blocking”과 “Functional”에 대한 내용이 많다.

하지만 나는 Reactive란 의미를 이해하는게 더 우선이라고 생각한다.

Reactive란 단어는 사전적으로 “반응 하는”이라는 의미를 가진다. 그렇다면 어디에 반응하는 것일까?

바로 어떤 이벤트나 상황이 발생할 때, 그에 대한 반응을 해서 적절한 행동하는 것을 의미한다. 예를 들어보자면, 네트워크 I/O 이벤트(커넥션 연결, 데이터 read/write등)에 반응하는 이벤트 기반의 처리방식이 있으며, UI 관점에선 마우스 클릭 이벤트에 반응하는 처리 방식 모두 Reactive하다고 볼 수 있다.

그리고 이러한 데이터의 변화가 발생하여 이벤트 알림이 발생되고 미리 지정한 로직으로 “반응 하는” 동작 매커니즘은 보통 Non-blocking으로 구현된다. (정확히는 EventLoop 방식처럼 동작한다.)

네트워크 I/O 관점에서 블로킹 방식과 “반응 하는” 논블로킹 방식의 처리 구조 더욱 알고싶다면 이전 글인 사례를 통해 이해하는 네트워크 논블로킹 I/O와 Java NIO을 먼저 보길 추천한다.

이러한 Reactive한 처리 방식은 사실 굉장히 오래전부터 사용되는 매커니즘이다. 멀리 안보고 지금 우리 앞에있는 키보드를 떠올려봐도 키보드를 눌렀을 때 그에 “반응 하여” 동작한다. 조금 더 자세히 얘기하자면, 컴퓨터의 주변 장치는 CPU의 도움이 필요한 경우에만 인터럽트 (이벤트라고 생각해도 좋다)를 사용해 CPU에게 처리하라고 알려준다. 그럼 그에 맞춰 CPU는 입력한 키에 맞춰 “반응 하여” 동작한다.

정리하면 어떤 이벤트나 상황이 발생했을 때, 그에 반응해서 Non-blocking으로 작업을 처리하는 동작 매커니즘을 “Reactive”한 시스템이라고한다.

그리고 서버 관점에선 클라이언트의 요청에 머뭇거리지 않고 제때 잘 반응해서 응답을 빠르게 주는 “반응을 잘하는 시스템”을 만드는게 목표라고 할 수 있다.

보통 서버 애플리케이션은 CPU intensive한 일보다는 DB 및 외부서버에 요청/응답하는 I/O intensive한 일을 더 많이 수행하게된다.

그러므로 I/O 부분을 Blocking하지 않고 이벤트에 “반응”하는 Non-Blocking 모델을 적용함으로써 비교적 큰 성능 향상을 가져올 수 있다.

Reactive에 대한 용어 의미와 시스템 구축시의 설계 원칙에 대해서 더 자세히 알고싶다면 Reactive 선언문 한번 읽는 것을 추천한다.

여담으로 비동기 Non-Blocking의 장점으로 C10K 문제 해결을 많이 얘기한다. 즉 “만개의 클라이언트를 동시에 처리할 수 있는가?”에 대한 문제를 해결했다는 것이다.

그 이유는 소켓은 동시에 수백만개를 열 수 있는 반면, Blocking 프로그램에서는 많아야 수천개만 동시에 처리할 수 있기에, 이 불일치를 해결하기 위한 방법으로 비동기 Non-Blocking 프로그래밍이 제시된 것.


💁‍♂️ Reactive Programming 특징

Reactive는 “반응 하는” 동작 매커니즘이라는 것인데, 이제 이를 프로그래밍적으로 구축하는 프로그래밍 모델을 “Reactive Programming”이라 칭한다. 쉽게 말해 Reactive 설계 원칙에 부합하는 비동기 Non-blocking 처리를 위한 프로그래밍 모델을 의미한다.

그리고 위키피디아에선 Reactive Programming를 아래와 같이 정의하며, 이를 통해 어떠한 특징을 가지고있는지 알 수 있다.

In computing, reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change.
  • declartive programming (선언형 프로그래밍)
    • 보통 C나 C++처럼 명령형 보단 실행할 동작을 구체적으로 명시하지 않고 목표만 선언하는 선언형을 추구한다.
  • data streams and propagation of change (데이터 스트림즈와 변화에 대한 전파)
    • 데이터가 지속적으로 발생하고 흐른다는 의미에서의 데이터 스트림즈, 그리고 이러한 데이터의 변환하는 이벤트를 발생시키고 계속해서 전달되면서 처리한다는 의미에서의 propagation.
  • 그리고 메시지 기반 비동기 통신
    • 위키피디아 정의엔 없지만, Reactive Programming은 메시지 기반으로 비동기 통신을 추구한다.

이외에도 Back Pressure라는 중요한 특징도 있다. 이와 관련해서는 실제 코드를 살펴보면서 이해하는게 더 좋을 듯해서 아래에서 더 자세히 다룬다.

Reactive 구현 스펙 - Reactive Streams

앞서 Reactive한 시스템이 무엇이고 어떤 것을 추구하는지 간단히 살펴보았는데, 사실 개발자들의 관심사는 그래서 이걸 어떻게 구현해야하는가? 일 것이다.

이러한 Reactive 시스템을 어떻게 구현할지 정의해 놓은 별도의 표준 사양이 바로 Reactive Streams다.

서버관점에서의 대부분 Reactive 구현체들은 다르게 말하면 모두 Reactive Streams 스펙을 구현한 시스템이다.

Reactive Streams에 대해서 다양한 견해를 토비님이 유튜브에서 얘기하시는데 꼭 한번 시청하길 추천한다.

Iterable과 옵저버 패턴으로 이해하는 Reactive Streams

이번 글은 바로 Reactive Streams란 어떤 것이며 어떻게 동작하는지를 설명하기보다는, Iterable과 옵저버 패턴의 비교를 통해 왜 Reactive가 만들어졌는가를 코드를 통해 직접 정리해보았다.


💁‍♂️ Duality - Iterable vs Observable

Reactive Streams의 처리 방식 아이디어를 이야기할 때 에릭마이어는 항상 Duality에 대해서 말한다고한다.

쌍대성을 의미하는 Duality는 수학과 물리학에서 자주 등장하는 표현으로, 하나의 문제를 서로 반대되는 두 가지 관점에서 볼 수 있다는 원칙이다.

이를 프로그래밍적으로 말하면, 궁극적인 기능은 같으나 서로 반대되는 방향으로 구현을 표현하는 것을 말한다.

아마 개념으로만 들으면 이해하기 쉽지않다. 적어도 나는 그랬다. 그래서 예시를 통해 이를 이해해본다.


예를 하나 들어보자면, 아래와같이 5개의 요소를 가진 배열이있다고 가정해본다.

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);

5개의 요소를 모두 하나하나 출력하는 기능을 구현한다고 가정하면 크게 두 가지의 쌍대되는 방식으로 처리할 수 있다.

  1. Iterable (for-each)
  2. 옵저버 패턴

첫번째 방법. Iterable

아마 5개의 요소를 처리해야한다고 할 때 먼저 떠오르는 방법은 for-each일 것이다.

for (int data : list) {
    System.out.println(data); // data 사용
}

그렇다면 어떻게 List는 for-each를 돌면서 요소를 가져와서 사용할 수 있는 것일까?

바로 ListIterable 인터페이스를 구현하고 있는 덕분이다.

public interface Iterable<T> {
    Iterator<T> iterator();
}

Iterable이 반환하는 Iterator를 이용하여 for-each를 구현할 수 있다.

Iterable<Integer> list = Arrays.asList(1, 2, 3, 4, 5);

for (Iterator<Integer> it = list.iterator(); it.hasNext(); ) {
    Integer data = it.next();
    System.out.println(data);
}

이 방식은 가장 흔히 사용되는 방식이며, Iterable의 구현체가 반환하는 IteratorhasNext(), next() 메서드를 통해 데이터에 하나하나 접근할 수 있다.

여기서 중요한건 Iterable의 동작 방식이 Pull (끌어온다)이라는 것이다.

데이터를 사용하는 클라이언트 입장에선 Iteratornext() 메서드를 통해 데이터를 Pull해서 데이터를 처리한다.

두번째 방법. 옵저버 패턴

Iterable이 Pull 방식이라면 5개의 요소를 가진 리스트를 반대 관점에서 사용하는 쌍대성 방식은 무엇일까?

바로 Push 방식이다. 그리고 이 Push 방식의 가장 대표적인 패턴이 옵저버 패턴이다.

Pull은 데이터가 필요한 쪽에서 직접 데이터를 가져와서 사용하는 반면에, Push는 데이터를 가진쪽에서 데이터가 필요한 대상에게 밀어넣어주는 방식이다.

옵저버 패턴을 간단히 아래와 같이 구현해 볼 수 있다.

/**
 * Observable: source, producer, publisher. 이벤트를 공급하는 역할.
 * Observer: target, consumer, subscriber. 이벤트를 소비하는 역할.
 */
@SuppressWarnings("deprecation")
public class ObservableTestMain {

    public static void main(String[] args) {
        // Observer (subscriber)
        Observer ob = new Observer() {
            @Override
            public void update(Observable o, Object arg) {
                System.out.println(Thread.currentThread().getName() + " " + arg);
            }
        };

        // Observable (Publisher)
        IntObservable io = new IntObservable();
        io.addObserver(ob);

        ExecutorService es = Executors.newSingleThreadExecutor();

        es.execute(io);

        System.out.println(Thread.currentThread().getName() + " EXIT");
        es.shutdown();
    }

    static class IntObservable extends Observable implements Runnable {
        @Override
        public void run() {
            for (int i = 1; i <= 5; i++) {
                setChanged(); // 변경된 것을 알려줌
                // push
                notifyObservers(i); // Observer에게 이벤트 알림. (데이터 전송)
            }
        }
    }
}

// 실행결과
main EXIT
pool-1-thread-1 1
pool-1-thread-1 2
pool-1-thread-1 3
pool-1-thread-1 4
pool-1-thread-1 5

위 코드를 실행하면 for-each과 동일하게 리스트를 순회하며 출력한다.

다만 for-each과 다른점은 Observable에 등록된 Observer들에게 notifyObservers()를 통해 데이터를 Push (전송)했다는 것이다.

Observable -> Event/Data -> Observer

Iterable과 옵저버 패턴 비교

Iterable와 Observable 모두 동일한 기능을 수행하지만 서로 다른 관점에서 구현하였다. 이를 쌍대성 관계라고 이야기한다.

처리 과정을 정리하면 아래와 같은 차이가 존재한다.

event Iterable (pull) Observable (push)
retrieve data T next(void) void notifyObservers(T)
complete check hasNext() setChanged()

가장 주목할 점은 Pull 방식인 next()는 직접적으로 데이터(T)를 반환 받지만, Push 방식인 notifyObservers()는 반환 값이 없으며 데이터(T)를 전송한다.


🤔 그렇다면 Pull 방식보다 Push 방식이 더 좋은 점은 무엇일까?

Push 방식을 이용하면 별개의 스레드에게 작업을 손쉽게 할당할 수 있다.

에를 들어, Iterable은 아이템을 수신할 대상이 한 순간에 한 개의 핸들러에서만 다뤄야한다면, Observable은 미리 여러 개의 Observer를 등록하여 아이템을 여러 개의 핸들러에 보낼 수 있다. (1 : N 관계)

동기, 비동기 관점에서 바라봐도, Push 방식은 리턴 값이 없으므로 호출한 쪽에서 리턴 값을 신경쓸 필요가 없어 Non-Blocking을 구현하기 쉽다.

그리고 메모리 방면에서도 Push는 받는 측에서 수신한 데이터를 메모리에 잘게잘게 올려 처리하고 없앨 수 있기때문에 더 효율적으로 처리할 수 있다.

또한, 데이터가 변경될 때마다 Observable에 등록된 Observer는 해당 변화를 쉽게 감지할 수도 있다.

정리하면 Push방식을 이용하면 Reactive에서 말한 이벤트 기반, 비동기 Non-blocking등을 모두 구현할 수 있다.


💁‍♂️ 옵저버 패턴의 한계

Push 방식의 옵저버 패턴은 Pull 방식보다 여러 장점이 있지만, 여전히 2가지의 문제점을 가진다.

  1. 데이터의 끝이라는게 없다. 받는 쪽에서 끝을 알 수가 없다.
  2. Error, Exception이 발생하면 재시도, 콜백을 실행하는 로직이 없다.

이 문제를 개선한 옵저버 패턴이 발전해 Reactive Streams의 기초가 되었다고한다.

Reactive Streams 표준 - 4개 인터페이스

옵저버 패턴의 한계를 해결해주고, Reactive 시스템을 만들기위해 앞서 말했듯 Reactive Streams라는 표준 API가 만들어졌다.

Reactive Streams에서 정의한 4가지의 인터페이스는 다음과 같다.

public interface Publisher<T> {
   public void subscribe(Subscriber<? super T> s);
}
 
public interface Subscriber<T> {
   public void onSubscribe(Subscription s);
   public void onNext(T t);
   public void onError(Throwable t);
   public void onComplete();
}
 
public interface Subscription {
   public void request(long n);
   public void cancel();
}

public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
  • Publisher
    • 옵저버 패턴의 Observable과 같은 역할이며, Subscriber의 구독을 받기 위한 subscribe API 하나만 가지고있다. ex. Database Driver
    • 데이터를 생성하고 발행하는 역할.
    • API
      • subscribe (구독하다) - Publisher에 Subscriber를 등록할 때 사용되는 메서드.
  • Subscriber
    • 옵저버 패턴의 Observer과 같은 역할이며, Observer보다 더 많은 기능을 제공함과 동시에 핵심적인 역할을 수행한다.
    • 구독한 Publisher로부터 통지받은 데이터를 전달받아 처리하는 역할.
    • onSubscribe onNext* (onError | onComplete)?
      • onSubscribe 한번, onNext 여러번, onError or onComplet 한번
    • API
      • onSubscribe - 최초 호출시 무조건 호출되는 API이며, 이를 통해 매개변수로 Subscription (구독 정보)를 주고 받는다.
      • onNext - Publisher로부터 데이터를 받아 처리할 때 사용된다. (여러 번 호출될 수 있다.)
      • onError - 에러가 난다면 처리하는 API. (스펙상 onError가 호출되면 더이상 Publisher가 데이터를 보내지 않는다.)
      • onComplete - 작업 완료시 사용되는 API. (스펙상 onComplete가 호출되면 더이상 Publisher가 데이터를 보내지 않는다.)
  • Subscription
    • 옵저버 패턴에는 없는 개념으로, Reactive Streams에서 Publisher와 Subscriber 사이의 데이터 흐름을 제어하는 용도로 사용된다.
    • Publisher에 요청할 데이터의 개수를 지정하고, 데이터의 구독을 취소하는 역할.
    • 구독 정보를 가진 객체라고 보면 되며, Subscriber가 Publisher에게 Back Pressure 방식으로 흐름을 제어할 때 사용된다.
    • API
      • request - Subscriber가 Publisher에게 n개의 데이터를 요청할 때 사용되는 API.
        • 만약 데이터가 3개가 필요하면 n에 3을 던져주면, Subscription은 3개를 던져주게 된다.
        • 만약 데이터가 총 10개라면, 3개 → 3개 → 3개 → 1개순서로 보내게된다. 그리고 이때 request는 subscribe의 onNext 메서드를 통해 3개의 데이터를 전달한다.
      • cancel - 메서드 명 그대로 구독을 취소할 때 사용되는 API이다.
  • Processor
    • Publisher와 Subscribe의 기능을 모두 가진 인터페이스.
    • Subscriber로서 다른 Publisher를 구독할 수 있으며, Publisher로서 다른 Subscriber가 구독할 수 있다.

이러한 표준 API를 정의하기위해 넷플릭스, 피보탈, 레드헷, 오라클, 트위터 등등 많은 회사들이 참여했다고한다. 더 자세한 내용은 reactive-streams에서 확인할 수 있으며, 여기서 명시한 API를 통해 ReactiveXProject Reactor 그룹들이 구현체를 만들어 제공하고있다. Webflux는 Project Reactor 기반으로 구현되었다. (물론 Webflux 사용시 내부적으로 ReactiveX를 사용하도록 할 수도 있다고한다.)

Reactive Streams 표준 - 동작 흐름

4개의 표준 인터페이스가 어떤 역할인지 살펴보았는데, 각 인터페이스들이 어떻게 유기적으로 동작하는지 살펴보자면 다음과 같다.

reactive workflow

  1. Subscriber가 Publisher에게 subsribe(Subscriber)를 통해 이벤트 구독을 요청한다.
  2. Publisher는 onSubscribe(Subscription)를 통해 Subsriber에게 Subscription (구독 정보)를 전달한다.
    • Subscription을 통해서 Subscriber는 Publisher에게 직접적으로 통신하지않고도 데이터 흐름을 제어할 수 있다.
  3. Subscription을 받은 Subscriber는 Publisher에게 직접적으로 데이터 요청을 하지않고, Subscription의 request(n) 함수를 통해 Publisher에게 원하는 개수의 데이터를 전달한다.
    • Subscription이 Subscriber와 Publisher 간 통신의 매개체가 된 것.
    • Back Pressure (역압) 특징을 Subscription을 통해 구현한다.
      • Back Pressure는 발행-구독 프로토콜에서 이벤트 스트림의 구독자가 발행자의 이벤트 제공 속도보다 느린 속도로 이벤트를 소비하면서 문제가 발생하지 않도록 보장하는 장치이다.
      • 이를 통해 부하가 발생한 컴포넌트가 완전 불능이 되거나 예기치 않는 방식으로 이벤트를 잃어버리는 등의 문제가 발생하는 것을 예방할 수 있다. 부하가 발생한 컴포넌트는 이벤트 발생 속도를 늦추라고 알리거나, 얼마나 많은 이벤트를 수신할 수 있는지 알리거나, 다른 데이터를 받기 전에 기존의 데이터를 처리하는 데 얼마나 시간이 걸리는지를 업스트림 발행자에게 알린다. 이때 사용되는 것이 Subscription.
  4. Publisher는 Subscriber로부터 호출받은 Subscription의 request(n)을 통해 Subscriber의 onNext(T t)에 데이터를 전달하며, 작업이 완료되면 onComplete, 에러가 발생하면 onError 시그널을 전달한다.
  5. Subscriber와 Publisher 그리고 Subscription이 서로 유기적으로 연결되어 통신을 주고받으면서 subscribe로부터 onComplete까지 연결되고, 이를 통해 Back Pressure가 완성된다.

조금 다른 그림을 통해 보면 아래와 같이 이해할 수도 있다.

reactive workflow 3

onNext()마다 request(n)를 보낼 수도 있다.

직접 Reactive Streams 구현해보기 - Hello World

마지막으로 Reactive Streams 스펙을 직접 구현해보면 아래와 같다.

/**
 * Reactive Streams Hello World
 */
public class PubSub {

    public static void main(String[] args) {
        Publisher<Integer> pub = new Publisher<Integer>() {
            @Override
            public void subscribe(Subscriber<? super Integer> subscriber) {
                Iterable<Integer> data = Stream.iterate(1, a -> a + 1).limit(5).collect(toList());

                // 2. Publisher에서 Subscriber에게 Subscription (구독 정보)를 전달한다.
                subscriber.onSubscribe(new Subscription() {
                    @Override
                    public void request(long n) {
                        // 4. 요청에 따라 Subscriber에게 n개 만큼의 데이터를 전송.
                        // 여기선 우선 모든 데이터 전부 보내는 것으로 예시 작성함.
                        System.out.println("Pub request. n : " + n);
                        try {
                            data.forEach(it -> subscriber.onNext(it));
                            subscriber.onComplete();
                        } catch (Throwable e) {
                            subscriber.onError(e);
                        }
                    }

                    @Override
                    public void cancel() {
                    }
                });
            }
        };

        Subscriber<Integer> sub = new Subscriber<Integer>() {
            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) {
                // 3. Publisher로부터 전달받은 Subscription (구독 정보)을 통해 데이터를 요청한다.
                System.out.println("Sub onSubscribe.");
                this.subscription = subscription;
                // Subscription을 통해 데이터 request
                this.subscription.request(Integer.MAX_VALUE);
            }

            @Override
            public void onNext(Integer item) {
                // 5. Publisher에서 전송한 데이터를 받아서 Subscriber에서 처리.
                System.out.println("Sub onNext. item : " + item);
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("Sub onError.");
            }

            @Override
            public void onComplete() {
                System.out.println("Sub onComplete");
            }
        };

        // 1. publisher에 subscriber 구독 요청.
        pub.subscribe(sub);
    }
}


// 결과
Sub onSubscribe.
Pub request. n : 2147483647
Sub onNext. item : 1
Sub onNext. item : 2
Sub onNext. item : 3
Sub onNext. item : 4
Sub onNext. item : 5
Sub onComplete

위 예시는 request(1)로 요청하여 매 onNext()마다 request(1)를 날리도록 한 예시이다.

n을 조금 높게하고 버퍼를둬서 request의 횟수를 줄일 수도 있다.

Reactive Streams 구현체

Reactive Streams가 대략 어떻게 동작하는지 살펴보고 직접 구현해보았는데, 정말 간단한 핵심적인 부분만 구현한 것이며, 실제로 Github에 가보면 각 인터페이스가 지켜야할 Rule 들이 훨씬 더 많이 명시되어있다.

Rule을 직접 보면 알겠지만 굉장히 많고 까다롭다.. 토이 프로젝트로 공부겸 만들어보는 것은 좋지만, 스펙에 맞춰 구현하고 실제 사용하기엔 이미 잘 만들어진 검증된 구현체를 사용하는 것이 좋다고 생각한다. 검증된 구현체는 비교적 많다. 참고

  • 순수 스트림 연산 처리 - RxJava, Reactor, Akka Streams 등등
  • 저장소 데이터 Reactive Streams 방식으로 조회시 - ReactiveMongo, R2DBC, mongodb-driver-reactivestreams 등등
  • Reactive Streams 기반의 웹 애플리케이션 처리 - Spring Webflux, Armeria, Vert.x, Play Framework 등등

참고


© mark-kim.blog Built with Gatsby