Netty使用初步
Posted in framework, java on 六月 19th, 2010 by kafka0102
1、简介
Java1.4提供了NIO使开发者可以使用Java编写高性能的服务端程序,但使用原生的NIO API就像Linux C中网络编程一样,还是需要做IO处理、协议处理等低层次工作。所以,就像C服务端程序大量使用libevent作为网络应用框架一样,Java社区也不断涌现出基于NIO的网络应用框架。在这其中,Jboss出品的Netty就是个中翘楚。Netty是个异步的事件驱动网络应用框架,具有高性能、高扩展性等特性。Netty提供了统一的底层协议接口,使得开发者从底层的网络协议(比如TCP/IP、UDP)中解脱出来。就使用来说,开发者只要参考 Netty提供的若干例子和它的指南文档,就可以放手开发基于Netty的服务端程序了。
在Java社区,最知名的开源Java NIO框架要属Mina和Netty,而且两者渊源颇多,对两者的比较自然不少。实际上,Netty的作者原来就是Mina作者之一,所以可以想到,Netty和Mina在设计理念上会有很多共同点。我对Mina没什么研究,但其作者介绍,Netty的设计对开发者有更友好的扩展性,并且性能方面要优于Mina,而Netty完善的文档也很吸引人。所以,如果你在寻找Java NIO框架,Netty是个很不错的选择。本文的内容就是围绕一个demo介绍使用Netty的点点滴滴。
2、服务端程序
2.1、ChannelHandler
服务端程序通常的处理过程是:解码请求数据、业务逻辑处理、编码响应。从框架角度来说,可以提供3个接口来控制并调度该处理过程;从更通用的角度来说,并不特化处理其中的每一步,而把每一步当做过滤器链中的一环,这也是Netty的做法。Netty对请求处理过程实现了过滤器链模式(ChannelPipeline),每个过滤器实现了ChannelHandler接口。Netty中有两种请求事件流类型也做了细分:
1)downstream event:其对应的ChannelHandler子接口是ChannelDownstreamHandler。downstream event是说从头到尾执行ChannelPipeline中的ChannelDownstreamHandler,这一过程相当于向外发送数据的过程。 downstream event有:”write”、”bind”、”unbind”、 “connect”、 “disconnect”、”close”。
2)upstream event:其对应的ChannelHandler子接口是ChannelUpstreamHandler。upstream event处理的事件方向和downstream event相反,这一过程相当于接收处理外来请求的过程。upstream event有:”messageReceived”、 “exceptionCaught”、”channelOpen”、”channelClosed”、 “channelBound”、”channelUnbound”、 “channelConnected”、”writeComplete”、”channelDisconnected”、”channelInterestChanged”。
Netty中有个注释@interface ChannelPipelineCoverage,它表示被注释的ChannelHandler是否能添加到多个ChannelPipeline中,其可选的值是”all”和”one”。”all”表示ChannelHandler是无状态的,可被多个ChannelPipeline共享,而”one”表示ChannelHandler只作用于单个ChannelPipeline中。但ChannelPipelineCoverage只是个注释而已,并没有实际的检查作用。对于ChannelHandler是”all”还是”one”,还是根据逻辑需要而定。比如,像解码请求handler,因为可能解码的数据不完整,需要等待下一次读事件来了之后再继续解析,所以解码请求handler就需要是”one”的(否则多个Channel共享数据就乱了)。而像业务逻辑处理hanlder通常是”all”的。
下面以一个简单的例子说明如何编写“解码请求数据、业务逻辑处理、编码响应”这一过程中涉及的ChannelHandler。该例子实现的协议格式很简单,请求和响应流中头4个字节表示后面跟的内容长度,根据该长度可得到内容体。
首先看下解码器的实现:
public class MessageDecoder extends FrameDecoder { @Override protected Object decode( ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { if (buffer.readableBytes() < 4) { return null;//(1) } int dataLength = buffer.getInt(buffer.readerIndex()); if (buffer.readableBytes() < dataLength + 4) { return null;//(2) } buffer.skipBytes(4);//(3) byte[] decoded = new byte[dataLength]; buffer.readBytes(decoded); String msg = new String(decoded);//(4) return msg; } }
MessageDecoder继承自FrameDecoder,FrameDecoder是Netty codec包中的辅助类,它是个ChannelUpstreamHandler,decode方法是FrameDecoder子类需要实现的。在上面的代码中,有:
(1)检查ChannelBuffer中的字节数,如果ChannelBuffer可读的字节数少于4,则返回null等待下次读事件。
(2)继续检查ChannelBuffer中的字节数,如果ChannelBuffer可读的字节数少于dataLength + 4,则返回null等待下次读事件。
(3)越过dataLength的字节。
(4)构造解码的字符串返回。
@ChannelPipelineCoverage("all") public class MessageServerHandler extends SimpleChannelUpstreamHandler { private static final Logger logger = Logger.getLogger( MessageServerHandler.class.getName()); @Override public void messageReceived( ChannelHandlerContext ctx, MessageEvent e) { if (!(e.getMessage() instanceof String)) { return;//(1) } String msg = (String) e.getMessage(); System.err.println("got msg:"+msg); e.getChannel().write(msg);//(2) } @Override public void exceptionCaught( ChannelHandlerContext ctx, ExceptionEvent e) { logger.log( Level.WARNING, "Unexpected exception from downstream.", e.getCause()); e.getChannel().close(); } }
MessageServerHandler是服务端业务处理handler,其继承自SimpleChannelUpstreamHandler,并主要实现messageReceived事件。关于该类,有如下注解:
(1)该upstream事件流中,首先经过MessageDecoder,其会将decode返回的解码后的数据构造成 MessageEvent.getMessage(),所以在handler上下文关系中,MessageEvent.getMessage()并不一定都返回ChannelBuffer类型的数据。
(2)MessageServerHandler只是简单的将得到的msg再写回给客户端。e.getChannel().write(msg);操作将触发DownstreamMessageEvent事件,也就是调用下面的MessageEncoder将编码的数据返回给客户端。
@ChannelPipelineCoverage("all") public class MessageEncoder extends OneToOneEncoder { @Override protected Object encode( ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { if (!(msg instanceof String)) { return msg;//(1) } String res = (String)msg; byte[] data = res.getBytes(); int dataLength = data.length; ChannelBuffer buf = ChannelBuffers.dynamicBuffer();//(2) buf.writeInt(dataLength); buf.writeBytes(data); return buf;//(3) } }
MessageEncoder是个ChannelDownstreamHandler。对该类的注解如下:
(1)如果编码的msg不是合法类型,就直接返回该msg,之后OneToOneEncoder会调用 ctx.sendDownstream(evt);来调用下一个ChannelDownstreamHandler。对于该例子来说,这种情况是不应该出现的。
(2)开发者创建ChannelBuffer的用武之地就是这儿了,通常使用dynamicBuffer即可,表示得到的ChannelBuffer可动态增加大小。
(3)返回编码后的ChannelBuffer之后,OneToOneEncoder会调用Channels.write将数据写回客户端。
2.2、MessageServerPipelineFactory
创建了3个ChannelHandler,需要将他们注册到ChannelPipeline,而ChannelPipeline又是和Channel对应的(是全局单例还是每个Channel对应一个ChannelPipeline实例依赖于实现)。可以实现ChannelPipeline的工厂接口 ChannelPipelineFactory实现该目的。MessageServerPipelineFactory的代码如下:
public class MessageServerPipelineFactory implements ChannelPipelineFactory { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = pipeline(); pipeline.addLast("decoder", new MessageDecoder()); pipeline.addLast("encoder", new MessageEncoder()); pipeline.addLast("handler", new MessageServerHandler()); return pipeline; } }
2.3、MessageServer
服务端程序就剩下启动代码了,使用Netty的ServerBootstrap三下五除二完成之。
public class MessageServer { public static void main(String[] args) throws Exception { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the default event pipeline. bootstrap.setPipelineFactory(new MessageServerPipelineFactory()); // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(8080)); } }
稍加补充的是,该Server程序并不完整,它没有处理关闭时的资源释放,尽管暴力的来看并不一定需要做这样的善后工作。
3、客户端程序
客户端程序和服务端程序处理模型上是很相似的,这里还是付上代码并作简要说明。
3.1、 ChannelHandler
客户端是先发送数据到服务端(downstream事件流),然后是处理从服务端接收的数据(upstream事件流)。这里有个问题是,怎么把需要发送的数据送到downstream事件流里呢?这就用到了ChannelUpstreamHandler的channelConnected事件了。实现的 MessageClientHandler代码如下:
@ChannelPipelineCoverage("all") public class MessageClientHandler extends SimpleChannelUpstreamHandler { private static final Logger logger = Logger.getLogger( MessageClientHandler.class.getName()); @Override public void channelConnected( ChannelHandlerContext ctx, ChannelStateEvent e) { String message = "hello kafka0102"; e.getChannel().write(message); } @Override public void messageReceived( ChannelHandlerContext ctx, MessageEvent e) { // Send back the received message to the remote peer. System.err.println("messageReceived send message "+e.getMessage()); try { Thread.sleep(1000*3); } catch (Exception ex) { ex.printStackTrace(); } e.getChannel().write(e.getMessage()); } @Override public void exceptionCaught( ChannelHandlerContext ctx, ExceptionEvent e) { // Close the connection when an exception is raised. logger.log( Level.WARNING, "Unexpected exception from downstream.", e.getCause()); e.getChannel().close(); } }
对于编码和解码Handler,复用MessageEncoder和MessageDecoder即可。
3.2、 MessageClientPipelineFactory
public class MessageClientPipelineFactory implements ChannelPipelineFactory { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = pipeline(); pipeline.addLast("decoder", new MessageDecoder()); pipeline.addLast("encoder", new MessageEncoder()); pipeline.addLast("handler", new MessageClientHandler()); return pipeline; } }
3.3、MessageClient
public class MessageClient { public static void main(String[] args) throws Exception { // Parse options. String host = "127.0.0.1"; int port = 8080; // Configure the client. ClientBootstrap bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the event pipeline factory. bootstrap.setPipelineFactory(new MessageClientPipelineFactory()); // Start the connection attempt. ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); // Wait until the connection is closed or the connection attempt fails. future.getChannel().getCloseFuture().awaitUninterruptibly(); // Shut down thread pools to exit. bootstrap.releaseExternalResources(); } }
在写客户端例子时,我想像的代码并不是这样的,对客户端的代码我也没做过多的研究,所以也可能没有找到更好的解决方案。在上面的例子中,bootstrap.connect方法中会触发实际的连接操作,接着触发 MessageClientHandler.channelConnected,使整个过程运转起来。但是,我想要的是一个连接池,并且如何写数据也不应该在channelConnected中,这样对于动态的数据,只能在构造函数中传递需要写的数据了。但到现在,我还不清楚如何将连接池和 ChannelPipeline有效的结合起来。或许,这样的需求可以跨过Netty来实现。
4、总结
关于Netty的初步使用,尚且总结到这里。关于这篇文章,写得断断续续,以至于到后来我都没兴趣把内容都整理出来。当然,这多少也是因为我是先整理 Netty原理方面的东西所致。我也只能卑微的期望,该文对Netty入门者会有些许帮助。
=============================== 华丽的终止符 ================================
相关日志
26 Responses
留下评论
六月 19th, 2010 at 12:21 下午
[...] Netty是JBoss出品的高效的Java NIO开发框架,关于其使用,可参考我的另一篇文章 netty使用初步。本文将主要分析Netty实现方面的东西,由于精力有限,本人并没有对其源码做了极细致的研 究。如果下面的内容有错误或不严谨的地方,也请指正和谅解。对于Netty使用者来说,Netty提供了几个典型的example,并有详尽的API doc和guide doc,本文的一些内容及图示也来自于Netty的文档,特此致谢。 [...]
六月 24th, 2010 at 11:49 下午
[...] Netty是JBoss出品的高效的Java NIO开发框架,关于其使用,可参考我的另一篇文章 netty使用初步。本文将主要分析Netty实现方面的东西,由于精力有限,本人并没有对其源码做了极细致的研 究。如果下面的内容有错误或不严谨的地方,也请指正和谅解。对于Netty使用者来说,Netty提供了几个典型的example,并有详尽的API doc和guide doc,本文的一些内容及图示也来自于Netty的文档,特此致谢。 [...]
七月 13th, 2010 at 12:15 上午
博客写的非常不错,O(∩_∩)O谢谢,飘过!~~~~~~~~
[回复]
七月 13th, 2010 at 12:17 上午
我如果用他来开发的一个web和后天聊天系统怎么样?
有什么注意的问题,我们的一期项目,经常会出现掉线的问题,不知道如何解决!
大虾!
[回复]
kafka0102 回复:
七月 13th, 2010 at 1:39 上午
netty来保持长连接是没有问题的。你说的掉线是在在哪一层次呢?是客户端到web server还是web server到netty?对于聊天室的实现,你使用的是哪种服务器推(拉)方式?这种session的中断可能实现上的偏差造成,可以跟踪连接断开的异常排查。像netty程序,继承SimpleChannelUpstreamHandler的handler有channelDisconnected、channelClosed、exceptionCaught,可以检查是否有这3种异常的事件出现。
[回复]
十月 2nd, 2010 at 12:21 上午
[...] Netty使用初步 [...]
十一月 18th, 2010 at 12:55 上午
关于你说的“并且如何写数据也不应该在channelConnected中,这样对于动态的数据,只能在构造函数中传递需要写的数据了”,其实例子只是展现了一种客户端向服务器发送数据的方式,可能这么写省事吧。其实你的客户端是可以这样写的:
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
Channel channel = future.awaitUninterruptibly().getChannel();
if (future.isSuccess()) {
channel.write(“哈哈,发一次”)
}
下面就是失败的处理以及释放资源
[回复]
kafka0102 回复:
十一月 18th, 2010 at 11:28 上午
@jombowang,
你说的操作Channel是可以的,但它还是太底层了,应该有个更好的封装。当然,netty毕竟是个服务端框架,它没有做客户端连接池的封装也很正常。
[回复]
十一月 27th, 2010 at 12:55 下午
您好,我现在遇到一个问题:
在发送方 channel.write(message) 成功,不过这里message比较大,比如是4096个字节。
在接收方如果使用:
if (buffer.readableBytes() < 4096) {
return null;//(1)
}
有时会发生这样的情况,即 buffer.readableBytes 一直都无法读够4096,比如只能读到 1500 个字节,那么这样的话,本次接受到的消息将无法被解析,返回null。
而我的需求是每次必须读到4096字节才往下运行,有什么方法可以做到吗?
比如设置参数?或者什么其它的变通的方法?或者说这种情况是不可避免的,即 write 成功,接收端receive到的字节数总有可能小于 write 的字节数?
thanks!
[回复]
kafka0102 回复:
十一月 27th, 2010 at 6:38 下午
@leeing,
就你的问题来说,是服务端读的方式不对。如果是自己操作channel buffer读数据,读数据显然不能读一次,而是在一个超时范围内不断的while循环,直到读到完整的数据或者超时关闭连接。如果使用netty,netty的几个decoder已经做了这样的数据,而你只要取回数据就行。
[回复]
十一月 27th, 2010 at 8:20 下午
谢谢您的回复。实际上,我正是不断用while循环来操作的:
发送方:channel.write(message) , message 长度是4096,在这里,我想实现的是文件分块传输的功能,这里的message是指某个文件的一部份,其大小是4096字节。
然后在接收端,我要将这段文件重新组装成一个文件:
while (buf.readableBytes() < 4096) {
buf = (ChannelBuffer) e.getMessage();
// log.debug("buffer size:"+buf.capacity());
}
问题就在于,发送方已经成功地 write,但是接收方运行到这一段语句的时候,如果去掉注释,会发现一个情况,就是buf的大小有可能始终在1500个字节,然后就陷入死循环无法继续读取下去,不断地会打印:buffer size : 1500。
——–
如果发送端和接收端的都在同一台机器上,若message的比较小,比如128个字节的话,是不会出现死循环的情况的;然而,默认情况下,如果message到了1024个字节就已经会陷入死循环。
如果,单机测试时加一段这样的配置代码:
channelconfig.setOption("receiveBufferSizePredictorFactory", new FixedReceiveBufferSizePredictorFactory(8192));
那么单机上就可以实现4096个字节的传输,但注意,5000字节又可能会出现死循环。(所以我也不清楚这句话中的8192究竟起到什么作用)。
不过,最麻烦的地方是,如果发送端和接收端在不同的机器上,即使加了上面的配置代码,也仍然无效,无法传输4096个字节,经常在1500个字节左右就停止了,再也没能新的数据过来。
不知道描述得是否清楚,总之,就是发送方write成功,但接收方可能收不到对方write的所有字节(如果坚持读取,可能陷入死循环),似乎有时会出现数据缺失的现象。单机上配置一下参数,可以暂时地不会出现这个问题,但切换到多机情况下,又不行了。
[回复]
十一月 27th, 2010 at 8:34 下午
所以,我的问题是:这样的情况是否是正常的?即发送方write成功,但接收方无法收到对方发送的完整数据,就算发生这样情况的概率比较小。
如果是这样,那么,根据您前面说的,只能在超时后要求重传吗?
如果不是,那么,应该采取什么样的策略来避免这样的情况发生呢?
启动时,我使用的是:ServerBootstrap,Netty 版本是:3.2.2
[回复]
kafka0102 回复:
十一月 28th, 2010 at 10:48 上午
@leeing,
看你的描述,现象是服务端接受大数据会有丢包的情况。你可以检查下网络情况是否正常,丢包的常见情况是网络连接差造成。另一方面,这1500个字节大小是默认的MTU,你可以检查两端机器的MTU大小是否都是1500,尤其是服务端,如果MTU大时,可能会丢包。对于客户端,可以设置tcp nodelay,确保数据都传输过去。
[回复]
十一月 28th, 2010 at 3:50 下午
非常感谢!我按照您的提示检查了一下,正是因为这个问题。
经检测,MTU 在我的机器上最多是 1492 个字节,在 windows xp 中,cmd中运行:
> ping -l 1492 -f localhost
是可以正确ping成功的。
如果用:
> ping -l 1493 -f localhost
则会回应说:
Packet needs to be fragmented but DF set.
[回复]
kafka0102 回复:
十一月 28th, 2010 at 6:27 下午
@leeing,
不客气。共同进步。
[回复]
六月 20th, 2011 at 4:36 下午
public class MessageDecoder extends FrameDecoder {
@Override
protected Object decode(
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
if (buffer.readableBytes() < 4) {
return null;//(1)
}
int dataLength = buffer.getInt(buffer.readerIndex());
if (buffer.readableBytes() < dataLength + 4) {
return null;//(2)
}
buffer.skipBytes(4);//(3)
byte[] decoded = new byte[dataLength];
buffer.readBytes(decoded);
String msg = new String(decoded);//(4)
return msg;
}
}
这段代码的readerIndex ,writerIndex的指针如何变化,十分迷惑,不知道兄弟能否具体说明一下,谢了
[回复]
kafka0102 回复:
六月 20th, 2011 at 5:42 下午
好久的文章了,手边没有netty的文档和源码,简单的回复一下,具体的你可以参考他的文档。
1、传来的数据包格式应该是头四个字节(int型)表示真正的内容的长度,所以先要判断 if (buffer.readableBytes() < 4) 不满足条件就返回。
2、buffer.readableBytes() < dataLength + 4 这里加4就是整个数据包长度,而实际的内容长度是dataLength。
3、接下来就越过表示长度的头四个字节:skipBytes
4、最后就是读内容了。
至于你这writerIndex,我没从这篇文章grep到,还是自行理解吧。
[回复]
六月 20th, 2011 at 6:43 下午
非常感谢兄弟的及时来信!
1。兄弟提到的writerIndex是netty的api上提到的啊
2.根据netty的上述图中api中所示
readableBytes= this.writerIndex – this.readerIndex
所以,我们必须知道,此时writerIndex和readerIndex的pointer位置啊
根据netty api的描述,每次读的时候指针readerIndex是会移动变化的啊
3.楼上上面提到了头4个字节表示“真正的内容长度”, buffer.getInt(buffer.readerIndex())表示”实际的内容长度”,这两个概念如何具体理解?有些迷惑!
int dataLength = buffer.getInt(buffer.readerIndex());
if (buffer.readableBytes() < dataLength + 4) {
return null;//(2)
}
[回复]
kafka0102 回复:
六月 21st, 2011 at 11:00 上午
2、netty的那个图已经算是比较清晰了。netty的buffer维护了两个位置信息,writerIndex表示从客户端读到的数据写入buffer的最高位置,readerIndex表示程序读buffer数据的最新位置。比如客户端要传入100字节数据,一开始readerIndex和writerIndex为0,netty首先读了100长度的数据进来,这时writerIndex为100,实际可用的buffer数据就在(writerIndex-readerIndex)之间,接着程序端做实际的逻辑处理就要不断读buffer,那么readerIndex就不断增长,但可用的buffer会一直是(writerIndex-readerIndex)之间,知道无可用的数据为止。
3、以一个例子来说,假如客户端要发送信息”hello”,那么实际的数据包是:前4个字节是5,表示后面的实际数据长度,紧接着是5个字节的”hello”,我说的就是这个意思。
[回复]
六月 20th, 2011 at 6:47 下午
netty的api中描述如下
Sequential Access Indexing
ChannelBuffer provides two pointer variables to support sequential read and write operations – readerIndex for a read operation and writerIndex for a write operation respectively. The following diagram shows how a buffer is segmented into three areas by the two pointers:
+——————-+——————+——————+
| discardable bytes | readable bytes | writable bytes |
| | (CONTENT) | |
+——————-+——————+——————+
| | | |
0 <= readerIndex <= writerIndex <= capacity
readableBytes
int readableBytes()
Returns the number of readable bytes which is equal to (this.writerIndex – this.readerIndex).
writableBytes
int writableBytes()
Returns the number of writable bytes which is equal to (this.capacity – this.writerIndex).
[回复]
七月 3rd, 2011 at 6:09 上午
看了楼主关于netty的两篇博文,lz写得很好。
只是针对MessageClient你说的问题,我觉得用不着跨过netty来实现啊。 其实连接一但建立,对客户端和服务器端得处理来说,没有什么区别啊。
“但是,我想要的是一个连接池,并且如何写数据也不应该在channelConnected中,这样对于动态的数据,只能在构造函数中传递需要写的数据了。”
你说的连接池是connection的连接池吧?这个你只需要将建立成功的channel放到一个池里即可啊。
写数据是不一定在channelConnected中啊,只不过对于客户端而言,一般都是协议请求的发起者,所以一旦建立好就发送数据,但是这个不是强制的,完全可以根据自己协议来处理的。
[回复]
十一月 19th, 2011 at 12:54 上午
服务端如何实现心跳检测功能呢?客户端又如何做发送心跳?刚接触,小白了
[回复]
kafka0102 回复:
十一月 21st, 2011 at 8:38 下午
@ixqbar, 心跳检测分多种,如果只想检测server是否还在运行,可以发个简单的请求命令,如果涉及到运行状态的检测,就需要单独处理了。
[回复]
十一月 21st, 2011 at 6:36 下午
当MessageServer意外关闭或者中止后MessageServer中代码应该怎么做才能释放相应资源
[回复]
kafka0102 回复:
十一月 21st, 2011 at 8:40 下午
@ixqbar, 进程挂掉?那os自动就释放资源了。
[回复]
十二月 5th, 2011 at 1:38 上午
如果客户度和服务器都是走本地的127.0.0.1做测试,实时发送速率可接近10M,此时建议在Client使用16M大小的固定
channelconfig.setOption(“receiveBufferSizePredictorFactory”, new FixedReceiveBufferSizePredictorFactory(16776960));
否则,请在两端限速。
[回复]