thumbnail
Reactor-Netty 동작과정 이해하기 (Reactor Netty Deep Dive)
Java / Netty / Reactor / Webflux / Non-blocking
2025.01.17.

들어가며

Reactor-Netty는 Netty과 Reactive Streams의 구현체인 Reactor Core를 기반으로 구축된 Reactive 네트워크 프레임워크다. Netty에 Reactive 계층을 구축하고 Back-Pressure를 도입함으로써 시스템은 반응성(responsive)이 뛰어나고 탄력적(elastic)이며 회복력(resilient)이 있게 될 수 있었다.

이 글은 개인적으로 RPC 기반의 네트워크 라이브러리내 Reactive Streams를 도입함으로써 Reactor-Netty 코드를 살펴보다 이렇게 글까지 작성하게되었다.

글의 초반엔 Reactor-Netty가 어떤 문제를 해결하기위해 탄생하게되었는지 개인적인 의견도 포함하여 작성하였으며, 그 다음엔 Netty과 Reactor Core를 어떻게 연결했는지 중점을 두고 정리했다.

마지막으론 Netty Channel내 Reactive Streams의 조립/구독/실행 타이밍과 과정을 정리하였으며, 그 과정에서의 Back-Pressure는 어떻게 동작하는지를 정리하였다.

1 Reactor-Netty = Netty + Reactive Streams

항상 어떠한 기술을 접할 때 그 기술이 탄생하게된 이유와 어떤 문제를 해결하고자하는지 살펴본다.

이번 글의 첫 부분도 처음 Reactor-Netty를 접했을 때 집중적으로 살펴본 부분을 정리하고 Reactor-Netty의 동작방식을 정리해보려고한다.

바로 Reactor-Netty의 동작 원리를 보려면 여기부터 읽기를 추천한다.

1-1 클라이언트-서버간의 통신 과정


💁‍♂️ Reactor-Netty는 두 엔드포인트간 통신에 사용되는 네트워크 프레임워크다.

공식 repo에선 Reactor-Netty를 아래와 같이 정의하고있다.

Reactor Netty offers non-blocking and backpressure-ready TCP/HTTP/UDP/QUIC clients & servers based on Netty framework.

살펴볼 내용이 많은데 우선 집중하고자하는 부분은 Reactor-Netty가 client & server의 네트워크 프레임워크라는 점이다.

즉, Reactor-Netty는 네트워크 프레임워크로써, 클라이언트와 서버간의 통신할 때 사용된다.


🤔 그렇다면 클라이언트-서버간의 네트워크 통신시 서버와 클라이언트는 각각 어떤 과정을 거치게될까?

client server layer


위 그림은 클라이언트와 서버가 서로 통신하며 데이터를 주고받을 때 수행하는 각 작업의 계층을 나타낸다.

기존 네트워크 7계층이나 TCP/IP 4계층의 내용은 이 글의 중점이 아니므로 생략했다.

클라이언트와 서버는 모두 데이터를 전송 및 수신하기위해 위 계층을 거치게된다.

예를 들어, TCP 서버가 클라이언트의 요청을 받는다면 아래와 같은 작업을 수행하여 비즈니스를 수행하고 응답을 보낸다.

  1. TCP Connection 수립
  2. 서버 애플리케이션은 OS커널 소켓 파일내 Client로부터 전송 받은 데이터를 네트워크 I/O를 통해 OS로부터 read한다. (Network I/O 계층)
  3. 서버 애플리케이션은 전송 받은 데이터에 맞는 비즈니스 처리후 클라이언트에게 다시 응답을 보내기위해 해당 소켓에 네트워크 I/O를 통해 write한다. (Application 계층)

위 과정을 조금 자세하게 나타내면 아래와 같이 OS내 소켓의 버퍼를 통해 데이터를 주고받는다.

1-2 Netty는 Network I/O 부분만을 처리한다

클라이언트와 서버가 데이터를 주고받는 위 과정에서 Netty는 Network I/O 계층의 네트워크 프레임워크라고 볼 수 있다.


💁‍♂️ Netty는 Event Driven Non-Blocking Network Framework다.

위 클라이언트 - 서버간의 데이터 통신중 Netty는 Network I/O 부분을 처리한다.

OS 커널내 소켓 버퍼로부터 애플리케이션(프로세스)이 데이터를 read/write하는 작업을 Netty가 수행한다는 의미이다.

이때 OS 커널에 write, read하기위한 system call은 애플리케이션 프로세스 관점에서 Blocking 모드와 Non-Blocking 모드로 나뉘게된다.

Netty는 select(), epoll()등을 활용하여 Connection별 이벤트 파이프라인을 만들어 Non-Blocking 방식으로 Network I/O를 수행해줄 뿐이다.

Netty는 기본적으로 reactor-based model I/O 전략을 사용한다. 이와 관련해서는 Netty 이해하기를 참고


💁‍♂️ 즉, Netty는 Network I/O를 효율적으로 처리해줄 뿐, 비즈니스 처리에 대한 Thread 전략은 따로 수립해야한다.

Netty를 사용하면 Network I/O를 Event Driven하게 Non-Blocking으로 쉽게 구성하여 높은 성능을 보장해주지만, 비즈니스 처리에 대한 Thread 전략은 따로 수립 및 구현해야한다는 것이다.

실제로 Kafka, Cassandra, Spark, ES, Lettuce등 수많은 프레임워크와 라이브러리가 Netty를 Network I/O로서 사용한다. 이와 관련해서는 Netty Docs에 더 자세히 나와있다.

1-3 Reactive Streams는 Push 방식의 Non-Blocking을 기본으로한다

Network I/O에 대한 것은 Netty가 담당한다면 이제 Application 계층의 비즈니스 처리를 고려해야한다.


🤔 그렇다면 Netty 기반의 비즈니스 처리에 대한 Thread 전략은 무엇이 있을까?

