1. 概述
Netty 中所有的的 I/O 操作都是异步的。I/O 操作是比较耗时的,为了不阻塞调用线程,Netty 提供了 ChannelFuture 接口,使用 addListener()方法注册一个 ChannelFutureListener 监听器,可以在 I/O 操作结束之后进行通知返回结果。在下面的代码中,bind 操作返回一个 ChannelFuture 对象,可以继续执行后续操作,也可以调用 sync() 方法同步等待执行结果,给程序开发带来了更多的开发模式,结合不同的业务场景,可以方便选择异步还是同步模式。
1 | // Configure the server. |
这篇文章的主要目的是分析 ChannelFuture 在 Netty 中的实现原理。
2. 原理
ChannelFutrue 本质上是线程间交换数据的方式,一个线程等待另外一个线程的处理结果,取得结果一般有两种方式:1)同步等待,如同 get() 方法;2)注册回调,在设置结果的同时调用回调函数。其伪代码如下所示:
1 | public class ChannelFutrue { |
通过持有 ChannelFutrue 类,调用方可以同步或异步获取执行的结果,在这个例子中,为了简化操作,我们使用 CountDownLatch 进行同步,而在 ChannelFutrue 使用 synchronized + notify/await 来实现线程的同步。
3. Netty 实现
我们以 ServerBootstrap 中的 bind 方法为例,分析 ChannelFuture 在这个流程中的使用方式,bind 方法的主要流程如下所示(其中的实现细节在后续篇章介绍):
在 bind 方法中主要包含在 4 个步骤:
- 生成 NioServerSocketChannel 对象;
- 将 NioServerSocketChannel 对象注册到 EventLoop 中;
- 执行 bind 操作;
- 同步等待 bind 操作执行完成。
3.1 register 流程
可以看到第 2 和 3 步都是一个 I/O 操作,为了避免调用线程被阻塞,它们都被提交到 EventLoop 线程(每一个 EventLoop 对象都会绑定一个线程)中执行,并返回一个 ChannelFuture 对象,一个 I/O 操作会对应一个ChannelFuture 对象,调用线程与 EventLoop 通过该对象完成执行结果的交换。下面以 register 方法为例,分析下 ChannelFuture 对象的使用。
1、生成 ChannelFuture 对象
调用 register 之后返回一个 DefaultChannelPromise 对象,该对象是 ChannelFuture 的子类。
1 | // SingleThreadEventLoop |
2、提交异步注册任务
提交注册任务的逻辑在 AbstractChannel.AbstractUnsafe 中,提交的时候会判断当前线程,如果当前线程是 eventLoop 线程,直接执行即可,如果不是,则提交一个任务到 eventLoop 线程 中。
1 | // AbstractChannel.AbstractUnsafe |
3、执行注册逻辑
register0 是在 EventLoop 线程中执行的,与调用注册方法的线程不是同一个。注册的逻辑通过子类的 doRegister() 方法实现,注册完成之后通过 safeSetSuccess(promise) 和 safeSetFailure(promise, t) 通知注册结果。
1 | // AbstractChannel.AbstractUnsafe |
4、结果通知
结果通知主要包含两个操作:
- 设置处理结果,唤醒所有等待的线程;
- 调用注册到 ChannelFuture 中的监听器;
1 | // AbstractChannel.AbstractUnsafe |
3.2 异步操作的协同
在上面的操作中,bind 操作依赖 register 操作的结果,由于这两个操作都是异步操作,如何进行协同?即在 register 操作成功执行 bind 操作。正常情况下,有两种办法:1)同步等待操作执行完成;2)通过添加 GenericFutureListener 监听器,执行完由 EventLoop 线程进行回调。在这里是通过第二种方式来操作的。
在执行 initAndRegister 操作之后,会得到一个 ChannelFuture regFuture 对象,此时 register 已经提交给 EventLoop 执行,不一定执行完成,需要判断执行结果,如果未完成,则向 regFuture 对象中添加监听器,在监听器中调用 bind 操作,而监听器会中注册完成之后调用。
1 | // AbstractBootstrap |
3.3 sync 同步操作
由于 bind 操作是一个异步操作,此时在调用线程中需要等待绑定的结果,所以调用了 sync 方法。另外,在程序的最后,也使用了一个 ChannelFuture,用于等待 Channel 关闭事件。
1 | /// 绑定端口并等待完成 |
3.3 ChannelFuture 线程同步
ChannelFuture 中的线程同步方式是 synchronized 同步块,如下代码如下:
1 |
|
在调用 await 操作时,如果没有结果(操作未完成),则会调用 wait 方法阻塞该线程,同时增加等待的线程数;操作完成之后会调用 notifyAll 方法,通知所有等待的线程继续执行,这样完成了调用结果在不同线程间的交互。
4. 总结
ChannelFuture 本质是线程间通信的一种工具,通过 ChannelFuture,可以实现 I/O 的异步操作,并完成操作结果的通知功能。