Loading...

文章背景图

Netty

2024-05-08
2
-
- 分钟
|

Netty 是由JBOSS 提供的一个 Java 开源框架,现为 Github上的独立项目。主要针对在TCP协议下,面向Clients端的高并发应用,或者Peer-to-Peer场景下的大量数据持续传输的应用。Netty是一个异步的、基于事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络 IO 程序。Netty本质是一个NIO框架,适用于服务器通讯相关的多种应用场景

1 I/O 基础

1.1 Linux网络I/O模型简介

Linux的内核将所有外部设备都看做一个文件来操作,对一个文件的读写操作会调用内核提供的系统命令,返回一个file descriptor(fd,文件描述符)。而对一个socket的读写也会有相应的描述符,称为socketfd(socket描述符),描述符就是一个数字,它指向内核中的一个结构体(文件路径,数据区等一些属性)。

根据UNIX网络编程对I/O模型的分类,UNIX提供了5种I/O模型,分别如下。

  1. 阻塞I/O模型:最常用的I/O模型就是阻塞I/O模型,缺省情形下,所有文件操作都是阻塞的。我们以套接字接口为例来讲解此模型:在进程空间中调用recvfrom,其系统调用直到数据包到达且被复制到应用进程的缓冲区中或者发生错误时才返回,在此期间一直会等待,进程在从调用recvfrom开始到它返回的整段时间内都是被阻塞的,因此被称为阻塞I/O模型

image-20240404160841402

  1. 非阻塞I/O模型:recvfrom从应用层到内核的时候,如果该缓冲区没有数据的话,就直接返回一个EWOULDBLOCK错误,一般都对非阻塞I/O模型进行轮询检查这个状态,看内核是不是有数据到来

image-20240404160930004

  1. I/O复用模型:Linux提供select/poll,进程通过将一个或多个fd传递给select或poll系统调用,阻塞在select操作上,这样select/poll可以帮我们侦测多个fd是否处于就绪状态。select/poll是顺序扫描fd是否就绪,而且支持的fd数量有限,因此它的使用受到了一些制约。Linux还提供了一个epoll系统调用,epoll使用基于事件驱动方式代替顺序扫描,因此性能更高。当有fd就绪时,立即回调函数rollback

image-20240404161353725

  1. 信号驱动I/O模型:首先开启套接口信号驱动I/O功能,并通过系统调用sigaction执行一个信号处理函数(此系统调用立即返回,进程继续工作,它是非阻塞的)。当数据准备就绪时,就为该进程生成一个SIGIO信号,通过信号回调通知应用程序调用recvfrom来读取数据,并通知主循环函数处理数据

image-20240404161535712

  1. 异步I/O:告知内核启动某个操作,并让内核在整个操作完成后(包括将数据从内核复制到用户自己的缓冲区)通知我们。这种模型与信号驱动模型的主要区别是:信号驱动I/O由内核通知我们何时可以开始一个I/O操作;异步I/O模型由内核通知我们I/O操作何时已经完成

image-20240404161624990

1.2 I/O多路复用技术

在I/O编程过程中,当需要同时处理多个客户端接入请求时,可以利用多线程或者I/O多路复用技术进行处理。I/O多路复用技术通过把多个I/O的阻塞复用到同一个select的阻塞上,从而使得系统在单线程的情况下可以同时处理多个客户端请求。与传统的多线程/多进程模型比,I/O多路复用的最大优势是系统开销小,系统不需要创建新的额外进程或者线程,也不需要维护这些进程和线程的运行,降低了系统的维护工作量,节省了系统资源

目前支持I/O多路复用的系统调用有select、pselect、poll、epoll,在Linux网络编程过程中,很长一段时间都使用select做轮询和网络事件通知,然而select的一些固有缺陷导致了它的应用受到了很大的限制,最终Linux不得不在新的内核版本中寻找select的替代方案,最终选择了epoll。epoll与select的原理比较类似,为了克服select的缺点,epoll作了很多重大改进

  1. 支持一个进程打开的socket描述符(FD)不受限制(仅受限于操作系统的最大文件句柄数)。select最大的缺陷就是单个进程所打开的FD是有一定限制的,它由FD_SETSIZE设置,默认值是1024。epoll并没有这个限制,它所支持的FD上限是操作系统的最大文件句柄数,具体的值可以通过cat /proc/sys/fs/file-max察看,通常情况下这个值跟系统的内存关系比较大。
  2. **I/O效率不会随着FD数目的增加而线性下降。**传统select/poll的另一个致命弱点,就是当你拥有一个很大的socket集合时,由于网络延时或者链路空闲,任一时刻只有少部分的socket是“活跃”的,但是select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。epoll不存在这个问题,它只会对“活跃”的socket进行操作——这是因为在内核实现中,epoll是根据每个fd上面的callback函数实现的。那么,只有“活跃”的socket才会去主动调用callback函数,其他idle状态的socket则不会。在这点上,epoll实现了一个伪AIO。
  3. **使用mmap加速内核与用户空间的消息传递。**无论是select、poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存复制就显得非常重要,epoll是通过内核和用户空间mmap同一块内存来实现的。
  4. **epoll的API更加简单。**包括创建一个epoll描述符、添加监听事件、阻塞等待所监听的事件发生、关闭epoll描述符等。

1.3 Java I/O模型

Java 共支持 3 种网络编程模型/IO模式:BIO、NIO、AIO

  1. Java BIO:同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销
  2. Java NIO:同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理
  3. Java AIO:异步非阻塞,AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程

2 NIO

2.1 NIO基本介绍

Java NIO 全称 java non-blocking IO,是指 JDK 提供的新API。从 JDK1.4 开始,Java 提供了一系列改进的输入/输出的新特性,被统称为 NIO(即 New IO),是同步非阻塞的。它有三个核心组件Channel(通道),Buffer(缓冲区),Selector(选择器)

NIO相关类都被放在 java.nio 包及子包下,并且对原 java.io包中的很多类进行改写。NIO是面向缓冲区 ,或者面向块编程的。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络。

Java NIO的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情

2.2 NIO的三大组件

2.2.1 缓冲区 - Buffer

缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。在NIO库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,写入到缓冲区中。任何时候访问NIO中的数据,都是通过缓冲区进行操作。

缓冲区实质上是一个数组。通常它是一个字节数组(ByteBuffer),也可以使用其他种类的数组。但是一个缓冲区不仅仅是一个数组,缓冲区提供了对数据的结构化访问以及维护读写位置(limit)等信息。

最常用的缓冲区是ByteBuffer,一个ByteBuffer提供了一组功能用于操作byte数组。除了ByteBuffer,还有其他的一些缓冲区,事实上,每一种Java基本类型(除了Boolean类型)都对应有一种缓冲区,

image-20240404203003502

2.2.2 通道 - Channal

Channel是一个通道,它就像自来水管一样,网络数据通过Channel读取和写入。通道与流的不同之处在于通道是双向的,流只是在一个方向上移动(一个流必须是InputStream或者OutputStream的子类),而通道可以用于读、写或者二者同时进行。

因为Channel是全双工的,所以它可以比流更好地映射底层操作系统的API。特别是在UNIX网络编程模型中,底层操作系统的通道都是全双工的,同时支持读写操作。

Channel可以分为两大类:用于网络读写的SelectableChannel和用于文件操作的FileChannel。FileChannel 用于文件的数据读写,DatagramChannel 用于 UDP 的数据读写,ServerSocketChannel 和 SocketChannel 用于 TCP的数据读写。

2.2.3 多路复用器 - Selector

多路复用器提供选择已经就绪的任务的能力。简单来讲,Selector会不断地轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。

graph TD
subgraph selector 
thread --> selector
selector --> c1(channel)
selector --> c2(channel)
selector --> c3(channel)
end

一个多路复用器Selector可以同时轮询多个Channel,由于JDK使用了epoll代替传统的select实现,所以它并没有最大连接句柄1024/2048的限制。这也就意味着只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端

2.3 ByteBuffer

2.3.1 基本使用

  1. 向 buffer 写入数据,例如调用channel.read(buffer)
  2. 调用 flip() 切换至读模式
  3. 从buffer读取数据,例如调用 buffer.get()
  4. 调用 clear() 或 compact() 切换至写模式
  5. 重复 1~4 步骤

这里演示读取文件, 见代码块:

@Slf4j
public class IByteBuffer {

    private IByteBuffer() {}

    public static void read(String filePath) {
        try (FileInputStream is = new FileInputStream(filePath)) {
            FileChannel channel = is.getChannel();
            // 准备缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(5);
            // 从channel读取数据, 向buffer写入
            StringBuilder sb = new StringBuilder();
            while (true) {
                int len = channel.read(buffer);
                if (len == -1) {    // 读到末尾
                    break;
                }
                // 读写切换, 写模式 -> 读模式
                buffer.flip();
                while (buffer.hasRemaining()) { // 游标是否还有剩余数据
                    // 读取buffer中的数据
                    sb.append((char) buffer.get());
                }
                buffer.clear(); // 读写切换, 读模式 -> 写模式
            }
            log.info("Read data: {}", sb);
            channel.close();
        } catch (IOException e) {
            log.error("ReadFileError: ", e);
        }
    }

}

2.3.2 ByteBuffer结构

ByteBuffer 有以下重要属性

属性描述
capacity容量,即可以容纳的最大数据量;在缓冲区创建时被设定并且不能改变
Limit表示缓冲区的当前终点,不能对缓冲区超过极限的位置进行读写操作。且极限是可以修改的
Position位置,下一个要被读或写的元素的索引,每次读写缓冲区数据时都会改变改值,为下次读写作准备
Mark标记

0021

写模式下,position 是写入位置,limit 等于容量,下图表示写入了 4 个字节后的状态

0018

flip 动作发生后,position 切换为读取位置,limit 切换为读取限制

0019

读取 4 个字节后,状态

0020

clear 动作发生后,状态

0021

compact 方法,是把未读完的部分向前压缩,然后切换至写模式

0022

2.3.3 ByteBuffer常用方法

2.3.3.1 分配空间

可以使用 allocate 方法为ByteBuffer分配空间,其它 buffer 类也有该方法

Bytebuffer buf = ByteBuffer.allocate(16);

注意两种用法

// class java.nio.HeapByteBuffer
System.out.println(ByteBuffer.allocate(16).getClass());
// class java.nio.DirectByteBuffer
System.out.println(ByteBuffer.allocate(16).getClass());
  • HeapByteBuffer:Java堆内存,读写效率较低,会收到GC的影响
  • DirectByteBuffer:直接内存,读写效率较高(少一次数据的拷贝,在零拷贝会讲),分配效率较低,使用不当会造成内存泄漏,不会收到GC影响(使用系统内存)

2.3.3.2 向buffer写入数据

有两种办法

  • 调用 channel 的 read 方法
  • 调用 buffer 自己的 put 方法
int readBytes = channel.read(buf);

buf.put((byte)127);

2.3.3.3 从buffer读取数据

同样有两种办法

  • 调用 channel 的 write 方法
  • 调用 buffer 自己的 get 方法
int writeBytes = channel.write(buf);

byte b = buf.get();

get 方法会让 position 读指针向后走,如果想重复读取数据

  • 可以调用rewind方法将position重新置为 0
  • 或者调用get(int i)方法获取索引i的内容,它不会移动读指针

2.3.3.4 mark和reset

mark是在读取时,做一个标记,即使position改变,只要调用reset就能回到 mark 的位置

注意

rewind和flip都会清除mark位置

2.3.3.5 字符串与 ByteBuffer 互转

@Test
public void test3() {
    // 1. 直接赋值
    ByteBuffer buffer = ByteBuffer.allocate("abcde".getBytes().length);
    buffer.put("abcde".getBytes());
    buffer.flip();
    System.out.println(new String(buffer.array()));

    // 2. Charset
    ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("abcde");
    String str = StandardCharsets.UTF_8.decode(buffer1).toString();
    System.out.println(str);

    // 3. wrap
    ByteBuffer buffer2 = ByteBuffer.wrap("abcde".getBytes());
    System.out.println(new String(buffer2.array()));
}

