type
status
date
slug
summary
tags
category
icon
password
出站--->Socket通道--->入站--->客户端
客户端发数据给服务端:
客户端--->出站--->Socket通道--->入站--->服务端

下面是Netty官方源码给的图,我个人觉的不是太好理解,上面的图好理解一些
ByteToMessageDecoder的小细节
- 由于发送的字符串是16字节,根据上面注释说的内容,decode会被调用两次
 
如下图验证结果:

- 同时又引出了一个小问题
 

当我们
MyClientHandler传一个Long时,会调用我们的MyLongToByteEncoder的编码器。那么控制台就会打印这样一句话:MyLongToByteEncoder encode 被调用。但是这里并没有调用编码器,这是为什么呢?MyClientHandler这个处理器的后一个处理器是MyLongToByteEncoder
MyLongToByteEncoder的父类是MessageToByteEncoder,在MessageToByteEncoder中有下面的一个方法
3.当我们以这样的形式发送数据

这两个类型并不匹配,也就不会走编码器。因此我们编写 Encoder 是要注意传入的数据类型和处理的数据类型一致
结论:
- 不论解码器 
handler还是编码器handler即接收的消息类型必须与待处理的消息类型一致,否则该handler不会被执行 
- 在解码器进行数据解码时,需要判断缓存区(
ByteBuf)的数据是否足够,否则接收到的结果会期望结果可能不一致。 
解码器 - ReplayingDecoder
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
ReplayingDecoder扩展了ByteToMessageDecoder类,使用这个类,我们不必调用readableBytes()方法,也就不用判断还有没有足够的数据来读取。参数S指定了用户状态管理的类型,其中Void代表不需要状态管理
- 应用实例:使用 
ReplayingDecoder编写解码器,对前面的案例进行简化[案例演示] 
ReplayingDecoder- 并不是所有的 
ByteBuf操作都被支持,如果调用了一个不被支持的方法,将会抛出一个UnsupportedOperationException。 ReplayingDecoder在某些情况下可能稍慢于ByteToMessageDecoder,例如网络缓慢并且消息格式复杂时,消息会被拆成了多个碎片,速度变慢
使用方便,但它也有一些局限性:
其它编解码器

LineBasedFrameDecoder:这个类在Netty内部也有使用,它使用行尾控制字符(\n或者\r\n)作为分隔符来解析数据。
DelimiterBasedFrameDecoder:使用自定义的特殊字符作为消息的分隔符。
HttpObjectDecoder:一个HTTP数据的解码器
- 在 
Maven中添加对Log4j的依赖在pom.xml 
- 配置 
Log4j,在resources/log4j.properties 
- 演示整合
 

TCP 粘包和拆包及解决方案
TCP 粘包和拆包基本介绍
TCP是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一一成对的socket,因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通信是无消息保护边界的
- 由于 
TCP无消息保护边界,需要在接收端处理消息边界问题,也就是我们所说的粘包、拆包问题,看一张图 
TCP粘包、拆包图解

假设客户端分别发送了两个数据包 
D1 和 D2 给服务端,由于服务端一次读取到字节数是不确定的,故可能存在以下四种情况:- 服务端分两次读取到了两个独立的数据包,分别是 
D1和D2,没有粘包和拆包 
- 服务端一次接受到了两个数据包,
D1和D2粘合在一起,称之为TCP粘包 
- 服务端分两次读取到了数据包,第一次读取到了完整的 
D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这称之为TCP拆包 
- 服务端分两次读取到了数据包,第一次读取到了 
D1包的部分内容D1_1,第二次读取到了D1包的剩余部分内容D1_2和完整的D2包。 
TCP 粘包和拆包现象实例
在编写 
Netty 程序时,如果没有做处理,就会发生粘包和拆包的问题看一个具体的实例:
MyServer
MyServerInitializer
MyServerHandler
MyClient
MyClientInitializer
MyClientHandler
效果
第一次运行:
Client

Server

第二次运行:
Client

Server

可以看到第一次运行时,服务器一次性将10个数据都接收了,第二次运行时分六次接收的,这就很形象的看出了TCP的粘包现象。
TCP 粘包和拆包解决方案
- 常用方案:使用自定义协议+编解码器来解决
 
- 关键就是要解决服务器端每次读取数据长度的问题,这个问题解决,就不会出现服务器多读或少读数据的问题,从而避免的 
TCP粘包、拆包。 
看一个具体的实例
- 要求客户端发送 
5个Message对象,客户端每次发送一个Message对象 
- 服务器端每次接收一个 
Message,分5次进行解码,每读取到一个Message,会回复一个Message对象给客户端。 

