thumbnail
네티 이해하기 (Netty Deep Dive)
Java / Non-blocking / NIO / Network / Event-loop / Netty
2024.05.06.

들어가며

이번 글은 가장 인기있는 비동기 이벤트 기반 네트워크 애플리케이션 프레임워크 Netty를 내가 공부하면서 꼭 이해해야하는 모든 측면을 정리하고자한다.

Netty에 대해서 Deep Dive 하기전에 Netty를 이해하기 위한 배경 지식부터 정리할 예정이며, 이 내용을 바탕으로 Netty의 동작 방식, 핵심 개념 및 구성 요소등을 Deep Dive한다.


1 Before Dive In - Netty를 이해하기위한 사전 지식

본격적으로 Netty에 대해서 Deep In 하기전에 Netty에 대한 개념을 이해하는데 꼭 알아야하는 지식들을 먼저 정리해본다.

이전에 작성한 사례를 통해 이해하는 네트워크 논블로킹 I/O와 Java NIO사례를 통해 이해하는 Event Loop 내용을 다시 다듬어서 정리하였으며, 더 자세한 내용은 두 글을 참고하길 추천한다.

Netty를 이해하는데 꼭 필요한 사전 지식은 Non-blocking 기반의 Multiplexing Network I/O와 ByteBuffer 그리고 EventLoop에 대한 내용이다.

만약 위 내용에 대해서 충분히 이해하고있다면 바로 Netty Deep Dive 부분으로 넘어가길 추천한다.


1-1 I/O 모델

애플리케이션 프로세스에서 원격에 있는 디바이스와 데이터를 전송(write) 및 수신(read)하기위해선 OS 커널 system call이 불가피하다.

socket read write

  • write system call의 경우, OS 커널은 전송하고자하는 데이터를 소켓 전송 버퍼(send buffer)에 복사한다.
  • read system call의 경우, OS 커널은 수신 받은 데이터를 소켓 수신 버퍼 (receive buffer)에 복사한다.

보통 이러한 OS 커널 작업은 대부분 DMA가 네트워크 카드로부터 OS 커널에 데이터를 버퍼에 복사하고, 인터럽트를 발생시켜 애플리케이션 프로세스가 해당 데이터를 읽어가거나, 외부 디바이스에 데이터를 전송하게된다.

다시말해 서로 다른 공간에 위치한 애플리케이션간의 통신은 아래와 같이 kernel내 소켓과 소켓내 buffer등을 활용하여 통신하게된다.

network systemcall


여기서 중요한 점은 통신을 위한 커널 소켓내 버퍼에 데이터를 어떻게 읽고 쓰기할지에 대한 I/O 모델은 Blocking I/O와 Non-blocking I/O로 나뉜다.


1-2 Blocking I/O와 Non-blocking I/O


💁‍♂️ Network I/O

OS 커널내 소켓 버퍼로부터 애플리케이션(프로세스)이 데이터를 read/write하는 작업을 Network I/O라 부른다.

이때 네트워크 커널에 write, read하기위한 system call은 애플리케이션 프로세스 관점에서 Blocking 모드와 Non-Blocking 모드로 나뉘게된다.

  • Blocking 모드는 프로세스내 Thread가 커널에 system call (read/write)하고 응답이 올 때까지 다른 일을 하지못하고 Blocking 되는 것을 말한다.
    • JDK 1.3에서 사용되는 기존 Java I/O가 Blocking 모드다.
  • Non-Blocking 모드는 프로세스내 Thread가 커널에 system call (read/write)하고 응답과 상관없이 바로 return되어 다른 일 수행하는 것을 말한다.
    • JDK 1.4에서 나온 Java New I/O (NIO)가 Non-Blocking 모드를 지원하기 시작했다.

Blocking/Non-Blocking은 제어권 관점에서 호출되는 함수의 리턴여부가 관심사이다.


💁‍♂️ Blocking 방식보다 Non-blocking이 효율적이다

Blocking 방식의 네트워크 I/O는 이해하기 쉽고 구현하기 용이하다는 장점이 있지만, 컴퓨터 리소스를 제대로 활용하지 못하는 문제가 있다.

출처: https://stackoverflow.com/questions/17615272/java-selector-is-asynchronous-or-non-blocking-architecture
출처: https://stackoverflow.com/questions/17615272/java-selector-is-asynchronous-or-non-blocking-architecture

컴퓨터 리소스를 제대로 활용하지 못하는 이유는 TCP 네트워킹에서 accept, read, write등의 I/O 작업을 Blocking하게 처리하기때문에 소켓을 연결 및 처리하는 Thread가 실제 일을 하지않고있음에도 Thread를 점유해 다른 작업 요청을 처리하지 못하게한다.

쉽게 말해 애플리케이션 Thread가 네트워크 I/O 작업을 위해 Blocking 될 때의 해당 Thread의 기회 비용이 너무 크다는 의미이다.


blocking i o thread


그리고 자연스레 위와같이 I/O 작업마다 Thread를 할당해서 사용해야하므로, 많은 수의 Thread가 소비된다.

실제 많은 서버가 현재 TCP connection과 HTTP 요청부터 비즈니스 처리후 응답까지 모두 Blocking하게 처리한다.

많은 서버 프레임워크들은 이러한 비효율성을 Connection 풀을 활용하여 비용을 줄였지만, 여전히 I/O는 모두 Blocking하게 처리하기때문에 HTTP 요청부터 비즈니스 처리후 응답까지 모두 Blocking하게 처리한다.

이 문제를 해결하는 방법은 I/O 작업을 Non-Blocking하게 변경함으로써 적은 수의 Thread로 여러 개의 커넥션을 처리하도록 하는 것인데, 쉬운 이해를위해 음식점을 비유 들자면 아래와 같다.

  • Blocking
    • 손님이 줄서서 자기 순번이오면 음식을 시키고, 그 앞에서 다른 일을 하지 않고 음식 나올 때까지 기다린다.
    • 이때 식당의 카운터 직원도 다른 손님의 주문을 받지 못하고 바로 앞 손님의 주문 요청과 음식이 나올 때까지 대기한다. 그리고 음식(요청에 대한 결과)까지 나와서야 다음 손님의 요청을 받고 처리하기를 반복한다.
  • Non-Blocking
    • 손님이 음식을 시키고 전화번호를 입력해두고나서 자기 할 일을 한다. 음식이 나오면 음식점에서 메시지로 알림을 준다. 알림이오면 그제서야 손님은 음식을 받는다.
    • 이때 식당의 카운터 직원도 손님의 주문을 받아 요리사에게 처리를 위임하고 음식이 완료되면 요리사로부터 알려달라고 전달한다. 그리고 음식이 나오면 이제서야 카운터 직원은 손님에게 음식 나왔다가 말하기를 반복한다.
    • 이때 식당의 카운터 직원은 계속해서 다른 손님의 주문을 받는다.

위 예시에서 손님은 Client, 카운터 직원은 Server 역할이며, 비유를 통해 알 수 있듯이, Non-Blocking이 음식점 입장에선 훨씬 많은 수의 고객 대상으로 빠르고 효율적으로 장사할 수 있다.

그렇다면 Non-Blocking하게 처리하는 방식은 무엇일까?


1-3 이벤트 기반 프로그래밍 Non-Blocking Multiplexing I/O

앞서 Blocking보다 Non-blocking이 더 효율적이라고했는데, 어떻게 동작하기에 더 효율적이라고 하는 것일까? 그리고 어떻게 적은 수의 Thread로 여러 개의 커넥션을 처리하는 것일까?

우선 커널 Network I/O를 Non-blocking하게 처리한다는 의미는 Thread가 connect(), accept(), read(), write() 등의 I/O를 호출과 동시에 return 받아 막힘없이 계속 처리하는 것을 의미한다.

그리고 모든 I/O를 막힘없이 바로 return 받는 Non-Blocking의 처리 방식은 크게 두 가지로 나뉜다.

  • Polling 방식
  • Push 방식 -> Multiplexing I/O (Event)

💁‍♂️ Polling 방식의 Non-Blocking

먼저 Bad-Practice에 해당하는 Polling 방식의 Non-Blocking을 살펴보면 아래와 같다.

출처: https://stackoverflow.com/questions/17615272/java-selector-is-asynchronous-or-non-blocking-architecture
출처: https://stackoverflow.com/questions/17615272/java-selector-is-asynchronous-or-non-blocking-architecture

이 방식은 I/O 작업 요청은 막힘없이 바로 return 받는 Non-Blocking으로 동작하지만, 특정 Thread가 지속적으로 모든 소켓을 반복하여 이전에 요청한 작업이 완료되었는지? 그리고 작업할게 있는지 체크하고 작업할게 있다면 작업을 수행하는 방식이다.

이 방식도 Non-Blocking 방식이라고 볼 수 있긴 한데, 특정 소켓에서 어떤 일이 발생했는지 계속 확인해야하기때문에 kernel에 I/O system call (read/write)을 지속적으로 호출하면서 필요이상의 CPU의 리소스가 소요된다.

가장 큰 문제는 Channel내 어떠한 I/O 이벤트가 발생하지 않았음에도 지속적으로 system call을 호출해야한다는 것이다.

또한, 구현에 따라 확인하는 과정이 Blocking 되어 처리가 계속 지연되어 더 큰 문제가 발생할 수도 있게된다.

가장 큰 문제는 연결된 커넥션 수가 늘어나면 확인해야할 Channel이 많아지므로 성능이 선형적으로 나빠지게되므로, 당연히 Bad Practice 방식이라고 볼 수 있다.

그럼 Best Practice는 무엇일까? 바로 이벤트 기반의 Multiplexing Non-Blocking 방식이다.


💁‍♂️ Multiplexing I/O

전통적으로 사용자 인터페이스가 포함된 프로그램에선 이벤트 기반 프로그래밍이 많이 사용된다.

대표적으로 마우스 클릭이나 키보드 입력에 반응하는 콜백이 이에 해당한다. 각 이벤트를 먼저 정의해두고 발생한 이벤트에 따라서 콜백 코드가 실행되도록 프로그램을 작성하는 것이다.

앞서 살펴본 Polling 방식은 직접 Channel들을 순회하며 읽을 데이터가 있는지 체크하였는데, 이보단 특정 Channel이 변경되면 변경되었다고 이벤트를 만들어 알림을 주도록하면 실제 사건(Event)이 일어났을때만 컴퓨터가 일을 하면되므로 리소스를 효율적으로 사용할 수 있게된다.


non blocking i o thread


이렇게되면 위와 같이 이벤트 발생했을 때만 컴퓨터가 일을 하면 되므로, 하나의 Thread로 여러 개의 Channel을 처리 및 관리할 수 있게된다.

그래서 이러한 I/O 방식을 Multiplexing 이라 부른다.

Multiplexing은 단어 뜻대로하면 다중화로 하나의 통로를 여러 개가 활용한다는 의미인데, 이를 이벤트 기반의 소켓 통신에 적용하면 셀렉터(Selector)라는 하나의 객체가 여러 소켓 채널의 변화를 감지하여 이벤트가 발생했을 때만 처리한다는 의미이다.

출처: https://stackoverflow.com/questions/17615272/java-selector-is-asynchronous-or-non-blocking-architecture
출처: https://stackoverflow.com/questions/17615272/java-selector-is-asynchronous-or-non-blocking-architecture

위 다이어그램에서도 알 수 있듯이, Application이 계속해서 OS 커널에 system call 요청을 하지않고, 소켓에 변화가 있을때마다 OS 커널이 Application에 알려주는 방식이다.

조금 다른 그림으로 polling 방식과 push 방식을 비교하면 아래와 같다.


non blocking polling push


이렇게 OS 커널에서 Socket Channel내 변화가 있을 때마다 이벤트 형식처럼 애플리케이션에 알려준다는 의미에서 개인적으로 이벤트 기반 프로그래밍과 유사하다고 생각든다.

그리고 Netty도 이러한 멀티플렉싱을 십분 활용한 EventLoop 기반으로 동작함으로써 높은 성능을 유지한다.

어떻게 소켓에 변화가있을때 OS 커널은 Application에 알림을 주는 것인가?라는 의문은 Linux의 select(), poll(), epoll()등의 개념을 구글링해서 살펴보길 추천한다.


1-4 Java NIO

자바는 I/O의 효율적인 처리를 위해 JDK 1.4 버전에 NIO (New I/O)라는 이름으로 Multiplexing I/O를 지원하기 시작했다.

앞서 살펴본바와 같이 Java NIO의 Selector에 SocketChannel을 등록하면 해당 Socket에서 변화(read/write 할 준비가 됨)가 발생했을 때 이벤트를 통해 처리를 할 수 있도록 지원한다.

쉬운 이해를 위해 Java NIO의 전반적인 처리 방식을 도식화하면 아래와 같다.

Selector는 시스템 이벤트 통지 API를 사용하여 하나의 스레드로 동시에 많은 I/O를 처리 할 수 있다
Selector는 시스템 이벤트 통지 API를 사용하여 하나의 스레드로 동시에 많은 I/O를 처리 할 수 있다

위 그림에서 Selector는 이벤트를 감지하는 역할을하며, Non-blocking Channel에 Selector를 등록해놓으면 클라이언트의 커넥션 요청이 오거나 데이터 읽기/쓰기 작업이 필요한경우 Channel이 Selector에 이벤트를 통보한다.