당연히 서버이기에 각 요청에 대한 처리는 비동기이되 이 부분도 Blocking과 Non-Blocking으로 나뉜다.

  • Blocking 방식
    • Network I/O는 Netty를 사용해 Non-Blocking으로 처리하지만, 비즈니스 처리는 Blocking으로 처리
  • Non-Blocking 방식
    • Network I/O도 Non-Blocking, 비즈니스 처리도 Non-Blocking.

💁‍♂️ Network I/O는 Non-Blocking, 비즈니스는 Blocking으로 처리하는 방식

글로만보면 이해가 쉽지 않을 듯 하여, 비즈니스를 Blocking으로 처리하는 Netty 코드를 간단히 작성해보면 아래와 같다.

public class BlockingServerHandler extends ChannelInboundHandlerAdapter {

    private final Executor workerExecutor;
    private final Executor asyncExecutor; // 비즈니스 처리용 스레드풀.

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object e) throws Exception {
        ByteBuf buffer = (ByteBuf) e;

        if (asyncExecutor != null) {
            try {
                asyncExecutor.execute(
                    new NettyExecutor(ctx, buffer));
            } catch (RejectedExecutionException rejectedExecutionException) {
                // ... 스레드 풀 꽉 찬 경우
            }
        } else {
            // ... 혹은 Netty I/O 처리
        }

    }
    
    ...
}

Netty는 기본적으로 Network I/O를 통해 소켓내 수정 사항이 있으면 해당 Connection에 매핑된 파이프라인 Handler들의 channelRead()가 호출된다.

이때 Blocking 비즈니스 처리는 위 코드와 같이 Thread Pool내 특정 스레드에 위임한다.

위 방식을 사용하면 Network I/O는 Netty, 비즈니스 처리는 Thread Per a Request와 같이 처리하게된다.

Netty는 기본적으로 Non-Blocking의 Event-Driven 기반이기때문에 channelRead()내 동일한 스레드에서 Blocking 코드를 실행한다는 것은 성능에 치명적인 문제를 발생시킨다.

그래서 위와 같이 Blocking 비즈니스 처리는 비동기로 다른 스레드에 위임하여 처리한다.

Spring MVC에서 사용하는 Tomcat과 유사하게 동작하는 것이며, 실제로 Tomcat도 8.0부터 Connector를 도입함으로써 Java NIO 기반의 Non-Blocking Network I/O를 지원하고있다. 참고


💁‍♂️ Network I/O도 Non-Blocking, 비즈니스도 Non-Blocking으로 처리하는 방식

그렇다면 비즈니스도 Non-Blocking하게 처리하는 방법은 무엇이있을까?

우선 Non-Blocking 방식에 대한 이해가 필요한데, 여기서 말해는 Non-Blocking은 처리해야 되는 작업을 Pull 방식이 아닌 Push 방식으로 멈춤없이 처리하는 것을 말한다.

즉, 어떤 이벤트나 상황이 발생할 때, 그에 대해 반응(React)해서 적절한 행동을 수행하는 것을 말한다.

이렇게 처리가 필요할 때만 적절히 “반응”하여 처리하는 Non-Blocking 방식중 하나가 Reactive다.

Reactive 처리 방식이란 사실 굉장히 오래전부터 사용되는 매커니즘이라고한다. 멀리 안보고 지금 우리 앞에있는 키보드를 떠올려봐도 키보드를 눌렀을 때 그에 “반응 하여” 동작한다. 컴퓨터의 주변 장치도 CPU의 도움이 필요한 경우에만 인터럽트 (이벤트라고 생각해도 좋다)를 사용해 CPU에게 처리하라고 알려준다. 그럼 그에 맞춰 CPU는 입력한 키에 맞춰 “반응 하여” 동작한다.

자연스레 callback을 떠오를 것이다. 맞다. callback이 사실 Non-Blocking의 대표적인 구현 방식이다.

옵저버 패턴

바로 앞서 살펴보았던 Netty도 네트워크 I/O 이벤트 (Connection 연결, 소켓 데이터 read/write 등등)에 반응하는 이벤트 기반의 Non-Blocking 처리 방식을 채택하고있다.

서버 비즈니스 부분에서 처리를 Non-Blocking하게 처리한다는 것은 각 이벤트에 맞춰 callback을 미리 설정하고 이벤트가 발생했을 때 처리하여 응답하는 것을 의미한다.


다만 옵저버 패턴의 callback만으로 아래와 같은 한계가 존재한다.

  1. 데이터의 끝이라는게 없다. 받는 쪽에서 끝을 알 수가 없다.
  2. Error, Exception이 발생하면 재시도, 콜백을 실행하는 로직이 없다.

위 문제들을 개선한 옵저버 패턴이 발전해 Reactive Streams의 기초가 되었다고한다.

그리고 이 스펙을 구현한 구현체중 하나가 바로 Reactor다.

관련된 더 자세한 내용은 Reactive StreamsReactor 글들을 참고.

1-4 다시 Reactor-Netty의 정의를 살펴보면


💁‍♂️ Reactor-Netty는 Netty와 Reactor의 브릿지 역할을 수행한다.

앞서 말했듯이, 비즈니스 처리 부분을 Non-Blocking하게 처리하는 매커니즘으로 Reactive Streams가 사용될 수 있다.

그리고 그 구현체인 Reactor을 기반으로 Network I/O와 비즈니스 처리 모두 Non-Blocking하게 처리하기위해 이 둘을 연결한 것이 Reactor-Netty라고 볼 수 있다.

다른 의미로 Reactive한 매커니즘을 적용하기위해 이를 적용한 클라이언트-서버는 아래와 같이 Reactive 계층이 추가된다.




💁‍♂️ 다시 살펴보는 Reactor-Netty 정의

이제 다시 공식 repo의 정의를 살펴보면 무슨 말을 하고자하는 이해가된다.

