Netty3 源码分析

1. 总体结构

Netty是Jboss旗下一款优秀的网络编程框架,作者是Trustin Lee,韩国人,80后。Netty简化了Java环境下的网络程序开发,它屏蔽了底层具体实现方式的差异和细节,为上层应用程序提供一个抽象的一致的接口。不管使用的传输层协议是TCP还是UDP,底层JDK API是NIO还是OIO,甚至是在同一JVM内部的通信,这些差异细节都被netty屏蔽在其内部,上层只需要跟netty提供的网络编程模型打交道,并指定要使用何种具体实现(这在netty内被称为transport service)即可,要切换时也非常方便。Netty的总体结构如下:

Alt text

这里再引用netty官方文档中的一段话:

“The Netty project is an effort to provide an asynchronous event-driven network application framework and tools for rapid development of maintainable high performance and high scalability protocol servers and clients. In other words, Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP/IP socket server.”

从这段话中我们可以看出Netty几个最主要的特性:

  1. 异步编程模型
  2. 事件驱动
  3. (主要基于)NIO实现

在本文后续对Netty源代码分析的过程中中会针对这三点进行详细的讲述。

2. Netty的API

Netty自己引入了一套网络模型的抽象,要想深入Netty,首先要对这套API有较为深入的了解。以下是使用Netty编程时所接触的上层接口及它们的交互方式:

Alt text

处在整套模型的核心位置的是Channel,Channel通常是对底层Socket的抽象表示(在TCP实现中),但也可能是其他可以进行IO操作的数据源。可以把它想象成一个管道,应用程序可以打开一个管道,连接到某处(通常是另一个Channel),通过管道向目的地写数据,也可以从管道读另一端发送过来的数据,类似这样:

Alt text

之前说过Netty是事件驱动的,每个Channel都有一个ChannelPipeline,在Channel上发起的所有事件都会被送到ChannelPipeline中流转一遍得到处理。这些事件由ChannelEvent表示,按事件在ChannelPipeline中的流向又分为Upstream和Downstream两种事件。Pipeline中保存着一系列ChannelHandler,事件在这些Handler之间被处理并传递。同时,每个ChannelPipeline都会被指定一个ChannelSink,挂在Handler链的最末端,事件被所有Handler处理完成后最终会流向ChannelSink被其处理,这也是它为什么要“Sink(沉没)”的原因。关于Channel的事件及其流转这一整套体系还是比较复杂的,后面会详细讲到。

Channel大部分情况下是由ChannelFactory生产的。每个ChannelFactory一般都会在内部创建一个ChanelSink,在生产Channel的时候,会负责将Pipeline、Channel、Sink这三者装配在一起,这意味着同一个ChannelFactory生产的Channel,它们的Pipeline虽然是不一样的,但ChannelSink却是公用一个,在后面的代码中我们也能发现这一点。

在Channel上发起的所有IO操作都是“异步”的。“异步”这个词语很容易引起混淆,根据《Unix网络编程 卷一》一书所述,以输入为例,包含两个阶段:

  • 数据从硬件底层传递到内核缓冲区;
  • 从内核缓冲区向用户进程空间复制数据。

根据POSIX的定义,只要在以上两步中引起了用户进程的阻塞,这样的IO均称为同步IO,反之则称为异步IO。按照这样的定义,Unix下的5种IO模型:阻塞式IO、非阻塞IO、IO复用、信号驱动、异步IO,只有最后一种才能被称为是“异步IO模型”,并且Linux内核直到2.6版本才提供了AIO的支持。JDK在1.4之前的提供的IO库是面向流的阻塞式IO(Old IO),1.4开始引入的NIO(New IO)提供了非阻塞IO和IO复用(IO multiplexing),并不真正支持异步IO(JDK 7之前),我们经常可以在别人的文章中看到“NIO带来了异步IO”等等这样的说法,实际上都是错误的。

回到本文,在Netty的语境下的“异步”,全名应该是“异步编程模型”或者“异步通知机制”,它的含义其实更接近于“非阻塞”,并且它也是基于NIO实现的。在Channel上调用的IO操作会立刻返回一个ChannelFuture对象而不保证在方法返回的时刻操作已经完成,后者保存着这个操作的执行状态,是成功还是失败,取消还是异常诸如此类。“异步”意味着调用线程不会由于IO操作而阻塞,但只能通过ChannelFuture跟踪操作的执行情况。

这里可以简单地解释一下这套异步编程模型的具体实现,Channel上发起的IO请求,实际都会被转换成ChannelEvent事件(大部分是DownStream的)交由Pipeline最终到Sink获得处理,同时,每个ChannelEvent内部都有一个ChannelFuture对象保存着该事件的处理情况。Netty内部使用NIO的非阻塞方法进行实际的IO,实时更新相应的ChannelFuture并最终返回给调用者。举个例子,假如我们向Channel提交一个2M数据的写请求,事件经由Pipeline流转处理,最终Netty调用NIO在本次只写入了8K,则ChannelFuture被更新为“进行中,8K”类似的状态并返回。当下次Channel可写的时候再写入比如4K,并再次更新ChannelFuture,如此往复直到完成。这种“异步”的调用方式要求调用方通过为ChannelFuture添加监听回调的形式编程,对应的接口是ChannelFutureListener。相反,如果底层切换到阻塞式的OIO实现,则异步的特性就不复存在了,仍然举刚刚的例子:当Netty内部通过OIO向Channel写入2M数据时,会一直阻塞直到这2M数据全部写完才返回。

最后, Channel通过ChannelConfig保存自身的配置信息,此外,ChannelConfig还保存着该Channel使用的ChannelBufferFactory和ChannelPipelineFactory。ChannelBufferFactory负责创建Channel在IO过程中使用的ChannelBuffer。Netty使用ChannelBuffer来存储并操作读写的网络数据,它有点像NIO中的ByteBuffer但比后者更优秀。ChannelPipelineFactory是创建ChannelPipeline的工厂,一般由用户指定。Netty提供了默认的Pipeline的实现,一般来说我们只需要往里面加Handler就可以了,通过Handler实现我们想要的逻辑,非常方便。

3. Channel

从本节开始我们将深入探索Netty内部,详细了解Netty接口之下具体的实现,首先是Channel,以下是它整个产品体系的类图:

Alt text

从图中可以看到,Channel产品族非常清晰的划分成4大种类:

3.1 ServerChannel