2.3.3.6 Scattering Reads

分散读取,有一个文本文件 3parts.txt

onetwothree

使用如下方式读取,可以将数据填充至多个 buffer

try (RandomAccessFile file = new RandomAccessFile("helloword/3parts.txt", "rw")) {
    FileChannel channel = file.getChannel();
    ByteBuffer a = ByteBuffer.allocate(3);
    ByteBuffer b = ByteBuffer.allocate(3);
    ByteBuffer c = ByteBuffer.allocate(5);
    channel.read(new ByteBuffer[]{a, b, c});
    a.flip();
    b.flip();
    c.flip();
    System.out.println(new String(a.array()));
    System.out.println(new String(b.array()));
    System.out.println(new String(c.array()));
} catch (IOException e) {
    e.printStackTrace();
}

2.3.3.7 Gathering Writes

使用如下方式写入,可以将多个 buffer 的数据填充至 channel

try (RandomAccessFile file = new RandomAccessFile("helloword/3parts.txt", "rw")) {
    FileChannel channel = file.getChannel();
    ByteBuffer d = ByteBuffer.allocate(4);
    ByteBuffer e = ByteBuffer.allocate(4);
    channel.position(11);

    d.put(new byte[]{'f', 'o', 'u', 'r'});
    e.put(new byte[]{'f', 'i', 'v', 'e'});
    d.flip();
    e.flip();
    System.out.println(new String(d.array()));
    System.out.println(new String(e.array()));
    channel.write(new ByteBuffer[]{d, e});
} catch (IOException e) {
    e.printStackTrace();
}

2.3.4 ByteBuffer大小分配

每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer

ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer

  • 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现 http://tutorials.jenkov.com/java-performance/resizable-array.html
  • 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

2.4 文件编程

FileChannel只能工作在阻塞模式下,不能直接打开 FileChannel,必须通过 FileInputStream、FileOutputStream 或者 RandomAccessFile 来获取 FileChannel,它们都有 getChannel 方法

  • 通过 FileInputStream 获取的 channel 只能读
  • 通过 FileOutputStream 获取的 channel 只能写
  • 通过 RandomAccessFile 是否能读写根据构造 RandomAccessFile 时的读写模式决定

2.4.1 读取数据

会从 channel 读取数据填充 ByteBuffer,返回值表示读到了多少字节,-1 表示到达了文件的末尾

int readBytes = channel.read(buffer);

2.4.2 写入数据

ByteBuffer buffer = ...;
buffer.put(...); // 存入数据
buffer.flip();   // 切换读模式

while(buffer.hasRemaining()) {
    channel.write(buffer);
}

在 while 中调用 channel.write 是因为 write 方法并不能保证一次将 buffer 中的内容全部写入 channel

2.4.3 关闭channel

channel 必须关闭,不过调用了 FileInputStream、FileOutputStream 或者 RandomAccessFile 的 close 方法会间接地调用 channel 的 close 方法

2.4.4 channal位置

获取当前位置

long pos = channel.position();

设置当前位置

long newPos = ...;
channel.position(newPos);

设置当前位置时,如果设置为文件的末尾

  • 这时读取会返回 -1
  • 这时写入,会追加内容,但要注意如果 position 超过了文件末尾,再写入时在新内容和原末尾之间会有空洞(00)

2.4.5 强制写入

操作系统出于性能的考虑,会将数据缓存,不是立刻写入磁盘。可以调用 force(true) 方法将文件内容和元数据(文件的权限等信息)立刻写入磁盘

2.4.6 两个Channel传输数据

两个Channel传输数据可以使用transferTo方法,transferTo一次最多拷贝2g的数据,示例如下

String FROM = "helloword/data.txt";
String TO = "helloword/to.txt";
long start = System.nanoTime();
try (FileChannel from = new FileInputStream(FROM).getChannel();
     FileChannel to = new FileOutputStream(TO).getChannel();
    ) {
    // 效率高,底层会利用操作系统的零拷贝进行优化
    long size = from.size();
    // left 变量代表还剩余多少字节
    for (long left = size; left > 0; ) {
        left -= from.transferTo((size - left), left, to);
    }
} catch (IOException e) {
    e.printStackTrace();
}

2.5 网络编程

2.5.1 非阻塞 vs 阻塞

2.5.1.1 阻塞模式

阻塞模式下,相关方法都会导致线程暂停

  • ServerSocketChannel.accept 会在没有连接建立时让线程暂停
  • SocketChannel.read 会在没有数据可读时让线程暂停
  • 阻塞的表现其实就是线程暂停了,暂停期间不会占用 cpu,但线程相当于闲置

单线程下,阻塞方法之间相互影响,几乎不能正常工作,需要多线程支持。但多线程下,有新的问题,体现在以下方面

  • 32 位 jvm 一个线程 320k,64 位 jvm 一个线程 1024k,如果连接数过多,必然导致 OOM,并且线程太多,反而会因为频繁上下文切换导致性能降低
  • 可以采用线程池技术来减少线程数和线程上下文切换,但治标不治本,如果有很多连接建立,但长时间 inactive,会阻塞线程池中所有线程,因此不适合长连接,只适合短连接

服务端代码:

public static void main(String[] args) {
    try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
        int port = NetworkUtil.findAvailablePort(8080);
        ssc.bind(new InetSocketAddress(port));
        log.info("Server start at port {}", port);
        // 线程池
        ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 50, 1, TimeUnit.HOURS,
                                                         new LinkedBlockingDeque<>(20));
        while (true) {
            // 建立与客户端的连接, SocketChannel用来与客户端进行通信
            log.info("Waiting for client connect...");
            SocketChannel sc = ssc.accept();        // 阻塞方法
            log.info("Client connect at {}", sc.getRemoteAddress());
            pool.execute(() -> {
                try {
                    log.info("Thread handler {}", sc.getRemoteAddress());
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    while (sc.isConnected()) {
                        // 接收客户端发送的数据
                        int read = sc.read(buffer);     // 阻塞方法
                        if (read == -1) break;
                        String data = BufferUtil.readBuffer(buffer);
                        log.info("Receive data from client: {}", data);
                    }
                    log.info("Client Closed");
                } catch (IOException e) {
                    log.error("Error ", e);
                    IoUtil.close(sc);
                }
            });
        }
    } catch (IOException e) {
        log.error("Error ", e);
    }
}