Reactor Netty offers non-blocking and backpressure-ready TCP/HTTP/UDP/QUIC clients & servers based on Netty framework.

위 정의를 풀이 해석하면 아래와 같다.

  • non-blocking, back-pressure => Reactive Streams
  • TCP/HTTP/UDP/QUIC client & servers network i/o => Netty

그리고 이를 한 줄로 정리하면 아래와 같다.

Reactor-Netty는 Event-Driven Non-Blocking 네트워크 라이브러리인 Netty에 Reactor Streams를 결합하여 Network I/O부터 비즈니스 처리까지 완전 Non-Blocking하게 처리하는 네트워크 프레임워크다.

즉, 완전 Non-Blocking 처리를위해 Netty와 Reactor를 연결시켜 사용자로하여금 비교적 쉽게 Reactive한 처리를 구현할 수 있도록 프레임워크로 구현한 것이다.

프레임워크는 기본적으로 제어권이 프레임워크에있다. 개발자는 그저 핵심 비즈니스만 필요한 부분에 구현해주면 된다.

2 예시 - 대문자 변환 TCP Server

이제 본격적으로 Reactor-Netty의 내부를 살펴보기전에, 전반적인 설명에 사용될 예시 하나를 구현한다.

Uppercase TCP Server

public class UppercaseTcpServer {

    public static void main(String[] args) {
        DisposableServer server = TcpServer.create()
                .host("127.0.0.1")
                .port(8080)
                .handle((inbound, outbound) -> {
                    // TCP inbound, outbound에 대한 서버 비즈니스 처리 (Reactive)
                    return inbound
                            .receive()
                            .asString()
                            .map(String::toUpperCase)
                            .flatMap(response -> outbound.sendString(Mono.just(response)));
                    }
                )
                .bindNow();

        server.onDispose()
                .block();
    }
}

TCP로 문자열 데이터가 들어오면 대문자로 변환하여 반환하는 간단한 서버 예시이다.

위 코드에서 TcpServer는 TCP 서버를 구성하고 시작하기 위해 Reactor-Netty에서 제공하는 클래스이다. 이는 Netty의 ServerBootstrap 구성을 대부분 추상화 (숨기고)있으며, 외부에 TCP 서버에 대한 설정을 인터페이스로 제공하고있다. 기능적 차이는 없지만 내부적으로 Reactive Streams를 구현한다.

이외에도 TcpServer는 ServerSocketChannel을 캡슐화하고 block을 통해 채널 close 이벤트를 구독할 수 있는 바인딩 작업을 수행한 후 DisposableServer를 반환한다.

3 Reactive 계층 아키텍처를 통해 살펴보는 Reactor-Netty 구성요소

Reactor-Netty 반응 계층의 동작과정을 살펴보기 전에, 전반적인 큰 그림을 이해하기위해 Netty와 Reactor를 연결하는 Reactive 계층의 아키텍처를 다이어그램과 같이 먼저 살펴보고자한다.


Reactor-Netty의 데이터 수신 및 전송 과정
Reactor-Netty의 데이터 수신 및 전송 과정

위 그림을 통해 Reactor-Netty의 Reactive 계층은 크게 두 부분으로 나눌 수 있다.


💁‍♂️ Netty Handler 부분

  • Reactor-Netty 편의 Handler
    • Reactor-Netty에서 구현해둔 핸들러로 로그와 메트릭 관련 Netty Handler, 또는 사용자 정의 코덱 Handler. 이 Handler들은 보통 우선순위가 가장 높으며 ChannelPipeline내 다른 Handler보다 앞에 구성된다.
    • 위 다이어그램내 Library/User Defined Codec로 이해하면 된다. (편의상 Application 계층으로 해두었다)
    • ex. reactor.left.accessLogHandler, reactor.left.channelMetricsHandler, Http 코덱 Handler등 등등.
  • Netty와 Reactor의 브릿지 Handler
    • Netty와 Reactive의 브릿지 역할을 수행하는 Reactor-Netty 핵심 Handler. (Netty와 Reactor의 접점)
    • 소켓 데이터 수신시 ChannelOperations에 전달하며, ChannelOperations에서 처리된 데이터를 다시 외부 소켓에 전송하는 역할을 수행한다.
    • ChannelOperationsHandler

💁‍♂️ Reactive Streams를 구성하는 모듈

  • ChannelOprations
    • Netty Channel의 생명주기에 맞춰 소켓 데이터를 NettyInbound, NettyOutbound를 통해 Reactor에 publish하고 또는 subscriber하는 객체.
      • Netty Channel와 생명주기를 같이한다. 즉, Channel마다 생성되고 소멸된다.
    • 소켓 데이터를 수신하면 FluxReceive에게 데이터를 전달한다.
  • FluxReceive
    • FluxReive는 ChannelOperations에서 전달해준 수신된 소켓 데이터를 Queue에 넣고, 다운스트림으로 발행하는 Flux 역할을 수행한다.
      • 여기서 다운스트림은 사용자 정의 Reactive Streams Handler이다.
    • FluxReceive는 Queue에 넣고 loop를 통해 다운스트림으로 데이터를 publish함으로써 Back-Pressure를 구현했다.
    • 수신된 소켓 데이터(Client Request)를 사용자 정의 핸들러에 emit하는 Publisher 겸 Listener
  • MonoSendMany
    • FluxReceive의 다운스트림으로 사용자 정의 Reactive Streams Handler에서 처리후 emit한 데이터를 수신하고 소켓에 전송하는 역할을 수행한다.
    • 즉, 사용자 핸들러에 의해 처리된 데이터를 소켓에 emit하는 Publish 겸 Listener
  • 사용자 정의 Reactive Streams Handler
    • Reactive Streams 스타일로 데이터의 처리와 전송을 구현한다.
    • ex. 위 대문자 변환 예시에서의 handle((inbound, outbound) -> ...)부분

