type
status
date
slug
summary
tags
category
icon
password
出站--->Socket通道--->入站--->客户端
客户端发数据给服务端:
客户端--->出站--->Socket通道--->入站--->服务端
notion image
下面是Netty官方源码给的图,我个人觉的不是太好理解,上面的图好理解一些

ByteToMessageDecoder的小细节

  1. 由于发送的字符串是16字节,根据上面注释说的内容,decode会被调用两次
如下图验证结果:
notion image
  1. 同时又引出了一个小问题
    1. notion image
当我们MyClientHandler传一个Long时,会调用我们的MyLongToByteEncoder的编码器。那么控制台就会打印这样一句话:MyLongToByteEncoder encode 被调用。但是这里并没有调用编码器,这是为什么呢?
  1. MyClientHandler这个处理器的后一个处理器是MyLongToByteEncoder
  1. MyLongToByteEncoder的父类是MessageToByteEncoder,在MessageToByteEncoder中有下面的一个方法
3.当我们以这样的形式发送数据
notion image
这两个类型并不匹配,也就不会走编码器。因此我们编写 Encoder 是要注意传入的数据类型和处理的数据类型一致
结论:
  • 不论解码器 handler 还是编码器 handler 即接收的消息类型必须与待处理的消息类型一致,否则该 handler 不会被执行
  • 在解码器进行数据解码时,需要判断缓存区(ByteBuf)的数据是否足够,否则接收到的结果会期望结果可能不一致。

解码器 - ReplayingDecoder

  1. public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
  1. ReplayingDecoder 扩展了 ByteToMessageDecoder 类,使用这个类,我们不必调用 readableBytes() 方法,也就不用判断还有没有足够的数据来读取。参数 S 指定了用户状态管理的类型,其中 Void 代表不需要状态管理
  1. 应用实例:使用 ReplayingDecoder 编写解码器,对前面的案例进行简化[案例演示]
  1. ReplayingDecoder
    1. 使用方便,但它也有一些局限性:
      • 并不是所有的 ByteBuf 操作都被支持,如果调用了一个不被支持的方法,将会抛出一个 UnsupportedOperationException
      • ReplayingDecoder 在某些情况下可能稍慢于 ByteToMessageDecoder,例如网络缓慢并且消息格式复杂时,消息会被拆成了多个碎片,速度变慢

其它编解码器

notion image
  1. LineBasedFrameDecoder:这个类在 Netty 内部也有使用,它使用行尾控制字符(\n或者\r\n)作为分隔符来解析数据。
  1. DelimiterBasedFrameDecoder:使用自定义的特殊字符作为消息的分隔符。
  1. HttpObjectDecoder:一个 HTTP 数据的解码器
  1. LengthFieldBasedFrameDecoder:通过指定长度来标识整包消息,这样就可以自动的处理黏包和半包消息。
    1. Log4j 整合到 Netty

  1. Maven 中添加对 Log4j 的依赖在 pom.xml
  1. 配置 Log4j,在 resources/log4j.properties
  1. 演示整合
notion image

TCP 粘包和拆包及解决方案

TCP 粘包和拆包基本介绍

  1. TCP 是面向连接的,面向流的,提供高可靠性服务。收发两端(客户端和服务器端)都要有一一成对的 socket,因此,发送端为了将多个发给接收端的包,更有效的发给对方,使用了优化方法(Nagle 算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,因为面向流的通信是无消息保护边界的
  1. 由于 TCP 无消息保护边界,需要在接收端处理消息边界问题,也就是我们所说的粘包、拆包问题,看一张图
  1. TCP 粘包、拆包图解
notion image
假设客户端分别发送了两个数据包 D1D2 给服务端,由于服务端一次读取到字节数是不确定的,故可能存在以下四种情况:
  1. 服务端分两次读取到了两个独立的数据包,分别是 D1D2,没有粘包和拆包
  1. 服务端一次接受到了两个数据包,D1D2 粘合在一起,称之为 TCP 粘包
  1. 服务端分两次读取到了数据包,第一次读取到了完整的 D1 包和 D2 包的部分内容,第二次读取到了 D2 包的剩余内容,这称之为 TCP 拆包
  1. 服务端分两次读取到了数据包,第一次读取到了 D1 包的部分内容 D1_1,第二次读取到了 D1 包的剩余部分内容 D1_2 和完整的 D2 包。

TCP 粘包和拆包现象实例

在编写 Netty 程序时,如果没有做处理,就会发生粘包和拆包的问题
看一个具体的实例:

MyServer

MyServerInitializer

MyServerHandler

MyClient

MyClientInitializer

MyClientHandler

效果
第一次运行:
Client
notion image
Server
notion image
第二次运行:
Client
notion image
Server
notion image
可以看到第一次运行时,服务器一次性将10个数据都接收了,第二次运行时分六次接收的,这就很形象的看出了TCP的粘包现象。

TCP 粘包和拆包解决方案

  1. 常用方案:使用自定义协议+编解码器来解决
  1. 关键就是要解决服务器端每次读取数据长度的问题,这个问题解决,就不会出现服务器多读或少读数据的问题,从而避免的 TCP 粘包、拆包。
看一个具体的实例
  1. 要求客户端发送 5Message 对象,客户端每次发送一个 Message 对象
  1. 服务器端每次接收一个 Message,分 5 次进行解码,每读取到一个 Message,会回复一个 Message 对象给客户端。
notion image

MessageProtocol

MyServer

MyServerInitializer

MyServerHandler

MyClient

MyClientInitializer

MyClientHandler

MyMessageEncoder

MyByteToLongDecoder2

效果

Client输出
Server输出
无论运行几次,Server都是分5次接收的,这样就解决了TCP粘包问题。

Netty 心跳(heartbeat)服务源码剖析

源码剖析目的
作为一个网络框架,提供了诸多功能,比如编码解码等,
还提供了非常重 要的一个服务-----心跳机制
。通过心跳检查对方是否有效,这是
框架 中是必不可少的功能。下面我们分析一下Netty内部 心跳服务源码实现。
notion image
源码剖析
说明
  1. Netty 提供了 IdleStateHandler ,ReadTimeoutHandler,WriteTimeoutHandler 三 个Handler 检测连接的有效性,重点分析 IdleStateHandler .
  1. 如图 3)ReadTimeout事件和WriteTimeout事件都会自动关闭连接,而且,属于异常处理,所以,这里只是介绍以下,我们重点看IdleStateHandler。
    1. notion image