그럼 Selector는 미리 등록해둔 Key의 상태를 변경하여 특정 Channel에 대한 작업을 수행하도록 미리 등록된 콜백 메서드를 실행하면서 Thread에 비즈니스 로직 처리를 위임한다.

위 내용을 조금 더 자세히 이해하고싶다면 실제 코드 예시로 하나하나 구현하여 설명한 이전글 - 사례를 통해 이해하는 네트워크 논블로킹 I/O와 Java NIO 보는 것을 추천한다.


1-5 ByteBuffer


💁‍♂️ NIO 이전의 기존 Java I/O가 느린 또 다른 이유

Java Network I/O가 느린 이유로 앞서 말했듯이 Blocking 방식으로 처리되는 문제도 있었지만, 또 다른 큰 이유는 JVM 애플리케이션이 커널 버퍼 (Direct Buffer)를 직접 핸들링 할 수 없기때문이다.

Socket이나 file로부터 Stream이 들어오면 OS 커널은 데이터를 커널 버퍼에 쓰게되는데, Java에서 이 커널 버퍼에 접근할 수 있는 방법이 없다.

따라서 처음엔 JVM 내부 메모리에 커널 버퍼 데이터를 복사해서 접근할 수 있도록했으며, 이로인해 “커널에서 JVM 내부 메모리에 복사”하는 오버헤드가 존재한다.

여기서 말하는 JVM 내부 메모리는 프로세스별로 할당되는 스택이나 힙 메모리를 의미한다.

조금 더 구체적으로 살펴보자면 복사과정은 아래와 같이 동작한다.

  • JVM (프로세스)이 file이나 socket으로부터 데이터를 읽기 위해 kernal에 명령을 전달한다.
  • kernel은 시스템 콜 (read())를 사용함으로써 디스크 컨트롤러가 물리적 디스크나 소켓으로부터 데이터를 읽어온다.
  • OS는 DMA를 이용하여 kernel 버퍼에 해당 데이터를 복사한다.
  • 그리고 JVM (프로세스) 내부 버퍼로 복사하고, JVM은 그제서야 해당 데이터를 사용한다.

위 과정은 단순해보이지만 아래와 같은 문제를 야기한다.

  • kernel 버퍼에서 JVM 내부 버퍼로 복사할 때 CPU 소비.
  • 복사한 JVM 버퍼내 데이터 사용후 GC가 수거해야함으로써 CPU 소비.
  • 복사가 진행되는 동안 I/O 요청한 JVM Thread는 Blocking된 상태이므로 기회 비용 발생.
    • 물론 Non-Blocking하게 처리할 순 있을듯하다.

💁‍♂️ 커널에 직접 접근할 수 있는 ByteBuffer - zero copy

JDK 1.4부터 JVM 내부 버퍼에 복사하는 문제를 해결하기위해 kernel 버퍼에 직접 접근할 수 있는 기능을 제공하기 시작했다.

바로 ByteBuffer (Direct Buffer)다. 그리고 Netty도 이를 적극 활용함으로써 메모리 복사에 대한 비효율성을 없앴다.

Direct Buffer는 직접 kernel 버퍼를 참조하고 있으므로, 위에서 발생한 복사문제로인해 CPU 자원의 비효율성 문제점을 해결할 수 있다.

다시말해 ByteBuffer를 사용하면 OS 레벨내에선 zero copy를 실현할 수 있다. 다만, GC의 영향권을 벗어나 자칫 메모리 누수가 발생할 수도 있어 상황에 따라 메모리 해제 관련해서 잘 챙겨줘야한다.

그리고 Java NIO의 ByteBuffer는 아래와 같은 문제점들이 존재한다.

  • 데이터 write/read 인덱스가 분리되어있지않고 하나의 값을 공유하여 사용한다.
    • write/read 모드가 따로 존재하지않고 하나의 pos 값을 flip하여 사용한다.
  • 버퍼의 사이즈가 고정적이다.
    • limit이나 capacity를 넘어서면 버퍼의 사이즈가 늘어나는 것이 아닌 BufferOverflowException이 발생한다.
  • 버퍼풀이 존재하지 않다.
    • 버퍼풀이 따로 존재하지않으며, 버퍼의 생성 및 메모리 해제 작업이 빈번하여 GC가 자주 유발된다. (따로 객체 풀링 서드파티를 이용하면 해결할 순 있다.)

Netty는 위와 같은 기존 NIO ByteBuffer의 문제점을 해결하기위해 ByteBuf를 사용한다.

이와 관련해서는 다른 글에서 조금 더 자세히 다뤄 볼 예정이다.

ByteBuffer는 JVM 내부 메모리에 기존대로 복사하는 방식과 커널에 직접 접근하는 DirectBuffer 방식이 있다.

즉, 무조건 커널내 메모리를 직접 접근하는 것이 아닌 구현 방식에 따라 다르므로 상황에 따라 알맞는 방식을 사용하는 것이 중요하다.


1-6 EventLoop

지금까지 이야기한 Multiplexing 기반의 Non-blocking I/O와 ByteBuf를 활용하면 Reactor 패턴을 구현할 수 있다.

그리고 Reactor 패턴은 우리가 보통 많이 이야기하는 EventLoop라고 볼 수 있다.

물론 개념의 범위가 조금 다르지만 이 글에선 편의를위해 이하 EventLoop라고 칭할 예정이다.

EventLoop의 동작 방식을 도식화하면 아래와 같다.


event loop


  • Event Loop
    • 무한 반복문을 실행하며 Selector로부터 I/O 이벤트가 발생할 때까지 대기하다가 이벤트가 발생하면 해당 이벤트를 처리할 수 있는 Handler에게 디스패치한다.
    • 보통 특정 Channel에 대한 이벤트를 큐에 삽입할 때, 해당 이벤트를 처리할 수 있는 Handler도 같이 첨부해준다.
  • Handler
    • 이벤트를 받아 비즈니스 로직을 수행한다. (수행완료하고 결과에 맞는 이벤트를 다시 발행하기도한다.)

정리하면 Client와의 Connection을 통해 생성된 Socket Channel을 EventLoop에 등록하면 아래와 같이 해당 Channel의 I/O부터 close까지의 생명주기는 EventLoop가 관리하게된다.

즉, Channel을 EventLoop에 등록함으로써 Channel의 제어 흐름, 멀티 스레딩, 동시성 제어등을 EventLoop가 처리하게된다는 의미다.

Netty는 실제로 위 EventLoop 개념을 적극 활용하여 구현되어 있으며, Netty에서의 EventLoop 구현체중 하나인 NioEventLoop는 1 Selector + 1 thread + 1 taskQueue로 되어있다.

마지막으로 EventLoop는 기본적으로 하나의 thread에서 동작하기에 비즈니스 로직에 Blocking한 로직을 넣으면 애플리케이션에 성능이 더욱 악화될 수 있어 주의가 필요하다.

그래서 Netty는 잘 동작 방식을 이해하고 사용하는 것이 무엇보다 중요하다.

정리하면 EventLoop는 Netty에서 제어 흐름, 멀티스레딩, 동시성 제어등의 역할을 수행한다.


2 Netty 동작 흐름 - EventLoopGroup 그리고 boss와 child

앞서 Netty를 이해하는데 필요한 사전 지식을 살펴보았으니, 이제 본격적으로 Netty에 대해서 정리해본다.

우선 코드 레벨로 Deep Dive 하기전에 Netty가 동작하는 전체 큰 그림을 먼저 살펴본다.

Netty Server를 이해하면 Client는 쉽게 이해되므로 Server 기준으로 글을 작성했다.


💁‍♂️ Netty의 기본적인 동작 방식 - EventLoop

앞서 언급했듯이 Netty는 기본적으로 EventLoop 기반으로 동작한다.

더 자세히 얘기하자면 Netty는 NIO Selector에 등록된 Channel에서 발생하는 이벤트들을 Channel에 매핑된 핸들러가 처리하는 구조다.

EventLoop의 처리 방식을 그림으로 도식화하면 아래와 같다.


netty eventloop


그림에서 알 수 있듯이, Selector에 등록된 Socket Channel Key중 이벤트가 발생한 SelectedKey들을 Task Queue에 넣고 등록된 Channel에 Handler Pipeline에 위임하여 네트워크 read/write와 비즈니스 로직을 처리한다.

뒤에서 자세히 얘기하겠지만 어떠한 Operation 처리느냐에따라 바로 실행할 수도, Task Queue에 넣고나서 실행될 수도 있다.

Pipeline에 대한 자세한 내용은 아래에서 더 자세히 다룰 예정이며, 여기선 그저 이벤트를 Stream으로 처리하는 Pipeline이라고만 이해해도 충분하다.

Pipeline = I/O 이벤트에 대한 Handler 모음

많은 개발자들이 익숙한 Node.js와 Redis의 EventLoop와 유사한 매커니즘을 가진다고 보면된다.


💁‍♂️ Netty EventLoopGroup - BossGroup와 ChildGroup

Netty는 설정을 통해 여러 EventLoop를 Group으로 묶을 수 있으며 Group을 조합해서 애플리케이션을 구성한다. (Netty의 가장 핵심적인 부분이기도 하다.)

쉽게 말해 같은 역할을 수행하는 복수의 EventLoop를 EventLoop Group으로 묶을 수 있으며, EventLoopGroup은 크게 BossGroup와 ChildGroup으로 구분된다.

아래는 Netty의 EventLoop를 구성할 때의 가장 기본이 되는 구조를 도식화한 그림이다.


netty eventloop group


조금 복잡해보일 수 있는데, 간단히보면 두 개의 EventLoop Group으로 구성되어있다고 보면된다.

  • BossGroup
    • Acceptor EventLoop Group이라고 볼 수 있으며, 새로운 클라이언트의 연결 요청을 처리한다. 즉, 새로운 클라이언트와의 SocketChannel을 생성하며, accept 이벤트를 처리한다.
    • BossGroup내 EventLoop는 클라이언트와의 연결 부분만 처리하며, accept 완료된 SocketChannel은 ChildGroup내 EventLoop Selector에 위임등록하여 데이터 read/write부터는 ChildGroup내 EventLoop가 처리하도록한다.
  • ChildGroup
    • Worker EventLoop Group으로도 많이 알려져있으며, BossGroup으로부터 전달받아 Selector에 등록된 SocketChannel에서 발생하는 read/write 이벤트를 처리한다.
    • read/write에 대한 이벤트를 모두 처리하기때문에 사용자와의 요청/응답에 대한 비즈니스 처리는 모두 ChildGroup내 EventLoop에서 처리한다.

정리하면 Netty는 다수의 EventLoop를 두 개의 Group으로 묶어서 사용하며, 아래와 같이 BossGroup은 새로운 커넥션 처리, ChildGroup은 연결된 커넥션에 대한 non-blocking하게 read/write를 처리한다고 보면된다.


boss child event group


이렇게 구성된 구조상 SocketChannel은 Worker EventLoop Group내 하나의 EventLoop에만 등록되어 처리된다. Boss와 Child를 구분한 이유는 새로운 연결에 대한 처리와 연결된 커넥션에 대한 처리를 분리함으로써 더 많은 트래픽을 동시에 소화할 수 있도록하기 위함인 것으로 생각든다.

당연히 새로운 연결(connect)보단 채널에 대한 read/write에 대한 이벤트가 훨씬 많이 발생하기때문에, 보통 Worker EventLoop의 수를 더 많이 설정한다. 그래서 Netty는 따로 설정안하면 디폴트로 Worker Group의 EventLoop의 수를 2 * 사용가능한 프로세서 수로 설정한다.

물론 위 구조가 절대적인 것은 아니다. 예를 들어, Netty를 Client로 사용할 경우, 새로운 연결을 한번만하면 되기때문에 보통 하나의 EventLoopGroup으로 I/O (connect, read, write) 관련 이벤트를 모두 처리한다.


3 Netty 구성요소 Deep Dive

Netty를 이해하는데 필요한 사전 지식과 EventLoop 기반의 동작 방식의 대해서 살펴보았으니, 이제 Netty가 구동하는데 서로 유기적으로 협업하며 동작하는 여러 Component 들을 살펴본다.

Netty는 실제 클라이언트-서버간의 네트워크 통신과정에서 여러 컴포넌트들이 각자의 역할을 수행하며 협력한다.

각 컴포넌트별 관계는 아래와 같다.

비교적 많은 컴포넌트들이 서로 각자의 역할을 수행하며, 다른 컴포넌트와 상호 협력한다.

아래부터는 이제 각 컴포넌트별로 어떤 역할을 수행하며, 어떻게 상호 협력하는지 그리고 각 컴포넌트를 이해하는데 필요한 사전 지식들을 정리한다.


3-1 Channel

SocketChannel


💁‍♂️ Channel - 네트워크 소켓 I/O에 대한 실질적인 처리를 수행한다

Channel은 TCP 커넥션에 대한 I/O 작업을 처리하는 역할을 수행한다.

구체적으로 Channel은 사용자에게 아래와 같은 기능을 제공한다.

  • Channel의 현재 상태 (ex. open? connected?)
  • Channel에 대한 설정 (ex. send/receive buffer size)
  • I/O 명령 (ex. read, write, connect, bind)
  • Channel에 대한 I/O 이벤트를 처리할 ChannelPipeline 관련 처리.
  • 연결된 EventLoop 조회 (Channel이 등록된 EventLoop를 알 수 있다)

그리고 필요에 따라 ChannelPipeline에 ChannelHandler를 추가 및 제거하여 Channel I/O 이벤트에 대한 처리를 수행할 수 있다.

