Netty 线程模型是指 Netty 框架为了提供高性能、高并发的网络通信,而设计的管理和利用线程的策略和机制。
Netty 线程模型被称为 Reactor(响应式)模型/模式,它是基于 NIO 多路复用模型的一种升级,它的核心思想是将 IO 事件和业务处理进行分离,使用一个或多个线程来执行任务的一种机制。
1.Reactor三大组件Reactor 包含以下三大组件:
其中:
Reactor(反应器):Reactor 负责监听和分发事件,它是整个 Reactor 模型的调度中心。Reactor 监视一个或多个输入通道,如监听套接字上的连接请求或读写事件。当检测到事件发生时,Reactor 会将其分发给预先注册的处理器(Handler)进行处理。在 Netty 中,这个角色经常是由 EventLoop 或其相关的 EventLoopGroup 来扮演,它们负责事件的循环处理、任务调度和 I/O 操作。Acceptor(接收器):用于处理 IO 连接请求。当 Reactor 检测到有新的客户端连接请求时,会通知 Acceptor,后者通过 accept() 方法接受连接请求,并创建一个新的 SocketChannel(在 Netty 中是 Channel)来表示这个连接。随后,Acceptor 通常会将这个新连接的 Channel 注册到 Worker Reactor 或 EventLoop 中,以便进一步处理该连接上的读写事件。Handlers(处理器):Handlers 负责具体的事件处理逻辑,即执行与事件相关的业务操作。在 Netty 中,Handler 是一个或多个 ChannelHandler 的实例,它们形成一个责任链(ChannelPipeline),每个 Handler 负责处理一种或一类特定的事件(如解码、编码、业务逻辑处理等)。数据或事件在 ChannelPipeline 中从一个 Handler 传递到下一个,直至处理完毕或被消费。Handler 可以分为入站(inbound)和出站(outbound)两种,分别处理流入的数据或流出的数据。2.Reactor三大模型Reactor 模式支持以下三大模型:
单线程模型多线程模型主从多线程模型具体内容如下。
2.1 单线程模型在单线程模型中,所有的事件处理操作都由单个 Reactor 实例在单个线程下完成。Reactor 负责监控事件、分发事件和执行事件处理程序(Handlers),如下图所示:
单线程模型的实现 Demo 如下:
// 假设有一个单线程的Reactor,负责监听、接收连接、读写操作class SingleThreadReactor { EventLoop eventLoop; // 单个事件循环线程 SingleThreadReactor() { eventLoop = new EventLoop(); // 初始化单个事件循环 } void start(int port) { ServerSocketChannel serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); // 绑定端口 eventLoop.execute(() -> { // 在事件循环中执行 while (true) { SocketChannel clientSocket = serverSocket.accept(); // 接受连接 if (clientSocket != null) { handleConnection(clientSocket); // 处理连接 } } }); eventLoop.run(); // 启动事件循环 } void handleConnection(SocketChannel clientSocket) { // 读写操作,这里简化处理 ByteBuffer buffer = ByteBuffer.allocate(1024); while (clientSocket.read(buffer) > 0) { // 处理读取的数据 buffer.flip(); // 假设处理数据逻辑... buffer.clear(); } // 写操作逻辑类似 }}优缺点分析优点:简单、线程安全性好、适合编写简单的网络应用。缺点:处理能力受限于单个线程的处理能力,无法充分利用多核 CPU,可能会影响性能。2.2 多线程模型在多线程模型中,连接 Acceptor 和业务处理(Handlers)是由不同线程分开执行的,其中 Handlers 是由线程池(多个线程)来执行的,如下图所示:
多线程模型的实现 Demo 如下:
// 假设有两个线程,一个用于监听连接,一个用于处理连接后的操作class MultiThreadReactor { EventLoop acceptLoop; EventLoop workerLoop; MultiThreadReactor() { acceptLoop = new EventLoop(); // 接收连接的线程 workerLoop = new EventLoop(); // 处理连接的线程 } void start(int port) { ServerSocketChannel serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); acceptLoop.execute(() -> { // 在接受线程中监听 while (true) { SocketChannel clientSocket = serverSocket.accept(); if (clientSocket != null) { workerLoop.execute(() -> handleConnection(clientSocket)); // 将新连接交给工作线程处理 } } }); acceptLoop.run(); // 启动接受线程 workerLoop.run(); // 启动工作线程 } // handleConnection 方法与单线程模型中的相同}优缺点分析优点:此模式可以提高并发性能,充分利用多核 CPU,并且保持简单的编程模型。缺点:多线程数据共享和数据同步比较复杂,并且 Reactor 需要处理所有事件监听和响应,在高并发场景依然会出现性能瓶颈。2.3 主从多线程模型主从多线程模型是一个主 Reactor 线程加多个子 Reactor 子线程,以及多个工作线程池来处理业务的,如下图所示:
主从多线程模型的实现 Demo 如下:
import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;public MainReactorModel { public static void main(String[] args) { // 主Reactor,用于接受连接 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 从Reactor,用于处理连接后的读写操作 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { // 在这里添加业务处理器,如解码器、编码器、业务逻辑处理器 ch.pipeline().addLast(new MyBusinessHandler()); } }); ChannelFuture future = bootstrap.bind(8080).sync(); System.out.println("Server started at port 8080"); future.channel().closeFuture().sync(); // 等待服务器关闭 } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}优缺点分析优点:可以充分利用多核 CPU 的资源,提高系统的整体性能和并发处理能力。缺点:模型相对复杂,实现和维护成本较高。课后思考NioEventLoop 是如何实现的?它能够保证 Channel 操作的线程安全吗?为什么?
本文已收录到我的面试小站 [www.javacn.site](https://www.javacn.site),其中包含的内容有:Redis、JVM、并发、并发、MySQL、Spring、Spring MVC、Spring Boot、Spring Cloud、MyBatis、设计模式、消息队列等模块。