IdleStateHandler分析

handlerAdded方法 当该handler被添加到pipeline中时,则调用initialize方法
只要给定的参数大于0,就创建一个定时任务,每个事件都创建。同时,将state状态设置为1,防止重复初始化调用initOutputChanged方法,初始化“监控出站数据属性”。 三个任务类
notion image
这3个定时任务分别对应读,写,读或者写事件。共有一个父类(AbstractIdleTask)。这个父类提供了一个模板方法
说明: 1)得到用户设置的超时时间。 2)如果读取操作结束了(执行了channelReadComplete方法设置),就用当前时间减去给定时间和最后一次读(执操作的时间行了channelReadComplete方法设置),如果小于O,就触发事件。反之,继续放入队 列。间隔时间是新的计算时间。 3)触发的逻辑是:首先将任务再次放到队列,时间是刚开始设置的时间,返回一个promise对象,用于做 取消操作。然后,设置first属性为false,表示,下一次读取不再是第一次了,这个属性在channelRead方 法会被改成rue。 4)创建一个IdleStateEvent类型的写事件对象,将此对象传递给用户的UserEventTriggered方法。完成触 发事件的操作。 5)总的来说,每次读取操作都会记录一个时间,定时任务时间到了,会计算当前时间和最后一次读的时间的间隔,如果间隔超过了设置的时间,就触发UserEventTriggered方法。∥前面介绍IdleStateHandler说过,可以看一下
写事件的run方法(即VriterIdleTimeoutTask的run方法)分析
说明: 写任务的代码逻辑基本和读任务的逻辑一样,唯一不同的就是有一个针对出站较慢数据的判断 hasOutputChanged
所有事件的run方法(即AllldleTimeoutTask的rum方法)分析
说明: 1)表示这个监控着所有的事件。当读写事件发生时,都会记录。代码逻辑和写事件的的基本一致: 2)需要大家注意的地方是 long nextDelay allldleTimeNanos; if (!reading){ ∥当前时间减去最后一次写或读的时间,若大于0,说明超时了 nextDelay -ticksInNanos()-Math.max(lastReadTime,last Write Time); 3)这里的时间计算是取读写事件中的最大值来的。然后像写事件一样,判断是否发生了写的慢的情况。
10.小结Nety的心跳机制 l)IdleStateHandler可以实现心跳功能,当服务器和客户端没有任何读写交互时,并超过了给定的时间,则会 触发用户handler的userEventTriggered方法。用户可以在这个方法中尝试向对方发送信息,如果发送失败,则关 闭连接。 2)IdleStateHandler的实现基于EventLoop的定时任务,每次读写都会记录一个值,在定时任务运行的时候, 通过计算当前时间和设置时间和上次事件发生时间的结果,来判断是否空闲。 3)内部有3个定时任务,分别对应读事件,写事件,读写事件。通常用户监听读写事件就足够了。 4)同时,IdleStateHandler内部也考虑了一些极端情况:客户端接收缓慢,一次接收数据的速度超过了设置的 空闲时间。Netty通过构造方法中的observeOutput属性来决定是否对出站缓冲区的情况进行判断。 5)如果出站缓慢,Ny不认为这是空闲,也就不触发空闲事件。但第一次无论如何也是要触发的。因为第一 次无法判断是出站缓慢还是空闲。当然,出站缓慢的话,可能造成OOM,OOM比空闲的问题更大。 6)所以,当你的应用出现了内存溢出,OOM之类,并且写空闲极少发生(使用了observeOutput为true), 那么就需要注意是不是数据出站速度过慢。 7)还有一个注意的地方:就是ReadTimeoutHandler,它继承自IdleStateHandler,当触发读空闲事件的时候, 就触发ctx.fireExceptionCaught方法,并传入一个ReadTimeoutException,然后关闭Socket。 8)而WriteTimeoutHandler的实现不是基于IdleStateHandler的,他的原理是,当调用write方法的时候,会 创建一个定时任务,任务内容是根据传入的promise的完成情况来判断是否超出了写的时间。当定时任务根据指 定时间开始运行,发现promise的isDone方法返回false,表明还没有写完,说明超时了,则抛出异常。当write 方法完成后,会打断定时任务。