이제 위 다이어그램내 Reactor-Netty 처리 과정의 세부 사항을 알아보기전 꼭 알아야하는 부분과 모듈들을 하나하나 설명한다.

3-1 Reactor-Netty내 Data Stream의 Upstream과 Downstream

Reactor-Netty에서 중요한 부분은 당연히 Reactive Streams를 구성하는 모듈들이다.

그러므로 여러 객체들중 Data Stream을 구성하는 객체들의 상호작용만 이해해도 Reactor-Netty의 전반적인 동작 과정을 쉽게 이해할 수 있다.

전체적인 아키텍처 다이어그램에서 Data Stream을 구성하는 모듈들의 Upstream과 Downstream이 어떻게되는지만 간단히 정리해보면 아래와 같다.


Reactor-Netty내 Reactive Streams
Reactor-Netty내 Reactive Streams

위 다이어그램은 실제 Connection으로부터 데이터를 수신하고 전송하는 과정에서의 Reactor-Netty내 Data Stream의 Upstream-Downstream 구성과 상호작용을 묘사했다.

실제 개발자는 파란색 상자인 User Defined 부분만 로직을 넣어주면된다. (프레임워크의 장점이기도하다) 그리고 해당 부분이 위에서 살펴본 대문자 변환 예시의 handle()부분이다.

대문자 변환 서버 예시

public class UppercaseTcpServer {

    public static void main(String[] args) {
        DisposableServer server = TcpServer.create()
                .host("127.0.0.1")
                .port(8080)
                .handle((inbound, outbound) -> { // User Defined Reactive Streams Handler
                    // Connection이 수립될 때마다 아래 코드를 실행하여 Reactor의 Stream을 구성한다.
                    // TCP inbound, outbound에 대한 서버 비즈니스 처리 (Reactive)
                    return inbound
                            .receive()
                            .asString()
                            .map(String::toUpperCase)
                            .flatMap(response -> outbound.sendString(Mono.just(response)));
                    }
                )
                .bindNow();

        server.onDispose()
                .block();
    }
}

위 다이어그램내 여러 모듈들은 모두 Netty내 Connection이 수립될 때 Connection마다 생성되며 수립때마다 조립 및 구독된다. 그리고 실제 요청이 들어오면 실행된다. 관련 자세한 내용은 아래에서 더 자세히 다룰 예정이다.

위 동작 흐름만 이해하면 사실 Netty-Reactor 동작 과정의 반은 이해했다고봐도 무방하다.


3-2 Reactor내 Stream 처리 구성요소

전반적인 처리 과정의 큰 그림을 살펴보았으니, 이제 Reactor-Netty내 처리 과정중 꼭 알아야하는 구성요소들만 간단히 정리해본다.


3-2-1 ChannelOprationsHandler

Reactive Bridge Handler

ChannelOprationsHandler는 Reactive Bridge Handler의 역할로서, Netty와 Reactor 사이의 연결 레이어 역할을 수행한다.

이는 전체 Data Stream의 입구 역할을 하며 비즈니스 처리를 위한 Reactive Streams의 조립 및 구독을 수행한다.

ChannelOprationsHandler은 다양한 기능을 구현하는데, 아래 세 가지가 핵심적인 부분이라고 볼 수 있을듯하다.

  • Connection이 수립되면 ChannelOperations를 생성하고, 이를 Connection내 Channel에 바인딩한다.
    • 이때 ChannelOperations내에선 이후 Connection내 데이터 수신시 비즈니스 처리를 위한 Reactive Streams의 조립과 구독을 수행한다.
  • Channel의 데이터를 읽고 처리를 위해 ChannelOperations에 위임한다. (onInboundNext(msg))
  • Connection이 닫힐 때 Reactive Streams의 구독을 취소한다.

3-2-2 Connection

Connection은 네트워크 연결 (TCP Connection)의 추상 인터페이스로서, Channel Context 정보와 데이터 읽기 및 쓰기 관련 작업을 수행한다.

Connection이 수립되면 Connection은 Channel내 속성에 저장되고, Channel은 Connection 읽기/쓰기 관련 작업 수행시 다른 구성 요소에 Connection을 제공한다.


3-2-3 NettyInbound와 NettyOutbound

NettyInbound와 NettyOutbound는 개발자에게 공개된 Socket 관련 데이터 읽기 및 쓰기 인터페이스다.

실제 앞서 살펴본 대문자 변환 서버 TcpServer.handler()의 매개변수이기도하다.

즉, 개발자는 위 두 개의 인터페이스를 통해 읽기/쓰기에 관한 Reactive 작업을 추가하여 Reactor Netty로부터 데이터를 받기도하고 보내기도한다.

public TcpServer handle(BiFunction<? super NettyInbound, 
                                   ? super NettyOutbound, 
                                   ? extends Publisher<Void>> handler)

쉽게 말해, Reactor-Netty와 개발자간의 데이터를 주고받기위해 Reactive Streams를 구성하는 인터페이스 역할이다.

  • NettyInbound
    • NettyInbound는 세 가지 인터페이스를 포함하여 채널 데이터를 수신하기 위한 역할을 수행한다.
  • NettyOutbound
    • NettyOutbound는 채널 데이터 전송을 위한 역할을 수행한다.

3-2-3 ChannelOperations

ChannelOperations는 NettyInbound와 NettyOutbound를 모두 구현함으로써 Channel에 대한 Reactive 관련 작업을 통합하였다.

또한, Connection 인터페이스에 대한 구현도 포함하고있으며, Connection이 수립되었을 때 ChannelOperationsHandler에 의해 생성되어 Channel에 바인딩된다.

그리고 실제 개발자 비즈니스 로직을 작성하는 TcpServer.handle()의 매개변수인 NettyInbound, NettyOutbound 인터페이스들의 구현체이기도하다.


또한 ChannelOperations는 Connection이 수립되고 Channel에 바인되어 Channel내 전체 Reactive Streams의 조립과 구독을 수행한다.