MessageProtocol
MyServer
MyServerInitializer
MyServerHandler
MyClient
MyClientInitializer
MyClientHandler
MyMessageEncoder
MyByteToLongDecoder2
效果
Client输出
Server输出
无论运行几次,Server都是分5次接收的,这样就解决了TCP粘包问题。
Netty 心跳(heartbeat)服务源码剖析
源码剖析目的
作为一个网络框架,提供了诸多功能,比如编码解码等,
还提供了非常重 要的一个服务-----心跳机制
。通过心跳检查对方是否有效,这是
框架 中是必不可少的功能。下面我们分析一下Netty内部 心跳服务源码实现。

源码剖析
说明
- Netty 提供了 IdleStateHandler ,ReadTimeoutHandler,WriteTimeoutHandler 三 个Handler 检测连接的有效性,重点分析 
IdleStateHandler. 
- 如图  3)
ReadTimeout事件和WriteTimeout事件都会自动关闭连接,而且,属于异常处理,所以,这里只是介绍以下,我们重点看IdleStateHandler。 

IdleStateHandler分析
handlerAdded方法 当该handler被添加到pipeline中时,则调用initialize方法只要给定的参数大于0,就创建一个定时任务,每个事件都创建。同时,将state状态设置为1,防止重复初始化调用initOutputChanged方法,初始化“监控出站数据属性”。 三个任务类

这3个定时任务分别对应读,写,读或者写事件。共有一个父类(
AbstractIdleTask)。这个父类提供了一个模板方法说明: 1)得到用户设置的超时时间。 2)如果读取操作结束了(执行了channelReadComplete方法设置),就用当前时间减去给定时间和最后一次读(执操作的时间行了channelReadComplete方法设置),如果小于O,就触发事件。反之,继续放入队 列。间隔时间是新的计算时间。 3)触发的逻辑是:首先将任务再次放到队列,时间是刚开始设置的时间,返回一个promise对象,用于做 取消操作。然后,设置first属性为false,表示,下一次读取不再是第一次了,这个属性在channelRead方 法会被改成rue。 4)创建一个IdleStateEvent类型的写事件对象,将此对象传递给用户的UserEventTriggered方法。完成触 发事件的操作。 5)总的来说,每次读取操作都会记录一个时间,定时任务时间到了,会计算当前时间和最后一次读的时间的间隔,如果间隔超过了设置的时间,就触发UserEventTriggered方法。∥前面介绍IdleStateHandler说过,可以看一下
写事件的run方法(即VriterIdleTimeoutTask的run方法)分析
说明: 写任务的代码逻辑基本和读任务的逻辑一样,唯一不同的就是有一个针对出站较慢数据的判断 hasOutputChanged
所有事件的run方法(即AllldleTimeoutTask的rum方法)分析
说明: 1)表示这个监控着所有的事件。当读写事件发生时,都会记录。代码逻辑和写事件的的基本一致: 2)需要大家注意的地方是 long nextDelay allldleTimeNanos; if (!reading){ ∥当前时间减去最后一次写或读的时间,若大于0,说明超时了 nextDelay -ticksInNanos()-Math.max(lastReadTime,last Write Time); 3)这里的时间计算是取读写事件中的最大值来的。然后像写事件一样,判断是否发生了写的慢的情况。
10.小结Nety的心跳机制 l)IdleStateHandler可以实现心跳功能,当服务器和客户端没有任何读写交互时,并超过了给定的时间,则会 触发用户handler的userEventTriggered方法。用户可以在这个方法中尝试向对方发送信息,如果发送失败,则关 闭连接。 2)IdleStateHandler的实现基于EventLoop的定时任务,每次读写都会记录一个值,在定时任务运行的时候, 通过计算当前时间和设置时间和上次事件发生时间的结果,来判断是否空闲。 3)内部有3个定时任务,分别对应读事件,写事件,读写事件。通常用户监听读写事件就足够了。 4)同时,IdleStateHandler内部也考虑了一些极端情况:客户端接收缓慢,一次接收数据的速度超过了设置的 空闲时间。Netty通过构造方法中的observeOutput属性来决定是否对出站缓冲区的情况进行判断。 5)如果出站缓慢,Ny不认为这是空闲,也就不触发空闲事件。但第一次无论如何也是要触发的。因为第一 次无法判断是出站缓慢还是空闲。当然,出站缓慢的话,可能造成OOM,OOM比空闲的问题更大。 6)所以,当你的应用出现了内存溢出,OOM之类,并且写空闲极少发生(使用了observeOutput为true), 那么就需要注意是不是数据出站速度过慢。 7)还有一个注意的地方:就是ReadTimeoutHandler,它继承自IdleStateHandler,当触发读空闲事件的时候, 就触发ctx.fireExceptionCaught方法,并传入一个ReadTimeoutException,然后关闭Socket。 8)而WriteTimeoutHandler的实现不是基于IdleStateHandler的,他的原理是,当调用write方法的时候,会 创建一个定时任务,任务内容是根据传入的promise的完成情况来判断是否超出了写的时间。当定时任务根据指 定时间开始运行,发现promise的isDone方法返回false,表明还没有写完,说明超时了,则抛出异常。当write 方法完成后,会打断定时任务。
用 Netty 自己实现简单的RPC
RPC 基本介绍
RPC(Remote Procedure Call)—远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
- 两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样(如图)
 