所有的网络程序都分为服务器端和客户端, 在使用基于连接的通信协议时,Netty在服务器端遵循这样的抽象模型:使用一个listening channel(Netty内部称之为Server-side channel,这里就用listening channel这个说法,个人感觉更贴切一些)不停地监听客户端发起的连接,每当接受了一个连接请求后就创建一个accepted channel用于后续实际和客户端的网络通信。该抽象模型其实和JDK提供的OIO(java.net.ServerSocket和java.net.Socket)、NIO(java.nio.channels.ServerSocketChannel和java.nio.channels.SocketChannel)编程模型是一致的。

ServerChannel代表的就是上述模型中的listening channel,它一般只由ServerChannelFactory创建,且自身又被细化成两个产品族:

ServerSocketChannel

使用TCP协议时,服务器端的listening channel。接口名称中的“Socket“指的是底层使用伯克利套接字进行TCP通信。它有OIO和NIO两套实现,分别是java.net.ServerSocket和java.nio.channels.SocketChannel的包装。
该种类型的Channel在接收连接后会生产出SocketChannel,这对应于Channel另一个子产品体系,稍后会详细谈到。

LocalServerChannel

通过In-VM通信的网络程序使用与其他依赖真实网络进行通信的程序一样的API,但实际的数据交换过程却是发生在同一个JVM中的,并且也使用上述的网络模型。LocalServerChannel是这种情况下服务器端的listening channel,只由一个默认实现。在本次分析源码的过程中没有太多了解。它在接受连接后生产LocalChannel,也是Channel的另一子产品族。

3.2 SocketChannel

Alt text

SocketChannel代表底层使用TCP/IP套接字通信的 非Server-side Channel。
这个产品族分成两种角色,这也体现在名字上:

ClientSocketChannel

在使用TCP协议通信的Client端生成的Channel,这类Channel一般只由ClientSocketChannelFactory创建:

  • NioClientSocketChannel
    使用NIO方式的客户端Channel
  • OioClientSocketChannel
    使用OIO方式的客户端Channel
  • HttpTunnelingClientSocketChannel
    这里要提到一种“HTTP tunnel “技术。Http隧道是为了突破防火墙的限制,它和Http协议一样使用80端口通信。HttpTunnelingClientSocketChannel在客户端与Server端的HttpTunnelingServlet进行通信。在服务器端可以将基于Netty的真正的Server应用绑定到HttpTunnelingServlet指定的endpoint上(二者必须要在同一个JVM中),当HttpTunnelingServlet在收到客户端请求时会将该请求转发过去,相当于做了一次代理的工作,反之亦然。这个过程也涉及到In-VM形式的通信,会用到Local transport service相关的类。在本文中对该主题不做深入展开。

AcceptedSocketChannel

上文我们提到了使用TCP协议时Netty在服务器端使用的模型:Listening Channel和Accepted Channel,前者对应ServerSocketChannel,后者则是这里的** AcceptedSocketChannel来表示。注意,这类Channel没有对应的ChannelFactory,而是由Listening Channel负责创建。类似的,也有NIO和OIO两种实现方式(上图中绿色的两个类):
NioServerSocketChannel — NioAcceptedSocketChannel
OioServerSocketChannel — OioAcceptedSocketChannel

3.3 LocalChannel

Alt text

正如在介绍LocalServerChannel时介绍的,它最终创建的Accepted Channel即为LocalChannel,亦仅有一个默认实现。

但同时,LocalChannel也充当着”LocalClientChannel”的角色在Client端被使用。Netty没有区分这二者,而是让LocalChannel承担这两个角色。

3.4 DatagramChannel

Alt text

DatagramChannel代表了底层使用UDP协议通信的Channel,它只由DatagramChannelFactory创建。
同样的,底层也有NIO和OIO两种实现。

3.5 Channel的方法

Channel的方法分为两种:

  • 同步的getter,查看Channel的各种状态;
  • 异步的IO方法如connect、bind、write等,仔细查看代码可以发现没有open和read方法,这是因为一个Channel被创建出来时就会默认被打开;并且应用程序一般不会主动向Channel发起read,而是由Netty内部在有数据到达时生成ChannelEvent在pipeline中向上传递。

Channel是有父子关系的,典型的,Server端的ListeningChannel和Accepted Channel就是父子关系,因为前者间接地导致后者的创建。

综上所述,Channel根据底层使用的协议(TCP/UDP)、角色(Server端的Listening Channel和Accepted Channel,Client端的Channel)、使用何种JDK IO类库实现(NIO/OIO)等特性被逐层细分为几大子产品体系,从上而下逐渐由抽象到具体。再看该套体系的实现,相似产品的相同逻辑被大量抽取到抽象类中,每个具体实现一方面实现自身特殊的接口,一方面继承该抽象类获得对通用接口的实现,在自己代码中只需实现少量扩展方法,通过这种方式做到了最大程度的逻辑重用,典型的例子有AbstractChannel抽象类及其子类。这种“接口继承体系 + 默认抽象实现类“实现一套多层次的产品体系的模式在其他场景也是非常常见的。

4. ChannelFactory

顾名思义,ChannelFactory主要的职责即负责创建各种各样的Channel,这也不难理解为什么ChannelFactory和Channel体系几乎有完全一样的结构了,它的类图和整个产品体系如下:

Alt text
Alt text

ChannelFactory接口只有两个方法,其中方法newChannel负责创建Channel,并将指定的Pipeline绑定到该Channel。实际的实现常常在这一步还会完成pipeline和ChannelSink的绑定。
同样的,ChannelFactory也分为4种:

4.1 ServerChannelFactory

Alt text

ServerChannelFactory负责创建服务器端的listening channel,即以ServerChannel接口开始的一系列产品。它的继承体系和ServerChannel的体系是一一对应的,分为ServerSocketChannelFactory和LocalServerChannelFactory,前者又有NIO和OIO两种实现方式。这里不再赘述。
需要注意的是,ServerChannelFactory并不负责创建Accepted Channel。

4.2 ClientSocketChannelFactory

Alt text

ClientSocketChannelFactory负责创建SocketChannel中的**ClientSocketChannel,它的继承体系和SocketChannel中Client-Side SocketCHannel是一致的,这里也没太多花招。
上面说过,SocketChannel的另外一种角色是在TCP服务器端的Accepted Channel,这种Channel不是由工厂创建出来的,在ChannelFactory产品族中没有对应的工厂类。

4.3 LocalChannelFactory

Alt text

LocalChannelFactory负责创建LocalChannel,值得一提的是, LocalChannel同时承担Server端Accepted Channel和Client Channel的角色,这里的工厂仅负责创建后者。

4.4 DatagramChannelFactory

Alt text

UDP相关的Channel的工厂类,没怎么深入过。