이후 데이터 수신시 Connection시 수립된 Reactive Streams에 흘려보내 전체 Stream의 완료 및 오류 이벤트를 모니터링하고 처리한다.

Channel내 전체 Reactive Streams을 감독하는 역할..?


3-2-4 ConnectionObserver

ConnectionObserver는 각 Connection 상태에 대한 처리를 위임받아 처리하는 이벤트 리스너다.

Connection내 상태가 변경되면 상태에 따라 처리하는 리스너.


ConnectionObserver내 가장 중요한 인터페이스는 onStateChange다. Connection내 상태 변경시 ChannelOprationsHandler가 리스너의 onStateChange를 호출한다.

ConnectionObserver

/**
 * React on connection state change (e.g. http request or response).
 *
 * @param connection the connection reference
 * @param newState the new State
 */
void onStateChange(Connection connection, State newState);

실제 Connection에 대한 발생할 수 있는 상태 목록은 아래와 같다.

interface State {

		/**
		 * Propagated when a connection has been established and is available
		 */
		State CONNECTED = ReactorNetty.CONNECTED;

		/**
		 * Propagated when a connection is bound to a channelOperation and ready for
		 * user interaction
		 */
		State CONFIGURED = ReactorNetty.CONFIGURED;

		/**
		 * Propagated when a connection has been reused / acquired
		 * (keep-alive or pooling)
		 */
		State ACQUIRED = ReactorNetty.ACQUIRED;

		/**
		 * Propagated when a connection has been released but not fully closed
		 * (keep-alive or pooling)
		 */
		State RELEASED = ReactorNetty.RELEASED;

		/**
		 * Propagated when a connection is being fully closed
		 */
		State DISCONNECTING = ReactorNetty.DISCONNECTING;
	}

4 Reactor-Netty Reactive Streams 조립 및 구독 타이밍

앞서 Reactor-Netty의 전반적인 동작과정과 그 과정에서 상호작용하는 모듈들을 살펴보았다.

이제 본격적으로 앞서 살펴본 Reactor-Netty내 Connection에 대한 Reactive Streams가 언제 조립되고 구독되는 타이밍을 살펴본다.

4-1 Connection별로 Data Streams가 존재한다


💁‍♂️ Reactive Streams의 조립 모두 Connection이 수립될 때 수행된다.


reactor netty channelRead

위 그림은 앞서 살펴본 Reactor-Netty내 여러 Reactive Streams가 어떻게 상호작용하며 데이터를 수신/처리/전송하는지를 나타낸다.

이때 각각의 Reactive Streams 모듈들은 모두 Netty Connection과 생명 주기를 같이한다.

즉, 새로 연결된 각각의 Connection에는 그에 상응하는 Reactive Streams가 존재하며, Reactive Streams의 조립 및 구독은 모두 Connection이 수립될 때 수행된다.

쉽게 말해 Connection 별로 Data Streams이 각각 존재한다.

4-2 Reactive Streams 조립과 구독 호출 타이밍


💁‍♂️ ChannelOperationsHandler.channelActive()

Connection별 Reactive Streams의 실제 조립 및 구독 과정은 Connection이 수립되고 매핑된 Netty ChannelPipeline의 channelActive()를 호출하면서 수행된다.

즉, 앞서말한 Netty와 Reactor의 브릿지 역할 Handler인 ChannelOperationsHandler.channelActive()에서 수행된다.


ChannelOperationsHandlers.channelActive()
ChannelOperationsHandlers.channelActive()

코드를 살펴보면 알 수 있듯이 가장 먼저 channel을 랩핑하여 Connection을 얻는다.

여기서 반환되는 것은 간단한 SimpleConnection이다.

그리고 Connection 상태에 맞게 호출되는 콜백인 listener의 onStateChange를 호출한다.

이때 해당 메서드는 실제로 ServerTransport.childObserver가 호출되고 ServerTransport는 ServerTransportDoOnConnection.onStateChange에 처리를 위임한다.

다만 CONNECTED 상태일 때는 channelGroup이 따로 있지않는한 별다른 작업을 수행하진 않는다.


💁‍♂️ ChannelOperations 생성

그 다음 실제로 Reactive Streams과의 브릿지 역할인 ChannelOperations를 생성한다. 이 부분이 channelActive()에서 가장 중요하다.

디버깅
디버깅

디버깅해보니 위와 같이 Connection, ChannelObserver(listener)와 함께 ChannelOperations의 생성자를 호출하여 생성하는 것을 볼 수 있었다.

ChannelOperations 생성자
ChannelOperations 생성자

그리고 위와 같이 ChannelOperations 생성자내에서 FluxReceive, MonoSandMany등이 인스턴스화되면서 개발자가 작성한 handle()외의 Reactive Streams는 조립된다.


💁‍♂️ ChannelOperations 바인딩 및 Reactive Streams 조립 및 구독

ChannelOperations가 생성되었으니, 다시 channelActive() 코드로 돌아오면..

ChannelOperationsHandler.channelActive()
ChannelOperationsHandler.channelActive()

Connection에 ChannelOperations를 바인딩한다. 그리고 이제 Connection의 상태가 CONFIGURED가 되었다고 listener 콜백을 호출한다.

이때도 ServerTransport.childObserver가 호출되고 ServerTransport는 ServerTransportDoOnConnection.onStateChange에 처리를 위임한다.

ServerTransportDoOnConnection.onStateChange()
ServerTransportDoOnConnection.onStateChange()

그리고 위와 같이 doOnConnection.accept(connection);를 호출한다.

TcpServer.OnConnectionHandle
TcpServer.OnConnectionHandle

이를 디버깅해보면 아래와 같이 이전에 구현한 TcpServer내 OnConnectionHandle을 호출한다.

