做网站模板赚钱,笔记本做网站外网访问,中国建筑人才网证书查询,客户关系crm管理系统前言
近期笔者基于Netty接收UDP报文进行业务数据统计的功能#xff0c;因为Netty默认情况下处理UDP收包只能由一个线程负责#xff0c;无法像TCP协议那种基于主从reactor模型实现多线程监听端口#xff0c;所以笔者查阅网上资料查看是否有什么方式可以接收UDP收包的性能瓶颈…前言
近期笔者基于Netty接收UDP报文进行业务数据统计的功能因为Netty默认情况下处理UDP收包只能由一个线程负责无法像TCP协议那种基于主从reactor模型实现多线程监听端口所以笔者查阅网上资料查看是否有什么方式可以接收UDP收包的性能瓶颈遂以此文来记录一下笔者的解决过程。
简介Linux内核3.9的新特性对Netty的影响
常规的Netty处理UDP包我们只能用按个NIOEventLoop线程接收传输的数据包从底层来看即只使用一个socket线程监听网络端口通过这一个线程将数据传输到应用层上这一切使得我们唯一能够调优的方式就是在Socket监听传输时尽可能快速将发送给应用程序让应用程序及时处理完以便NIOEventLoop线程能够及时处理下一个UDP数据包。亦或者我们也可以直接通过增加服务器的数量通过集群的方式提升系统整体的吞吐量。 然而事实真是如此吗在Linux内核3.9版本新增了一个SO_REUSEPORT的特性它使得单台Linux的端口可以被多个Socket线程监听这一特性使得Netty在高并发场景下的UDP数据包能够及时被多个线程及时处理尽可能的避免了丢包线程且最大化的利用了CPU核心实现内核层面的负载均衡。 Netty实现Linux下UDP端口复用步骤
引入Netty依赖
为了使用Netty我们必须先引入对应的maven依赖这里笔者选择了4.1.58的最终版读者可以按需选择自己的版本。 !--netty--dependencygroupIdio.netty/groupIdartifactIdnetty-all/artifactIdversion4.1.58.Final/version/dependency编写启动类和启动逻辑
然后我我们需要编写Netty的启动类代码模板如下因为Netty默认使用的是Java NIO而在Linux支持epoll模型相比与常规的Java NIO这种通过来回在用户态和内核态来回拷贝事件数组fd的方式epoll内部自己维护了事件的数组并可以将自行去询问连接状态并将结果返回到用户态显得更加高效。 所以笔者在启动类的编写时会判断当前服务器是否支持epoll的逻辑并通过该判断顺手解决了是否基于SO_REUSEPORT开启多线程监听的功能(注:这段代码读者必须自行查阅一下服务器内核版本是否大于等于3.9)。
/*** netty服务*/
Component
public class NettyUdpServer {private static final Logger LOG LoggerFactory.getLogger(NettyUdpServer.class);private EventLoopGroup bossLoopGroup;private Channel serverChannel;/*** netty初始化*/public void init(int port) {LOG.info(Epoll.isAvailable():{}, Epoll.isAvailable());//表示服务器连接监听线程组专门接受 accept 新的客户端client 连接bossLoopGroup Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();try {//1、创建netty bootstrap 启动类Bootstrap serverBootstrap new Bootstrap();//2、设置boostrap 的eventLoopGroup线程组serverBootstrap.group(bossLoopGroup)//3、设置NIO UDP连接通道.channel(Epoll.isAvailable() ? EpollDatagramChannel.class : NioDatagramChannel.class)//4、设置通道参数 SO_BROADCAST广播形式.option(ChannelOption.SO_BROADCAST, true).option(ChannelOption.SO_RCVBUF, 1024 * 1024)//5、设置处理类 装配流水线.handler(new NettyUdpHandler());// linux平台下支持SO_REUSEPORT特性以提高性能if (Epoll.isAvailable()) {LOG.info(SO_REUSEPORT);serverBootstrap.option(EpollChannelOption.SO_REUSEPORT, true);}// 如果支持epoll则说明是Linux版本则利用SO_REUSEPORT创建多个线程if (Epoll.isAvailable()) {// linux系统下使用SO_REUSEPORT特性使得多个线程绑定同一个端口int cpuNum Runtime.getRuntime().availableProcessors();LOG.info(using epoll reuseport and cpu: cpuNum);for (int i 0; i cpuNum; i) {LOG.info(worker-{} bind, i);//6、绑定server通过调用sync方法异步阻塞直到绑定成功ChannelFuture future serverBootstrap.bind(port).sync();if (!future.isSuccess()) {LOG.error(bootstrap bind fail port is port);throw new Exception(String.format(Fail to bind on [host %s , port %d]., 192.168.2.128, port), future.cause());} else {LOG.info(bootstrap bind success );}}} else {ChannelFuture future serverBootstrap.bind(port).sync();if (!future.isSuccess()) {LOG.error(bootstrap bind fail port is port);throw new Exception(String.format(Fail to bind on [host %s , port %d]., 127.0.0.1, port), future.cause());} else {LOG.info(bootstrap bind success );}}} catch (Exception e) {LOG.error(报错了错误原因:{}, e.getMessage(), e);}}}因为该代码是编写在spring boot项目中所以我们还需要添加一下启动的逻辑。
Component
public class InitTask implements CommandLineRunner {private static final Logger LOG LoggerFactory.getLogger(InitTask.class);Autowiredprivate NettyUdpServer nettyUdpServer;Overridepublic void run(String... args) {LOG.info(netty服务器初始化成功端口号:{}, 7000);nettyUdpServer.init(7000);}}封装业务处理类
处理类的逻辑比较简单了收到内容后打印后原子类自增一下该原子类是用于后续压测统计是否丢包用的。
/*** 报文处理器*/
Component
ChannelHandler.Sharable
public class NettyUdpHandler extends SimpleChannelInboundHandlerDatagramPacket {private static final Logger LOG LoggerFactory.getLogger(NettyUdpHandler.class);private static AtomicInteger atomicIntegernew AtomicInteger(0);Overrideprotected void channelRead0(ChannelHandlerContext ctx, DatagramPacket dp) {try {int length dp.content().readableBytes();//分配一个新的数组来保存具有该长度的字节数据byte[] array new byte[length];//将字节复制到该数组dp.content().getBytes(dp.content().readerIndex(), array);LOG.info(收到UDP报文报文内容:{} 包处理个数:{}, new String(array),atomicInteger.incrementAndGet());} catch (Exception e) {LOG.error(报文处理失败失败原因:{}, e.getMessage(), e);}}
}基于jmeter完成压测统计丢包率
自此我们项目都编写完成了我们不妨使用jmeter进行一次压测可以看到笔者会一次性发送100w个数据包查看最终的收包数。 而UDP包的格式以及目的地址和内容如下 最终压测结果如下可以看到服务器都及时的收到了数据包并不存在丢包的现象。 为了可以看到性能的提升笔者将代码还原回单线程监听的老代码段:
/*** netty初始化*/public void init(int port) {LOG.info(Epoll.isAvailable():{}, Epoll.isAvailable());//表示服务器连接监听线程组专门接受 accept 新的客户端client 连接bossLoopGroup Epoll.isAvailable() ? new EpollEventLoopGroup() : new NioEventLoopGroup();try {//1、创建netty bootstrap 启动类Bootstrap serverBootstrap new Bootstrap();//2、设置boostrap 的eventLoopGroup线程组serverBootstrap.group(bossLoopGroup)//3、设置NIO UDP连接通道.channel(Epoll.isAvailable() ? EpollDatagramChannel.class : NioDatagramChannel.class)//4、设置通道参数 SO_BROADCAST广播形式.option(ChannelOption.SO_BROADCAST, true).option(ChannelOption.SO_RCVBUF, 1024 * 1024)//5、设置处理类 装配流水线.handler(new NettyUdpHandler());ChannelFuture future serverBootstrap.bind(port).sync();if (!future.isSuccess()) {LOG.error(bootstrap bind fail port is port);throw new Exception(String.format(Fail to bind on [host %s , port %d]., 127.0.0.1, port), future.cause());} else {LOG.info(bootstrap bind success );}} catch (Exception e) {LOG.error(报错了错误原因:{}, e.getMessage(), e);}}根据老的压测结果来看单线程监听的情况下确实会存在一定的丢包所以如果在高并发场景下使用Netty接收UDP数据包的小伙伴建立利用好Linux内核3.9的特性提升程序的吞吐量哦。 参考文献
Linux下Netty实现高性能UDP服务(SO_REUSEPORT): https://blog.csdn.net/monokai/article/details/108453746
Netty网络传输简记: https://www.sharkchili.com/pages/710071/#前言