ChannelFactory除了负责扮演工厂的角色,也同时是资源管理者,方法releaseExternalResources释放工厂外部资源。所有不是工厂自己创建的资源都属于“外部资源“,典型的如在构造器中为工厂指定的线程池等等。当要关闭ChannelFactory时,必须先关闭它所创建的所有Channel(通过ChannelGroup#close()进行批处理),其次调用releaseExternalResources方法释放所有外部资源。
最后以一张Channel和ChannelFactory产品体系及它们之间的对应关系的图作为总结:

Alt text

5.ChannelPipeline

一旦Channel被创建出来,其ChannelPipeline就准备开始接受事件处理了。ChannelPipeline实现了Intercepting Filter模式,内部是一系列的ChannelHandler,Handler控制事件的处理、传递。
Pipeline通常通过用户自己实现的ChannelPipelineFactory创建,Netty内部提供了Pipeline的一个默认实现:DefaultChannelPipeline,通常情况下这已经满足我们的需求了。通过Channels工具类的pipeline()静态方法可以得到一个空的DefaultChannelPipeline实例,我们只需往里面添加自定义的Handler就可以了。

ChannelEvent在Pipeline中按照流向被分为UpStream和DownStream两种类型,Handler被分为相应的两种类型。PipeLine在Downstream或者Upstream类型的网络事件发生时,会从某一端开始调用匹配事件类型的Handler响应这种调用。ChannelPipeline维持有所有handler有序链表,并且由handler自身控制是否继续流转到下一个handler(通过ChannelHandlerContext#sendDownstream(e),这样设计有个好处就是随时终止流转,业务目的达到无需继续流转到下一个handler)。
Pipeline中事件的流转如下图所示(摘自Netty的API文档):

Alt text

UpStream事件由Netty内部的IO线程发起,通常是Netty内部完成某些IO操作之后,或有数据可读时主动反馈给Channel。Upstream事件由Pipeline低端开始,往上依次流经各UpstreamHandler,如果事件被传递出Handler链最顶端,则事件被忽略。

Downstream事件是Channel向Netty内部提交的IO操作请求,比如关闭、绑定、写数据请求等等。Channel接口的IO方法,其实质是通过Channels工具类的fireXXXEvent触发事件机制。在Downstream Handler的最末端其实还挂接着一个ChannelSink(图中没有画出),所有Downstream事件在走完Handler链后都会进入这个ChannelSink中得到处理,并最终由Netty内部IO线程进行处理。这里有一个小的trik:对Channel真正的IO操作必须要由Netty内部的IO线程执行,如果是用户线程提交Downstream事件,最终会被加入IO线程池的任务队列,等待后者循环执行而非立刻执行。这个说法其实不太准确,但大概的原理是这样的,关于这一点后面还在进行源代码分析时还会详细介绍。

为了对Pipeline的工作原理有个更直观的理解,我们来看看Netty提供的默认实现DefaultChannelPipeline,它的类图如下图所示:

Alt text

Netty还提供了另一个实现StaticChannelPipeline,这是一个不可变的pipeline。一旦创建之后,任何对他的添加删除Handler操作都会抛出异常。它的性能更加优异。

属性channel和sink保存pipeline被绑定到的Channel和ChannelSink,name2ctx是内部维护的一个 Handler名 – DefaultChannelHandlerContext对象 的Map,便于快速通过名称查找。

DefaultChannelPipeline在内部维护着一个DefaultChannelContext类型(是的,不是ChannelHandler而是它的Context)的双向链表,属性Head和Tail是它的首尾。ChannelContext是Handler的包装器,Handler可以通过它的Context将事件向下或向上传递,或者得到Pipeline对其进行修改。Context还为Handler提供了Attachment能力让其可以在几次被调用期间存储一些stateful信息。DefaultChannelContext是DefaultChannelPipeline的内部类,它是ChannelContext的默认实现,在Netty的pipeline默认实现中使用的都是这个类。

虽然在ChannelPipeline的文档中画的图中显示,upstream和downstream事件的流向是分开的,但实际上内部只有一个Handler链而非两个。调用addLast(handler)等方法向其中添加Handler,经过几次后链表可能是这样:
Alt text

当调用sendUpstream(ChannelEvent e)向pipeline中丢进去一个upstream类型的事件时,首先调用getActualUpstreamContext(this.head)从Head开始向后找到第一个Upstream类型的Handler(实际上是找它的Context,毕竟链表中的元素都是HandlerContext),将event交给它处理,该Handler很有可能在执行完逻辑后调用其context的sendUpstream(event)方法将事件继续向上传递,后者继续调用getActualUpstreamContext从自己出发找下一个Upstream Handler,整个Upstream事件的传递流程就像这样:
Alt text

如果是Downstream类型的事件,整个处理流程与上面相反,是从Tail开始往前的,并且还有一个不同点,当最后一个Downstream Handler将事件继续向下传递时,事件最终会汇聚到挂在pipeline最下端的ChannelSink,被它的eventSunk方法处理。这个步骤是在DefaultChannelHandlerContext#sendDownstream(ChannelEvent)中实现的。如果没有为pipeline指定sink,则使用一个默认的实现DiscardingChannelSink,这个实现只调用logger记录一下,什么也不干。Downstream事件的流向如下图所示:
Alt text

DefaultChannelPipeline还提供了callBeforeAdd、callAfterRemove等方法,在添加删除Handler前后调用。这些方法要求具体ChannelHandler实现了LifeCycleAwareChannelHandler接口。实现了该接口的Handler能够在自己被添加或移除的时候被通知到,亦即调用相应的方法。
DefaultChannelPipeline几乎所有的方法都是Synchronized,它是一个线程安全的类。

6. ChannelEvent

在Pipeline中流转的事件分为两类,Upstream表示对Channel通知底层IO状态,Downstream表示Channel向底层提交IO请求。在Netty中ChannelEvent的产品体系如下图所示:
Alt text

以下是Netty运行过程中会触发的事件:
Alt text

Upstream事件名称均表示状态,比如closed、received之类,Downstream事件名都是动词,表示IO请求。其中两类典型的事件:

表示读写事件的MessageEvent
messageReceived:当Netty内部IO线程在某个Channel上读取到了数据,便将其包装成ChannelBuffer对象并发起一个UpstreamMessageEvent事件进入Pipeline向上流转,这是一个典型的读取数据过程。

write:写请求的提交是个逆向的过程,Channel#write(Object)最终将要写入的数据包裹成一个DownstreamMessageEvent事件,提交到Pipeline中,最终进入Sink。

Downstream事件中没有read的事件,因为在Netty中,读Channel的请求不是由用户主动发起的,而是由内部IO线程不停地读,一旦有数据再告知用户程序。

表示Channel状态改变的ChannelStateEvent
用户调用Channel#bound/connect等方法时,实际上是触发相应的Downstream事件,Sink接到事件处理完毕后再触发Upstream的事件通知上层IO操作的完成情况。

Downstream是没有Open事件的,因为一个Channel在生产出来的时候默认就是打开状态,相应的具体实现应该在其构造器中实现打开Channel这个行为,通常还会触发ChannelOpen的Upstream事件。

Netty都是通过Channels这个工具类中的fireXXXEvent静态方法触发事件的,Channels将所有事件类型的触发汇总到一起,整个工程里无论何时何地都可以调用这些方法在指定的 Channel上触发事件流转。

ChannelEvent接口只有两个方法:
Alt text

  • getChannel()返回该Event关联的Channel;
  • getFuture()返回该Event关联的ChannelFuture对象。每个ChannelEvent在触发的时候都会为其绑定一个ChannelFuture,表示这个事件的执行状态。如果事件是Upstream的,那么返回的总会是SucceededChannelFuture实例表示执行成功,因为Upstream事件触发的时候已经是完成某种IO操作或已经到达某种状态了。反之,如果是Downstream的(即IO请求),则返回的ChannelFuture会被Netty内部随着实际的IO进度而不时的更新状态,即在IO请求执行成功或失败的时候得到“通知“。

7. ChannelSink

ChannelSink被挂在ChannelPipeline的末端,处理所有从Handler链流入的Downstream事件,即对IO请求(如bind、connect、write)做汇总处理的地方(但不一定在这里做最终的IO操作),也会在处理完后触发Upstream事件。一般在实现中ChannelSink在ChannelFactory内被创建,且只有一个实例,这意味着该ChannelFactory创建的N个Channel可能使用N个不同的Pipeline实例,但所有的Pipeline使用一个共同的Sink。

ChannelSink由底层使用的transport provider提供,它的类图和继承体系如下:

Alt text

  1. eventSunk(pipeline, event):处理Downstream事件;
  2. exceptionCaught(pipeline, event, channelPipelineException):当Pipeline中抛出异常时会调用该方法进行处理

Alt text

ChannelSink被分为4个产品族,根据使用场景分为:

TCP Client端

TCP Server端

注意: Server端存在两种Channel:Listening和Accepted。二者使用的Sink并不是分开的,而是共用一个Server-side ChannelSink,Sink里同时存在处理Listening Channel和Accepted Channel的逻辑。

UDP协议

其实仔细对照下ChannelFactory体系可以发现,这二者的结构几乎是一一对应的,除了Sink少了几个抽象类。

8. ChannelBuffer

Alt text

ChannelBuffer是Netty自己实现的一套类似Java NIO ByteBuffer系统,它是byte[]数组或ByteBuffer的封装,主要的实现类有3个:

ByteBufferBackedChannelBuffer

直接封装了NIO的ByteBuffer系统,具体是Direct Buffer还是Non-direct Buffer要看实例化时传入的ByteBuffer类型;

HeapChannelBuffer

内部就是个byte数组,这里的Heap就是JVM堆的意思。这个实现其实跟NIO的Non-direct ByteBuffer类似,内部都是JVM堆上的一个byte[]。根据网络字节续的不同,HeapChannelBuffer又分为BigEndianHeapChannelBuffer和 LittleEndianHeapChannelBuffer,默认使用的是BigEndianHeapChannelBuffer。Netty在读网络数据时默认使用的是HeapChannelBuffer,HeapChannelBuffer是个大小固定的buffer,Netty每次读网络数据时都会根据前面读取时使用的Buffer大小来预测要分配的空间。

DynamicChannelBuffer

可自适应大小的Buffer,它和HeapChannelBuffer的关系很像ArrayList和数组的关系。它内部具体使用什么样的机制实现是由其内部的ChannelBufferFactory决定的,默认是HeapChannelBufferFactory。当容量不够用时会使用该Factory创建一个更大的buffer扩容,并对原数据进行复制。对于在DecodeHandler中的写数据操作,在数据大小未知的情况下通常使用DynamicChannelBuffer。

复合Buffer:CompositeChannelBuffer

它的内部是个ChannelBuffer数组,CompositeChannelBuffer在若干个Buffer之上为上层提供一个统一的接口,上层可以“无缝“地,就像操作一个连续的ChannelBuffer一样使用多个异构Buffer。它的目的主要是减少内存拷贝,提高性能。

WrappedChannelBuffer

该接口的实现类均是ChannelBuffer的包装器,它们的用途是为底层offer提供“另一种视图“。比如用ReadOnlyChannelBuffer把一个Buffer包装一下,原本可读可写的现在就成了只读的了,但是底层使用的Buffer还是原来那个。实际上是为了重用底层buffer,减少数据的拷贝和内存开销。

9. ChannelBufferFactory

ChannelBuffer对应也有一套简单的工厂体系:
Alt text

具体的实现类有DirectChannelBufferFactory和HeapChannelBufferFactory,前者用于创建ByteBufferBackedChannelBuffer,底层使用NIO的DirectBuffer;后者创建HeapChannelBuffer。这里就涉及到一个问题,Direct Buffer和Non-direct Buffer有什么区别?什么情况下应该用哪种呢?

底层操作系统在只能对进程虚拟空间的一片连续区域内进行IO操作,而在JVM内部,一个byte[]反应到JVM进程的虚拟空间中很可能不是连续的,并且GC可以随意地将其移动。因为数组在JVM中是一个对象,而对象是怎样存储的依赖于具体的JVM实现。DirectBuffer是分配在JVM进程虚拟空间(实际上是native堆)的非堆(JVM Heap)区域的一块连续存储空间,OS内核可以直接使用它作为底层IO操作的输入输出而不需要额外的内存拷贝动作。此外,DirectBuffer不受GC影响,不会在高负载情况下受到频繁的GC对应用线程造成的中断影响。但是它相对堆上的对象而言,创建和销毁的代价都远远高于后者。

如果使用Non-direct buffer,则每次IO的过程,都会使用一个被缓存的小的DirectBuffer对象,先将数据拷贝到后者中,然后再进行实际的IO — 发生了数据拷贝,这正是在网络编程中所要极力避免的。因此,在频繁IO且对性能有要求的场景,我们可以使用DirectBuffer,并在代码中缓存它以避免创建和销毁的开销。这也是对创建和销毁都非常耗资源的“重型对象”的常见处理方式,如线程池、数据库连接池等等。

回到Netty,DirectChannelBufferFactory作为DirectBuffer的生产工厂,会在第一次请求Buffer的时候一次性分配一块较大的ByteBufferBackedChannelBuffer(默认1M),以后每次请求都在它上面slice一块出来返回,当这块“蛋糕”被分完了,则再生产一块“大蛋糕”供使用,这样有效降低了生产DirectBuffer的次数。相比之下,HeapChannelBufferFactory的实现就没这么多花招了,只是简单地调用ChannelBuffers.buffer或ChannelBuffers.wrappedBuffer工具方法,最终创建一个HeapChannelBuffer返回。

对于开发者创建 ChannelBuffer,可使用实用类ChannelBuffers中的工厂方法。

10. Reactor模式

终于介绍完了Netty的上层API及它们的产品体系,现在是时候深入到它的内部探索它的代码实现了。虽然Netty的实现有3种transport service,但我们最常用的是NIO实现方式进行TCP协议之上的编程,因此对代码的分析会主要集中在NIO Server端这一块。

Netty使用了经典的Reactor模式,在开始分析代码之前先来复习下理论知识。

使用Old IO库,阻塞IO方式进行网络服务器开发,最简单的模型就是 1 thread per socket,即一个主线程负责Accept连接,一直处于阻塞状态;每收到一个连接即为其分配一个线程进行处理,如下图所示:
Alt text

这种模式的缺点在于:

  1. 每个连接一个线程,当并发用户较多时线程数量急剧增长,大大增加系统对线程的管理开销,这将成为主要瓶颈;
  2. 网络是不稳定的,此外,如果是长连接,客户端很多时候是沉默的,连接上并不是一直都有数据流动,大部分时候是空闲的。但如果使用阻塞IO和上述线程模型,即使某个连接没有数据可读,该连接占用的线程也会因此而阻塞。有可能在同一时刻存在大量IO阻塞的线程,浪费资源。

假如在读写网络的时候,socket不满足条件就返回,不用阻塞着傻傻地等待结果,那么该连接占用的线程就可以去为其他连接服务了。但过段时间socket满足IO条件了(比如有数据过来或者可以写),我们怎么得到通知呢。假想下如果有一个线程专门负责轮询所有的连接监控他们的状态,一旦发现有可读数据或可写空间,就立刻交给某个线程去进行实际的IO操作和业务处理,后者读写直到没有可读数据或无法继续写入,如果还没有完成IO操作,则继续将该连接交还给监控线程,等待下一次状态满足被处理。

这正是NIO带给我们的最大价值:IO多路复用和事件模型、非阻塞IO。NIO的Selector可以监控N个连接的状态,相当于我们设想中的监控线程。我们把连接注册到Selector上时会声明感兴趣的状态,Selector在一次select的过程中发现某些连接正处在它被声明的状态,则把他们挑选出来并将IO事件分发给用户处理。用户程序使用非阻塞IO对其进行“最大努力的”读写,若本次处理没有完成,则继续交由Selector监控等待下次处理。这种由“主动阻塞等待”改为“被动通知非阻塞处理”的方式有效规避了前一种模型中IO阻塞导致大量线程浪费的问题,工作线程每次处理连接都能快速返回,我们可以最大化的利用线程资源,用少的多的线程应对相同规模的连接数。

Reactor模式中,原来的read-decode-compute…处理序列被打碎成N个独立的Handler, Reactor负责多路分离套接字(监控连接状态)并将IO事件分发给相应的Handler,后者进行非阻塞的IO读写,或进行编码解码业务处理。这种模型虽然占用资源少,但也有缺点:

  1. 事件分发的过程可能很慢,要根据连接和事件找到正确的Handler。
  2. 增加了代码编写的难度,处理逻辑被强制要求拆分成一个个的Handler,并且由于Handler可能不能一次性做完全部IO工作,必须保存处理状态等待下次被通知。

最简单的Reactor模型如下所示:
Alt text

单线程单Reactor,只有一个Selector同时负责监控Server Socket和Accepted Socket,并将IO事件派发给Acceptor和处理IO的Handler,Acceptor接受连接请求创建Accepted Socket并注册到Reactor。Reactor分发事件和执行Handler的过程是串行的,无法充分利用多核资源,这种模型实际用的不多。

第二种Reactor模型在Handler的处理中加入了线程池,编码、业务计算、解码等非IO操作的过程被创建成一个个任务提交到线程池中:
Alt text

第三种模型将Reactor分成两个部分,MainReactor通常只有一个,Listening Socket注册在它上面,专门监听连接请求,并将创建的Accepted Socket注册到SubReactor上,SubReactor可以有多个,负责处理已连接的socket,其中业务处理的部分仍然交给线程池进行,如下图所示:
Alt text

Netty的NIO实现采用的模型是第三种的变种,业务部分Handler链并没有使用连接池,而是类似第一种模型中采用串行处理。注意,SubReactor“分派事件 – Handler处理 – 分派下一次事件”这个处理过程也是串行的,只有几个Reactor是并行的,所以在 Netty3 中,一定不要在 handler 中做耗时或者阻塞的动作,否则 IO 线程会被 block,后续连接得不到响应。会引起阻塞的动作应放在单独的线程中异步去做

11. Netty的NIO Server端实现

首先我们来看看Server端Listening Channel的创建和MainReactor的启动,通常我们使用ServerBootstrap启动服务器:

// Configure the server.
        ServerBootstrap bootstrap = new ServerBootstrap(
                new NioServerSocketChannelFactory(
                        Executors.newCachedThreadPool(),
                        Executors.newCachedThreadPool()));

        // Set up the pipeline factory.
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline(new EchoServerHandler());
            }
        });

        // Bind and start to accept incoming connections.
        bootstrap.bind(new InetSocketAddress(8080));