客户端代码

public static void main(String[] args) {
    try (SocketChannel sc = SocketChannel.open()) {
        sc.connect(new InetSocketAddress("127.0.0.1", 8080));
        log.info("Client start");
        sc.write(Charset.defaultCharset().encode("123456"));
    } catch (IOException e) {
    }
}

2.5.1.2 非阻塞模式

非阻塞模式下,相关方法都会不会让线程暂停

  • 在ServerSocketChannel.accept在没有连接建立时,会返回 null,继续运行
  • SocketChannel.read 在没有数据可读时,会返回 0,但线程不必阻塞,可以去执行其它 SocketChannel 的 read 或是去执行 ServerSocketChannel.accept
  • 写数据时,线程只是等待数据写入 Channel 即可,无需等 Channel 通过网络把数据发送出去

但非阻塞模式下,即使没有连接建立,和可读数据,线程仍然在不断运行,白白浪费了 cpu。数据复制过程中,线程实际还是阻塞的(AIO 改进的地方)

客户段代码类似, 这里看服务端代码

public static void main(String[] args) {
    try (ServerSocketChannel ssc = ServerSocketChannel.open()) {
        ssc.configureBlocking(false);       // 设置为非阻塞模式
        int port = NetworkUtil.findAvailablePort(8080);
        ssc.bind(new InetSocketAddress(port));
        log.info("Server start at port {}", port);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        List<SocketChannel> channels = new ArrayList<>();
        while (true) {
            // 设置为非阻塞模式后, 不会阻塞accept方法, 接收不到连接会返回null
            SocketChannel sc = ssc.accept();
            if (sc != null) {
                log.info("Client connect at {}", sc.getRemoteAddress());
                sc.configureBlocking(false);    // 设置为非阻塞模式
                channels.add(sc);
            }
            for (SocketChannel channel : channels) {
                // 接收客户端发送的数据
                // 非阻塞模式下 read方法不会阻塞, 返回0, 表示没有数据可读
                int read = channel.read(buffer);
                if (read <= 0) continue;
                String data = BufferUtil.readBuffer(buffer);
                log.info("Receive data from client: {}", data);
            }
        }
    } catch (IOException e) {
        log.error("Error ", e);
    }
}

2.5.2 多路复用

单线程可以配合 Selector 完成对多个 Channel 可读写事件的监控,这称之为多路复用。多路复用仅针对网络 IO、普通文件 IO 没法利用多路复用。如果不用 Selector 的非阻塞模式,线程大部分时间都在做无用功,而 Selector 能够保证

  • 有可连接事件时才去连接
  • 有可读事件才去读取
  • 有可写事件才去写入

限于网络传输能力,Channel 未必时时可写,一旦 Channel 可写,会触发 Selector 的可写事件

2.5.2.1 创建Selector

Selector selector = Selector.open();

2.5.2.2 绑定Channel事件

channel.configureBlocking(false);
SelectionKey key = channel.register(selector, 绑定事件);
  • channel 必须工作在非阻塞模式
  • FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用

绑定的事件类型可以有

  • connect - 客户端连接成功时触发
  • accept - 服务器端成功接受连接时触发
  • read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
  • write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况

2.5.2.3 监听Channel事件

可以通过下面三种方法来监听是否有事件发生,方法的返回值代表有多少 channel 发生了事件

  • 阻塞直到绑定事件发生
int count = selector.select();
  • 阻塞直到绑定事件发生,或是超时(时间单位为 ms)
int count = selector.select(long timeout);
  • 不会阻塞,也就是不管有没有事件,立刻返回,自己根据返回值检查是否有事件
int count = selector.selectNow();

select何时不阻塞

  • 事件发生时

  1. 客户端发起连接请求,会触发 accept 事件

  2. 客户端发送数据过来,客户端正常、异常关闭时,都会触发 read 事件,另外如果发送的数据大于 buffer 缓冲区,会触发多次读取事件

  3. channel可写,会触发 write 事件

  4. 在 linux 下 nio bug 发生时

  • 调用 selector.wakeup()
  • 调用 selector.close()
  • selector 所在线程 interrupt

事件发生后,要么处理,要么取消(cancel),不能什么都不做,否则下次该事件仍会触发,这是因为 nio 底层使用的是水平触发

2.5.2.4 处理Accept事件

监听Accept可以使用

channel.register(selector, SelectionKey.OP_ACCEPT);

当有accept发生时

Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
    SelectionKey key = iter.next();
    if (key.isAcceptable()) {   // 处理Accept事件
        handleAccept(key, selector);
    } 
    iter.remove();
}

handleAccept方法

private static void handleAccept(SelectionKey key, Selector selector) throws IOException {
    ServerSocketChannel c = (ServerSocketChannel) key.channel();
    // 必须处理
    SocketChannel sc = c.accept();
    sc.configureBlocking(false);
    // 监听Read事件
    sc.register(selector, SelectionKey.OP_READ);
    log.info("Accepted connection from {}", sc.getRemoteAddress());
}

因为 select 在事件发生后,就会将相关的 key 放入 selectedKeys 集合,但不会在处理完后从 selectedKeys 集合中移除,需要我们自己编码删除。例如

  • 第一次触发了 ssckey 上的 accept 事件,没有移除 ssckey
  • 第二次触发了 sckey 上的 read 事件,但这时 selectedKeys 中还有上次的 ssckey ,在处理时因为没有真正的 serverSocket 连上了,就会导致空指针异常

2.5.2.5 处理Read事件

当有Read发生时

Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
while (iter.hasNext()) {
    SelectionKey key = iter.next();
    if (key.isReadable()) {  // 处理Read事件
        handleRead(key);
    }
    iter.remove();
}

handleRead方法