ChannelPipeline 이나 ChannelConfig 외에도 Channel에 정의된 다양한 역할을 수행한다.

메서드 설명
eventLoop() channel이 등록된 EventLoop를 반환한다.
pipeline() channel에 등록된 ChannelPipeline을 반환한다.
isActive() Channel이 active한 상태인지 boolean으로 반환한다. Datagram의 경우는 Datagram의 active 상태 여부를 반환한다.
localAddress() 바인딩된 Local의 SocketAddress를 반환한다.
remoteAddress() 원격의 SocketAddress를 반환한다.
write() 원격에 데이터를 write한다. 데이터는 ChannelPipeline을 통과하여 새로 고쳐질 때까지 대기열에 보관된다. (Schedules a write operation.)
flush() write을 통해 예약된 모든 write 작업을 flush한다. 즉, 이전에 write된 데이터를 모두 전송 계층 소켓에 flush한다.
writeAndFlush() write을 먼저 호출하고, flush를 호출한다.

아래 예시는 wrietAndFlush를 통해 데이터를 원격지에 전송하는 코드 예시다.

Channel channel = ...; // 연결된 Channel
ByteBuf buf = Unpooled.copiedBuffer("Hello World", CharsetUtil.UTF_8); // Channel을 통해 원격지에 데이터를 보낼 때 사용될 ByteBuf
ChannelFuture ch = channel.writeAndFlush(buf); // channel에 ByteBuf를 넘기면서 write과 flush한다.
cf.addListener(new ChannelFutureListener() { // Channel에 I/O 명령인 writeAndFlush 명령에 대한 결과 콜백 추가.
    @Override
    public void operationComplete(ChannelFurue future) {
        if (future.isSuccess()) {
            log.info("Write successful");
            return;
        }
        log.error("Write Error");
        future.cause().printStacktrace();
    } 
})

예시에서 볼 수 있듯이, 실질적인 Network Socket (TCP 연결)에 대한 I/O 요청과 처리는 모두 Channel을 통해서 수행한다.

Channel은 이러한 I/O 요청을 받으면 미리 설정된 Pipeline내 Inbound/Outbound Handler를 수행하고 I/O 작업을 수행한다.


3-2 Unsafe interface

위 컴포넌트간의 협력 다이어그램엔 나오지 않지만 Channel과 더불어 꼭 알아야하는 인터페이스가 있어 따로 소개하고자한다.


💁‍♂️ Unsafe 인터페이스의 의미

Channel 인터페이스를보면 내부 클래스로 Unsafe 인터페이스가 정의되어있다.

이 Unsafe란 용어는 Netty 뿐만 아니라 자바내에서도 많이 사용되는 용어이며, 보통 C나 커널 명령에 부합하는 low-level 명령을 호출할 때의 인터페이스 역할을한다.

Netty에서의 Unsafe도 커널내 Socket I/O 작업을 모두 수행하며, Channel내 I/O 작업이 필요하면 모두 이 Unsafe의 구현체에게 위임한다.

즉, Netty의 Channel Pipeline내 모든 I/O 작업은 결국 Unsafe 인터페이스 구현체에의해 실행된다. 이와 관련해서는 아래 Deep Dive에서 코드를 통해 살펴본다.

보통 유저 레벨의 코드에선 unsafe한 명령에 속하는 read, write의 I/O 명령을 직접 호출할 일이 거의 없다. 공식 문서에도 가능한 이 Unsafe 인터페이스의 구현체를 사용하길 추천하고있다.


💁‍♂️ Unsafe 구현체 - Byte, Message

실제 Unsafe 인터페이스의 구현체는 Channel의 구현체별로 다 있다. 즉, Nio, epoll, kqueue등의 Channel 구현체마다 모두 Unsafe 구현체를 구현 및 가지고있다.

Nio 기준으로 살펴보면 다음과 같은 계층구조를 가진다.

출처: https://medium.com/geekculture/a-tour-of-netty-5020ecee5494
출처: https://medium.com/geekculture/a-tour-of-netty-5020ecee5494

Unsafe의 Nio 버전 하위 인터페이스로 NioUnsafe가 정의되어있다. 그리고 NioUnsafe 인터페이스는 크게 Byte와 Message 이름이 붙은 구현체로 나뉜다.

  • NioByteUnsafe와 NioSocketChannelUnsafe (클라이언트 NioSocketChannel의 구현체)는 데이터 write와 read등의 I/O 작업을 수행한다.
  • NioMessageUnsafe의 구현체들은 새로운 커넥션(connect)을 수립하는 작업을 수행한다.

정리하면, Unsafe 인터페이스의 구현체는 NioByteUnsafe와 NioMessageUnsafe라는 두 가지 유형으로 나뉘며, Byte는 읽기-쓰기 작업과 관련이 있고, Message는 새 연결 설정 작업과 관련이 있다.

아래 Deep Dive하는 과정중 코드에서 unsafe가 생각보다 많이 보일 것이다. 이때 Socket I/O 작업을 수행하는것이라고 생각하고보면 더욱 이해하기 쉽다.


3-3 ChannelFuture과 ChannelPromise

비동기 결과 알림


💁‍♂️ Channel I/O에 대한 비동기 처리 pending completion 결과 - ChannelFuture

Netty에서의 모든 I/O 작업은 모두 비동기 (asynchronous)로 동작한다. 즉, I/O 작업 호출시 호출되자마자 바로 return된다.

이때 모든 I/O 작업 요청의 결과로 void를 return하는 것이 아닌, ChannelFuture를 반환하게된다.

ChannelFuture은 이름에서 알 수 있듯이, Java의 비동기 API인 Future과 유사한 역할을 수행한다. 물론 Future보단 Callback이 추가된 CompletableFuture과 더 유사하다.

실제 Netty 공식 문서에도 ChannelFuture를 The result of an asynchronous Channel I/O operation.라고 정의하고있다.

쉽게 말해 CompletableFuture처럼 비동기 작업을 요청하고 콜백을 넣어 비동기 + Non-Blocking하게 동작할 수 있게하는 것이며, ChannelFuture은 Channel I/O 작업에 더 특화되어 비동기 + Non-blocking으로 동작할 수 있게 도와준다.

ChannelFuture를 통하면 비동기 요청한 처리가 완료되었는지 I/O 상태를 확인할 수 있다.

비동기 처리기때문에 요청후 바로 결과가 반환되지만, 실제 I/O 작업은 위와 같이 uncompleted, completed(success, fail, cancel)로 나뉘어 상태가 관리된다.

그리고 CompletableFuture과 동일하게 작업이 성공했던 실패했던 완료된다면 Callback Listener를 붙여 I/O 완료에 대한 처리를 할 수 있다.

즉, Non-blocking하게 I/O 작업을 할 수 있다는 의미이다.

ChannelFuture 공식 문서에서도 Blocking 되는 명령어인 await() 대신 Non-Blocking 형식인 addListener()를 추천한다.

await()이 아닌 addListener를 통해 Non-blocking을 추천하는 문서 내용
await()이 아닌 addListener를 통해 Non-blocking을 추천하는 문서 내용


💁‍♂️ write 작업에 더 특화된 ChannelFuture - ChannelPromise

Channel write에 대한 처리를 수행하는 Handler인 ChannelOutboundHandler의 대부분의 메서드는 작업 완료시 알림을 받기위해 ChannelPromise를 인자로 받는다.

ChannelPromise는 ChannelFuture의 하위 인터페이스로써 setSuccess() 또는 setFailure() 와 같이 write 가능한 메서드를 정의하여 ChannelFuture을 불변으로 만들어준다.


3-4 ChannelHandler와 ChannelPipeline

Netty 컴포넌트중 가장 핵심이 되는 것은 개발자의 비즈니스가 담기는 ChannelHandler다. 아래 Deep Dive에서 이와 관련해서 더 자세히 다루겠지만, 여기선 우선 어떤 역할을 수행하는지만 간단히 정리해본다.


💁‍♂️ ChannelPipeline = ChannelHandler의 조합

Netty의 가장 핵심이자 기본이 되는 개념인 Channel이 TCP 연결후 생성되고나면 ChannelPipeline을 Channel에 구성하게된다. Netty는 기본적으로 데이터를 받기 위한 Input Stream을 Inbound, 데이터를 내보내기위한 Output Stream을 Outbound라고 표현한다.

그리고 ChannelPipeline은 여러 ChannelHandler의 조합을 의미하며, ChannelHandler는 Channel에 대한 실질적인 read/write을 호출하고, 비즈니스를 수행하는 컴포넌트이다.

EventLoop (정확히는 EventLoop내 Selector)에 등록된 Channel에서 I/O가 이벤트 발생시 ChannelHandler가 이에대한 처리를 수행한다.


💁‍♂️ Netty내 Channel은 각각 별도로 고유한 ChannelPipeline이 존재한다.

ChannelHandler은 Core J2EE에 소개된 Interceptng Filter 패턴을 구현하고 있어서, 개발자가 Handler를 자유롭게 추가 및 삭제할 수 있다.

이는 여러 Handler를 서로 상호작용할 수 있도록 제어할 수 있다는 의미이며, 각 Channel엔 여러 Handler의 조합을 나타내는 고유한 ChannelPipeline을 가지고있다.

중요한 점은 새로운 Channel이 TCP 연결후 생성되고나면 Netty는 자동으로 ChannelPipeline도 생성하여 Channel에 매핑한다는 점이다.

출처: Netty In Action.
출처: Netty In Action.

ChannelPipeline은 위 그림과 같이 Inbound 이벤트는 Linked-List Head에서 Tail까지의 InboundHandler로 전달되며, 반대로 Outbound 이벤트는 Linked-List의 Tail에서 Head까지 OutboundHandler로 전달된다.

위 내용은 아래에서 더 자세히 알아본다. 여기선 우선은 Channel에서 발생한 이벤트에 대한 처리를 수행하는 리스너 역할로 이해하면 될 듯 하다.


3-4-1 ChannelHandler엔 Blocking 코드 사용을 금지시키는 것이 좋다


💁‍♂️ EventLoop내 스레드를 Blocking 하게되면 성능에 부정적인 영향을 끼친다.

ChannelPipeline낸 각 ChannelHandler는 EventLoop에서 발생한 I/O 이벤트를 전달받아 처리하게된다.

이때 EventLoop는 Single Thread 기반으로 동작하기때문에, 해당 스레드를 Blocking하지 않는게 굉장히 중요하다.

만약 Blocking하게 된다면 해당 EventLoop에 등록된 Channel 이벤트 처리에 아주 큰 부정적인 영향을 끼치게된다.

이로인해 ChannelHandler내 비즈니스 로직은 무조건 Non-blocking하게 처리하는게 좋다.

더 정확히 말하면 Selector Thread를 Blocking하면 안된다. DB 접근이나 비즈니스 처리는 다른 스레드를 활용하고 처리하고 Selector Thread는 정말 비즈니스가 완료된 내용에 대한 I/O 작업만 처리하도록 하는 것이 좋다.

실제로 이미 I/O에 해당하는 read나 write은 모두 ChannelFuture를 반환하는등 Non-blocking하게 동작할 수 있는 기반이 마련되어있다.


💁‍♂️ 그렇다면 Non-blocking하게 처리하기위해선 어떻게 해야할까?

바로 비동기와 Callback을 활용하는 것이다.

즉, 비즈니스 처리를 다른 Thread로 비동기 요청하고, ChannelFuture, CompletableFuture등을 반환받아 처리가 완료되면 callback으로 Channel에 처리 완료된 내용을 I/O 부분만 Netty EventLoop에 위임하는 것이다.


3-5 ChannelHandler


💁‍♂️ ChannelHandler

앞서 다뤘듯이 ChannelHandler는 ChannelPipeline 안에서 조합을 통해 실행된다. ChannelHandler는 매핑된 Channel이 등록된 EventLoop에서 발생한 I/O Event와 I/O Operation을 처리하거나 파이프라인 내에서 다음 핸들러를 실행하는 역할을 한다.

그리고 Inbound와 Outbound로 구분되어 사용된다.

출처: Netty In Action.
출처: Netty In Action.

  • ChannelInboundHandler: Inbound I/O Event를 handle하는 Handler
  • ChannelOutboundHandler: Outbound I/O Operation을 handle하는 Handler

💁‍♂️ ChannelInboundHandler

ChannelInboundHandler는 Inbound I/O Event를 handle하는 Handler이며, 쉽게 생각하면 원격 장치에서 요청한 CONNECT와 READ에 대한 이벤트에 대한 콜백을 수행하는 ChannelHandler다.

ChannelInboundHandler에서 정의한 Channel I/O 생명주기에 따른 콜백 메서드는 아래와 같다.

메서드 설명
channelRegistered() Channel이 EventLoop에 등록되어 I/O 작업을 수행할 준비가 되었을 때 호출된다.
channelUnregistered() Channel이 EventLoop로부터 해제되어 어떠한 I/O 작업도 수행할 수 없을 때 호출된다.
channelActive() Channel이 EventLoop에 등록되고나서 active 상태일 때 호출된다. active는 Connected되거나 소켓에 Bind이 완료되었을 때를 말한다.
channelInactive() Channel이 deactive된 상태를 의미하며, 원격 장치과 connection이 끊겼을 때 호출된다.
channelRead() Channel로 부터 데이터를 read할 때 호출된다.
channelReadComplete() Channel로 부터 데이터를 read operation이 완료되었을 때 호출된다.
channelWritabilityChanged() Channel의 writablity state (쓰기 가능 여부 상태값)이 변경되면 호출된다. write이 너무 빨리 수행되어 OOM이 발생하는 것을 방지하기위해 채널이 만약 다시 쓰기 가능해질 때 이 콜백을 활용할 수 있다. 채널 메서드 isWritable()을 호출하여 채널의 쓰기 가능 여부를 확인 가능하며, 쓰기 가능성에 대한 임계치는 Channel.config().setWriteHighWaterMark()를 통해 설정할 수 있다.
userEventTriggered() POJO가 ChannelPipeline을 통해 전달되었을 때 호출된다.
exceptionCaught() 처리중 Throwable가 던져지면 호출된다.