第一句为ServerBootstrap指定了ChannelFactory,后续会用这个ChannelFactory在Server端创建一个类型为NioServerSocketChannel的Listening Channel。

第二步指定了PipelineFactory,所有Accepted Channel在创建时会使用该工厂创建一个Pipeline实例用于响应事件。生产Pipeline时先使用Channels.pipeline静态工具方法创建一个DefaultChannelPipeline实例,并往里添加一个我们自己实现的EchoServerHandler。

最后一句将bootstrap绑定到一个端口,整个过程就完成了,就这么简单~!

当然,大量的细节被隐藏在内部了。先看Channel工厂:

private final WorkerPool<NioWorker> workerPool;
private final BossPool<NioServerBoss> bossPool;
private final NioServerSocketPipelineSink sink;
  1. NioServerSocketChannelFactory维护所有的Boss/Worker,即IO线程。

    构造器的两个参数指定Boss/Worker使用的线程池:

    • BossExecutor:执行MainReactor的线程池,虽然大部分情况下只有一个Main Reactor,即线程池中只有一个线程(通常称之为Boss Thread),但仍然通过一个线程池进行管理,这是因为如果同一VM中存在多个Netty服务器端程序,可以把它们的Main Reactor使用的线程都集中起来到一个线程池中进行管理;
    • WorkerExecutor:执行SubReactor的线程池,Netty中的SubReactor是NioWorker类。
    • workerCount:SubReactor的个数。默认是CPU核数 * 2
  2. 其次,NioServerSocketChannelFactory维护一个NioServerSocketPipelineSink,这个Sink将会处理Listening Socket和所有Accepted Socket的Downstream事件。

