👉 这是一个或许对你有用的社群

🐱 一对一交流/面试小册/简历优化/求职解惑,欢迎加入芋道快速开发平台知识星球。下面是星球提供的部分资料:

👉这是一个或许对你有用的开源项目

国产 Star 破 10w+ 的开源项目,前端包括管理后台 + 微信小程序,后端支持单体和微服务架构。

功能涵盖 RBAC 权限、SaaS 多租户、数据权限、商城、支付、工作流、大屏报表、微信公众号等等功能:

  • Boot 地址:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • Cloud 地址:https://gitee.com/zhijiantianya/yudao-cloud
  • 视频教程:https://doc.iocoder.cn

来源:blog.csdn.net/u013615903/article/details/129044283


项目背景

最近公司某物联网项目需要使用socket长连接进行消息通讯,捣鼓了一版代码上线,结果BUG不断,本猿寝食难安,于是求助度娘,数日未眠项目终于平稳运行了,本着开源共享的精神,本猿把项目代码提炼成了一个demo项目,尽量摒弃了其中丑陋的业务部分,希望与同学们共同学习进步。

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

正文

一、项目架构

本项目使用了netty、redis以及springboot2.2.0

二、项目模块

本项目目录结构如下图:

netty-tcp-core是公共模块,主要是工具类。netty-tcp-server是netty服务端,服务端仅作测试使用,实际项目中我们只使用了客户端。netty-tcp-client是客户端,也是本文的重点。

三、业务流程

我们实际项目中使用RocketMQ作为消息队列,本项目由于是demo项目于是改为了BlockingQueue。数据流为:

生产者->消息队列->消费者(客户端)->tcp通道->服务端->tcp通道->客户端。

当消费者接收到某设备发送的消息后,将判断缓存中是否存在该设备与服务端的连接,如果存在并且通道活跃则使用该通道发送消息,如果不存在则创建通道并在通道激活后立即发送消息,当客户端收到来自服务端的消息时进行响应的业务处理。

四、代码详解

1.消息队列

由于本demo项目移除了消息中间件,于是需要自己创建一个本地队列模拟真实使用场景

packageorg.example.client.controller;importorg.example.client.QueueHolder;importorg.example.client.model.NettyMsgModel;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RequestParam;importorg.springframework.web.bind.annotation.RestController;/***@authorReWind00*@date2023/2/1513:48*/@RestController@RequestMapping("/demo")publicclassDemoController{/***间隔发送两条消息*/@GetMapping("testOne")publicvoidtestOne(){QueueHolder.get().offer(NettyMsgModel.create("87654321","HelloWorld!"));try{Thread.sleep(5000);}catch(InterruptedExceptione){e.printStackTrace();}QueueHolder.get().offer(NettyMsgModel.create("87654321","HelloWorldToo!"));}/***任意发送消息**@paramimei*@parammsg*/@GetMapping("testTwo")publicvoidtestTwo(@RequestParamStringimei,@RequestParamStringmsg){QueueHolder.get().offer(NettyMsgModel.create(imei,msg));}/***连续发送两条消息第二条由于redis锁将会重新放回队列延迟消费*/@GetMapping("testThree")publicvoidtestThree(){QueueHolder.get().offer(NettyMsgModel.create("12345678","HelloWorld!"));QueueHolder.get().offer(NettyMsgModel.create("12345678","HelloWorldToo!"));}}

测试接口代码如上,调用testOne,日志如下:

可以看到第一条消息触发了客户端创建流程,创建后发送了消息,而5秒后的第二条消息直接通过已有通道发送了。

测试接口代码如上,调用testTwo,日志如下:

发送shutdown可以主动断开已有连接。

测试接口代码如上,调用testThree,日志如下:

可以看到第二条消息重新入列并被延迟消费了。

六、源码

https://gitee.com/jaster/netty-tcp-demo

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/yudao-cloud
  • 视频教程:https://doc.iocoder.cn/video/

后记

本demo项目仅作学习交流使用,如果要应用到生产环境还有些许不足,有问题的同学可以留言交流。


欢迎加入我的知识星球,全面提升技术能力。

👉加入方式,长按”或“扫描”下方二维码噢

星球的内容包括:项目实战、面试招聘、源码解析、学习路线。

文章有帮助的话,在看,转发吧。

谢谢支持哟 (*^__^*)

本篇文章来源于微信公众号: 芋道源码



微信扫描下方的二维码阅读本文

此作者没有提供个人介绍
最后更新于 2023-11-02