private static void handleRead(SelectionKey key) {
    try {
        SocketChannel sc = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(128);
        int read = sc.read(buffer);
        if(read == -1) {
            key.cancel();
            sc.close();
        } else {
            buffer.flip();
            log.info("Receive data: {}", Charset.defaultCharset().decode(buffer));
            buffer.clear();
        }
    } catch (IOException e) {
        log.error("Error ", e);
        // cancel 会取消注册在 selector 上的 channel,并从 keys 集合中删除 key 后续不会再监听事件
        key.cancel();
    }
}

上面handlerRead方法有一个问题——消息边界问题

比如我的ByteBuffer只有四个字节, 服务端传来一个"你好啊",明显超出4个字节了,结果就会变成这样:

00:46:03.803 [main] INFO io.adrainty.study.nio.selector.Server -- Receive data: 你�
00:46:03.803 [main] INFO io.adrainty.study.nio.selector.Server -- Receive data: ���
00:46:03.803 [main] INFO io.adrainty.study.nio.selector.Server -- Receive data: �

这个时候我们就要处理消息的边界

0023

  • 一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽
  • 另一种思路是按分隔符拆分,缺点是效率低
  • TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量
  • Http 1.1 是 TLV 格式
  • Http 2.0 是 LTV 格式
sequenceDiagram 
participant c1 as 客户端1
participant s as 服务器
participant b1 as ByteBuffer1
participant b2 as ByteBuffer2
c1 ->> s: 发送 01234567890abcdef3333\r
s ->> b1: 第一次 read 存入 01234567890abcdef
s ->> b2: 扩容
b1 ->> b2: 拷贝 01234567890abcdef
s ->> b2: 第二次 read 存入 3333\r
b2 ->> b2: 01234567890abcdef3333\r

首先ByteBuffer可以作为sc的附件

ByteBuffer buffer = ByteBuffer.allocate(16);
sc.register(selector, SelectionKey.OP_READ, buffer);

修改handleRead

private static void handleRead(SelectionKey key) {
    try {
        SocketChannel sc = (SocketChannel) key.channel();
        // 获取buffer的附件
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        int read = sc.read(buffer);
        if(read == -1) {
            key.cancel();
            sc.close();
            return;
        }
        split(buffer);
        if (buffer.position() == buffer.limit()) {  // 容量没有消耗完
            // 扩容
            ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
            buffer.flip();
            newBuffer.put(buffer);
            // 关联新的附件
            key.attach(newBuffer);
        }
    } catch (IOException e) {
        log.error("Error ", e);
        key.cancel();
    }
}

private static void split(ByteBuffer source) {
    source.flip();
    int oldLimit = source.limit();
    for (int i = 0; i < oldLimit; i++) {
        if (source.get(i) == '\n') {
            ByteBuffer target = ByteBuffer.allocate(i + 1 - source.position());
            // 0 ~ limit
            source.limit(i + 1);
            target.put(source); // 从source 读,向 target 写
            target.flip();
            log.info("Receive data: {}", Charset.defaultCharset().decode(target));
            target.clear();
            source.limit(oldLimit);
        }
    }
    source.compact();
}

2.5.2.6 处理Write事件

非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数),例如:

服务端:

public static void main(String[] args) {
    try (
        ServerSocketChannel ssc = ServerSocketChannel.open();
        Selector selector = Selector.open();
    ) {
        ssc.configureBlocking(false);
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        int port = NetworkUtil.findAvailablePort(8080);
        log.info("server start at port {}", port);
        ssc.bind(new InetSocketAddress(port));

        while (true) {
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isAcceptable()) {
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);

                    StringBuilder sb = new StringBuilder();
                    // 向客户端发送大量数据
                    for (int i = 0; i < 3000000; i++) {
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    // 返回值代表实际写入的字节数
                    while (buffer.hasRemaining()){
                        int write = sc.write(buffer);
                        log.info("write {} bytes", write);
                    }
                }
            }
        }
    } catch (IOException e) {
    }
}

客户端:

public static void main(String[] args) {
    try (SocketChannel sc = SocketChannel.open()) {
        sc.connect(new InetSocketAddress("127.0.0.1", 8080));
        // 接收数据
        int count = 0;
        while (true) {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            count += sc.read(buffer);
            log.info("接收到 {} 字节", count);
            buffer.clear();
        }
    } catch (IOException e) {
    }
}

运行结果:

14:23:59.894 [main] INFO io.adrainty.study.nio.selector.WriteServer -- server start at port 8080
14:24:02.589 [main] INFO io.adrainty.study.nio.selector.WriteServer -- write 2883562 bytes
14:24:02.592 [main] INFO io.adrainty.study.nio.selector.WriteServer -- write 0 bytes
14:24:02.592 [main] INFO io.adrainty.study.nio.selector.WriteServer -- write 0 bytes
14:24:02.592 [main] INFO io.adrainty.study.nio.selector.WriteServer -- write 0 bytes
......
14:24:02.607 [main] INFO io.adrainty.study.nio.selector.WriteServer -- write 0 bytes
14:24:02.608 [main] INFO io.adrainty.study.nio.selector.WriteServer -- write 0 bytes
14:24:02.608 [main] INFO io.adrainty.study.nio.selector.WriteServer -- write 116438 bytes

这一直写入0肯定不符合要求,因为一直卡在while循环里面,处理不了其他请求的事件了,因此我们需要监听可写事件

用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略

  • 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上
  • selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册
  • 如果不取消,会每次可写均会触发 write 事件

修改代码如下:

while (true) {
    selector.select();
    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
    while (iter.hasNext()) {
        SelectionKey key = iter.next();
        iter.remove();
        if (key.isAcceptable()) {
            SocketChannel sc = ssc.accept();
            sc.configureBlocking(false);
            SelectionKey sckey = sc.register(selector, 0, null);

            StringBuilder sb = new StringBuilder();
            // 向客户端发送大量数据
            for (int i = 0; i < 3000000; i++) {
                sb.append("a");
            }
            ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
            // 返回值代表实际写入的字节数
            int write = sc.write(buffer);
            log.info("write {} bytes", write);
            if (buffer.hasRemaining()) {
                // 如果还有剩余未写入的字节,则继续注册OP_WRITE事件
                sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
                // 将剩余未写入的字节保存到attachment中
                sckey.attach(buffer);
            }
        } else if (key.isWritable()) {
            SocketChannel sc = (SocketChannel) key.channel();
            ByteBuffer buffer = (ByteBuffer) key.attachment();
            int write = sc.write(buffer);
            log.info("write {} bytes", write);
            // 清理
            if (!buffer.hasRemaining()) {
                key.attach(null);
                key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
            }
        }
    }
}