在bootstrap.bind内部,先手动构造一个Pipeline,它是为ServerSocketChannel服务的,因此也称为boss pipeline。这个管道中只有一个Handler—Binder,Binder是一个内部类,负责Server Channel绑定到某个端口的操作。

接下来通过factory创建NioServerSocketChannel ,并装配boss pipeline(以及factory内部的sink):

      ChannelHandler binder = new Binder(localAddress, futureQueue);
        ChannelHandler parentHandler = getParentHandler();

        ChannelPipeline bossPipeline = pipeline();
        bossPipeline.addLast("binder", binder);
        if (parentHandler != null) {
            bossPipeline.addLast("userHandler", parentHandler);
        }

        Channel channel = getFactory().newChannel(bossPipeline);

在newChannel方法中,除了打开一个ServerSocketChannel,还在该channel上触发一个ChannelOpen事件( 这印证了之前提到的,NioServerSocketChannel是基于ServerSocketChannel实现的,并且一个Channel被创建出来时就是Open的),Channels.fireChannelOpen方法创建一个UpstreamChannelStateEvent事件,进入boss pipeline流转:

  public static void fireChannelOpen(Channel channel) {
        // Notify the parent handler.
        if (channel.getParent() != null) {
            fireChildChannelStateChanged(channel.getParent(), channel);
        }

        channel.getPipeline().sendUpstream(
                new UpstreamChannelStateEvent(
                        channel, ChannelState.OPEN, Boolean.TRUE));
  }