생명 주기에 따라 다양한 콜백을 지원하는 것을 볼 수 있으며, 실제 Netty 작업시 흔히 접하게 될 메서드들이다.

이중에서 channelRead()에 보통 개발자의 코드가 많이 들어가게되는데, Pooling된 BytyeBuf 인스턴스의 경우 명시적으로 메모리를 해제해줘야한다.

이때 Netty에서 제공하는 ReferenceCountUtil.releas()를 사용하면 쉽게 해제할 수 있다.

위 생명 주기 관련된 더 자세한 내용은 아래 Deep Dive에서 코드레벨로 확인해 볼 예정이다.


💁‍♂️ ChannelOutboundHandler

ChannelOutboundHandler는 Outbound I/O Event를 handle하는 Handler이며, 쉽게 생각하면 원격 장치에 보낼 연결 요청 (connect), write 이벤트에 대한 콜백을 수행하는 ChannelHandler다.

ChannelOutboundHandler에서 정의한 Channel I/O 생명주기에 따른 콜백 메서드는 아래와 같다.

메서드 설명
bind() Channel을 Local에 bind 요청할 때 호출된다.
connect() 원격 장치에 connect 요청시 호출된다.
disconnect() 원격 장치에 disconnect 요청시 호출된다.
close() Channel에 close 요청시 호출된다.
deregister() EventLoop로부터 Channel을 해제할 때 호출된다.
read() Channel로 부터 더 많은 데이터를 read 요청할 때 호출된다.
flush() Channel을 통해 원격 장치에 보낼 데이터를 쌓아둔 Queue를 flush할 때 호출된다.
write() Channel을 통해 원격 장치에 보내기위해 데이터를 write할 때 호출된다.

Netty는 기본적으로 Non-blocking으로 동작하기 때문에, ChannelOutboundHandler의 대부분 메서드는 호출시 작업 완료 알림을 받기위한 ChannelPromise를 인수로 사용한다.

위 생명주기 관련해서도 아래 코드레벨로 더 자세히 살펴 볼 예정이다.


💁‍♂️ ChannelHandlerAdaptor

ChannelInboundHandler와 ChannelOutboundHandler 모두 순수 인터페이스라 모든 메서드를 구현해줘야하는 불편함이 존재하는데, Netty는 편의를 위해 ChannelHandlerAdaptor 추상 클래스를 구현한 어탭터 클래스를 제공한다.

  • ChannelInboundHandlerAdaptor: Inbound I/O Event 어댑터 구현체.
  • ChannelOutboundHandlerAdaptor: Outbount I/O Operation 어댑터 구현체.
  • ChannelDuplexHandler: Inbound, Outbound Event 처리용 어댑터 구현체.

3-6 ChannelHandlerContext


💁‍♂️ ChannelHandlerContext - Handler간의 상호작용을 도와주는 객체

ChannelPipeline내 ChannelHandler는 서로 상호작용할 때 ChannelHandlerContext(이하 Context)객체를 활용한다.

ChannelHandlerContext는 보통 ChannelPipeline이 생성되면서 같이 생성된다.

Context 객체를 통해 Handler들은 upstream과 downstream으로 이벤트를 전달할 수도, Pipeline을 동적으로 변경시킬 수도 있으며, key:value형태의 정보를 저장할 수도 있다.


💁‍♂️ ChannelHandlerContext - 이벤트 전파

ChannelHandlerContext의 기능중 하나가 바로 Handler간의 이벤트 전파 (event propagation)이다.

아래 그림은 실제 ChannelPipeline, ChannelHandler, ChannelHandlerContext가 어떤 관계를 가지는지 보여준다.

그리고 실제 이벤트 전파를 위한 실질적인 실행 (invoke)는 ChannelHandlerContext가 수행한다. 아래는 세 Component가 어떻게 상호작용하는지 보여준다.

ChannelHandlerContext는 네이밍에서 알 수 있듯이, 파이프라인내 현재 Handler 실행 컨텍스트를 저장하고 처리하는 역할을 수행한다. 쉽게 말해, 다음 Handler를 찾아 Handler에 실행을 위임하고, 현재 스레드가 어느것이냐에따라 바로 실행하거나 TaskQueue에 넣는등등의 처리를 수행한다.

이벤트 전파를 위해 ChannelHandlerContext는 아래와 같이 InboundHandler와 OutboundHandler에 대한 전파 메서드를 제공한다.

// inbound event propagation
ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()

// outbound event propagation
ChannelOutboundInvoker.bind(SocketAddress, ChannelPromise)
ChannelOutboundInvoker.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelOutboundInvoker.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelOutboundInvoker.disconnect(ChannelPromise)
ChannelOutboundInvoker.close(ChannelPromise)
ChannelOutboundInvoker.deregister(ChannelPromise)

즉, 위 메서드들을 실행하면 해당 Event를 다음 ChannelHandler로 메시지를 전파한다.


💁‍♂️ ChannelHandlerContext - 상태 관리

보통 Channel당 하나의 ChannelPipeline을 생성하여 매핑후 이벤트를 처리한다. 다만, 로그인과 같이 특정 ChannelHandler가 여러 Channel에서 공유되어야하며, ChannelHandler내 간단한게 멤버 변수로 로그인 여부등의 상태를 관리해야하는 경우 ChannelHandlerContext를 이용할 수 있다.

관련된 코드는 AttributeKey를 살펴보면 단번에 이해된다.

자칫 로컬 캐시처럼 메모리를 많이 잡아먹는 데이터를 관리해야하는경우, 사용시 주의해야한다.


3-7 ChannelInitializer

앞서 매 새로운 연결이 수립될 때마다 SocketChannel이 생성되며, Channel마다 ChannelPipeline이 생성 및 매핑되어 이벤트를 처리한다고했다.

이렇게 Channel이 생성되었을 때 ChannelPipeline을 생성, 초기화 및 매핑해주는 역할을 수행하는 컴포넌트가 바로 ChannelInitializer다.

public class ChannelInitializerExample extends ChannelInitializer<SocketChannel> {

    // 1
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        // 2
        ChannelPipeline pipeline = ch.pipeline();

        // 3
        pipeline.addLast(new EchoServerFirstChildHandler());
    }
}

주석을 해석하면 아래와 같다.

  1. 새로운 연결이 accept되어 채널이 생성될 때마다 호출된다.
  2. 채널 파이프라인을 가져온다.
  3. 채널에서 read/write 이벤트 발생시마다 처리할 Handler들을 ChannelPipeline에 초기화해준다.
    • 비즈니스 로직은 따로 구현후 pipeline에 추가해주면 된다.

3-8 Bootstrap

앞에서 설명한 것처럼 Selector를 사용하면 여러 리소스를 모니터링 할 수 있으며, 이벤트가 준비되면 적절한 Handler에 위임하고 다시 모니터링을 계속한다. 이를 EventLoop라고한다.

그리고 이러한 EventLoop는 Group으로 묶여, boss와 child로 구분된 후 각자의 역할을 수행한다.

Bootstrap은 Channel, EventLoopGroup등 Netty로 작성한 네트워크 애플리케이션의 동작 방식과 환경을 설정하는 도우미 클래스다. 이를 통해 Netty 애플리케이션을 시동할 수 있으며, 각종 설정도 할 수 있다.

아래는 bootstrap으로 설정할 수 있는 요소이다.

  • 전송 계층 (소켓 모드와 I/O 종류)
  • 이벤트 루프
  • 채널 파이프라인 설정
  • 소켓 주소와 포트
  • 소켓 옵션

설명보다 아래 코드를보면 이해가 쉽다.

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup) // 1
            .channel(NioServerSocketChannel.class) // 2
            .handler(new ChannelInitializer<NioServerSocketChannel>() { // 3
                @Override
                protected void initChannel(NioServerSocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new EchoServerInboundHandler());
                }
            })
            .childHandler(new ChannelInitializer<SocketChannel>() { // 4
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new EchoServerChildInboundHandler());
                }
            });

    // 서버 시작
    log.info("start server...");
    ChannelFuture f = b.bind(8080).sync(); // 5
    f.channel().closeFuture().sync();
} finally {
    log.info("close server...");
    workerGroup.shutdownGracefully();
    bossGroup.shutdownGracefully();
}

주석을 해석하면 아래와 같다.

  1. 이벤트 루프 그룹 설정.
  2. 비동기 네트워킹에 사용될 서버 채널 설정. (NioServerSocketChannel, EpollServerSocketChannel)
  3. boss 이벤트 루프 채널 초기화 클래스 설정. (ServerSocketChannel에 대한 핸들러 설정)
  4. worker 이벤트 루프 채널 초기화 클래스 설정. (SocketChannel에 대한 핸들러 설정.)
  5. Netty 서버를 특정 port에 bind한다.
    • 이때 bind 메서드는 비동기이나, 바로 뒤에 sync를 호출함으로써 bind될 때까지 blocking된다.

4 Deep Dive - EventLoop 생성과 EventLoopGroup 구성

지금까지 Netty에서 꼭 알아야하는 Component를 알아보았는데, 사실 각 Component의 역할만 들어선 어떤 역할을 수행하는지 크게 와닿지않는다. 적어도 나는 그랬다..

그래서 지금부터는 직접 코드를 통해 Deep Dive하면서 각 Componenet가 어떤 역할을 수행하고 어떻게 다른 Component와 협력하는지 살펴본다.

가장 먼저 Server Netty 애플리케이션을 설정하고 실행하는데 필요한 아래 Bootstrap 설정과 바인딩 관련하여 Deep Dive한다.

public static void main(String[] args) throws InterruptedException {
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    try {
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new NettyServerInitializer());

        // 서버 시작
        log.info("start server...");
        ChannelFuture f = b.bind(8080).sync();
        f.addListener((ChannelFuture future) -> {
            if (future.isSuccess()) {
                log.info("Server bound");
            } else {
                log.error("Bind attempt failed", future.cause());
            }
        });

        f.channel().closeFuture().sync();
    } finally {
        log.info("close server...");
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
    }
}

Bootstrap 설정을 보면 가장 먼저하는 것이 아래와 같이 EventLoop를 생성하는 것이다.

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

이번 글에서 Nio 기준으로 살펴보기때문에 위 코드는 NioEventLoopGroup를 인스턴스화한 코드이며, EventLoopGroup은 여러 개의 EventLoop를 조합한다.

bossGroup은 새로운 연결을 수립하는 역할의 EventLoop를 모아둔 그룹이고, workerGroup은 연결된 커넥션내 데이터를 read/write하는 역할의 EventLoop를 모아둔 그룹이다.


4-1 NioEventLoopGroup

우선 NioEventLoopGroup 생성부터 살펴보면 아래와 같다.

NioEventLoopGroup 생성자
NioEventLoopGroup 생성자

딱봐도 다양한 생성자를 가지고있다. 그리고 많은 생성자를 통해 초기화 및 할당하는 속성은 아래와 같다.

  • nThreads: 사용할 Thread의 개수. 더 자세히 말하자면 생성할 EventLoop의 개수이다.
    • Thread 1 : 1 EventLoop
    • 기본값은 사용 가능한 커널 수의 두 배이다.
  • executor: EventLoopGroup내 EventLoop들이 사용할 Thread를 할당해줄 Executor 설정.
    • EventLoop가 사용할 스레드를 할당해주는 스레드 풀 역할. (Thread Provider)
  • selectorProvider: Selector Provider (Nio, epoll 등등)
  • selectorStrategyFactory: selector 전략을 생성하는 Factory.
    • SelectStrategy: select하기 위해 동작하는 loop를 제어하는 정책.
    • 예를 들어, 즉시 처리할 이벤트가 있는 경우 select를 지연시키거나 skip 하는 등의 정책을 정할 수 있다.
  • rejectedExecutionHandler: Executor에 의해 실행할 수 없는 작업에 대한 Handler.

NioEventLoopGroup는 위 속성들을 생성할 때, 인자로 받아서 EventLoop를 생성할 때 사용한다.

그리고 NioEventLoop의 많은 생성자를 결국 super()를 호출함으로써 상위 클래스인 MultithreadEventLoopGroup내 생성자를 호출한다.


4-2 MultithreadEventLoopGroup

NioEventLoopGroup를 인스턴스화하면 상위 클래스인 MultithreadEventLoopGroup의 생성자를 호출하게되는데, 아래 코드를 보면 위에서 언급한 디폴트 Thread 개수 설정을 볼 수 있다.

MultithreadEventLoopGroup 생성자
MultithreadEventLoopGroup 생성자

2 * 사용가능한 프로세스 수로 되어있는 것을 코드를 통해 알 수 있으며, 그대로 다시 상위 클래스인 MultithreadEventExecutorGroup 생성자를 호출한다.

MultithreadEventExecutorGroup
MultithreadEventExecutorGroup

