网络管理中心网站,企业网站建设方案书模板,域名备案查询,建筑工程资质查询平台前文需求回顾
完成对红酒窖的室内温度采集及监控功能。由本地应用程序温度传感器定时采集室内温度上报至服务器#xff0c;如果温度 20 C 则由服务器下发重启空调指令#xff0c;如果本地应用长时间不上传温度给服务器#xff0c;则给户主手机发送一条预警短信。 Netty…前文需求回顾
完成对红酒窖的室内温度采集及监控功能。由本地应用程序温度传感器定时采集室内温度上报至服务器如果温度 20 °C 则由服务器下发重启空调指令如果本地应用长时间不上传温度给服务器则给户主手机发送一条预警短信。 Netty入门篇-从双向通信开始「上文」
上篇算是完成简单的双向通信了我们接着看看 “如果本地应用长时间不上传温度给服务器…”很明显客户端有可能挂了嘛所以怎么实现客户端与服务端的长连接就是本文要实现的了。
什么是心跳机制
百度百科心跳机制是定时发送一个自定义的结构体(心跳包)让对方知道自己还活着以确保连接的有效性的机制。
简单说这个心跳机制是由客户端主动发起的消息每隔一段时间就向服务端发送消息告诉服务端自己还没死可不要给户主发送预警短信啊。
如何实现心跳机制
1、客户端代码修改
我们需要改造一下上节中客户端的代码首先是在责任链中增加一个心跳逻辑处理类HeartbeatHandler
public class NettyClient {private static String host 127.0.0.1;public static void main(String[] args) {NioEventLoopGroup workerGroup new NioEventLoopGroup();Bootstrap bootstrap new Bootstrap();bootstrap// 1.指定线程模型.group(workerGroup)// 2.指定 IO 类型为 NIO.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true)// 3.IO 处理逻辑.handler(new ChannelInitializerSocketChannel() {Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new IdleStateHandler(0, 10, 0)).addLast(new StringDecoder()).addLast(new StringEncoder()).addLast(new HeartbeatHandler()).addLast(new NettyClientHandler());}});// 4.建立连接bootstrap.connect(host, 8070).addListener(future - {if (future.isSuccess()) {System.out.println(连接成功!);} else {System.err.println(连接失败!);}});}
}没什么变化主要是增加了HeartbeatHandler我们来看看这个类
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import java.nio.charset.Charset;
import java.time.LocalTime;public class HeartbeatHandler extends ChannelInboundHandlerAdapter {Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent idleStateEvent (IdleStateEvent) evt;if (idleStateEvent.state() IdleState.WRITER_IDLE) {System.out.println(10秒了需要发送消息给服务端了 LocalTime.now());//向服务端送心跳包ByteBuf buffer getByteBuf(ctx);//发送心跳消息并在发送失败时关闭该连接ctx.writeAndFlush(buffer).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);}} else {super.userEventTriggered(ctx, evt);}}Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println(捕获的异常 cause.getMessage());ctx.channel().close();}private ByteBuf getByteBuf(ChannelHandlerContext ctx) {// 1. 获取二进制抽象 ByteBufByteBuf buffer ctx.alloc().buffer();String time heartbeat:客户端心跳数据 LocalTime.now();// 2. 准备数据指定字符串的字符集为 utf-8byte[] bytes time.getBytes(Charset.forName(utf-8));// 3. 填充数据到 ByteBufbuffer.writeBytes(bytes);return buffer;}}还是继承自ChannelInboundHandlerAdapter不过这次重写的是userEventTriggered()方法这个方法在客户端的所有ChannelHandler中如果10s内没有发生write事件时触发所以我们在该方法中给服务端发送心跳消息。
业务逻辑处理类NettyClientHandler没有改动代码如下
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.Random;public class NettyClientHandler extends ChannelInboundHandlerAdapter {Overridepublic void channelActive(ChannelHandlerContext ctx) {System.out.println(new Date() : 客户端写出数据);// 1. 获取数据ByteBuf buffer getByteBuf(ctx);// 2. 写数据ctx.channel().writeAndFlush(buffer);}private ByteBuf getByteBuf(ChannelHandlerContext ctx) {// 1. 获取二进制抽象 ByteBufByteBuf buffer ctx.alloc().buffer();Random random new Random();double value random.nextDouble() * 14 8;String temp 获取室内温度 value;// 2. 准备数据指定字符串的字符集为 utf-8byte[] bytes temp.getBytes(Charset.forName(utf-8));// 3. 填充数据到 ByteBufbuffer.writeBytes(bytes);return buffer;}Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println(new Date() : 客户端读到数据 - msg.toString());}}对如上代码不了解的可以回看上一节Netty入门篇-从双向通信开始
2、服务端代码修改
服务端代码主要是开启TCP底层心跳机制支持.childOption(ChannelOption.SO_KEEPALIVE, true) 其他的代码并没有改动
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class NettyServer {public static void main(String[] args) {NioEventLoopGroup bossGroup new NioEventLoopGroup();NioEventLoopGroup workerGroup new NioEventLoopGroup();ServerBootstrap serverBootstrap new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup)// 指定Channel.channel(NioServerSocketChannel.class)//服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数.option(ChannelOption.SO_BACKLOG, 1024)//设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文.childOption(ChannelOption.SO_KEEPALIVE, true)//将小的数据包包装成更大的帧进行传送提高网络的负载.childOption(ChannelOption.TCP_NODELAY, true).childHandler(new ChannelInitializerNioSocketChannel() {Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new NettyServerHandler());}});serverBootstrap.bind(8070);}}我们再来看看服务端的业务处理类 NettyServerHandler
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
import java.util.Date;public class NettyServerHandler extends ChannelInboundHandlerAdapter {Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf byteBuf (ByteBuf) msg;String message byteBuf.toString(Charset.forName(utf-8));System.out.println(new Date() : 服务端读到数据 - message);/** 心跳数据是不发送数据 **/if(!message.contains(heartbeat)){ByteBuf out getByteBuf(ctx);ctx.channel().writeAndFlush(out);}}private ByteBuf getByteBuf(ChannelHandlerContext ctx) {byte[] bytes 我是发送给客户端的数据请重启冰箱!.getBytes(Charset.forName(utf-8));ByteBuf buffer ctx.alloc().buffer();buffer.writeBytes(bytes);return buffer;}}对channelRead() 方法增加了一个 if 判断判断如果包含heartbeat字符串就认为这是客户端发过来的心跳这种判断是非常low的因为到目前为止我们一直是用简单字符串来传递数据的上边传递的数据就直接操作字符串那么问题来了如果我们想传递对象怎么搞呢下节写。我们先来看一下如上代码客户端与服务端运行截图
服务端 客户端
至此整个心跳机制就完成了这样每隔10秒客户端就会给服务端发送一个心跳消息下节我们通过了解通协议以完善心跳机制的代码。 18年专科毕业后我创建了一个java相关的公众号用来记录自己的学习之路感兴趣的小伙伴可以关注一下小伟后端笔记