Boss pipeline 的 Binder负责调用Channel的bind方法进行绑定:

evt.getChannel().bind(localAddress)

正如前面提到的,Channel的IO方法基本上都是通过Channels的工具方法触发一个Downstream的事件,是一种“提交请求”的性质,Channels.bind方法也是如此:

public static ChannelFuture bind(Channel channel, SocketAddress localAddress) {
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        ChannelFuture future = future(channel);
        channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(
                channel, future, ChannelState.BOUND, localAddress));
        return future;
  }

该方法创建了一个DownStream的bind事件传入boss pipeline,并为其创建了一个ChannelFuture对象当做bind请求执行状态的容器。

Boss pipeline中并没有Handler处理Downstream事件(Binder是个UpstreamHandler),于是事件通过Pipeline“沉入”Sink,还记得在NioServerSocketChannelFactory的那个Sink么,对,就是它。在它的eventSunk方法中根据Channel的类型走两条不同的逻辑,分别对应Server端的两种Channel – Listening Channel和Accepted Channel:

  public void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception {
        Channel channel = e.getChannel();
        if (channel instanceof NioServerSocketChannel) {
            handleServerSocket(e);
        } else if (channel instanceof NioSocketChannel) {
            handleAcceptedSocket(e);
        }
  }

handleServerSocket方法中只会处理上层提交过来的close、bind、unbind请求:

  switch (state) {
        case OPEN:
            if (Boolean.FALSE.equals(value)) {
                close(channel, future);
            }
            break;
        case BOUND:
            if (value != null) {
                bind(channel, future, (SocketAddress) value);
            } else {
                close(channel, future);
            }
            break;
        }

噢bind方法,终于到了一个干实事儿的地方了,在bind里将N久之前就实例化好了的ServerSocketChannel绑定到指定端口上,更新ChannelEvent的ChannelFuture对象,并最终启动MainReactor不停Accept连接,流程如下:

//NIO实际ServerSocketChannel.bind操作
channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
bound = true;
future.setSuccess();    //更新future
fireChannelBound(channel, channel.getLocalAddress()); //触发一个Upstream的事件,通知Pipeline已经绑定成功
//启动MainReactor
Executor bossExecutor = ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
DeadLockProofWorker.start(bossExecutor,new ThreadRenamingRunnable(new Boss(channel),"New I/O server boss #" + id + " (" + channel + ')'));

Netty的MainReactor是Boss类,这是NioServerSocketPipelineSink的一个内部类,它实现了Runnable接口。启动MainReactor,其实就是用ServerSocketChannel实例化一个Boss任务(一般来说一个Netty服务器端程序也只会有这一个Boss任务,这意味着BossExecutor中大部分情况下只有一个Boss Thread)并提交到NioServerSocketChannelFactory中的BossExecutor线程池中执行。

Boss类在实例化时立刻创建一个Selector并将ServerSocketChannel注册上去,监听其OP_ACCEPT事件。它的run方法是一个死循环,只做一件事,那就是不停地获取连接,创建Accepted Channel并将其注册到subReactor,即NioWorker上。

try {
    for (;;) {
        try {
            if (selector.select(1000) > 0) {
                selector.selectedKeys().clear();
            }

            SocketChannel acceptedSocket = channel.socket.accept();
            if (acceptedSocket != null) {
                registerAcceptedChannel(acceptedSocket, currentThread);
            }
        } catch (Throwable e) {
            logger.warn(
                    "Failed to accept a connection.", e);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e1) {
                // Ignore
            }
        }
    }
} finally {
    channel.shutdownLock.unlock();
    closeSelector();
}

创建并注册Accepted Channel的过程是Boss#registerAcceptedChannel方法负责的:

ChannelPipeline pipeline =  channel.getConfig().getPipelineFactory().getPipeline();
NioWorker worker = nextWorker();
worker.register(new NioAcceptedSocketChannel(
  channel.getFactory(),
 pipeline,
 channel,
 NioServerSocketPipelineSink.this,
 acceptedSocket,
 worker, currentThread), null);

创建NioAcceptedSocketChannel使用的Pipeline是通过ServerBootstrap指定的PipelineFactory生产的,里面被填充了我们提供了自定义Handler。通过nextWorker方法决定该Channel要被提交到哪儿去,这个方法简单地轮流使用所有NioWorker。注意,NioAcceptedSocketChannel并没有用个工厂来创建,而是直接new的,这也印证了之前提到的,Server端的Accepted Channel是由ServerChannel间接地通过new生成。