MultithreadEventExecutorGroup 생성자를 보면 우선 executor 설정을 따로 하지 않으면 DefaultThreadFactory를 사용하는데 내부 로직을 보니 매번 새로운 Thread를 만들어 할당해준다.

그리곤 newChild() 메서드를 통해 제공된 thread 수에 따라 EventLoop 인스턴스를 생성하고 배열에 저장한다.

newChild() 는 추상 메서드로 아래와 같이 Selector 형식에 따라 구현체를 제공한다.

이 글에선 Nio 기준으로 작성하므로, NioEventLoopGroup 구현체의 newChild()를 보면 아래와 같다.

NioEventLoopGroup.newChild()
NioEventLoopGroup.newChild()

앞서 EventLoopGroup내 생성자 인수로 넘어온 executor, selectorProvider, selectorStrategyFactory등을 활용하여 EventLoop 인스턴스를 생성한다.


4-3 NioEventLoop 생성 및 초기화

NioEventLoopGroup의 생성자로 받은 여러 속성들을 결국 EventLoop를 생성하기 위함이다. 그리고 NioEventLoop의 생성자를 보면 아래와 같다.

NioEventLoop 생성자
NioEventLoop 생성자

생성자를 보면 NioEventLoopGroup에서 인스턴스화할 때 넘어오는 여러 속성들을통해 EventLoop를 초기화하고 selector는 openSelector()를 호출함으로써 할당한다.

NioEventLoop.openSelector()
NioEventLoop.openSelector()

openSelector()는 호출하면 위와 같이 provider 구현체에 따라 새로운 selector를 반환한다. (Nio, Epoll 등등..)

기존 Nio에선 I/O 처리가 필요한 Channel을 직접 Selector에 등록했는데, Netty에선 EventLoop가 Selector를 가지고있음으로써 Channel을 EventLoop에 등록해주면된다.


4-4 SingleThreadEventLoop

위에서 NioEventLoop의 생성자를 보면 역시나 상위 클래스인 SingleThreadEventLoop의 생성자를 호출한다.

SingleThreadEventLoop 생성자
SingleThreadEventLoop 생성자

코드에서 볼 수 있듯이, 생성을 상위 클래스인 SingleThreadEventExecutor에 위임한다.

SingleThreadEventExecutor내 생성자에선 아래와 같이 Executor로부터 Thread를 할당하고, TaskQueue를 인스턴스화한다.

SingleThreadEventExecutor 생성자
SingleThreadEventExecutor 생성자

여기서도 알 수 있듯이 EventLoop은 1 Thread + 1 Selector + 1 TaskQueue로 구성되어있다.

여기까지의 아래 코드에서의 NioEventLoopGroup를 생성하고 여러 속성을 할당하는 과정을 살펴보았다.

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

중요한 점은 각 NioEventLoop는 자체적인 Selector와 thread 그리고 TaskQueue를 가지고있으며, 동일한 EventLoop을 EventLoopGroup에 포함함으로써 리소스를 분산한다는 것이다.

이제 위 EventLoopGroup을 Bootstrap을 사용해서 Boss와 Child로 역할을 분담해서 설정한다.

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .childHandler(...);

그리고 이때 사용될 Channel과 여러 EventLoop의 Handler를 각각 handler와 childHandler로 설정한다.


5 Deep Dive - ServerSocketChannel 생성 및 초기화 그리고 Socket 바인딩

서버 네티 애플리케이션 기준으로 Deep Dive하기 때문에 ServerSocketChannel 기준으로 정리한다. 클라이언트의 경우 SocketChannel을 생성하고 초기화한다.

앞서 EventLoopGroup을 생성하고 Bootstrap을 통해 Boss와 Child Group을 설정하였다. (handler 포함)

이제 Bootstrap내 EventLoop를 Socket에 바인딩하는 과정을 Deep Dive 해본다.

ChannelFuture f = b.bind(8080).sync();

bind() 코드를 보면 새로운 Channel을 생성하고 바인딩 (doBind())하는 과정을 보여준다.

AbstraactBootstrap.bind()
AbstraactBootstrap.bind()

코드에서 알 수 있듯이, 우선 validate()를 통해 기본적인 유효성 검사를 진행하고 실제 바인딩 작업은 doBind()에 위임한다. 그리고 그 결과값으로 ChannelFuture를 돌려주는데 이는 Netty가 기본적으로 Non-blocking하게 동작하기때문이다. 만약 바인딩 성공후 어떤 작업을 수행하고싶다면 Callback 함수를 작성해주면된다.

실제 바인딩 작업을 수행하는 doBind()는 아래와 같이 새로운 Channel을 생성 (initAndRegister())하고 이를 EventLoop에 등록한 후 Socket에 바인딩(doBind0)을 수행한다.

AbstractBottstrap.doBind()
AbstractBottstrap.doBind()

새로운 Channel 생성과 등록 (regFuture)이 이미 완료되었다면 isDone()으로 확인하여 바로 소켓 바인딩을 위임하고, 아니라면 Channel 생성과 등록 요청에 Callback으로 바인딩 위임을 하게되어있다.

정리하면 ServerSocketChannel 생성부터 초기화 그리고 바인딩은 아래와 같은 과정을 거치게된다.

  1. initAndRegister() - Channel 및 Pipeline 생성 및 설정.
    • ServerSocketChannel 생성과 Pipeline 생성
      • NioServerSocketChannel 인스턴스화 -> 생성자내 Pipeline 생성.
      • Pipeline 생성.
    • init() - 설정 및 초기화
      • 생성된 ServerSocketChannel config 설정 적용
      • Boss Group내 EventLoop Channel에 Pipeline 초기화 및 설정
        • 기본적으로 Bootstrap.handler() 에 설정한 handler를 Pipeline에 추가한다.
        • 그외, ServerBootstrap의 경우 ServerBootstrapAcceptor를 추가로 매핑한다.
    • register() - 등록
      • Channel을 EventLoop내 selector에 등록
  2. doBind0(regFuture, channel, localAddress, promise) - Socket에 Channel 바인딩.

더 간단하게 순서로 보면 다음과 같다.

Channel 생성 -> Pipeline 생성 -> Bootstrap에 설정한 내용 기반 Channel 설정 및 Pipeline 설정 -> Pipeline과 설정이 완료된 Channel을 EventLoop내 Selector에 등록 -> 특정 Socket Port에 Channel 바인딩.


5-1 ServerSocketChannel 및 Pipeline 생성

새로운 Channel은 channelFactory (정확히는 ReflectiveChannelFactory)의 reflection로직을 통해 인스턴스화된다.

AbstractBootstrap.initAndRegister()
AbstractBootstrap.initAndRegister()

앞서 bootstrap을 설정하고 인스턴스화할 때 bootstrap.channel(NioServerSocketChannel.class)를 설정했었는데, 그 클래스 파일을 reflection으로 가져와서 인스턴스화한다고 보면 된다.

ReflectionChannelFactory.newChannel()
ReflectionChannelFactory.newChannel()

즉, bootstrap.channel()로 설정한 Channel 클래스파일이 인스턴스화되는 것인데, 이번 글은 Nio 기준이므로 NioServerSocketChannel를 인스턴스화한다. 그럼 아래 생성자가 실행되면서 인스턴스화된다.

NioServerSocketChannel 생성자
NioServerSocketChannel 생성자

상위 클래스인 AbstractNioMessageChannel 생성자를 호출하면서 OP_ACCEPT 이벤트에 관심있다는 내용을 전달한다. (새로운 커넥션에 대한 이벤트에 관심있다는 의사를 표현한 것)

새로운 커넥션 요청을 받아 처리하는 AbstractNioMessageChannel의 생성자를 거쳐 AbstractChannel 생성자까지 호출되면서 NioServerSocketChannel은 인스턴스화된다.

AbstractChannel 생성자
AbstractChannel 생성자

ServerSocketChannel을 Non-blocking으로 설정하고 os (정확히는 socket)와 read/write를 해줄 unsafe 구현체를 설정한다. 그 다음 Channel에 대한 Pipeline을 설정하고 Channel을 식별할 ID를 할당한다.


5-1-1 ServerSocketChannel에서 사용할 Pipeline 생성

Netty는 앞서 정리했듯이 이벤트 루프 기반으로 동작하며, 각 Channel에서 발생한 변경 이벤트를 Selector가 감지하여 Channel에 매핑된 Pipeline (Handler 모음)에 넘겨 처리한다.

바로 위에서도 ServerSocketChannel을 생성하였으니 이 Channel에서 발생하는 이벤트를 처리할 Pipeline을 생성하여 Channel에 매핑해줘야한다.

그리고 실제로 모든 Channel의 상위 클래스인 AbstractChannel을보면 생성자에서 Pipeline을 생성 및 매핑해준다.

AbstractChannel 생성자
AbstractChannel 생성자

Pipeline을 생성하는 newChannelPipeline() 메서드를 따라가보면 DefaultChannelPipeline을 인스턴스화하여 반환한다.

DefaultChannelPipeline 생성자
DefaultChannelPipeline 생성자

생성자만 봐도 알 수 있듯이, ChannelPipeline은 Linked-List로 구성되어있으며, 아무런 handler를 추가하지 않아도 ChannelHandlerContext를 구현한 headContext와 tailContext를 기본적으로 구성하게된다. 그림으로보면 아래와 같다.


5-1-2 ServerSocketChannel에 매핑된 Pipeline내 Handler 설정

ServerSocketChannel을 생성하고 I/O 작업을 대신 해줄 Unsafe 인터페이스와 이벤트에 따른 처리를 수행하는 Pipeline을 생성하였다면, 이제 Pipeline내 실제 개발자가 작성한 Handler를 추가해준다.

다시 AbstractBootstrap으로 돌아와 코드를 살펴보면..

AbstractBootstrap.initAndRegister()
AbstractBootstrap.initAndRegister()

실제 초기화를 진행하는 init(channel) 메서드를 따라가면 아래와 같이 Bootstrap 구현체인 클라이언트와 서버에 따라 다르게 설정되는 것을 볼 수 있다.

AbstractBootstrap.init()
AbstractBootstrap.init()

  • Bootstrap (Netty Client Application)
  • ServerBootstrap (Netty Server Application)

두 구현체의 구현 부분을 보면 동일하게 개발자가 설정한 Handler를 Channel의 Pipeline내 추가한다.

다만, Client Application의 경우는 Bootstrap내 설정된 Handler만을 추가하고, Server Application의 경우 앞서 말했듯이 새로운 연결을 수립하는 역할을 수행하는 ServerBootstrapAcceptor를 추가한다.

ServerBootstrap.init()
ServerBootstrap.init()

위 코드에서 알 수 있듯이, 개발자가 기존 Bootstrap에 설정한 다양한 config를 가져와서 Channel과 Pipeline에 설정한다.

예를 들어..

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .handler(new ChannelInitializer<>() {
            @Override
            protected void initChannel(Channel ch) throws Exception {
                // BossGroup에 속한 채널에 대한 설정
            }
        })
        .childHandler(new NettyChatServerInitializer());

위 그림에서 주황색 부분이 위 Bootstrap 코드중 handler()에서 설정한 Handler를 가져와서 Pipeline에 매핑하게된다. 즉, handler()를 통해 설정한 Handler는 Boss Group내 EventLoop Channel Pipeline에 Handler로써 매핑된다.

뒤에서 나오겠지만, childHandler는 Child Group EventLoop에 매핑된다.


그리고 Netty는 여러 Handler를 한번에 Bootstrap에 설정할 수 있게해주는데, 그 역할을 ChannelInitializer가 수행한다.

ChannelInitializer.initChannel()
ChannelInitializer.initChannel()

ChannelInitializer 구현체 예시
ChannelInitializer 구현체 예시

ChannelInitializer 구현체를 보면 기존의 ChannelInboundHandlerAdapter를 상속받는다. 그리고 handlerAdded 콜백이 실행될 때 initChannel 메서드를 수행하면서 사용자가 구현체내 구현한 initChannel을 실행하고 마지막으로 (finally) 자신을 제거(remove)하는 것을 볼 수 있다.

다시 말해, ChannelInitializer는 아래와 같이 initChannel 메서드내 구현한 작업을 수행하고 (보통 Handler 추가) 자기 자신을 Pipeline내에서 제거한다.


다시.. ServerBootstrap.init()로 돌아가면..

ServerBootstrap.init()
ServerBootstrap.init()

앞서 말했듯 주황색 부분에서 개발자가 설정한 Handler를 ChannelInitializer로 설정하고, 마지막으로 ServerBootstrapAcceptor를 Pipeline내 무조건 추가하게된다.

ServerBootstrapAcceptorChannelInboundHandlerAdaptor를 상속한 일반적인 Handler이며, 실질적으로 Boss Group의 역할인 새로운 연결을 수립하고, 연결된 이후부터는 Child Group에 해당 Channel을 등록(register)하여, read/write은 Child Group내 EventLoop가 수행하도록한다.

위 내용은 아래 ServerBootstrapAcceptor의 코드만봐도 쉽게 이해할 수 있다.

ServerBootstrapAcceptor.channelRead()
ServerBootstrapAcceptor.channelRead()


5-1-3 EventLoop Selector에 ServerSocketChannel 등록

Channel 및 Pipeline 생성과 초기화 그리고 설정까지 모두 완료되었다면, 이제 해당 Channel을 EventLoop내 Selector에 등록(register)해야한다. 그래야 해당 Channel에서 변경된 이벤트가 Selector에 의해 감지되어 Pipeline에 처리를 위임한다.

