netty之Netty心跳服务与断线重连

news/2024/10/4 20:49:14 标签: python, 开发语言

前言


使用netty中,需要监测服务是否稳定以及在网络异常链接断开时候可以自动重连。需要实现监听;f.addListener(new MyChannelFutureListener())
代码目录结构
在这里插入图片描述

package com.lm.demo.netty.client;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop;

import java.util.concurrent.TimeUnit;


public class MyChannelFutureListener implements ChannelFutureListener {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        if (channelFuture.isSuccess()) {
            System.out.println("client start done. {关注明哥,获取源码}");
            return;
        }
        final EventLoop loop = channelFuture.channel().eventLoop();
        loop.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    new NettyClient().connect("127.0.0.1", 7397);
                    System.out.println("client start done. {关注明哥,获取源码}");
                    Thread.sleep(500);
                } catch (Exception e) {
                    System.out.println("client start error go reconnect ... {关注明哥,获取源码}");
                }
            }
        }, 1L, TimeUnit.SECONDS);
    }
}

package com.lm.demo.netty.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;

import java.text.SimpleDateFormat;
import java.util.Date;


public class MyClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        SocketChannel channel = (SocketChannel) ctx.channel();
        System.out.println("链接报告开始");
        System.out.println("链接报告信息:本客户端链接到服务端。channelId:" + channel.id());
        System.out.println("链接报告IP:" + channel.localAddress().getHostString());
        System.out.println("链接报告Port:" + channel.localAddress().getPort());
        System.out.println("链接报告完毕");
    }

    /**
     * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("断开链接重连" + ctx.channel().localAddress().toString());
        //使用过程中断线重连
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    new NettyClient().connect("127.0.0.1", 7397);
                    System.out.println(" client start done. {关注明哥,获取源码}");
                    Thread.sleep(500);
                } catch (Exception e){
                    System.out.println("client start error go reconnect ... {关注明哥,获取源码}");
                }
            }
        }).start();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收msg消息{与上一章节相比,此处已经不需要自己进行解码}
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);
    }

    /**
     * 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("异常信息,断开重连:\r\n" + cause.getMessage());
        ctx.close();
    }

}

public class NettyClient {

    public static void main(String[] args) {
        new NettyClient().connect("127.0.0.1", 7397);
    }

    public void connect(String inetHost, int inetPort) {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.AUTO_READ, true);
            b.handler(new MyChannelInitializer());
            ChannelFuture f = b.connect(inetHost, inetPort).sync();
            f.addListener(new MyChannelFutureListener()); //添加监听,处理重连
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

}



public class MyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            if (e.state() == IdleState.READER_IDLE) {
                System.out.println("关注明哥提醒=> Reader Idle");
                ctx.writeAndFlush("读取等待:公众号关注明哥,客户端你在吗[ctx.close()]{我结尾是一个换行符用于处理半包粘包}... ...\r\n");
                ctx.close();
            } else if (e.state() == IdleState.WRITER_IDLE) {
                System.out.println("关注明哥提醒=> Write Idle");
                ctx.writeAndFlush("写入等待:公众号关注明哥,客户端你在吗{我结尾是一个换行符用于处理半包粘包}... ...\r\n");
            } else if (e.state() == IdleState.ALL_IDLE) {
                System.out.println("关注明哥提醒=> All_IDLE");
                ctx.writeAndFlush("全部时间:公众号关注明哥,客户端你在吗{我结尾是一个换行符用于处理半包粘包}... ...\r\n");
            }
        }
        ctx.flush();
    }

    /**
     * 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        SocketChannel channel = (SocketChannel) ctx.channel();
        System.out.println("链接报告开始");
        System.out.println("链接报告信息:有一客户端链接到本服务端");
        System.out.println("链接报告IP:" + channel.localAddress().getHostString());
        System.out.println("链接报告Port:" + channel.localAddress().getPort());
        System.out.println("链接报告完毕");
        //通知客户端链接建立成功
        String str = "通知客户端链接建立成功" + " " + new Date() + " " + channel.localAddress().getHostString() + "\r\n";
        ctx.writeAndFlush(str);
    }

    /**
     * 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端断开链接" + ctx.channel().localAddress().toString());
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收msg消息{与上一章节相比,此处已经不需要自己进行解码}
        System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);
        //通知客户端链消息发送成功
        String str = "服务端收到:" + new Date() + " " + msg + "\r\n";
        ctx.writeAndFlush(str);
    }

    /**
     * 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
        System.out.println("异常信息:\r\n" + cause.getMessage());
    }

}

package com.lm.demo.netty.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;


public class NettyServer {

    public static void main(String[] args) {
        new NettyServer().bing(7397);
    }

    private void bing(int port) {
        //配置服务端NIO线程组
        EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
        EventLoopGroup childGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class)    //非阻塞模式
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childHandler(new MyChannelInitializer());
            ChannelFuture f = b.bind(port).sync();
            System.out.println("server start done. {关注明哥,获取源码}");
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            childGroup.shutdownGracefully();
            parentGroup.shutdownGracefully();
        }

    }

}

测试结果
启动NettyServer *在心跳中设置ctx.close();模拟断开链接,等待重连
在这里插入图片描述
启动NettyClient
在这里插入图片描述
好了到这里就结束了netty之Netty心跳服务与断线重连的学习,大家一定要跟着动手操作起来。需要的源码的 可si我获取;


http://www.niftyadmin.cn/n/5690314.html

相关文章

MATLAB中数据导入与导出的全面指南

在MATLAB中,数据的导入与导出是数据处理工作流中的两个基本步骤。导入是将外部数据加载到MATLAB工作区的过程,而导出则是将工作区中的数据保存到外部文件中。这两个步骤对于数据分析、可视化和结果共享至关重要。本文将详细介绍如何在MATLAB中进行数据的…

PostgreSQL常用数值处理函数简介

PostgreSQL 提供了许多用于数值处理的函数,涵盖数学运算、取整、取余、随机数生成等操作。以下是一些常用的数值处理函数及其使用示例: 1. ABS() - 取绝对值 返回数字的绝对值。 SELECT ABS(-10); -- 返回 10 SELECT ABS(3.5); -- 返回 3.52. CEIL(…

【React】入门Day04 —— 项目搭建及登录与表单校验、token 管理、路由鉴权实现

项目搭建 创建项目 # 使用npx创建项目 npx create-react-app my-react-app # 进入项目目录 cd my-react-app # 创建项目目录结构 mkdir -p src/{apis,assets,components,pages,store,utils} touch src/{App.js,index.css,index.js} 使用npx create-react-app创建项目&#xff0…

苹果盛宴:iPhone 16系列领衔,智能穿戴新潮流来袭

在科技界备受瞩目的苹果秋季发布会上,众多新品悉数亮相,从全新的Apple Watch系列到AirPods系列,再到备受期待的iPhone 16系列,每一款产品都以其独特的创新和卓越的性能,再次定义了智能设备的高标准。 本文将带您领略这…

Yolov11项目实战1:道路缺陷检测系统设计【Python源码+数据集+运行演示】

一、项目背景 随着城市化进程的加速和交通网络的不断扩展,道路维护成为城市管理中的一个重要环节。道路缺陷(如裂缝、坑洞、路面破损等)不仅影响行车安全,还会增加车辆的磨损和维修成本。传统的道路缺陷检测方法主要依赖人工巡检…

ACT调试pycharm报错

在运行ACT 代码时,根据官方readme使用命令行需要在wandb选择的时候输入3 但是,使用pycharm运行的时候会报错 wandb.errors.UsageError: api_key not configured (no-tty). call wandb.login(key[your_api_key]) 网上搜索都是说要注册什么key&#xf…

工程机械车辆挖掘机自卸卡车轮式装载机检测数据集VOC+YOLO格式2644张3类别

数据集格式:Pascal VOC格式YOLO格式(不包含分割路径的txt文件,仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数):2644 标注数量(xml文件个数):2644 标注数量(txt文件个数):2644 标注…

NettyServerHandler注入其他类为null

NettyServerHandler中注入其他类为空解决办法: 增加如下代码 public static NettyServerHandler nettyServerHandler;PostConstructpublic void init() {nettyServerHandler this;}整个类代码如下: Component public class NettyServerHandler extends ChannelIn…