小结:

  1. ChannelFactory除了负责Channel生产,还负责维护Boss和Worker – 职责太多,4中将这二者分开了;
  2. Sink负责汇总处理DownStream事件:
    • 对于write,判断是否当前是否IO线程,是则立即执行,否则进入NioWorker的队列排队。这是为了保证最终的write动作只由IO线程做;
    • 对于bind/close等,则直接操作socket进行IO动作。
  3. 在 NioServerSocketChannle 的 boss pipeline 中触发的两次事件完成了bootstrap:
    • open的UpStream事件,表明Server Channel已被创建;
    • boss pipeline 中的 Binder 接收倒 open事件后,触发一个DownStream的 bind 事件;
    • Sink进行实际的bind动作,启动Boss,Boss进入event loop。至此,整个流程完毕。

a. Sink为什么存在?
Downstream事件本质是对底层transport service的请求,必须要有个地方受理这些请求,Sink的作用就在此。对不同的Downstream事件,Sink可能直接处理,也可能提交给Worker线程。

b. Sink为什么会提交write请求到Worker而不立即执行?其他的如bind/close请求为何直接执行?
这涉及到netty的线程模型。事件发起后在pipeline中的流转是立即串行执行的,且任意线程都有可能随时在channel上发起事件,因此handler/sink可能运行在User Thread中,也可能在IO thread中。Write动作必须由IO thread执行以防止线程中断导致socket关闭的问题;其他动作在User Thread中做也没关系。

这样的设定给handler和sink带来了并发问题,netty4中改成了所有outbound event(而不仅是实际的write IO动作)都必须由IO thread执行,所有在user threaed中fire的事件连同其pipeline,全部得先进入IO thread的队列,稍后进行实际的事件流转。这就保证了handler/sink始终运行在某一个IO thread中,不用再担心并发的问题;Sink也可以省略了。

说完了MainReactor,我们接着来看看SubReactor。SubReactor负责所有AcceptedChannel的通信,它是延迟启动的,MainReactor调用NioWorker#register方法注册一个Channel时,先检查自己有没有启动,如果没有,则开启自己的selector,并把自己提交到workerExecutor线程池中执行(还记得NioServerSocketChannelFactory中的workerExecutor么?),最后生成一个RegisterTask即该Accepted Channel的注册任务,添加到NioWorker内部的registerTaskQueue(代码较长这里就不贴了)。

为什么不立刻使用channel.register(selector, ops, attachment)方法直接注册到Selector呢?网上查资料发现是处于效率问题的考虑,这种方式至少要加两次锁,其中Selector内部的key集合竞争非常激烈。但是在Netty中注册的动作只由MainReactor单线程进行,应该没有并发的场景才对。Anyway,Netty采取的方式就是使用一个缓冲任务队列,每次注册先进队列,由workerThread执行循环时会首先处理该队列,执行真正的注册动作。

NioWorker和Boss一样,都实现了Runnable接口被线程池执行,如果我们使用一个没有上限的线程池如Executors.newCachedThreadPool(),则每个NioWorker都会对应一个worker Thread。NioWorker#run()像Boss#run()一样,也是死循环。每次循环中首先在Selector select一下,然后依次执行内部的注册任务队列(registerTaskQueue)、写任务队列(writeTaskQueue,关于这个队列后续还会说明)、处理selectedKeys。最后,如果Selector上没有注册的连接了,就把自己关闭,从workerExecutor的任务队列中出队,释放自己占用的线程资源。这一部分的关键代码如下(有删减):

for (;;) {
    wakenUp.set(false);
    SelectorUtil.select(selector);
    if (wakenUp.get()) {
        selector.wakeup();
    }

    cancelledKeys = 0;
    processRegisterTaskQueue();
    processWriteTaskQueue();
    processSelectedKeys(selector.selectedKeys());

    // Exit the loop when there's nothing to handle.
    // The shutdown flag is used to delay the shutdown of this
    // loop to avoid excessive Selector creation when
    // connections are registered in a one-by-one manner instead of
    // concurrent manner.
    if (selector.keys().isEmpty()) {
        if (shutdown || executor instanceof ExecutorService && ((ExecutorService) executor).isShutdown()) {

            synchronized (startStopLock) {
                if (registerTaskQueue.isEmpty() && selector.keys().isEmpty()) {
                    started = false;
                    try {
                        selector.close();
                    } catch (IOException e) {
                        logger.warn("Failed to close a selector.", e);
                    } finally {
                        this.selector = null;
                    }
                    break;
                } else {
                    shutdown = false;
                }
            }
        } else {
            // Give one more second.
            shutdown = true;
        }
    } else {
        shutdown = false;
    }
}

我们先来看对select出来的keys是怎么处理的,这个逻辑在processSelectedKeys方法中,该方法循环selectedKeys,看他们是哪种类型,如果是read就绪,则调用read(SelectionKey)方法进行IO读,如果是write就绪,则走writeFromSelectorLoop(SelectionKey)进而调用内部的write0(NioSocketChannel)方法。

服务器端的逻辑基本上都是从读开始的,读的过程是由Netty内部发起的,先来看看read方法。它的基本流程是:

  1. 使用该Channel配置的读缓存预测器预测此次需要分配空间大小,向内部的recvBufferPool申请一块ByteBuffer;
  2. 由SocketChannel的内核缓冲区向Buffer中读数据,直到数据读取完毕或Buffer满;
  3. 使用该Channel配置的ChannelBufferFactory创建一块同样大小的ChannelBuffer,将Buffer内容拷贝过去—这里出现了一次内存拷贝操作;
  4. 向recvBufferPool归还申请的ByteBuffer,向读缓存预测器反馈此次读取数据大小;
  5. 触发一个Upstream的MessageReceived事件,将ChannelBuffer传递出去,逻辑由此进入Pipeline。

这里用到了一个有意思的数据结构SocketReceiveBufferPool,这是一个小的DirectBuffer缓存池,避免频繁的创建和销毁DirectBuffer,用于SocketChannel的IO读操作,每个NioWorker都有一个。它的内部是一个用软引用对象包裹起来的ByteBuffer数组,内存不够时池内的buffer可以被GC。

其基本原理为:

  1. 请求指定大小N的buffer
    遍历内部数组,找到第一个容量大于等于N的buffer返回。若数组为空或没有满足条件的buffer,则调用ByteBuffer.allocateDirect(int)分配大小为N的buffer返回。
  2. 归还buffer
    遍历数组,找到第一个空位,插入。若数组已满,找到第一个比它小的buffer,将其替换。

读的过程似乎乏善可陈,那么写呢?当调用Channel#write时会发生什么呢?

看到这里应该大家都心领神会了,肯定又是一个携带ChannelFuture对象以及ChannelBuffer的Downstream的事件。没错,依然是向pipeline发送一个DownstreamMessageEvent,如果在半路上没有其他Handler拦截了,最终会到Sink中被处理。