그리고 위와 같이 TcpServer 구성시 개발자가 작성한 handlerapply를 호출하면서 바로 이전에 생성한 ChannelOperations를 넘긴다. 실제로 넘기는 값은 inbound로는 FluxReceive, outbound로 는 MonoSendMany이며, 아래와 같이 개발자가 작성한 로직과 같이 Reactive Streams를 조립하게된다.

handle() 구현부
handle() 구현부

마지막으로 아래와 같이 구성된 Reactive Streams를 subscribe (구독)한다. 실제 구독자는 c.disposeSubscriber()가 된다. (TCP 서버의 경우 ChannelOperations이다.)

TcpServer.OnConnectionHandle
TcpServer.OnConnectionHandle

구독 과정은 당연히 다운스트림에서 업스트림으로 흐르며 아래와 같이 동작하게된다.


💁‍♂️ 정리하면 아래와 같다고 볼 수 있다.

  • Reactive Streams 조립
    • ChannelOperations내 FluxReceive, MonoSendMand는 ChannelOperations가 생성되면서 각자 Streams을 조립.
    • Connection에 대한 전체 Reactive Streams는 개발자가 정의한 TcpServer.handle(inbound,outbound)가 호출되면서 조립됨.
  • Reactive Streams 구독
    • 전체 Reactive Streams를 조립하고 바로 구독.

Netty 입장에서 위 과정은 모두 Connection이 수립되고 ChannelPipeline에의해 ChannelOperationsHandler.channelActive()가 호출될 때 진행된다.

그리고 실제 연결된 Socket Connection내 데이터 수신되면 Netty는 FluxReceive에 데이터를 emit하면서 전반적인 Reactive Streams가 실행된다.

5 Reactive Streams 조립 과정

앞서 조립과 구독이 어디에서 언제 호출되는지 살펴보았다.

이 과정에서 전체 Reactive Streams의 구성은 handler.apply(c.inbound(), c.outbound())에서 수행된다는 것을 디버깅을 통해 살펴봤다.

이제 조립, 구독, 생성 과정을 조금 더 자세히 살펴본다. 우선 조립 과정이다.


여기서 c.inboundc.outbound는 각각 NettyInbound, NettyOutbound를 반환한다. 실제론 Connection내 inbound와 outbound를 모두 구현한 ChannelOperations를 반환한다.

ChannelOperations내 inbound(), outbound()
ChannelOperations내 inbound(), outbound()


💁‍♂️ inbound => FluxReceive

개발자가 정의한 비즈니스 처리 부분
개발자가 정의한 비즈니스 처리 부분

위에서 간단히 구현한 아래 예시로 디버깅을 해보면 위와 같이 .receive()를 호출함으로써 데이터를 받아온다.

그리고 이 메서드를 따라가면 아래와 같이 FluxReceive를 반환하는 것을 볼 수 있다.

FluxReceive
FluxReceive


💁‍♂️ Outbound => MonoSendMany

그리고 outbound.send()가 수행하는 작업과 ChannelOperations 구현을 살펴보면 아래와 같다.

MonoSendMany
MonoSendMany

위 구현에서 알 수 있듯이, 데이터 소스가 MonoSendMany를 통해 데이터를 패키징한다. MonoSendMany는 개발자의 데이터를 연결된 소켓 상대에 데이터를 전송하는 역할을 수행하며, 실제로 다운스트림에 데이터를 생성하진 않는다.

즉, inbound인 FluxReceive에의해 수신된 데이터를 handle()내 연결된 다운스트림들을 거쳐 내려온 데이터를 Mono형태로 포장하기만한다.

MonoSendMany는 더이상 다운스트림이 없고, 포장후 바로 Channel에 writeAndFlush()를 호출하여 데이터를 전송한다.


6 Reactive Streams 구독 과정

조립 과정을 살펴보았으니 이제 subscribe를 호출했을 때 어떤 일이 발생하는지 살펴본다.

OnConnectionHandle
OnConnectionHandle

앞서 살펴본대로 ChannelOperationsHandler.channelActive()에서 Streams을 조립하고 위와 같이 구독을 호출한다.


💁‍♂️ subscribe 호출순서

subsribe가 호출되는 순서도 앞서 살펴본대로 아래와 같다.

subscribe 과정
subscribe 과정

MonoSendMany에서 subscirbe()의 호출을 시작으로 가장 업스트림인 FluxReceive까지 호출되었다가, FluxReceive에서 onSubscribe()를 호출하면서 다시 다운스트림인 MonoSendMany까지 onSubscribe()가 호출된다.


💁‍♂️ MonoSendMany.subscribe()

가장 먼저 실행되는 다운스트림인 MonoSendMany은 아래와 같이 ChannelOperations의 원본인 CoreSubscriber를 SendManyInner로 캡슐화하고 원본 데이터 소스는 SendManyInner 모드에서 구독된다.

@Override
public void subscribe(CoreSubscriber<? super Void> destination) {
    source.subscribe(new SendManyInner<>(this, destination));
}

SendManyInner는 MonoSendMany내에서 데이터 전송을 구현한 subscriber라고보면된다.


💁‍♂️ FluxReceive.subscribe()

FluxReceive.subscribe()
FluxReceive.subscribe()

이번 구독이 처음인지 확인후 s.onSubscribe()를 호출한다. 그리고 이때부터 전체 Stream에 대한 onSubscribe()가 가장 다운스트림까지 전달되어 호출된다.


💁‍♂️ 사용자 Reactive Streams onSubscribe()

FluxReceive에서 호출된 onSubscribe()는 바로 다운스트림인 사용자가 정의한 Streams을 호출한다.

이때 실제로 아래와 같이 doOnSubscribe()를 넣어주면 Connection 연결의 Stream 구독 과정에서 실행된다.

handle내 doOnSubscribe() 정의한 경우
handle내 doOnSubscribe() 정의한 경우

그리고 그 다음 다운스트림인 SendManyInner (MonoSendMany)의 onSubscriber를 호출한다.