2.5.2.7 多线程优化

前面的代码只有一个选择器,没有充分利用多核 cpu,如何改进呢?

分两组选择器

  • 单线程配一个选择器,专门处理 accept 事件
  • 创建 cpu 核心数的线程,每个线程配一个选择器,轮流处理 read 事件

Runtime.getRuntime().availableProcessors() 如果工作在 docker 容器下,因为容器不是物理隔离的,会拿到物理 cpu 个数,而不是容器申请时的个数。这个问题直到 jdk 10 才修复,使用 jvm 参数 UseContainerSupport 配置, 默认开启

@Slf4j
public class MultiThreadServer {

    public static void main(String[] args) {
        Thread.currentThread().setName("boss");
        try (
                ServerSocketChannel ssc = ServerSocketChannel.open();
                Selector boss = Selector.open();
        ) {
            ssc.configureBlocking(false);
            SelectionKey bossKey = ssc.register(boss, 0, null);
            bossKey.interestOps(SelectionKey.OP_ACCEPT);
            int port = 8080;
            port = NetworkUtil.findAvailablePort(port);
            ssc.bind(new InetSocketAddress(port));
            log.info("server start at port {}", port);
            MultiThreadWorkerFactory workerFactory = MultiThreadWorkerFactory.createWorkerFactory(5);
            while (true) {
                boss.select();
                Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();
                    if (key.isAcceptable()) {
                        SocketChannel sc = ssc.accept();
                        sc.configureBlocking(false);
                        log.info("accept a connection from {}", sc.getRemoteAddress());
                        MultiThreadWorker worker = workerFactory.getWorker();
                        worker.register(sc);
                    }
                }
            }
        } catch (IOException e) {
            log.error("", e);
        }
    }

}
@Slf4j
public class MultiThreadWorker implements Runnable {

    private Selector selector;

    private final String name;

    private volatile boolean initialized;

    private final ThreadPoolExecutor poolExecutor;

    private final Deque<Runnable> tasks;

    public MultiThreadWorker(String name, ThreadPoolExecutor pool) {
        this.name = name;
        this.poolExecutor = pool;
        this.initialized = false;
        this.tasks = new ConcurrentLinkedDeque<>();
    }

    public void register(SocketChannel sc) throws IOException {
        if (!initialized) {
            init();
        }
        tasks.add(() -> {
            try {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                sc.register(selector, SelectionKey.OP_READ, buffer);
                selector.selectNow();
            } catch (IOException e) {
                log.error("", e);
            }
        });
        // 唤醒selector方法
        selector.wakeup();
    }

    private void init() throws IOException {
        this.selector = Selector.open();
        poolExecutor.execute(this);
        initialized = true;
    }

    @Override
    public void run() {
        while (true) {
            try {
                selector.select();
                Runnable task = tasks.poll();
                if (task != null) {
                    task.run();
                }
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();
                    if (key.isReadable()) {
                        log.info("{} receive a read event", name);
                        handleRead(key);
                    }
                }
            } catch (IOException e) {
                log.error("", e);
            }
        }
    }
}
public class MultiThreadWorkerFactory {

    private final List<MultiThreadWorker> workers;

    private final int workNums;

    private int curIndex = 0;

    private MultiThreadWorkerFactory(List<MultiThreadWorker> workers) {
        this.workers = workers;
        this.workNums = workers.size();
    }

    public MultiThreadWorker getWorker() {
        MultiThreadWorker worker = workers.get(curIndex);
        curIndex = (curIndex + 1) % workNums;
        return worker;
    }

    public static MultiThreadWorkerFactory createWorkerFactory(int workNums) {
        ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 50, 1, TimeUnit.HOURS,
                new LinkedBlockingDeque<>(20));
        return createWorkerFactory(workNums, pool);
    }

    public static MultiThreadWorkerFactory createWorkerFactory(int workNums, ThreadPoolExecutor poolExecutor) {
        List<MultiThreadWorker> workerList = new ArrayList<>(workNums);
        for (int i = 0; i < workNums; i++) {
            workerList.add(new MultiThreadWorker(String.format("worker-%s", i), poolExecutor));
        }
        return new MultiThreadWorkerFactory(workerList);
    }

}

2.6 NIO vs BIO

2.6.1 stream vs channel

  • stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)
  • stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用
  • 二者均为全双工,即读写可以同时进行

2.6.2 零拷贝

2.6.2.1 传统IO问题

我们来看一段代码

File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");

byte[] buf = new byte[(int)f.length()];
file.read(buf);

Socket socket = ...;
socket.getOutputStream().write(buf);

内部工作流程是这样的:

  1. java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 java 程序的用户态切换至内核态,去调用操作系统(Kernel)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access)来实现文件读,其间也不会使用 cpu

    DMA 也可以理解为硬件单元,用来解放 cpu 完成文件 IO

  2. 内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA

  3. 调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,cpu 会参与拷贝

  4. 接下来要向网卡写数据,这项能力 java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

可以看到中间环节较多,java 的 IO 实际不是物理设备级别的读写,而是缓存的复制,底层的真正读写是操作系统来完成的

  • 用户态与内核态的切换发生了 3 次,这个操作比较重量级
  • 数据拷贝了共 4 次

2.6.2.2 NIO优化

**通过 DirectByteBuf **

  • ByteBuffer.allocate(10) HeapByteBuffer 使用的还是 java 内存
  • ByteBuffer.allocateDirect(10) DirectByteBuffer 使用的是操作系统内存

大部分步骤与优化前相同,不再赘述。唯有一点:java 可以使用 DirectByteBuf 将堆外内存映射到 jvm 内存中来直接访问使用

  • 这块内存不受 jvm 垃圾回收的影响,因此内存地址固定,有助于 IO 读写
  • java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成两步
    • DirectByteBuf 对象被垃圾回收,将虚引用加入引用队列
    • 通过专门线程访问引用队列,根据虚引用释放堆外内存
  • 减少了一次数据拷贝,用户态与内核态的切换次数没有减少