다시 AbstractBootstrap.initAndRegister()로 돌아오면 아래와 같이 register()하는 부분을 볼 수 있다.

AbstractBootstrap.initAndRegister()
AbstractBootstrap.initAndRegister()

앞선 Deep Dive에서 알 수 있듯이, BossGroup은 MultithreadEventLoopGroup을 상속한 NioEventLoopGroup이다. 이 클래스의 register 메서드를 보면 다음과 같이 정의되어있다.

MultithreadEventLoopGroup.register(Channel)
MultithreadEventLoopGroup.register(Channel)

next() 메서드를 통해 NioEventLoop를 선택하여 특정 EventLoop에 Channel을 등록하게한다.

Netty에서 Channel의 생명주기는 오직 하나의 EventLoop에서만 관리된다.

즉, 하나의 Channel이 두 EventLoop에 의해 동시간대에 관리될 일은 없다.

그리고 next()는 특정 EventLoop를 고르는 EventExecutorChooser를 사용하는데 Netty는 디폴트로 PowerOfTwoEventExecutorChooser를 사용한다. 알고리즘은 아래와 같으며 라운드로빈 방식과 동일하다고보면된다.

PowerOfTwoEventExecutorChooser.next()
PowerOfTwoEventExecutorChooser.next()

EventLoop를 골랐으니 이제 register(Channel)을 통해 EventLoop에 Channel을 등록한다.

SingleThreadEventLoop.register()
SingleThreadEventLoop.register()

앞에서 설명한 것처럼 모든 I/O 작업은 Unsafe 인터페이스에 위임하게된다. 위 코드에서도 unsafe().register(...)를 통해 위임한다. 그리고 아래 AbstractUnsafe의 register 메서드를 호출하게된다.

AbstractChannel.AbstractUnsafe.register()
AbstractChannel.AbstractUnsafe.register()

등록 과정은 먼저 EventLoop.inEventLoop()를 호출하여 현재 스레드가 EventLoop 실행 스레드인지 여부를 확인한다. 만약 EventLoop 실행 스레드라면 즉시 Channel 등록 메서드 (register0)를 호출하고, 만약 아니라면 EventLoop Queue에 Task로 넘긴다.

마지막으로 등록 과정을 살펴보려면 AbstractChannel.register()를 보면 된다.

AbstractChannel.register()
AbstractChannel.register()

코드에서 알 수 있듯이, invokeHandlerAddedIfNeeded()가 앞서 ServerBootstrapAcceptor와 Boss Group에 등록할 개발자 지정 handler를 등록하는 ChannelInitializer를 호출한다. (앞서 얘기했던 부분) 즉, 이때 개발자가 ChannelInitializer에 정의한 Handler들이 Pipeline에 추가되고 Initializer는 제거된다.

그리고 Pipeline내 channelRegistered, channelActive등의 Callback을 호출한다.

AbstractChannelHandlerContext.fireChannelRegistered()
AbstractChannelHandlerContext.fireChannelRegistered()

AbstractChannelHandlerContext.fireContextInbound()
AbstractChannelHandlerContext.fireContextInbound()

로직을 보면 findContextInbound를 통해 다음 실행할 Handler를 찾아 channelRegistered를 호출한다. 실제 처음 Channel이 등록되었으니 headContext부터 tail이 도달 할 때까지 Pipeline에 등록된 InboundHandler의 channelRegistered를 호출하게된다.

AbstractChannelHandlerContext.invokeChannelRegistered()
AbstractChannelHandlerContext.invokeChannelRegistered()

AbstractChannelHandlerContext.executor()
AbstractChannelHandlerContext.executor()

호출부분을 더 자세히보면 Netty를 이해하는데 중요한 부분이 나온다. 바로 Channel내 발생한 Event를 처리할 때 모든 이벤트 Callback 메서드 (chanellRegister, channelRead,… 등등)는 위와 같이 executor.inEventLoop()를 통해 바인딩된 EventExecutor Thread와 현재 메시지 전송을 요청하고있는 Thread가 동일한지 체크한다.

만약 다음 실행할 Handler (ChannelHandlerContext)에 executor(EventExecutor)가 설정되어있다면 해당 이벤트를 EventLoop의 executor가 아닌 따로 설정한 executor의 TaskQueue에 Task를 삽입하여 실행하게한다. 그리고 Queue에 쌓인 메시지는 이후에 해당 EventExecutor에 의해 처리되게된다.

다시 말해, 만약 사용자가 별도의 EventExecutor를 설정하지 않았다면 모든 Handler는 EventLoop Thread를 공유해서 사용하게된다. 그러므로 Pipeline의 tail외에는 Queue에 메세지가 버퍼링되는 일은 일어나지 않는다. 반면에 사용자가 ChannelHandlerContext에 특정 EventExecutor를 설정해주었다면, Executor가 달라지는 Handler에서는 Queue에 메시지가 버퍼링 된 후 서로 다른 EventExecutor에 의해 메시지가 비동기적으로 처리되게된다.


마지막으로 doRegister()를 통해 실제 EventLoop내 Selector에 Channel을 등록한다.

AbstractNioChannel.doRegister()
AbstractNioChannel.doRegister()

위 코드와 같이 doRegister()을 호출하여 NioServerSocketChannel을 EventLoop내 Selector에 등록한다.

위 코드는 기존 NIO에서 ServerSocketChannel을 Selector에 등록하는 것과 동일하다.


5-2 Socket에 ServerSocketChannel 바인딩

앞서 Reflection을 통해 NioServerSocketChannel 객체를 생성하고, 초기화하면서 Pipeline을 생성하고 초기화하였다. 그 과정에서 ServerBootstrapAcceptor와 개발자가 정의한 Handler들이 매핑되었으며, 마지막으로 ServerSocketChannel을 EventLoop내 Selector에 등록했다. 그리고 각 단계별로 수행되어야하는 Callback 함수들도 실행되는 것을 보았다.

이제 실제 ServerSocketChannel을 특정 Socket에 바인딩한다.

다시 AbstractBootstrapdoBind()로 돌아가면…

AbstractBootstrap.doBind()
AbstractBootstrap.doBind()

지금까지 initAndResiter()부분을 돌아본 것이며, 이제 바인딩해야하는 doBind()를 더 자세히 살펴보면..

AbstractBootstrap.doBind0()
AbstractBootstrap.doBind0()

Bootstrap에 설정한 IP와 Port를 대상으로 Channel을 바인딩한다. 그리고 이러한 작업은 EventLoop 실행 메서드를 통해 Task로 넘긴다.

EventLoop는 자신의 생명 주기에 따라 아래 bind 메서드를 실행한다.

AbstractChannel.bind()
AbstractChannel.bind()

DefaultChannelPipeline.bind()
DefaultChannelPipeline.bind()

AbstractChannel.bind() 메서드는 위와 같이 pipeline내 tailContext의 bind() 메서드를 호출한다.

눈치 챈 사람도 있겠지만.. bind()ChannelOutboundHandler의 스펙으로, tail부터 Pipeline내 모든 OutboundHandler를 거쳐 head까지 연쇄되어 호출된다. 그리고 그 과정에서 각각의 Handler는 아래 로직을 타게된다.

AbstractChannelHandlerContext.bind()
AbstractChannelHandlerContext.bind()

바로 위에서 살펴본 Inbound와 동일하게 findContextOutbound를 통해 실행할 Handler를 찾아 bind() 메서드 콜백을 호출하게된다.

이때도 당연히 EventExecutor가 EventLoop내 스레드면 바로 실행하고, 아니라면 EventLoop TaskQueue에 넣게된다.

그렇게 계속 호출하다보면 결국 headContext에 다다르는데 HeadContext를 살펴보면 아래와 같이 결국 unsafe.bind()를 호출하게된다.

HeadContext.bind()
HeadContext.bind()

그리고 unsafe의 구현체인 AbstractUnsafe를 통해 바인딩을 실행하게된다.

AbstractUnsafe.bind()
AbstractUnsafe.bind()

실제 바인딩을 진행하는 코드는 doBind()이고, 이는 서버의 경우 NioServerSocketChannel.doBind()를 호출한다.

NioServerSocketChannel.doBind()
NioServerSocketChannel.doBind()

코드에서 알 수 잇듯이, 지정된 포트에 바인딩을 진행한다. 그리고 이렇게 바인딩이 완료되면 AbstractUnsafe에서 pipeline.fireChannelActive()를 호출함으로써 channelActive()에 대한 이벤트를 InboundHandler들에 전파하기 시작한다.


정리해서 바인딩 과정을 도식화하면 아래와 같다.

tail부터 head까지 이벤트 전파되고, head에서 unsafe를 통해 bind된다.
tail부터 head까지 이벤트 전파되고, head에서 unsafe를 통해 bind된다.

참고로 모든 Outbound 콜백 메서드는 위와 동일하게 동작한다.


그리고 channelActive()가 호출되면서 발생하는 Inbound를 도식화하면 아래와 같다.

head부터 tail까지 이벤트 전파.
head부터 tail까지 이벤트 전파.

모든 Inbound 이벤트도 위와 같이 동일하게 동작하게된다.


5-3 Channel 생성 및 소멸 생명주기

지금까지 ServerSocket을 생성하고 Socket에 바인딩하는 과정을 살펴보았다.

그리고 그 과정에서 여러가지 Channel에 대한 콜백 메서드가 실행되었는데, 이를 통해 Channel의 생성 및 소멸 생명주기를 간단히 정리하면 아래와 같다.

  • ChannelUnregistered: Channel은 만들어졌지만, EventLoop엔 매핑되지 않는 단계를 말한다.
    • ex. 새로운 커넥션이 들어와 SocketChannel이 만들어졌지만, EventLoop엔 매핑되지 않는 단계.
  • ChannelRegistered: Channel이 만들어졌으며, EventLoop에 등록된 단계를 말한다.
    • ex. 새로운 커넥션이 들어와 SocketChannel이 만들어졌으며, EventLoop에 등록된 단계
  • ChannelActive: Channel이 활성화된 단계. 원격 디바이스와 연결된 상태.
  • ChannelInactive: Channel이 비활성화된 단계. 원격 디바이스와 연결이 끊긴 상태이다.

위 내용은 생성 및 소멸에 대한 생명주기이며, Socket으로부터의 read/write 생명주기는 아래에서 더 자세히 정리한다.


6 Deep Dive - EventLoop 실행

지금까지 아래 내용에 대해서 Deep Dive 해보았다.

  • Channel 생성 및 설정
  • ServerSocketChannel EventLoop에 등록 그리고 Socket에 바인딩
  • Pipeline내 이벤트가 어떻게 전파되는지?
    • Inbound/Outbound Handlers

이제 실제 EventLoop내 Executor로부터 Thread를 할당받아 EventLoop를 실행시킬 차례다.

즉, Netty내 EventLoop는 어떻게 동작하는지 Deep Dive 해본다.


Netty내 EventLoop의 상위 클래스인 SingleThreadEventExecutor는 새 작업 (Runnable Task)이 Push할 때 사용하도록 실행 메서드 (execute())를 제공한다.

이 메서드는 특히 Channel을 EventLoop에 등록할 때 호출되며, 앞서 Channel 생성하고 EventLoop에 등록하고나서 Socket에 바인딩할 때도 Channel.eventLoop().execute(Task)를 통해 호출되는 것을 자주 볼 수 있었다.

SingleThreadEventExecutor.execute()
SingleThreadEventExecutor.execute()

로직에서 알 수 있듯이, 먼저 EventLoop내 TaskQueue에 Task를 추가한다. (addTask)

그리고 만약 작업을 요청한 Thread가 EventLoop Thread가 아닌 경우, Task를 추가하고 startThread를 통해 EventLoop Thread를 시작시킨다. (startThread())

이와 관련해서 더 자세히 코드로 살펴보자면..


💁‍♂️ addTask

addTask의 구현부분을 자세히보면 아래와 같다.

SingleThreadEventExecutor.addTask()
SingleThreadEventExecutor.addTask()

별다른 처리는 없고, 그저 EventLoop내 taskQueue에 Task를 offer (삽입)한다.


💁‍♂️ startThread

앞서 보았듯이 EventLoop는 addTask하고 만약 EventLoop.execute(Runnable)을 호출한 Thread가 다른 Thread라면 바로 startThread()를 호출한다.

Channel을 EventLoop에 등록하는 Task등은 보통 다른 Thread이므로, EventLoop를 종료하거나 EventLoop내 Thread를 시작시키는 것외엔 Task 요청은 모두 외부 Thread로부터 호출된다.

SingleThreadEventExecutor.startThread()
SingleThreadEventExecutor.startThread()

로직은 비교적 복잡한데, 간단히보면 EventLoop의 Thread가 아직 실행되지않았다면 실행시킨다. 그리고 Thread 실행은 doStartThread()에 위임한다.


💁‍♂️ doStartThread

실질적으로 EventLoop의 Thread를 실행시켜 EventLoop 로직을 트리거거는 메서드는 doStartThread()이다.

SingleThreadEventExecutor.doStartThread()
SingleThreadEventExecutor.doStartThread()

코드는 비교적 길게 작성되어있는데 간단히보면 EventLoop 생성하면서 할당했던 executor (Thread Pool)에 execute()를 호출하여 Thread를 할당받는다.

ThreadPerTaskExecutor.execute()
ThreadPerTaskExecutor.execute()

위 구현체는 NioEventLoopGroupNioEventLoop를 생성하면서 할당하는 Executor다.