用 Netty 自己实现简单的RPC

RPC 基本介绍

  1. RPC(Remote Procedure Call)—远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
  1. 两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样(如图)
notion image
过程:
  1. 调用者(Caller),调用远程API(Remote API)
  1. 调用远程API会通过一个RPC代理(RpcProxy)
  1. RPC代理再去调用RpcInvoker(这个是PRC的调用者)
  1. RpcInvoker通过RPC连接器(RpcConnector)
  1. RPC连接器用两台机器规定好的PRC协议(RpcProtocol)把数据进行编码
  1. 接着RPC连接器通过RpcChannel通道发送到对方的PRC接收器(RpcAcceptor)
  1. PRC接收器通过PRC协议进行解码拿到数据
  1. 然后将数据传给RpcProcessor
  1. RpcProcessor再传给RpcInvoker
  1. RpcInvoker调用Remote API
  1. 最后推给被调用者(Callee)
  1. 常见的 RPC 框架有:比较知名的如阿里的 DubboGooglegRPCGo 语言的 rpcxApachethriftSpring 旗下的 SpringCloud
notion image

我们的RPC 调用流程图

notion image
RPC 调用流程说明
  1. 服务消费方(client)以本地调用方式调用服务
  1. client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  1. client stub 将消息进行编码并发送到服务端
  1. server stub 收到消息后进行解码
  1. server stub 根据解码结果调用本地的服务
  1. 本地服务执行并将结果返回给 server stub
  1. server stub 将返回导入结果进行编码并发送至消费方
  1. client stub 接收到消息并进行解码
  1. 服务消费方(client)得到结果
小结:RPC 的目标就是将 2 - 8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用

己实现 Dubbo RPC(基于 Netty)

需求说明

  1. Dubbo 底层使用了 Netty 作为网络通讯框架,要求用 Netty 实现一个简单的 RPC 框架
  1. 模仿 Dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用 Netty 4.1.20

设计说明

  1. 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。
  1. 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
  1. 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 请求提供者返回数据
  1. 开发的分析图
notion image

代码

封装的RPC

可以把这块代码理解成封装的dubbo

NettyServer

NettyServerHandler

NettyClientHandler

NettyClient

接口HelloService

HelloServiceImpl

ServerBootStrap

ClientBootStrap

调用过程
  1. ClientBootstrap#main发起调用
  1. 走到下面这一行代码后
  1. 调用NettyClient#getBean,在此方法里与服务端建立链接。
  1. 于是就执行NettyClientHandler#channelActive
  1. 接着回到NettyClient#getBean调用NettyClientHandler#setPara,调用完之后再回到NettyClient#getBean,用线程池提交任务
  1. 因为用线程池提交了任务,就准备执行NettyClientHandler#call线程任务
  1. NettyClientHandler#call中发送数据给服务提供者
    1. 由于还没收到服务提供者的数据结果,所以wait住
  1. 来到了服务提供者这边,从Socket通道中收到了数据,所以执行NettyServerHandler#channelRead,然后因为此方法中执行了
    1. 就去HelloServiceImpl#hello中执行业务逻辑,返回数据给NettyServerHandler#channelReadNettyServerHandler#channelRead再把数据发给客户端
    1. NettyClientHandler#channelRead收到服务提供者发来的数据,唤醒之前wait的线程
    1. 所以之前wait的线程从NettyClientHandler#call苏醒,返回result给NettyClient#getBean
    1. NettyClient#getBeanget()到数据,ClientBootstrap#main中的此函数调用返回,得到服务端提供的数据。
      13.至此,一次RPC调用结束。

      效果

      ClientBootstrap打印
      ServerBootstrap打印
      Typora,最后一个免费版本!JAVA【Netty】第二讲
      IT小舟
      IT小舟
      微信公众号:IT小舟
      公告
      type
      status
      date
      slug
      summary
      tags
      category
      icon
      password
      notion image
      你好,我是小舟。 欢迎来到我的博客,我喜欢运动、阅读、音乐和写作。如果有什么想要交流的,可以加我微信:upcodezhou。再次感谢你的光临!
      notion image