1. 概述
Reactor 模式是一种服务器网络编程模式,它根据网络数据接收的特点,将连接的建立、网络数据的读写分离,用 mainReactor 线程处理网络的连接,用 subReactor 处理数据的读写,同时为了有效利用 CPU 多核的优势,subActor 可以有多个。它的整体结构如下图所示:
特点:
- 客户端的所有连接请求统一由 mainReactor 线程处理,同时将收到请求转交 subReactor 处理;
- subReactor 线程处理连接的读写,为了实现处理的负载,可以有多个 subReactor,通过一定的算法分配网络连接;
- 考虑到连接的 I/O 读写比较耗时,为了提高吞吐量,读写操作可以交由线程池处理。
说明:
文中说到的“网络连接”与下文说到的 “channel” 和 “socketChannel” 是一个概念。
另外,这篇文章主要包含三个部分的内容:1)Reactor 概念的介绍;2)Reactor 的模拟;3)Netty 中的实现;现在我们用 Java 模拟一个 Reactor 的实现。
2. 原理
2.1 MainReactor
我们以一个例子来模拟一个 Reactor,先看 MainReactor 类的代码,它主要的功能是监听 9090 端口接收网络连接,并将网络请求注册到 SubReactor 类,代码如下所示:
1 | public class MainReactor implements Runnable { |
MainReactor 有几个主要的属性:
- Selector:Selector 对象,用于实现网络 I/O 事件的监听,它只监听网络请求事件;
- ServerSocketChannel:服务器套接字,用于接收网络请求;
- SelectorManager:用于分配 SocketChannel 到 subReactor,SelectorManager 存有多个 subReactor 对象。
2.2 SubReactor
SubReactor 主要是处理 SocketChannel 的读写,代码如下所示:
1 | public class SubReactor implements Runnable { |
SubReactor 的功能主要是负载监听 SocketChannel 的读写事件,然后分发给线程池去处理。
2.3 Channel 的分配
MainReactor 接收到新的连接,会产生一个 SocketChannel 对象,按照一定的算法分配给 SubReactor。这个分配主要由 SelectorManager 对象完成,我们分析下其代码:
1 |
|
在这里,SelectorManager 的功能主要包括两个方面:1)创建及初始化 SubReactor 数组;2)根据轮洵算法分配 Channel。
3. 实现
Netty 实现了 Reactor 模式,其整体结构如下所示:
在 Netty 服务启动的时候会配置两个 EventLoopGroup bossGroup 和 WrokerGroup,EventLoopGroup 可以包含一个或多处 EventLoop,每一个 EventLoop 包含一个 Selector (也可能是 epoll,取决于实现)对象,同时它是一个独立的线程,可独立负载 I/O 请求。对比 Reactor,bossGroup 相当于 MainReactor,这负责监听网络的连接请求(生成 SocketChannle对象),并将其分配给 workerGroup,在这里,只包含一个 EventLoop;workerGroup 相当于 subReactor,监听连接的读写请求。下面分析下 Netty 中关于 EventLoopGroup 的代码实现。
3.1 初始化
1、线程数设置
1 | // 配置 EventLoop |
在这里使用的是 NioEventLoopGroup,bossGroup 设置的线程数为 1,而 workerGroup 没有设置线程数,使用默认配置的数量:2 * cpu size。
1 | protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) { |
2、创建 EventLoop 数组
NioEventLoopGroup 是 EventLoop 对象的容器集合,持有多个 EventLoop 对象,它的数量与线程数量一致,同时 NioEventLoopGroup 负责分配 SocketChannel,需要有一个分配的策略对象,这些是在其父类的构造函数中实现。
1 | protected MultithreadEventExecutorGroup(int nThreads, Executor executor, |
在 MultithreadEventExecutorGroup 构造函数中,主要做了三个工作:
- 定义 EventLoop 中的线程执行器,每一个 EventLoop 都包含一个线程,其线程由 ThreadPerTaskExecutor 生成;
- 初始化及生成 EventLoop 数组 ,newChild 方法由子类来实现,不同的模式有不同的实现;
- 定义 channel 的分配策略,根据 EventLoop 的数量有不同的实现。
在 NioEventLoopGroup 中,newChild 实现代码所示:1
2
3
4
5protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
newChild 方法的细节在后面的文章中再进行介绍。
3、Channel 分配策略
channel 的分配策略有两种,分别是:PowerOfTwoEventExecutorChooser 和 GenericEventExecutorChooser,它们本质上都是轮洵算法,只是当 EventLoop 的数量是 2 的幂次方时,对算法做了优化,使用位操作代替取余操作。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
// 使用位操作
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicLong idx = new AtomicLong();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
// 使用取余操作
public EventExecutor next() {
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
}
3.2 Channel 注册
在 Nio 模式下,Channel 有两种类型,分别是:NioServerSocketChannel 和 NioSocketChannel,其中 NioServerSocketChannel 用于监听网络连接请求,生成 NioSocketChannel 连接,该 Channle 注册到 BossGroup 的 EventLoop 中,而 NioSocketChannel 负责真正的网络读写,注册到 WorkerGroup 的 EventLoop 中。
1、NioServerSocketChannel 注册
在 Netty 的服务器启动过程中,主要的流程是一个 bind 操作,其流程包括:
- 创建 NioServerSocketChannel 类,完成初始化的工作,其中包括添加 ChannelHandler 类;
- 将 NioServerSocketChannel 注册到 EventLoop 中,同时向 Selector 对象中注册,不过此时并没有注册 OP_ACCEPT 事件;
- 执行网络层的 bind 操作;
- 执行读操作,主要是向 Selector 注册 OP_ACCEPT 事件。执行该操作后,便可接收网线的连接请求了。
NioServerSocketChannel 注册穿插在上面的 4 个步骤中,主要包括 1)将 NioServerSocketChannel 注册到 EventLoop 中;2)向 Selector 对象注册 OP_ACCEPT 事件。
1 | // AbstractUnsafe 对象中的 register 操作 |
在上面的代码可以看到 registor 操作主要是分配一个 EventLoop,并将 EventLoop 赋值给 NioServerSocketChannel。向 Selector 注册 Channel 则分为两次,第一次注册时事件参数为 0,等于没有注册任何事件;第二次是在底层 Channel bind 操作之后,准备就绪之后,再注册 OP_ACCEPT 事件。
2、NioSocketChannel 注册
在 bind 操作的流程中,第一步是创建 NioServerSocketChannel 类,并进行初始化,此时会注册 ChannelHandler 类,其中就有一个 ServerBootstrapAcceptor handler 类,它的主要功能就是收到网络请求之后对NioSocketChannel 类进行参数配置,将其注册到 workerGroup 中,其核心代码如下所示:
1 | public void channelRead(ChannelHandlerContext ctx, Object msg) { |
NioSocketChannel 和 NioServerSocketChannel 注册流程是一致的,差别只是注册到不同的 EventLoopGroup 及注册不同的 I/O 事件,其中 NioSocketChannel 注册的是 OP_READ 事件,而 NioServerSocketChannel 注册的是 OP_ACCEPT 事件。
3.3 事件循环
EventLoop 本质是一个事件循环,不断地从 Selector (Epoll) 对象中获取 I/O 事件,执行解码/反序列化操作后,再分发到上层的业务线程进行处理。另外一方面它可以执行用户自定义任务,如定时进行 Channel 空闲状态的检测,其核心代码如下所示:
1 | protected void run() { |
从上面的代码可以看到,根据计算的执行策略,可以为 I/O 事件处理及自定义任务分配不同的执行时间,详尽的代码在后面的文章介绍。
1 | private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { |
I/O 事件的处理本质是处理 channel 的各种 I/O 事件,其中将 OP_ACCEPT 抽象为 Netty的 read 事件,可以理解为读取的数据是 NioSocketChannel 对象,其代码如下所示:
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = this.javaChannel().accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable var6) {
logger.warn("Failed to create a new channel from an accepted socket.", var6);
try {
ch.close();
} catch (Throwable var5) {
logger.warn("Failed to close a socket.", var5);
}
}
return 0;
}
可以看出来,NioServerSocketChannel 收到 OP_ACCEPT 事件后,会生成 SocketChannel 对象,然后通过 ServerBootstrapAcceptor handle 类处理后,注册到 workerGroup 中,再监听 SocketChannel 对象的 OP_READ 事件,最终实现网络数据的读写。
4. 总结
通过对 Netty 中 Reactor 模型的分析,对 Netty 的线程模型及 I/O 的事件处理有了一个初步的认识,后续的文章将对涉及到的模块进行详尽的分析,希望能够深入理解 Netty 的设计思路。