Sink#handleAcceptedSocket方法中,对IO写请求的处理逻辑如下:

    MessageEvent event = (MessageEvent) e;
    NioSocketChannel channel = (NioSocketChannel) event.getChannel();
    boolean offered = channel.writeBuffer.offer(event);
    assert offered;
    channel.worker.writeFromUserCode(channel);
首先把该写事件放入Channel内部的写请求缓冲队列--writeBuffer中,然后调用NioWorker# writeFromUserCode方法,后者的实现也很简单:
  if (!channel.isConnected()) {
    cleanUpWriteBuffer(channel);
    return;
}

if (scheduleWriteIfNecessary(channel)) {
    return;
}

// From here, we are sure Thread.currentThread() == workerThread.

if (channel.writeSuspended) {
    return;
}

if (channel.inWriteNowLoop) {
    return;
}

write0(channel);

这里出现了一个奇怪的方法scheduleWriteIfNecessary,该方法的会先判断当前线程是不是WorkerThread,如果不是,就把该Channel内部的一个writeTask添加到NioWorker的writeTaskQueue中并返回true,接下来就不会走write0方法而是直接返回了。WriteTask是NioSocketChannel的一个内部类,它的主要作用就是flush Channel的整个写缓冲队列(writeBuffer)。在NioWorker#run()中,下一次循环会首先执行writeTaskQueue队列中的所有任务,这个时候才会发生真正的IO写操作。

这种很奇怪的方式是为了保证,Netty中所有实际的IO写操作都发生在WorkerThread中,这是因为:

nio的socket channel都实现了java.nio.channels.InterruptibleChannel接口。这意味着:
1、socketChannel可以被异步被关闭,在它上阻塞的线程会被唤醒并抛出AsynchronousCloseException异常;
2、如果一个在该channel上阻塞的线程被thread#interrupt()方法中断,channel也会被关闭,阻塞线程会被唤醒并抛出ClosedByInterruptException异常。并且,如果在调用 channel的IO方法之前,线程已经设置了中断状态,同样会引起channel关闭和抛出ClosedByInterruptException。

因此,在用户线程中进行IO操作,有可能因为线程终端而引起socket关闭,基于此,netty保证所有的IO操作均在workerThread中完成,代价是写操作不是实时的,可能会有一定时间的延迟。

write0(Channel)方法是真正对Socket进行写操作的地方,同样,用于IO写操作的ByteBuffer也有一个池缓存着—SocketSendBufferPool。write0循环channel内部的写缓存队列,对每个写事件“尽可能地”写入到Socket中,并同时更新该Event对应的Future对象。如果本次没有写完,则继续监听该Channel的OP_WRITE事件,下次唤醒接着继续写入,否则取消OP_WRITE监听。

我们平常所说的“对socket的读写”其实最终是和底层伯克利套接字的内核缓冲区打交道,读是从该内核缓冲区中读,写是往该缓冲区中写,这之下的具体网络传输对上层应用程序而言完全是透明和异步的,假设接收方的接收速率慢于Server端的发送速率,最终会导致Client端Socket的内核缓冲区满,进而导致Server端的内核缓冲区满,Server端无法继续发送数据。在NIO编程中必须要小心处理OP_WRITE事件,因为只要内核缓冲区有空间就会触发该事件,一般的作法是在事件处理中首先将该事件监听取消,如有需要最后再重新监听。

在写数据时也有一个小优化,在高负载情况经常出现明明触发OP_WRITE事件但实际写入0字节的情况,Netty采用了一种类似“自旋锁”的策略,少量循环测试若干次,如果测试成功就开始写数据,否则才放弃,这段代码如下:

long localWrittenBytes = 0;
/*
 * 将SenBuffer写入channel。
 * 这里用了一个小优化,类似并发编程中的自旋锁,当发现内核写缓存满,即write返回
   * 0时会少量循环尝试写入若干次(默认16),直到某次写入成功。
 */
for (int i = writeSpinCount; i > 0; i--) {
    localWrittenBytes = buf.transferTo(ch);
    if (localWrittenBytes != 0) {
        writtenBytes += localWrittenBytes;
        break;
    }
    if (buf.finished()) {
        break;
    }
}

线程模型 的参考资料:

Github上的WIKI:Thread Model

To put simply, for a channel:

  1. Regardless of its transport and type, its all upstream (i.e. inbound) events must be fired from the thread that performs I/O for the channel (i.e. I/O thread).

  2. All downstream (i.e. outbound) events can be triggered from any thread including the I/O thread and non-I/O threads. However, any upstream events triggered as a side effect of the downstream event must be fired from the I/O thread. (e.g. If Channel.close() triggers channelDisconnected, channelUnbound, and channelClosed, they must be fired by the I/O thread.

Current problems (UGLY - causes a race condition in an upstream handler, BAD - does not cause a race condition but violates the expected thread model):

  1. UGLY: The upstream events triggered as a side effect of a downstream event is triggered by the caller thread,
  1. UGLY: The local transport always uses a caller thread to trigger an event.
  2. BAD: channelOpen is triggered by the thread that called ChannelFactory.newChannel(), which is not an I/O thread. It’s kind of bad but otherwise its not possible to limit the concurrent active channels by closing the channel here. If we would do this in the IO-Thread it would not be that efficient.
  3. BAD: Client-side channels are run by two I/O threads. One that makes a connection attempts and the other that does actual I/O.

Netty architecture - questions about NioWorker loop again

There are two questions in this post.

The first question is: is it possible handler queues too long and cannot do process SelectKeys() in time?

Yes. However, it doesn’t seem to happen unless your handler implementation abuses intentionally.

The second question is: why is write operation always performed in the I/O loop thread?

Otherwise 1) you will see a lot of contention between writer threads if you write from different threads, 2) you will see various socket exceptions due to possible race conditions (connection reset, etc), and 3) Netty internal will become more complicated to deal with such conditions.

Please note that the thread model became more strict and event loop implementation became much simpler in Netty 4, so you might want to take a look in there, too.

Netty requires sending in the same thread as select() which delays sends
Is it possible for Netty to create a worker thread that does all the sends for a group of sockets? It appears that currently, netty posts outbound messages to a queue and attempts to wake up the selector which then copies the data into an unused buffer and sends it. This takes time. Is it possible to send directly from a different thread?

It is not possible. Netty has no idea about from which thread a user will call write(). Therefore, it needs a write request queue and a dedicated loop to perform writes. Otherwise the application will suffer from contention depending on how a user wrote his/her application. Under load, having a dedicated I/O loop and running a protocol with pipelining seem to yield higher throughput.

Loading Disqus comments...
目录