기본적으로 DefaultThreadFactory를 사용하게 되는데, 코드를 보면 매번 새로운 Thread를 생성하여 반환한다.

그리고 할당 받은 Thread에서 로직을 수행하는데 가장 중요한 부분은 아래 부분이다.

가장 중요한 이유는 실질적인 EventLoop의 동작 방식이 모두 run()에 정의되어있기 때문이다.

그리고 run()은 추상 메서드로서 아래와 같이 EventLoop 구현체에 따라 구현 부분을 가지고 있으며, 실제 EventLoop를 실행시키는 메서드다.

SingleThreadEventExecutor.run()
SingleThreadEventExecutor.run()

run()을 호출함으로써 구현체에따라 EventLoop내 스레드가 동작하게된다.

이번 글은 NioEventLoop 대상으로 작성되었다.


7 DeepDive - EventLoop 처리 과정

앞서 run()을 호출하여 EventLoop를 실행시키는 부분까지 살펴보았다.

코드를 살펴보기전에 EventLoop가 처리되는 과정을 다시 살펴보면 아래와 같다.

  1. Selector.select()
  2. processSelectedKeys
  3. runAllTasks()

그리고 위 EventLoop의 처리 과정은 NioEventLoop.run()을 보면 그대로 구현되어있다.


7-1 Selector.select()

NioEventLoop.run()
NioEventLoop.run()

NioEventLoop 스레드가 실행되면 무한 반복 (for ( ; ; ) {})하면서 EventLoop 과정을 반복한다.

이때 첫번째로 등록된 Channel내 변경된 이벤트를 모니터링하여 처리하는 select()가 호출된다.

EventLoop내 Queue에 Task가 없으면 (if(!hasTasks())) Selector내 등록된 Channel중 변경사항이 있는지 select() 를 호출함으로써 확인한다.

selector.select(curDeadlineNanos): 하나 혹은 여러 개의 Channel내 들어오는 이벤트를 모니터링한다. curDeadlineNanos 만큼의 시간이 지나면 selectorNow()를 호출함으로써 Queue내 Task가 영원히 기다리는것을 방지하기위해 사용된다. 디폴트로는 사용되지않는다.


💁‍♂️ Event Type

select() 를 통해 감지하는 이벤트는 아래와 같다.

  • OP_ACCEPT: Accepting client connection.
  • OP_READ: Channel Ready for Read operation.
  • OP_WRITE: Channel is writable.
  • OP_CONNECT: TCP connection has been established.

7-2 processSelectedKeys()

NioEventLoop.run()
NioEventLoop.run()

selectStrategy에 따라 감지된 I/O 이벤트는 processSelectedKeys()에 의해 처리된다. 그리고 먼저 select된 Event에 대한 처리를하고, finally로 runAllTasks() 를 호출함으로써 EventLoop내 Task를 실행한다.

processSelectedKeys()에 대해서 더 자세히 알아보자면..

NioEventLoop.processSelectedKeys()
NioEventLoop.processSelectedKeys()

만약 selected된 key가 존재한다면 processSelectedKeysOptimized 를 호출한다.

반대로 selected된 key가 없다면 selector.selectedKeys()로 감지된 (selected)된 key를 재차 찾아 selected된 key에 대한 처리를 진행한다.

NioEventLoop.processSelectedKeysOptimized()
NioEventLoop.processSelectedKeysOptimized()

그리고 selected된 keys를 반복하며 processSelectedeKey()를 호출한다.

NioEventLoop.processSelectedKeys()
NioEventLoop.processSelectedKeys()

Nio에 대해서 익숙한 개발자라면 한눈에 바로 어떤 코드인지 이해가 될 것이다.

간단히보면 selected된 I/O 이벤트가 어떤 타입이냐에 따라 처리를 알맞는 unsafe 인터페이스 메서드에 위임한다.


7-2-1 CONNECT 이벤트

TCP Connection이 수립되었을 때 발생하는 이벤트.

해당 채널내 NioUnsafe.finishConnect() 메서드를 호출한다.

AbstractNioChannel.finishConnect()
AbstractNioChannel.finishConnect()

그리고 doFinishConnect()를 호출하여 어떤 Channel 구현체인지에 따라 몇가지 validate를 진행하고 fulfillConnectPromise 메서드를 호출한다.

AbstractNioChannel.fulfillConnectPromise()
AbstractNioChannel.fulfillConnectPromise()

코드에서 알 수 있듯이 Channel에 연결된 ChannelPipeline내 InboundHandler의 생명주기중 channelActive 를 콜백하게된다. 당연 처음 연결된 Channel의 경우 HeadContext의 fireChannelActive를 호출하게된다.

그리고 pipeline.fireChannelActive가 호출됨에 따라, 구현체를 쭉 안으로 들어가면 아래와 같이 나오게된다.

AbstractChannelHandlerContext.invokeChannelActive()
AbstractChannelHandlerContext.invokeChannelActive()

headContext가 아닌경우는 그저 해당 Handler의 channelActive() 를 호출한다. 반면, headContext즉, Pipeline의 가장 앞 ChannelHandler에선 아래와 같이 Channel이 등록된 Selector에 이제 이 채널은 READ 이벤트에 관심있어라고 호출하게됨으로써, Selector가 해당 Channel내 읽을 부분이 있을지 이벤트를 감지하기 시작한다.

AbstractChannelHandlerContext.invokeChannelActive()
AbstractChannelHandlerContext.invokeChannelActive()

그리고 HeadContext의 경우는 아래와 같이 Unsafe.beginRead()를 호출한다.

HeadContext.read()
HeadContext.read()

마지막으로 unsafe의 구현체중 Connection과 관련된 AbstractNioChannel.doBeginRead()를 보면 아래와 같이 Selector에 READ_OP을 관심 이벤트로 등록하게되면서 CONNECT 이벤트의 처리는 끝난다.

AbstractNioChannel.doBeginRead()
AbstractNioChannel.doBeginRead()

정리하면 CONNECT 이벤트인 경우

  • Channel내 연결된 pipeline의 fireChannelActive를 호출함으로써 pipeline내 Head Inbound Handler의 channelActive가 콜백된다.
  • 보통 가장 처음 HeadContext를 호출하게되는데, HeadContext는 설정된 다음 Handler의 channelActive를 콜백한 후, Unsafe를 통해 Selector에 이제 READ 이벤트를 받을 준비가 되었다고 알려준다. (interestOp ⇒ READ_OP)

7-2-2 READ 이벤트

다시 아래와 같이 NioEventLoop내 processSelectedKeys 메서드로 돌아와서 코드를 살펴본다.

만약 Selector로부터 Channel내 READ 이벤트가 감지된 경우 unsafe.read() 가 호출된다.

NioEventLoop.processSelectedKeys()
NioEventLoop.processSelectedKeys()

이 unsafe 인터페이스는 여기서 Channel에 따라 서로 다른 두 개의 구현체를 사용한다.

  • NioServerSocketChannel의 경우
  • NioSocketChannel의 경우

💁‍♂️ NioServerSocketChannel의 경우

NioServerSocketChannel은 서버 Socket Channel이다.

AbstractNioMessageChannel을 상속한 AbstractNioMessageChannelread()가 실행된다.

AbstractNioMessageChannel.NioMessageUnsafe.read()
AbstractNioMessageChannel.NioMessageUnsafe.read()

코드에서 볼 수 있듯이, NioServerSocketChannel.doReadMessage에 데이터 읽기를 위임한다.

위임받는 메서드에선 아래와 같이 먼저 채널에 대해 연결 수락 (accept())을 수행한 후, 연결이 수립된 NioSocketChannel을 생성한다.

NioServerSocketChannel.doReadMessages
NioServerSocketChannel.doReadMessages

그리고 위 SocketChannel은 바로 뒤에서 ServerBootStrapAcceptor.channelRead에 의해 worker eventloop group (child group)에 등록된다.

ServerBootStrapAcceptor.channelRead()
ServerBootStrapAcceptor.channelRead()


💁‍♂️ NioSocketChannel의 경우

NioSocketChannel은 클라이언트에서 요청한 Socket이거나 ServerSocketChannel에 의해 TCP 연결이 수립되면서 생성된 Channel이다.

TCP 연결이 수립된 후, 해당 Channel이 worker group내 EventLoop에 의해 관리되기 시작하면, unsafe의 구현체는 AbstractNioByteChannel.read()를 호출한다.

AbstractNioByteChannel.read()
AbstractNioByteChannel.read()

read하는 과정은 다음과 같다.

  1. Channel의 config를 통해 두 메모리 할당자를 가져온다.
    • ByteBuf 메모리 할당자 (allocator) - Nio의 ByteBuf를 생성하거나 할당받는 역할. 실제 Netty내에서 사용될 요청 데이터를 저장하는 버퍼 역할. (pooled, unpooled)
    • RecvByteBuf 메모리 할당자 (allocHandle) - 읽기 처리에 어느정도의 메모리를 할당해야하는지 계산하여 최적의 버퍼 용량을 예측하는데 사용된다.
      • allocHandle.allocate() 메서드를 통해 모든 인바운드 데이터를 읽을 수 있을 만큼 크고 공간을 낭비하지 않을 만큼 작은 용량의 새 수신 버퍼를 만든다.
  2. RecvByteBufAllocator로부터 새로 생성한 handle을 reset한다.
    • 새로운 읽기 작업의 최적의 버퍼 사이즈를 찾기위해 이전에 작업한 내용을 모두 reset.
  3. 읽기 루프를 돌입한다.
    • allocHandle.allocate()를 호출함으로써 모든 인바운드 데이터를 읽어들일 ByteBuf를 할당한다. 이때 내부적으로 RecvByteBuf.Handle.guess()를 호출하여 실제 할당을 수행하지 않고 필요한 버퍼 크기를 추측하여 추측된 사이즈의 ByteBuf를 할당한다.
    • doReadBytes(byteBuf)를 통해 실제 데이터를 Socket Read 버퍼로부터 받아온다.
    • 만약 Socket이 close되었거나, Socket으로부터 더이상 읽을 게 없을때 루프는 종료 (break) 된다.
    • 무한 반복될 수 없으니, ChannelMetadata내의 read 속성 (디폴트는 16번)에 횟수따라 루프를 반복한다. 그리고 TCP 전송이 너무 크면 데이터 스트림 기반으로 여러 전송으로 분할된다. 그 한번의 전송이 루프내 한번의 channelRead 콜백이며, 데이터가 너무크면 여러 번의 파이프라인 channelRead가 호출된다. 따라서 channelRead 메서드를 사용할 때 주의가 필요하다. 데이터 양이 많은 경우엔 캐시에 데이터를 넣어두고 channelReadComplete에서 처리하도록하는게 좋다.
  4. 읽기 루프를 빠져나온 후 allocHandlereadComplete 메서드를 호출하여 읽기가 완료되었음을 나타내고 다음번 메모리 할당을 위해 읽은 기록을 기록한다. 그리고 파이프라인 channelReadComplete를 호출한다.

이와 관련하여 잘 정리된 글 - The unSafe.read Method of Netty Source Code Analysis


7-2-3 WRITE 이벤트

다시 NioEventLoop내 precessSelectedKeys()메서드로 돌아와서 WRITE 이벤트를 살펴본다.

NioEventLoop.processSelectedKeys()
NioEventLoop.processSelectedKeys()

코드에서 알 수 있듯이, 만약 Selector로부터 Channel내 write 이벤트가 감지된 경우 unsafe.forceFlush()를 호출한다.

forceFlush() 코드를 따라가보면 AbstractChannel.flush0()를 호출하며 코드는 다음과 같다.

AbstractChannel.flush0
AbstractChannel.flush0

코드를 간단히보면 ChannelOutboundBuffer를 가져와서 doWrite(outboundBuffer)를 호출한다.

doWrite의 구현체는 모든 Nio, Epoll 등등 SocketChannel의 구현체마다 구현하고있으며, 실제 flush를 통해 Socket에 데이터를 기록하는 작업을 수행한다.


ChannelOutboundBuffer

Netty의 모든 Channel Socket는 ChannelOutboundBuffer를 가진다. 정확히는 Channel Socket의 Unsafe 구현체가 이 버퍼를 가지고 있다. 그리고 Channel에서의 모든 output 작업은 모두 ChannelOutboundBuffer를 거치게된다. 이렇게 하는 이유는 네트워크의 처리량을 향상시키기 위함이며, 실제로 외부에 write 작업을 요청하면, 그 데이터는 Socket에 바로 기록되지않고, ChannelOutboundBuffer에 먼저 기록되고나서, flush가 호출되어야 실제 Socket에 기록된다.

즉, ChannelHandlerContext에서 write()을 호출하면 ChannelOutboundBuffer에 데이터가 쌓이고, flush()를 호출하거나, EventLoop내 WRITE 이벤트가 감지되면 ChannelOutboundBuffer내 Socket에 데이터를 기록하여 Socket이 연결된 리모트에 데이터가 전송된다.


💁‍♂️ ChannelOutboundBuffer 구조와 동작 과정

ChannelOutboundBuffer (이하 버퍼)는 Entry Node를 요소로 가진 Linked-List 구조로 되어있다.

동작 과정을 도식화하면 아래와 같다.

write할 때 addMessage 메서드를 호출하여 Entry를 생성하고 리스트의 TailEntry 노드에 append한다. 그리고 addFlush가 호출되었을 때 unflushedEntry의 참조를 flushedEntry에 할당한 다음, unflushedEntry의 참조는 null로 변경한다.

