本文最后更新于:June 30, 2023 pm
前面分析了Netty整体的流程 和 Channel的结构,我们看到 Netty 中有很多的异步调用,所以在介绍更多 NIO 相关的内容之前,我们来看看它的异步接口是怎么实现的。
本文来源于:【https://github.com/ztgreat/blog-docs.git】
编者仅仅做了简单排版和搬运收藏,非商业用途,最终知识版权归原作者所有。
回顾
前面我们在介绍 Echo 例子的时候,已经用过了 ChannelFuture 这个接口了,接下来我们就来看看 Netty 中的异步调用是如何实现的。
客户端
// Start the client.
ChannelFuture future = b.connect(HOST, PORT);
future.sync();
// Wait until the connection is closed.
Channel channel= future.channel();
ChannelFuture closeFuture=channel.closeFuture();
closeFuture.sync();
服务端
// Start the server.
ChannelFuture future = b.bind(PORT);
future.sync();
// Wait until the server socket is closed.
Channel channel = future.channel();
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();
特意把代码拆开,方便理解,可以看到 其实客户端和服务端 结构是差不多的,相信分析了Future 结构后,我们对上面的代码理解会更加的深刻。
JDK 中的 Future
关于 Future 接口,常用的就是在使用 Java 的线程池 ThreadPoolExecutor 的时候了。在 submit 一个任务到线程池中的时候,返回的就是一个 Future 实例,通过它来获取提交的任务的执行状态和最终的执行结果。
下面是 JDK 中的 Future 接口 java.util.concurrent.Future
:
public interface Future<V> {
// 取消该任务
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否已取消
boolean isCancelled();
// 任务是否已完成
boolean isDone();
// 阻塞获取任务执行结果
V get() throws InterruptedException, ExecutionException;
// 带超时参数的获取任务执行结果
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Netty 中的 Future 接口继承了 JDK 中的 Future 接口,然后添加了一些方法:
Netty 中的Future
io.netty.util.concurrent.Future
:
public interface Future<V> extends java.util.concurrent.Future<V> {
// 是否成功
boolean isSuccess();
// 是否可取消
boolean isCancellable();
// 如果任务执行失败,这个方法返回异常信息
Throwable cause();
// 添加 Listener 来进行回调
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 移除 Listener
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 阻塞等待任务结束,如果任务失败,将“导致失败的异常”重新抛出来
Future<V> sync() throws InterruptedException;
// 不响应中断的 sync()
Future<V> syncUninterruptibly();
// 阻塞等待任务结束,和 sync() 功能是一样的,不过如果任务失败,它不会抛出执行过程中的异常
Future<V> await() throws InterruptedException;
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
// 获取执行结果,不阻塞,如果没数据,返回 NULL。
V getNow();
// 取消任务执行
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
我们可以发现, Netty 的 Future 接口 扩展了 JDK 中 Future 接口,它加了 sync() 和 await() 用于阻塞等待,还加了 Listeners,只要任务结束去回调 Listener 们就可以了。
这里顺便说下 sync() 和 await() 的区别:sync() 内部会先调用 await() 方法,等 await() 方法返回后,会检查下这个任务是否失败,如果失败,重新将导致失败的异常抛出来,这个我们将在后面的实现类中看到。
Future 体系
上面罗列了Future的部分结构,未全部列出来,稍后我们将分析 ChannelFuture,已经Promise。
ChannelFuture
Future 接口的子接口 ChannelFuture,它将和 IO 操作中的 Channel 关联在一起了,用于异步处理 Channel 中的事件。
public interface ChannelFuture extends Future<Void> {
// ChannelFuture 关联的 Channel
Channel channel();
// 覆写以下几个方法,使得它们返回值为 ChannelFuture 类型
@Override
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelFuture sync() throws InterruptedException;
@Override
ChannelFuture syncUninterruptibly();
@Override
ChannelFuture await() throws InterruptedException;
@Override
ChannelFuture awaitUninterruptibly();
boolean isVoid();
}
我们看到,ChannelFuture 接口相对于 Future 接口,除了将 channel 关联进来,没有增加什么东西,其他几个都是方法覆写,为了让返回值类型变为 ChannelFuture。
Promise
接下来 我们来介绍下 Promise 接口,它和 ChannelFuture 接口无关,而是和前面的 Future 接口相关,也就是说Promise的和ChannelFuture是独立的,在js 或者 nodejs 中都有一个 Promise的概念。
Promise 这个接口其实可以算作是一个异步任务的抽象,这个我们在后面具体编程的时候,再来看。
Promise 接口和 ChannelFuture 一样,也继承了 Netty 的 Future 接口,然后加了一些 Promise 的内容:
public interface Promise<V> extends Future<V> {
// 设置 该 future 成功及设置其执行结果,然后通知所有的 listeners。
// 如果该操作失败,将抛出异常
Promise<V> setSuccess(V result);
// 和 setSuccess 方法一样,只不过如果失败,它不抛异常,返回 false
boolean trySuccess(V result);
// 设置 该 future 失败,及其失败原因。
// 如果该操作失败,将抛出异常
Promise<V> setFailure(Throwable cause);
// 设置 该 future 失败,及其失败原因。
// 如果该操作失败,返回 false,不抛出异常
boolean tryFailure(Throwable cause);
// 设置该 future 不可以被取消
boolean setUncancellable();
// 覆写,返回 Promise 类型的实例
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override
Promise<V> await() throws InterruptedException;
@Override
Promise<V> awaitUninterruptibly();
@Override
Promise<V> sync() throws InterruptedException;
@Override
Promise<V> syncUninterruptibly();
}
ChannelPromise
接下来,我们再来看下 ChannelPromise,它继承了前面介绍的 ChannelFuture 和 Promise 接口,这样ChannelPromise 就联系上了Chaneel了。
/**
* Special {@link ChannelFuture} which is writable.
*/
public interface ChannelPromise extends ChannelFuture, Promise<Void> {
//覆写 ChannelFuture 中的 channel() 方法
@Override
Channel channel();
//覆写,目的是为了返回值类型是 ChannelPromise
@Override
ChannelPromise setSuccess(Void result);
ChannelPromise setSuccess();
boolean trySuccess();
@Override
ChannelPromise setFailure(Throwable cause);
//覆写也是为了得到 ChannelPromise 类型的实例
@Override
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise addListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise removeListener(GenericFutureListener<? extends Future<? super Void>> listener);
@Override
ChannelPromise removeListeners(GenericFutureListener<? extends Future<? super Void>>... listeners);
@Override
ChannelPromise sync() throws InterruptedException;
@Override
ChannelPromise syncUninterruptibly();
@Override
ChannelPromise await() throws InterruptedException;
@Override
ChannelPromise awaitUninterruptibly();
/**
* Returns a new {@link ChannelPromise} if {@link #isVoid()} returns {@code true} otherwise itself.
*/
ChannelPromise unvoid();
}
我们可以看到,它综合了 ChannelFuture 和 Promise 中的方法,只不过通过覆写将返回值都变为 ChannelPromise 了而已,没有增加什么新的功能。
我们上面介绍了几个接口,Future 以及它的子接口 ChannelFuture 和 Promise,然后是 ChannelPromise 接口同时继承了 ChannelFuture 和 Promise。
接下来,我们需要来一个实现类,这样才能比较直观地看出它们是怎么使用的,因为上面的这些都是接口定义,具体还得看实现类是怎么工作的。
DefaultPromise
下面,我们来介绍下 DefaultPromise 这个实现类,内容有点多,我们就介绍几个关键的内容。
首先,我们看下它有哪些属性:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
// 任务的执行结果
private volatile Object result;
// 执行任务的线程池,promise 持有 executor 的引用
private final EventExecutor executor;
// 监听者,回调函数
private Object listeners;
// 调用sync()/await()进行等待的线程数量
private short waiters;
// 是否正在唤醒等待线程,用于防止重复执行唤醒,不然会重复执行 listeners 的回调方法
private boolean notifyingListeners;
......
}
DefaultPromise 实现了 Promise,但是没有实现 ChannelFuture,所以它还没有和 Channel 联系起来。
我们 看到 DefaultPromise 中持有了一个 线程池引用。
在 DefaultPromise 中 我们看到有如下注释:
/**
* Get the executor used to notify listeners when this promise is complete.
* <p>
* It is assumed this executor will protect against {@link StackOverflowError} exceptions.
* The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
* depth exceeds a threshold.
* @return The executor used to notify listeners when this promise is complete.
*/
protected EventExecutor executor() {
return executor;
}
当任务执行后,会进行 Listener 的调用,而Listener 的调用逻辑 这个是不清楚的,有可能是同步的,也可能是异步的,因此用线程池去执行,这样将 回调任务 和 和 任务的执行 进行了分割。
我们来看看 DefaultPromise 中的一些方法:
addListener
添加监听者(回调函数)
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
//同步处理
synchronized (this) {
addListener0(listener);
}
//添加完后,判断任务是否完成了,如果是,则需要进行通知
if (isDone()) {
notifyListeners();
}
return this;
}
isDone
判断任务是否执行完成了,这个判断 结果值类型就可以了,很简单。
@Override
public boolean isDone() {
return isDone0(result);
}
private static boolean isDone0(Object result) {
return result != null && result != UNCANCELLABLE;
}
setSuccess
设置 该 future 成功及设置其执行结果,并且会通知所有的 listeners。
如果该操作失败,将抛出异常
@Override
public Promise<V> setSuccess(V result) {
//设置结果
if (setSuccess0(result)) {
// 如果设置成功,则开始进行回调处理
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this);
}
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
checkNotifyWaiters();
return true;
}
return false;
}
trySuccess
设置 该 future 成功及设置其执行结果,并且会通知所有的 listeners。
如果该操作失败,返回false,不抛出异常。
@Override
public boolean trySuccess(V result) {
if (setSuccess0(result)) {
notifyListeners();
return true;
}
return false;
}
setFailure
设置 该 future 失败,及其失败原因。
如果该操作失败,将抛出异常
@Override
public Promise<V> setFailure(Throwable cause) {
if (setFailure0(cause)) {
notifyListeners();
return this;
}
throw new IllegalStateException("complete already: " + this, cause);
}
tryFailure
设置 该 future 失败,及其失败原因。
如果该操作 失败,返回false,不抛出异常。
@Override
public boolean tryFailure(Throwable cause) {
if (setFailure0(cause)) {
notifyListeners();
return true;
}
return false;
}
上面几个方法都非常简单,先设置好值,然后执行监听者们的回调方法。notifyListeners() 方法感兴趣的读者也可以看一看,不过它还涉及到 Netty 线程池的一些内容,我们还没有介绍到线程池,这里就不展开了。上面的代码,在 setSuccess0 或 setFailure0 方法中都会唤醒阻塞在 sync() 或 await() 的线程
另外,就是可以看下 sync() 和 await() 的区别,其他的我觉得随便看看就好了。
sync
@Override
public Promise<V> sync() throws InterruptedException {
await();
// 如果任务是失败的,重新抛出相应的异常
rethrowIfFailed();
return this;
}
我们看到 sync 内部会调用await 方法,只是如果 await 执行失败,那么会再次抛出异常问题。
实例代码
我们通过一个实例,来了解 Promise的使用,以及监听器的使用:
public static void main(String[] args){
// 构造线程池
EventExecutor executor = new DefaultEventExecutor();
// 创建 DefaultPromise 实例
Promise promise = new DefaultPromise(executor);
// 添加 一个 listener
promise.addListener(new GenericFutureListener<Future<String>>() {
@Override
public void operationComplete(Future future) throws Exception {
if (future.isSuccess()) {
System.out.println("任务成功,结果:" + future.get());
} else {
System.out.println("任务失败,异常:" + future.cause());
}
}
});
// 提交任务到线程池
executor.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
// 设置 promise 的结果
// promise.setFailure(new RuntimeException());
promise.setSuccess("success");
}
});
// 主线程线程阻塞等待执行结果
try {
promise.sync();
} catch (InterruptedException e) {
}
}
运行代码, 3 秒后将输出:
任务成功,结果:success
这里我们也可以试一下 sync() 和 await() 的区别,开启 promise.setFailure(new RuntimeException()) 代码,同时最后调用await 方法,对比区别,这里就不展示了,读者可以自行去尝试。
小结一下
对比JDK 中的Future 如果我们需要获取结果,我们需要调用get 方法获取,或者通过isDone 来判断任务是否完成,这都是主动轮询的方式。
而Netty中的Future 我们 可以用 await(),等 await() 方法返回后,得到 promise 的执行结果,然后处理它;
另一种就是提供 Listener 实例,我们不太关心任务什么时候会执行完,只要它执行完了以后会去执行 listener 中我们定义的逻辑就可以了。
在 DefaultPromise 中以及 上面的代码中,我们看到promise 持有了 线程池引用,这个是为什么呢?
在 DefaultPromise 中 我们看到有如下注释:
/**
* Get the executor used to notify listeners when this promise is complete.
* <p>
* It is assumed this executor will protect against {@link StackOverflowError} exceptions.
* The executor may be used to avoid {@link StackOverflowError} by executing a {@link Runnable} if the stack
* depth exceeds a threshold.
* @return The executor used to notify listeners when this promise is complete.
*/
protected EventExecutor executor() {
return executor;
}
当任务执行后,会进行 Listener 的调用,而Listener 的调用逻辑 这个是不清楚的,有可能是同步的,也可能是异步的,因此用线程池去执行,这样将 回调任务 和 和 任务的执行 进行了分割。
DefaultChannelPromise
DefaultChannelPromise 基本上都是调用 DefaultPromise的方法 ,实现了ChannelPromise 接口,将Channle 关联了进来。
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
private final Channel channel;
private long checkpoint;
/**
* Creates a new instance.
*
* @param channel
* the {@link Channel} associated with this future
*/
public DefaultChannelPromise(Channel channel) {
this.channel = checkNotNull(channel, "channel");
}
//... 省略其它方法
}
有了上面的认识,下面 我们回过头来再看客户端中Future的调用:
// Start the client.
ChannelFuture future = b.connect(HOST, PORT);
future.sync();
// Wait until the connection is closed.
Channel channel= future.channel();
ChannelFuture closeFuture=channel.closeFuture();
closeFuture.sync();
当客户端进行 connect 后,返回了一个 Future,这个connect 是异步的,因此还不知道任务执行如何,因此这里我们调用 future的sync,等待connect 完成。
当connect 后,我们从future 中获取 Channel, 再从这个 Channle 获取一个 Future,而这个Future 是监听 Channle 是否关闭的,通过sync 方法,我们可以一直等待Channle的关闭,因此如果不进行closeFuture.sync()
,那么主线程就会直接执行完毕,不会阻塞在最后。
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!