💁‍♂️ SendManyInner.onSubscriber()와 request

FluxReceive.onSubscribe()
FluxReceive.onSubscribe()

Queue 관련하여 중복되어 생성되는 것을 방지하기위한 작업을 진행한다.

그리고 코드에서 볼 수 있듯이 s.request(MAX_SIZE)를 호출한다. 여기서 MAX_SIZE의 기본값을 128이다.

request는 다운스트림의 요구사항을 업스트림으로 전달하는 Reactive Streams의 기본 스펙이다.

다운스트림 (SendManyInner)의 요청을 받은 FluxReceive는 아래와 같이 작업을 수행한다.

FluxReceive.request()
FluxReceive.request()

위 코드에서 알 수 있듯이 FluxReceive는 drainReceiver()을 호출한다.

여기서 drainReceiver()는 요구에 따라 Channel내 데이터를 다운스트림으로 발행하는 loop이다.

즉, 해당 Channel에 요청 데이터가 들어오기전까지 Streams는 이제 대기한다.

여기서 이 함수 또한 Netty EventLoop 스레드에서 실행된다.


💁‍♂️ 위 구독 과정은 모두 Netty I/O 스레드에서 실행된다.

위 subscribe 과정은 모두 Netty I/O 스레드에서 수행된다. 보통 Netty Handler에 Blocking 코드를 포함하면 문제가 발생하는데, 위 과정은 따로 I/O Blocking처럼 오랫동안 블로킹하는게 없으며 매우 빨라 I/O 일반적으로 스레드를 차단하진 않는다.

물론 개발자가 비즈니스를 기입하는 Reactive 프로그래밍 부분에서는 Blocking 호출을하면 안된다. 만약 위 구독 과정중 doOnSubscribe()에 오랫동안 Blocking하는 코드를 넣으면 해당 Netty I/O가 차단되기때문에 주의가 필요하다.


7 Reactive Streams 실행 과정

조립/구독을 살펴보았으니 이제 마지막으로 단일 Connection내 Reactive Streams가 어떻게 동작하는지 자세히 살펴본다.

그리고 그 과정에서 Reactive Streams가 어떻게 배압조절을 하는지도 살펴본다.


💁‍♂️ ChannelOperationsHandler.channelRead()

조립/구독까지 완료되었다는 것은 Connection이 이미 수립되어 데이터 요청 받을 준비가 되었다는 의미다.

그리고 실제 Connection과 연결된 소켓에 데이터가 수신되면 ChannelOperationsHandler.channelRead()가 실행된다.

ChannelOperationsHandler.channelRead()
ChannelOperationsHandler.channelRead()

위 코드에서 알 수 있듯이, 이전 channelActive()과정에서 생성한 ChannelOperations를 Channel로부터 가져온다.

그리고 ChannelOperations.onInboundNext를 호출한다. 그리고 이는 FluxReceive.onInboundNext()에 위임한다.

FluxReceive는 이는 구독한 Reactive Streams에 데이터를 emit한다.


💁‍♂️ FluxReceive 데이터 emit 과정을 통해 살펴보는 배압조절

FluxReceive.onInboundNext()
FluxReceive.onInboundNext()

FluxReceive는 수신된 데이터를 자체 큐 (receiverQueue)에 넣고, drainReceive() 루프를 호출한다.

다음은 핵심 구현 부분이다. 직접 코드를 넣기엔 너무 길어.. 직접 코드를 복붙한다.


FluxReceiver.drainReceive()

final void drainReceiver() {
    // general protect against stackoverflow onNext -> request -> onNext
    if (wip++ != 0) {
        return;
    }
    int missed = 1;
    for(;;) {
        final Queue<Object> q = receiverQueue;
        final CoreSubscriber<? super Object> a = receiver;
        boolean d = inboundDone;

        if (a == null) {
            ......
        }

        long r = receiverDemand;
        long e = 0L;

        while (e != r) {
            ......

            try {
                ...
                a.onNext(v); // 데이터를 emit한다.
            }
            finally {
                try {
                    ReferenceCountUtil.release(v);
                }
                catch(Throwable t) {
                    inboundError = t;
                    cleanQueue(q);
                    terminateReceiver(q, a);
                }
            }

            e++;
        }

        ......

        if (r == Long.MAX_VALUE) {
            ......
        }

        // Back-Pressure
        if ((receiverDemand -= e) > 0L || (e > 0L && q.size() < QUEUE_LOW_LIMIT)) {
            if (needRead) {
                needRead = false;
                channel.config()
                        .setAutoRead(true);
            }
        }
        else if (!needRead) {
            needRead = true;
            channel.config()
                    .setAutoRead(false);
        }

        missed = (wip -= missed);
        if(missed == 0){
            break;
        }
    }
}

drainReceiver 루프는 다운스트림의 request 수에 따라 데이터를 발행하고, onNext 함수를 통해 데이터를 emit하는 역할을 수행한다.

데이터 emit하는 것 외에 루프의 가장 중요한 부분은 Connection 수준에서 동작하는 Back-Pressure이다.

다음 두 가지 조건이 충족되면 채널의 autoRead를 닫는다.

  1. 다운스트림의 request <= 0
  2. 대기열 >= 특정 임계치. 임계치의 기본 값은 32다.

즉, 너무 많은 요청이 한번에 들어오는 상태에서 다운스트림이 처리를 빠르게하지 못함으로써 Overflow가 발생하는 것을 방지한다. 이로인해 메모리에 너무 많은 데이터를 읽어들이는 것을 미리 방지할 수 있다.

Netty Channel은 autoRead를 false로하면 데이터를 자동으로 읽어들이지않는다.


💁‍♂️ Back-Pressure가 발생할 수 있는 두 가지 상황

