1. 概述
EventLoop 是 Reactor 模式中的执行者,首先它持有 Selector 对象,监听多路 SocketChannel 的网络 I/O 事件,并对 I/O 事件分发处理。同时,它持有一个 Thread 对象,除了监听网络 I/O 事件, EventLoop 也可以执行提交的任务,包括定时任务,总结来说,EventLoop 具有如下三大功能:
- 负责监听 SocketChannel 对象的 I/O 事件;
- 处理分发 I/O 事件;
- 执行任务,包括定时任务。
整体的处理流程如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31protected void run() {
int selectCnt = 0;
for (;;) {
try {
// 1、监听 Channel 网络事件
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
final int ioRatio = this.ioRatio;
final long ioStartTime = System.nanoTime();
// 2、处理网络事件
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
// 3、执行任务
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} catch (Throwable t) {
handleLoopException(t);
} finally {
// ...
}
}
}
说明: 代码以 NioEventLoop 为例。
2. 流程
上文分析了 EventLoop 的主要功能,在这部分内容中主要讲述这三大功能及线程的创建,我们先从线程的创建开始。
2.1 线程创建
线程相关的变量:
1 | // EventLoop 线程对象 |
EventLoop 中持有一个线程的引用,在第一次执行任务的时候启动,提供任务的线程如果已经是 EventLoop 线程,将任务提交给任务即可,如果是非 EventLoop 线程,则需要启动 EventLoop 线程。
1 | private void execute(Runnable task, boolean immediate) { |
EventLoop 中通过 state 字段来标识是否启动,通过判断该字段来决定是否启动线程,其中 state 字段是 volatile 类型,并通过 AtomicIntegerFieldUpdater 进行原子更新,保证线程的安全。
1 | private void startThread() { |
线程的启动是通过 executor 添加一个任务,在 executor 中启动一个线程,再把该线程赋值给 EventLoop,等同于使用了只有一个线程的线程池来生成线程。可以看到,任务的主体是 SingleThreadEventExecutor.this.run() 方法,该方法就是前文说到的 EventLoop 主体业务逻辑。
1 | private void doStartThread() { |
executor 使用的是 ThreadPerTaskExecutor 对象,在 NioEventLoopGroup 初始化的时候生成。
1 | // Executor 为 null |
ThreadPerTaskExecutor 执行器会生成一个新的线程来执行新的任务,该线程就是 FastThreadLocalThread 对象。
1 | public final class ThreadPerTaskExecutor implements Executor { |
通过上面的代码分析,EventLoop 中的线程由 ThreadPerTaskExecutor 执行器生成,线程对象为 DefaultThreadFactory。
2.3 监听网络 I/O 事件
Selector 相关的变量:
1 | // NioEventLoop |
在这个阶段有三个重点:
- 设置 selector 的超时时间,主要是以下一个定时任务执行的时间间隔作为参考来设置超时时间,避免阻塞定时任务的准时执行;
- selector 唤醒的机制,如果超时时间过长,中途有任务插入,需要执行,此时需要中断 selector;
- 重建 selector,解决 bug 8566。
1 | protected void run() { |
计算下一个定时任务执行的时间间隔逻辑比较简单,将 ScheduledFutureTask 任务添加到 scheduledTaskQueue 队列中,而 scheduledTaskQueue 是一个优先级队列,它已经将 ScheduledFutureTask 根据执行时间进行了排序,取出的第一个元素便是最近将要执行的任务,计算它还需要多久需要执行便可。
1 | // 计算下一个定时任务执行的时间间隔 |
外部线程执行新任务的时候,任务需要立即执行的话,需要唤醒 selector,避免因 selector 长时间等待错过执行时机。
1 | // 执行新任务 |
重建 selector,解决 bug 8566 的逻辑,后续有时间再另外分析。
2.4 处理及分发网络 I/O 事件
在 NioEventLoop 中,处理 I/O 事件的时间与执行任务的时间比率为 1:1,即两者的执行时间是相等的。
1 | // 默认情况下处理 I/O 事件的时间比率为 50%,即 I/O 处理的时间占执行时间的 50 %。 |
现在分析 I/O 事件的处理,这里提供了两种方式,一种是经过 Netty 优化过数据结构的方式,一种是 Java 原生的方式,它们之间的区别主要是存放 SelectionKey 对象的底层数据结构的差异,而处理流程没有变。
1 | private void processSelectedKeys() { |
优化的方法主要是通过反射的方式,将 Selector 中 selectedKeys 和 publicSelectedKeys 字段替换为 Netty 版的 SelectedSelectionKeySet。数据结构做了那些优化可以再深入分析源码。
1 | final Selector unwrappedSelector; |
处理 I/O 事件的大致流程如下:
- 遍历 selectedKeys 集合,处理所有 Channel 的 I/O 事件,一个 SelectionKey 对象代表一个 Channel 的 I/O 事件;
- 取出 SelectionKey 对象中的附件,该附件由 AbstractNioChannel.doRegister 方法注册到 Selector 对象上,附件就是 AbstractNioChannel 自身,触发 I/O 事件时,再由 SelectionKey 对象返回;
- 根据附件对象的不同,调用不同的处理逻辑,这里主要是处理 Channel 的 I/O 事件
1 | private void processSelectedKeysOptimized() { |
真正的处理逻辑在 processSelectedKey 方法中处理,这里有两个重点:
- 写缓存空间充足,注册 OP_WRITE 事件会频繁触发,导致 cpu 空转,所以正常情况下,不需要注册 OP_WRITE 事件,只有在写缓存满的时候才会注册该事件,触发之后进行刷新操作;
- 在 Netty 中,将 OP_ACCEPT 当作读操作,只不过它读取的数据比较特殊,是 SocketChannel 对象。
事件的处理逻辑包含在 AbstractNioChannel.NioUnsafe 中,由该方法调用 ChannelPipeline 中的回调方法,至此,将事件处理由网络层传递给 Netty 框架层。
1 | private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { |
以 unsafe.read() 为例,在 NioMessageUnsafe 类的实现中,读取操作会调用 AbstractNioChannel 子类的 doReadMessages 方法读取网络数据,并写入到 readBuf 中,再调用 ChannelPipeline 中的 fireChannelRead 方法将数据传递给 Netty 框架层。
1 | private final class NioMessageUnsafe extends AbstractNioUnsafe { |
在 NioServerSocketChannel 类 doReadMessages 实现中,读取到的数据是 SocketChannel,该对象会分配给 WorkerGroup 中,由 WorkerGroup 中的 EventLoop 去读取网络数据。
1 | // NioServerSocketChannel |
而在 NioSocketChannel 类的 doReadBytes 实现中,读取到的是 ByteBuf,传递给上层的是 ByteBuf 对象,上层对象再对其进行反序列化、业务处理等操作。
1 | // NioSocketChannel |
至此,根据 I/O 事件的类型,将数据分发到了上层中,上层业务可以继续处理。
2.5 执行任务
2.5.1 执行流程
执行任务相关的变量:
1 | // 任务队列 |
在 EventLoop 中,有两类任务,一是常规的任务,没有时间属性,二是周期性或延时的定时任务,它们分别存放到两个不同的队列。任务执行时,先将到期的定时任务从 scheduledTaskQueue 队列移动到 taskQueue 中,再统一执行 taskQueue 队列中的任务。
任务执行的大致如下:
- 将到期的定时任务移动到 taskQueue 中;
- 计算此次执行的时长,如果执行的时间超过设定的执行时长,则退出进行下一轮的事件处理;
- 遍历执行 taskQueue 中的任务,在两种情况下退出任务的执行:1)任务的执行时长超过了设定的执行时长;2)taskQueue 队列为空;
1 | protected boolean runAllTasks(long timeoutNanos) { |
移动到期的定时任务逻辑相对简单:遍历 scheduledTaskQueue 队列,将到期的任务从 scheduledTaskQueue 队列中移除,再添加到 taskQueue 队列中,如果添加失败,则再添加回 scheduledTaskQueue 队列,等待下次再操作。
1 | private boolean fetchFromScheduledTaskQueue() { |
在定时任务任务中超时的判断是基于相对时间的,起始时间为程序启动的时间。在 scheduledTask 中关联有一个任务执行的截止时间,将这个截止时间与当前计算的时间进行比较,小于当前的时间则说明已经过期,满足执行的条件,则需要将该任务移动到 taskQueue。
另外,scheduledTaskQueue 是一个优先级队列,已经根据截止时间排序,队首的元素是最先到期的任务,如果取到了未到期的任务,则停止遍历,因为后面的任务截止时间更大,没有必要进行比较了。
1 | protected final Runnable pollScheduledTask(long nanoTime) { |
2.5.2 任务的添加
上面分析了任务执行的流程,下面看下这两类任务怎么添加到任务队列中。
1、常规任务
常规任务是通过 execute 方法添加的,该方法含义上有执行的意思,但实际上执行该方法,只是将任务添加到 taskQueue 中,任务的执行最终是在 EventLoop 线程中完成的。
1 | // 添加一个常规任务 |
2、定时任务
定时任务有两种:1)延时任务;2)周期性任务。它们是通过 schedule 方法添加,方法定义如下所示:
1 |
|
两种类型的定时任务统一封装为 ScheduledFutureTask 任务,添加一个定时任务实际就是添加一个 ScheduledFutureTask 对象到 scheduledTaskQueue 中。添加 ScheduledFuture 的过程中,如果当前线程就是 EventLoop 线程,则直接操作即可,如果当前线程不是 EventLoop 线程,则添加一个常规任务,用来执行该操作。这样设计,应该是出于线程安全的考虑,保证只有 EventLoop 线程执行添加操作。在这里,还需要考虑定时任务已经过期,需要唤醒 EventLoop 线程执行任务。
1 | private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { |
ScheduledFutureTask 对象封装了三个功能:
- 执行添加任务,将 ScheduledFutureTask 对象本身添加到 scheduledTaskQueue 队列;
- 执行延时任务,由于延时任务只会执行一次,执行完便结束;
- 执行周期性任务,执行完本轮的任务之外,还需要将 ScheduledFutureTask 添加回 scheduledTaskQueue 队列,等待下一轮执行。
1 |
|
至此,任务的执行分析完毕。
3. 总结
EventLoop 担当了网络层与 Netty 框架间的桥梁作用,本质是一个事件循环,不断监听 Channel 的网络 I/O 事件,并进行分发处理。另外,也承担了执行任务的作用,包括常规的任务及定时任务。理解 EventLoop 的事件循环会极大加深对 Netty 的理解。