进一步优化( linux 2.1)

两个 channel 调用 transferTo/transferFrom 方法拷贝数据(底层采用了 linux 2.1 后提供的 sendFile 方法)

  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
  2. 数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝
  3. 最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

可以看到

  • 只发生了一次用户态与内核态的切换
  • 数据拷贝了 3 次

进一步优化(linux 2.4)

  1. java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
  2. 只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
  3. 使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu

整个过程仅只发生了一次用户态与内核态的切换,数据拷贝了 2 次。所谓的零拷贝,并不是真正无拷贝,而是在不会拷贝重复数据到 jvm 内存中,零拷贝的优点有

  • 更少的用户态与内核态的切换
  • 不利用 cpu 计算,减少 cpu 缓存伪共享
  • 零拷贝适合小文件传输

2.6.3 AIO

AIO 用来解决数据复制阶段的阻塞问题

  • 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
  • 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果

异步模型需要底层操作系统(Kernel)提供支持

  • Windows 系统通过 IOCP 实现了真正的异步 IO
  • Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势

2.6.3.1 文件AIO

@Slf4j
public class AioDemo1 {

    public static void main(String[] args) throws IOException {
        try (
                AsynchronousFileChannel s = AsynchronousFileChannel.open(
                Paths.get("1.txt"), StandardOpenOption.READ))
        {
            ByteBuffer buffer = ByteBuffer.allocate(16);
            // 参数1 ByteBuffer
            // 参数2 读取的起始位置
            // 参数3 附件
            // 参数4 回调对象
            s.read(buffer, 0, buffer, new CompletionHandler<>() {
                @Override
                public void completed(Integer result, ByteBuffer attachment) {  // 读取正确完毕
                    log.debug("read completed...{}", result);
                    BufferUtil.readBuffer(buffer);
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {  // 读取出现异常
                    log.debug("read failed...");
                }
            });

        } catch (IOException e) {
            log.error("read error...", e);
        }
        System.in.read();
    }
}

响应文件读取成功的是另一个线程,主线程并没有 IO 操作阻塞

默认文件 AIO 使用的线程都是守护线程,所以最后要执行 System.in.read() 以避免守护线程意外结束

2.6.3.2 网络AIO

@Slf4j
public class AioServer {
    public static void main(String[] args) throws IOException {
        try (AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open()) {
            ssc.bind(new InetSocketAddress(8080));
            ssc.accept(null, new AcceptHandler(ssc));
            System.in.read();
        }
    }

    private static void closeChannel(AsynchronousSocketChannel sc) {
        try {
            log.info("[{}] {} close", Thread.currentThread().getName(), sc.getRemoteAddress());
            sc.close();
        } catch (IOException e) {
            log.error("close channel error", e);
        }
    }

    private record ReadHandler(AsynchronousSocketChannel sc) implements CompletionHandler<Integer, ByteBuffer> {

        @Override
            public void completed(Integer result, ByteBuffer attachment) {
                try {
                    if (result == -1) {
                        closeChannel(sc);
                        return;
                    }
                    log.info("[{}] {} read", Thread.currentThread().getName(), sc.getRemoteAddress());
                    attachment.flip();
                    log.info("{}", Charset.defaultCharset().decode(attachment));
                    attachment.clear();
                    // 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
                    sc.read(attachment, attachment, this);
                } catch (IOException e) {
                    log.error("read error", e);
                }
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                closeChannel(sc);
                log.error("read error", exc);
            }
        }

    private record WriteHandler(AsynchronousSocketChannel sc) implements CompletionHandler<Integer, ByteBuffer> {

        @Override
            public void completed(Integer result, ByteBuffer attachment) {
                // 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容
                if (attachment.hasRemaining()) {
                    sc.write(attachment);
                }
            }
    
            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                log.error("write error", exc);
                closeChannel(sc);
            }
        }

    private record AcceptHandler(AsynchronousServerSocketChannel ssc) implements CompletionHandler<AsynchronousSocketChannel, Object> {

        @Override
            public void completed(AsynchronousSocketChannel sc, Object attachment) {
                try {
                    log.info("[{}] {} connected", Thread.currentThread().getName(), sc.getRemoteAddress());
                } catch (IOException e) {
                    log.error("get remote address error", e);
                }
                ByteBuffer buffer = ByteBuffer.allocate(16);
                // 读事件由 ReadHandler 处理
                sc.read(buffer, buffer, new ReadHandler(sc));
                // 写事件由 WriteHandler 处理
                sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
                // 处理完第一个 accept 时,需要再次调用 accept 方法来处理下一个 accept 事件
                ssc.accept(null, this);
            }
    
            @Override
            public void failed(Throwable exc, Object attachment) {
                log.error("accept error", exc);
            }
        }
}

3 Netty入门

3.1 Netty概述

Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端

基于事件驱动:底层使用了Selector,多路复用

异步的:这里并不是指异步IO模型,只要指使用多线程,将调用结果和接收结果区分开来

Netty基于NIO,相比NIO,解决了以下问题

  • 需要自己构建协议
  • 解决 TCP 传输问题,如粘包、半包
  • epoll 空轮询导致 CPU 100%
  • 对 API 进行增强,使之更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer

3.2 快速开始

加入依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.39.Final</version>
</dependency>

服务器端代码:

// ServerBootstrap服务端的启动器, 负责组装Netty组件, 启动服务器
new ServerBootstrap()
    // BossEventLoopGroup, WorkerEventLoopGroup(Selector, thread)
    .group(new NioEventLoopGroup())
    // 选择服务器ServerSocketChannel实现
    .channel(NioServerSocketChannel.class)
    // childHandler: boss负责处理连接, worker负责处理读写, 决定了worker(child)能执行那些操作(handler)
    // Channel: 客户端进行数据处理的通道, lInitializer: 初始化, 负责添加别的handler
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) throws Exception {
            ch.pipeline()
                .addLast(new StringDecoder())   // 将ByteBuf转换成字符串
                .addLast(new ChannelInboundHandlerAdapter() {   // 添加自定义handler
                    @Override       // 读事件
                    public void channelRead(ChannelHandlerContext ctx, Object msg) {
                        // 打印上一步转换好的字符串
                        log.info("receive msg: {}", msg);
                    }
                });
        }
    })
    // 绑定监听端口
    .bind(8080);