过程:
- 调用者(
Caller),调用远程API(Remote API) 
- 调用远程API会通过一个RPC代理(
RpcProxy) 
- RPC代理再去调用
RpcInvoker(这个是PRC的调用者) 
RpcInvoker通过RPC连接器(RpcConnector)
- RPC连接器用两台机器规定好的PRC协议(
RpcProtocol)把数据进行编码 
- 接着RPC连接器通过RpcChannel通道发送到对方的PRC接收器(RpcAcceptor)
 
- PRC接收器通过PRC协议进行解码拿到数据
 
- 然后将数据传给
RpcProcessor 
RpcProcessor再传给RpcInvoker
RpcInvoker调用Remote API
- 最后推给被调用者(Callee)
 
- 常见的 
RPC框架有:比较知名的如阿里的Dubbo、Google的gRPC、Go语言的rpcx、Apache的thrift,Spring旗下的SpringCloud。 

我们的RPC 调用流程图

RPC 调用流程说明
- 服务消费方(
client)以本地调用方式调用服务 
client stub接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
client stub将消息进行编码并发送到服务端
server stub收到消息后进行解码
server stub根据解码结果调用本地的服务
- 本地服务执行并将结果返回给 
server stub 
server stub将返回导入结果进行编码并发送至消费方
client stub接收到消息并进行解码
- 服务消费方(
client)得到结果 
小结:
RPC 的目标就是将 2 - 8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用己实现 Dubbo RPC(基于 Netty)
需求说明
Dubbo底层使用了Netty作为网络通讯框架,要求用Netty实现一个简单的RPC框架
- 模仿 
Dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用Netty 4.1.20 
设计说明
- 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
 
- 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
 
- 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 
Netty请求提供者返回数据 
- 开发的分析图
 

代码
封装的RPC
可以把这块代码理解成封装的dubbo
NettyServer
NettyServerHandler
NettyClientHandler
NettyClient
接口HelloService
HelloServiceImpl
ServerBootStrap
ClientBootStrap
调用过程
ClientBootstrap#main发起调用
- 走到下面这一行代码后
 
- 调用
NettyClient#getBean,在此方法里与服务端建立链接。 
- 于是就执行
NettyClientHandler#channelActive 
- 接着回到
NettyClient#getBean调用NettyClientHandler#setPara,调用完之后再回到NettyClient#getBean,用线程池提交任务 
- 因为用线程池提交了任务,就准备执行
NettyClientHandler#call线程任务 
- 在
NettyClientHandler#call中发送数据给服务提供者 
由于还没收到服务提供者的数据结果,所以wait住
- 来到了服务提供者这边,从Socket通道中收到了数据,所以执行
NettyServerHandler#channelRead,然后因为此方法中执行了 
- 就去
HelloServiceImpl#hello中执行业务逻辑,返回数据给NettyServerHandler#channelRead,NettyServerHandler#channelRead再把数据发给客户端 
NettyClientHandler#channelRead收到服务提供者发来的数据,唤醒之前wait的线程
- 所以之前wait的线程从
NettyClientHandler#call苏醒,返回result给NettyClient#getBean 
NettyClient#getBeanget()到数据,ClientBootstrap#main中的此函数调用返回,得到服务端提供的数据。
13.至此,一次RPC调用结束。
效果
ClientBootstrap打印
ServerBootstrap打印
- 作者:IT小舟
 - 链接:https://www.codezhou.top/article/JAVA%E3%80%90Netty%E3%80%91%E7%AC%AC%E4%B8%89%E8%AE%B2%EF%BC%88%E6%8C%81%E7%BB%AD%E6%9B%B4%E6%96%B0%EF%BC%89
 - 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。
 