참고로 addFlush는 실제 Socket에 데이터를 기록하진않는다. 대신 unflushedEntry의 참조를 flushedEntry에 할당함으로써 곧 flush된다는 의사만 나타낸다.

이렇게 하는 이유는 Netty는 비동기가 기본이라 모든 요청에 대해 Promise를 제공하는데, 이는 취소될 수 있다. 따라서 addFlush가 호출된후엔 flush하기 전에 취소할 수 없다는 점을 약속하기위해 호출된다한다.

addFlush()메서드가 호출되고나선, Channel은 실제 Socket에 데이터를 flush하기위해 flush0()를 호출한다.

보통 NIO의 경우 따로 WRITE 이벤트를 등록하지않아도, TCP 버퍼에 공간이 있다면 자동으로 flush0()를 호출한다. 오직 TCP 버퍼가 water level에 도달 했을때만 해주면 된다고 한다.


💁‍♂️ ChannelOutboundBuffer 사용이유

실제 코드와 여러 문서를 보면서 내가 느낀 ChannelOutboundBuffer가 존재하는 가장 큰 이유는 네트워크에 대한 Backpressure 때문인 듯 하다. 즉, ChannelOutboundBuffer를 사용함으로써 동시에 처리할 수 있는 것보다 더 빠른 속도로 네트워크나 수신자에게 데이터가 넘치치 않도록 보장하기 위해 사용한다. 실제로 Netty는 TCP 버퍼가 한계까지 도달했는지 isFlushPending()을 통해 확인한다.

그외에도 Netty는 기본이 비동기이기 때문에, write할 데이터를 버퍼에 저장함으로써 Thread가 Blocking되지 않도록하기위해 버퍼를 사용하는 듯하다.


💁‍♂️ Channel Watermark

한가지 주의할 점은 TCP 커넥션의 상대방이 데이터를 느리게 수신할 경우 버퍼에 해제할 수 없는 데이터양이 많아 OOM이 발생할 수 있다는 점이다. 다시 말해, ChannelOutboundBuffer가 Linked-List로 구현되어있어 무제한으로 Node가 추가될 수 있어 OOM이 발생할 수 있다. Netty는 이러한 문제를 해결하기위해 High Watermark와 Low Watermark를 설정할 수 있게하여 ChannelOutboundBuffer에 용량을 제한한다. 그리고 메모리 크기가 임계치보다 높아지면 Netty는 Channel의 상태를 unwritable로 설정하고, Channel내 Pipeline의 fireChannelWritabilityChanged를 트리거한다. 이때, 병목이 해결될 수 있도록, channelWritabilityChanged내 데이터를 느리게 처리하도록 설정할 수 있다.

이로인해 Netty는 기본적으로 버퍼내 pending 데이터가 설정한 High Watermark만큼 도달했는지 확인하는 isWritable 스위치를 사용한다.

ChannelOutboundBuffer.isWritable()
ChannelOutboundBuffer.isWritable()

마지막으로 버퍼의 데이터가 임계치값보다 낮아지면 스위치는 꺼지고 fireChannelWritabilityChanged도 호출되지 않는다. 그러므로 이 두 값을 잘 설정하여 사용하는 것이 중요하다.

이와 관련하여 잘 정리된 글 - Netty outbound buffer ChannelOutboundBuffer source code analysis


7-3 runAllTasks()

이제 마지막 단계는 TaskQueue에 있는 Task를 polling하여 실행하는 runAllTasks()이다.

NioEventLoop.runAllTasks()
NioEventLoop.runAllTasks()

Task를 실행하는 과정은 간단하다. Queue로부터 Task를 polling후 Task에 해당하는 Runnablerun()을 호출함으로써 바로 실행한다.


7-3-1 언제 Queue에 Task가 쌓이는 것인가?

이때 한가지 의문이 드는데, 앞서 processSelectedKey 단계에서 I/O에 해당하는 connect, read, write 이벤트에 대한 처리를 EventLoop내 Thread가 모두 처리하는 것처럼 보이며, 어디에도 Queue에 Task를 추가하는 건 보이지 않았다.


🤔 그렇다면 언제 Queue엔 언제 Task가 쌓이며 어떤 Task가 쌓이는 것일까?

이와 관련해서는 ChannelHandler들을 실행하는 주체인 ChannelHandlerContext내 write 코드를 살펴보면 이해할 수 있다.

AbstractChannelHandlerContext.write()
AbstractChannelHandlerContext.write()

코드에서 볼 수 있듯이, EventLoop는 크게 두 가지 방식으로 Task를 실행한다.

  1. Task가 EventLoop Thread에서 발생한 경우
    • EventLoop의 TaskQueue에 들어가지않고 select()후 즉시 실행되게된다.
  2. Task가 다른 Thread에서 발생한 경우
    • EventLoop내 Thread가 아닌 다른 Thread에서 비동기로 실행되어 추가된 Task는 TaskQueue에 들어가 EventLoop Thread가 runAllTasks()를 처리하는 시점에 처리된다.

실제로 inEventLoop()의 구현부를 살펴보면 아래와 같다.

즉, 아래와 같이 다른 스레드에서 특정 비즈니스를 실행하고 context에 write하는 코드는 TaskQueue에 쌓여 순서에 따라 I/O 처리가 진행된다.

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    Executors.newFixedThreadPool(3).execute(() -> {
        ctx.writeAndFlush(Unpooled.copiedBuffer(" binghe netty client!", CharsetUtil.UTF_8));
    });
}

8 DeepDive - Socket에 데이터 Write하는 두 가지 방법

이 글의 마지막으로 Netty가 어떻게 Socket에 데이터를 write 하는지 그 과정을 살펴본다.

앞서 ChannelOutboundBuffer에서 언급했듯이, Netty내 write 메서드는 실제로 데이터를 원격 Socket에 전송하지 않고, 단지 ChannelOutboundBuffer에 데이터를 추가하고 실제 데이터는 flush되어야 전송된다.

정확히는 ChannelHandlerContext에서 write()을 호출하면 ChannelOutboundBuffer에 데이터가 쌓이고, flush()를 호출하거나, EventLoop내 WRITE 이벤트가 감지되면 ChannelOutboundBuffer내 Socket에 데이터를 기록하여 Socket이 연결된 리모트에 데이터가 전송된다.

그리고 Netty는 쓰기 작업을 요청할 때 두 가지 방법을 제공한다.

  1. ctx.writeAndFlush()
  2. ctx.channel().writeAndFlush()

두 방법의 유일한 차이점은 ctx.channel().writeAndFlush()은 호출되면 tailContext부터 시작하여 pipeline에 정의된 모든 OutboundHandler를 통해 headContext까지 역순회를 시작한다는 것이다.

반대로 ctx.writeAndFlush()는 호출된 Handler에서 바로 headContext로 OutboundHandler를 찾아 역순회를 시작한다.


8-1 ctx.channel().writeAndFlush()

먼저 ctx.channel().writeAndFlush()에서 channel()AbstractChannel를 반환하게 되는데 writeAndFlush(Object msg) 코드를 보면 아래와 같다.

AbstractChannel.writeAndFlush()
AbstractChannel.writeAndFlush()

파이프라인에 위임하고있으며, 코드를 따라가보면..

DefaultChannelPipeline
DefaultChannelPipeline

위 코드에서도 알 수 있듯이, tail (tailContext)writeAndFlush()를 요청한다.

즉, 앞서 말했듯이 ctx.channel().writeAndFlush()은 호출되면 tailContext부터 시작하여 pipeline에 정의된 모든 OutboundHandler를 통해 headContext까지 역순회한다.

그림으로보면 아래와 같다.

Inbound B에서 ctx.channel().writeAndFlush()가 호출된 예시

ctx.channel().writeAndFlush()
ctx.channel().writeAndFlush()

마지막으로 tailContext는 AbstractChannelHandlerContext의 구현체이므로, tail의 위치에서 AbstractChannelHandlerContext.writeAndFlush()를 호출한다.


8-2 ctx.writeAndFlush()

반대로 ctx.writeAndFlush()는 바로 ChannelHandlerContext을 호출한다. 더 정확히는 호출된 위치에서의 AbstractChannelHandlerContext.writeAndFlush()를 바로 호출한다.

AbstractChannelHandlerContext.writeAndFlush()
AbstractChannelHandlerContext.writeAndFlush()

그림으로 보면 아래와 같다.

Inbound B에서 ctx.writeAndFlush()가 호출된 예시

ctx.writeAndFlush()
ctx.writeAndFlush()


8-3 writeAndFlush()

위에서 정리한 두 방법은 pipeline내 어떤 위치부터 writeAndFlush()를 트리거하냐의 차이가 있었다.

결국 두 방법 모두 ChannelHandlerContext의 writeAndFlush()이 호출되어 pipeline내 OutboundHandler를 연쇄적으로 호출하게된다.

AbstractChannelHandlerContext.write()
AbstractChannelHandlerContext.write()

AbstractChannelHandlerContext.findContextOutbound
AbstractChannelHandlerContext.findContextOutbound

ChannelHandlerContext.writeAndFlush()가 호출되면 위와 같이 findContextOutbound를 통해 다음 트리거할 OutboundHandler를 찾는다. 그리고 위에서 계속 얘기했던.. handler를 실행하는 Thread가 EventLoop 스레드인지 확인하고 맞다면 invokeWriteAndFlush()를 바로 실행하고, 아니라면 write task를 생성하고 TaskQueue에 넣어 나중에 실행될 수 있도록한다.

AbstractChannelHandlerContext.invokeWriteAndFlush()
AbstractChannelHandlerContext.invokeWriteAndFlush()

invokeWriteAndFlush()의 코드에서 알 수 있듯이, invokeWrite0invokeFlush0()를 호출한다.


💁‍♂️ invokeWrite0

AbstractChannelHandlerContext.invokeWrite0()
AbstractChannelHandlerContext.invokeWrite0()

headContext에 도달할 때까지 write 이벤트를 계속해서 트리거한다. 그리고 headContext에 도달하면 아래와 같이 unsafe.write(msg, promise)를 호출한다.

DefaultChannelPipeline.write()
DefaultChannelPipeline.write()

그리고 unsafe내 write 코드는 아래와 같이 ChannelOutboundBuffer에 addMessage함으로써 버퍼내 전송할 데이터를 추가한다.

AbstractUnsafe.write()
AbstractUnsafe.write()

앞서 말했듯이 이는 버퍼에 데이터만 추가할 뿐, 실제 OS Socket Buffer에 데이터를 보내진 않는다. 실제론 flush해야 Socket Buffer에 데이터를 보낸다.


💁‍♂️ invokeFlush0

AbstractChannelHandlerContext.invokeFlush0()
AbstractChannelHandlerContext.invokeFlush0()

flush도 동일하게 headContext에 도달할 때까지 flush 이벤트를 계속해서 트리거한다. 그리고 headContext에 도달하면 아래와 같이 unsafe.flush()를 호출한다.

DefaultChannelPipeline.flush()
DefaultChannelPipeline.flush()

그리고 unsafe내 flush 코드는 아래와 같이 ChannelOutboundBuffer에 addFlush()를 호출한다.

AbstractUnsafe.flush()
AbstractUnsafe.flush()

addFlush()의 의미는 이전에 버퍼에 추가한 메시지를 Flush 된 것으로 표시된 것으로 처리하기 위함이다. 당연히 바로 flush0()를 호출하여 Socket에 실제 데이터를 전송한다.

그리고 그 코드는 아래와 같다.

AbstractChannel.flush0()
AbstractChannel.flush0()

가장 중요한 부분은 doWrite()이며, 이는 데이터를 주고 받는 AbstractNioByteChannel.doWrite()를 호출하게된다. 그리고 호출함에 따라 지정된 데이터를 Socket Buffer에 flush한다. 그리고 이는 OS에 의해 Socket에 연결된 원격 디바이스에 전송되게된다.

AbstractNioByteChannel.doWrite()
AbstractNioByteChannel.doWrite()

Socket Buffer에 데이터를 flush하는건 데이터 타입에 따라 아래와 같이 다르게 처리하게 된다.

AbstractNioByteChannel.doWriteInternal()
AbstractNioByteChannel.doWriteInternal()

보통은 ByteBuf를 보낼 텐데, ByteBuf는 doWriteBytes()를 통해 주어진 ByteBuf의 바이트를 Socket Buffer에 write한다.

NioSocketChannel.doWriteBytes()
NioSocketChannel.doWriteBytes()


💁‍♂️ ChannelOutboundBuffer의 데이터가 High Watermark (최대 임계치)를 넘은 경우

High Watermark에 도달하여 write를 완료할 수 없을 때, Netty는 flush 작업을 예약한다.

예약하는 부분은 아래의 incompleteWrite 코드를 호출하면서 진행된다.

AbstractNioByteChannel.doWrite()
AbstractNioByteChannel.doWrite()

AbstractNioByteChannel.incompleteWrite()
AbstractNioByteChannel.incompleteWrite()

코드에서 알 수 있듯이, flush 예약은 바로 Selector에 채널 이벤트로 OP_WRITE를 전달함으로써 EventLoop가 ch.unsafe.forceFlush()를 호출하게한다.

EventLoop 처리 과정중 OP_WRITE은 forceFlush()를 호출한다
EventLoop 처리 과정중 OP_WRITE은 forceFlush()를 호출한다

이 내용은 EventLoop의 실행과정중 write 이벤트에 해당한다.


참조


© mark-kim.blog Built with Gatsby