客户端代码:

// 创建客户端启动器
new Bootstrap()
    // 添加EventLoop
    .group(new NioEventLoopGroup())
    // 选择客户端channel实现
    .channel(NioSocketChannel.class)
    // 添加处理器
    .handler(new ChannelInitializer<NioSocketChannel>() {
        @Override       // 在连接建立后会调用
        protected void initChannel(NioSocketChannel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    // 连接到服务器
    .connect("127.0.0.1", 8080)
    // 阻塞方法, 直到连接建立
    .sync()
    // 获取连接对象
    .channel()
    // 向服务器发送数据
    .writeAndFlush("hello, world");

3.3 Netty的组件

3.3.1 EventLoop

EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。

它的继承关系比较复杂

  • 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
  • 另一条线是继承自 netty 自己的 OrderedEventExecutor,提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop提供了 parent 方法来看看自己属于哪个 EventLoopGroup

EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)

常用的EventLoopGroup有

  • NioEventLoopGroup:处理IO事件、普通任务、定时任务
  • DefaultEventLoopGroup:处理普通任务、定时任务

EventLoopGroup有一个参数的构造器,表示这个线程池有多少个线程,默认是当前CPU核心数 * 2

public NioEventLoopGroup() {
    this(0);
}

// ...

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

执行普通任务可以如下:

public void testNormalEventGroup() {
    NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2);
    log.debug("server start...");
    nioWorkers.execute(()->{
        log.debug("normal task...");
    });
}

执行定时任务可以如下:

public void testCronEventGroup() {
    NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2);

    // 参数1: Runnable, 执行的任务
    // 参数2: 初始延迟时间, 延迟多久执行
    // 参数3: 时间周期, 多久执行一次
    // 参数4: 时间单位
    nioWorkers.scheduleAtFixedRate(() -> {
        log.debug("running...");
    }, 0, 1, TimeUnit.SECONDS);
}

处理IO事件可以类似于3.2

之前我们group用了一个参数,Boss和Worker都使用同一个NioEventLoopGroup,group还可以指定两个参数,第一个参数是Boss的NioEventLoopGroup(处理Accept事件),第二个参数是Worker的NioEventLoopGroup(处理读写事件)

new ServerBootstrap()
                .group(new NioEventLoopGroup(), new NioEventLoopGroup())

ChannelHandler也可以指定当前handler用的是哪一个eventGroup

EventLoopGroup group = new DefaultEventLoop();
new ServerBootstrap()
    .group(new NioEventLoopGroup(), new NioEventLoopGroup())
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<NioSocketChannel>() {
        @Override
        protected void initChannel(NioSocketChannel ch) throws Exception {
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    log.info(buf.toString(Charset.defaultCharset()));
                    // 这个方法表示交给下一个handler处理,如果不加会在这里直接结束
                    ctx.fireChannelRead(msg);
                }
                // 指定这个handler使用group线程来执行
            }).addLast(group, new ChannelInboundHandlerAdapter(){
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    log.info(buf.toString(Charset.defaultCharset()));
                }
            });
        }
    })
    .bind(8080);

hanler执行过程中,可能会涉及到不同EventLoop的切换,比如EventLoop1执行完之后,是怎么交给EventLoop2的呢?这里我们看一下源码

// ChannelInboundHandlerAdapter.java
@Skip
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.fireChannelRead(msg);
}

// AbstractChannelHandlerContext.java
public ChannelHandlerContext fireChannelRead(Object msg) {
    invokeChannelRead(this.findContextInbound(32), msg);
    return this;
}

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // 返回下一个handler的eventLoop
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {	// 当前handler中的线程是否和eventLoop是同一个线程
        next.invokeChannelRead(m);	// 同一个线程,直接方法调用
    } else {						// 不是同一个线程
        executor.execute(new Runnable() {	// 将要执行低代码作为任务提交给下一个事件循环处理
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }

}

3.3.2 Channel

channel 的主要方法有

  • close() 可以用来关闭 channel
  • closeFuture() 用来处理 channel 的关闭
    • sync 方法作用是同步等待 channel 关闭
    • 而 addListener 方法是异步等待 channel 关闭
  • pipeline() 方法添加处理器
  • write() 方法将数据写入
  • writeAndFlush() 方法将数据写入并刷出

3.3.2.1 ChannelFuture

ChannelFuture channelFuture = new Bootstrap()
    .group(new NioEventLoopGroup())
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<Channel>() {
        @Override
        protected void initChannel(Channel ch) {
            ch.pipeline().addLast(new StringEncoder());
        }
    })
    .connect("127.0.0.1", 8080); // 1

connect方法是异步非阻塞的,返回的是 ChannelFuture 对象,它的作用是利用 channel() 方法来获取 Channel 对象,由main方法发起了调用,真正执行的方法是NioEventLoop,onnect 方法是异步的,意味着不等连接建立,方法执行就返回了。因此 channelFuture 对象中不能立刻获得到正确的 Channel 对象。

System.out.println(channelFuture.channel()); // 1
channelFuture.sync(); // 2
System.out.println(channelFuture.channel()); // 3
  • 执行到 1 时,连接未建立,打印 [id: 0x2e1884dd]
  • 执行到 2 时,sync 方法是同步等待连接建立完成
  • 执行到 3 时,连接肯定建立了,打印 [id: 0x2e1884dd, L:/127.0.0.1:57191 - R:/127.0.0.1:8080]

使用sync方法同步处理结果, 会阻塞在当前线程, 直到nio线程连接建立完毕

除了调用sync方法外,还可以使用addListener方法异步处理结果

channelFuture.addListener((ChannelFutureListener) channelFuture1 -> {
    Channel channel = channelFuture1.channel();
    channel.writeAndFlush("Hello, World");
});

我们如果想关闭channelFuture怎么操作呢

评论交流

文章目录