보통 다운스트림의 소비 속도가 업스트림의 생산 속도를 따라잡지 못하는 경우는 크게 두 가지가 있다.

  1. 개발자가 정의한 Reactive Streams 작업의 시간이 오래 소요되는 경우
  2. 비즈니스 처리후 SendManyInner가 응답을 소켓에 전송하였으나 네트워크의 속도가 따라오지 못하는 경우.

첫 번째 개발자가 정의한 로직의 문제인 경우는 개발자가 디버깅을 통해 원인을 파악하고 수정해야한다. 대부분은 외부 (DB, 외부 서버)의 속도가 느리거나 로직내 Blocking 로직을 넣음으로써 스레드가 차단되어 발생하는 경우일 것이다.

두 번째 문제는 실제 네트워크에 데이터를 전송하는 가장 다운스트림인 SendManyInneronNext를 수신했을 때 아래와 같이 처리함으로써 Back-Pressure 역할을 수행한다.

SendManyInner

@Override
public void onNext(I t) {
    ...
    trySchedule();
}

void trySchedule() {
    ...

    try {
        if (eventLoop.inEventLoop()) {
            run(); // 데이터 쓰기 작업 호출
            return;
        }
        eventLoop.execute(this);
    }
    catch (Throwable t) {
        ...
    }
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void run() {
    Queue<I> queue = this.queue;
    try {
        int missed = 1;
        for (; ; ) {
            int r = requested;

            while (Integer.MAX_VALUE == r || r-- > 0) {
                I sourceMessage = queue.poll();

                ...

                int readableBytes = parent.sizeOf.applyAsInt(encodedMessage);

                ...

                pending++;

                // 채널 내부 버퍼에 쓰기
                ctx.write(encodedMessage, this);

                // 아래 조건중 하나라도 만족시 flush 실행
                // 1. flush 해야할 대상이 지정된 경우
                // 2. channel의 기본 버퍼가 꽉차서 더이상 write을 할 수 없는 경우
                // 3. 쓰기 해야할 데이터의 양 > 남은 버퍼의 용량
                if (parent.predicate.test(sourceMessage) 
                    || !ctx.channel().isWritable() 
                    || readableBytes > ctx.channel().bytesBeforeUnwritable()) {

                    needFlush = false;
                    ctx.flush();
                }
                else {
                    needFlush = true;
                }
            }

            // 그외 비동기 flush
            if (needFlush && pending != 0) {
                needFlush = false;
                eventLoop.execute(asyncFlush);
            }

            ...

            // 다음 필요한 업스트림 요청의 양을 결정하고 요청한다.
            int nextRequest = this.nextRequest;
            if (terminalSignal == null && nextRequest != 0) {
                this.nextRequest = 0;
                s.request(nextRequest);
            }

            ...
        }
    }
    ...
}

위 코드에서 알 수 있듯이, SendManyInner에서는 소켓에 flush 작업을 수행할지 여부를 결정하기위해 기본 쓰기 버퍼가 가득찼는지 여부등을 판단한다.

기본적으로 다음 세 가지 조건중 하나라도 만족하면 flush를 수행한다.

  1. flush 해야할 대상이 지정된 경우
  2. channel의 기본 버퍼가 꽉차서 더이상 write을 할 수 없는 경우
  3. 쓰기 해야할 데이터의 양 > 남은 버퍼의 용량

그 외의 경우엔 비동기로 flush를 요청한다. 그리고 다음 필요한 데이터의 양만큼 업스트림에 request로 요청한다.

8 Netty의 autoRead와 isWriteable

앞서 배압조절부분에서 Channel의 autoRead와 isWriteable을 활용한다는 것을 살펴봤다.


💁‍♂️ autoRead

Netty의 autoRead 속성은 Channel의 데이터 수신 속도를 제어한다.

그 배경엔 TCP의 Sliding Window 기반 흐름 제어 매커니즘이 사용된다. OS 소켓은 보통 커널에서 수신 버퍼와 전송 버퍼를 구성한다. 수신자의 경우 애플리케이션 계층이 소비하기에 너무 늦고 수신 버퍼가 가득차면 TCP는 상태 디바이스에게 TCP 전송 Window 크기를 줄이거나 전송을 중지하라고 알려준다. 마찬가지로 전송의 경우 전송 버퍼가 가득차면 비동기 시스템 호출이 애플리케이션 계층에 전송이 실패했음을 알려준다. (Netty에선 isWritable)

위와 같이 Netty는 autoRead 속성을 사용하여 Channel의 읽기 이벤트의 등록 및 제거를 제어함으로써 배압 조절을 구현한다. autoRead가 켜져있으면 Channel 데이터가 자동으로 애플리케이션 계층으로 읽혀지며, autoRead가 꺼지면 읽기를 멈춘다. 이때 TCP에서 수신한 데이터는 OS 수신 버퍼에 적재되며, TCP 흐름 제어 매커니즘이 동작하게된다고한다.


💁‍♂️ isWritable

수신 관련해서 autoRead가 사용된다면 전송 과정에서는 isWritable가 사용된다.

isWritable 속성은 기본 소켓의 전송 버퍼가 채워졌는지 여부를 결정한다. 전송 버퍼가 가득찼다면 false를 가지게되며 애플리케이션 계층은 더 이상 새 데이터를 쓰지 않아야한다.

Reactor-Netty는 이러한 점을 활용하여 배압 조절을 구현하였다.

정리

이번 글은 Reactor-Netty의 전반적인 동작 과정과 Reactive Streams와 Netty을 어떻게 연결했는지 살펴보았다.

그리고 그 과정에서 Connection 수준의 Back-Pressure 특성을 어떻게 구현했는지 살펴보았다.

이러한 Back-Pressure 특성은 서버를 더욱 유연하고 내결함성있게 만들어 외부 트래픽 변화에 대응하여 대규모 트래픽에 직면하면 붕괴되는 대신 시스템이 더욱 유연하게 처리할 수 있도록 보장해준다.


참고

  • Reactor-Netty Docs

© mark-kim.blog Built with Gatsby