Commit 2745cceb authored by wanghao's avatar wanghao

1 netty 接收 机械臂 回复状态功能实现,及 传送带 状态测试。

parent 533cce1e
...@@ -6,9 +6,9 @@ spring: ...@@ -6,9 +6,9 @@ spring:
druid: druid:
# 主库数据源 # 主库数据源
master: master:
url: jdbc:mysql://localhost:3306/ry?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 url: jdbc:mysql://localhost:3306/zh-mes-device-db?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&allowMultiQueries=true
username: root username: root
password: root password: Zh@123456
# 从库数据源 # 从库数据源
slave: slave:
# 从数据源开关/默认关闭 # 从数据源开关/默认关闭
...@@ -90,8 +90,16 @@ zehong: ...@@ -90,8 +90,16 @@ zehong:
# 实例演示开关 # 实例演示开关
demoEnabled: true demoEnabled: true
# 文件路径 示例( Windows配置D:/zehong/uploadPath,Linux配置 /home/zehong/uploadPath) # 文件路径 示例( Windows配置D:/zehong/uploadPath,Linux配置 /home/zehong/uploadPath)
profile: /home/zehong/uploadPath profile: D:/zehong/uploadPath
# 获取ip地址开关 # 获取ip地址开关
addressEnabled: false addressEnabled: false
# 验证码类型 math 数组计算 char 字符验证 # 验证码类型 math 数组计算 char 字符验证
captchaType: math captchaType: math
\ No newline at end of file
# Netty配置
netty:
port: 6001 # 服务端口
boss-group-thread-count: 1 # 主线程数
worker-group-thread-count: 8 # 工作线程数
max-frame-length: 65535 # 最大帧长度
heartbeat-timeout: 60 # 心跳超时时间(秒)
\ No newline at end of file
...@@ -94,4 +94,12 @@ zehong: ...@@ -94,4 +94,12 @@ zehong:
# 获取ip地址开关 # 获取ip地址开关
addressEnabled: false addressEnabled: false
# 验证码类型 math 数组计算 char 字符验证 # 验证码类型 math 数组计算 char 字符验证
captchaType: math captchaType: math
\ No newline at end of file
# Netty配置
netty:
port: 6001 # 服务端口
boss-group-thread-count: 1 # 主线程数
worker-group-thread-count: 8 # 工作线程数
max-frame-length: 65535 # 最大帧长度
heartbeat-timeout: 60 # 心跳超时时间(秒)
\ No newline at end of file
...@@ -88,8 +88,15 @@ zehong: ...@@ -88,8 +88,15 @@ zehong:
# 实例演示开关 # 实例演示开关
demoEnabled: true demoEnabled: true
# 文件路径 示例( Windows配置D:/zehong/uploadPath,Linux配置 /home/zehong/uploadPath) # 文件路径 示例( Windows配置D:/zehong/uploadPath,Linux配置 /home/zehong/uploadPath)
profile: D:/zehong/uploadPath profile: /home/zehong/uploadPath
# 获取ip地址开关 # 获取ip地址开关
addressEnabled: false addressEnabled: false
# 验证码类型 math 数组计算 char 字符验证 # 验证码类型 math 数组计算 char 字符验证
captchaType: math captchaType: math
\ No newline at end of file # Netty配置
netty:
port: 6001 # 服务端口
boss-group-thread-count: 1 # 主线程数
worker-group-thread-count: 8 # 工作线程数
max-frame-length: 65535 # 最大帧长度
heartbeat-timeout: 60 # 心跳超时时间(秒)
\ No newline at end of file
...@@ -26,7 +26,7 @@ spring: ...@@ -26,7 +26,7 @@ spring:
# 国际化资源文件路径 # 国际化资源文件路径
basename: i18n/messages basename: i18n/messages
profiles: profiles:
active: test active: dev
# 文件上传 # 文件上传
servlet: servlet:
multipart: multipart:
......
...@@ -125,6 +125,11 @@ ...@@ -125,6 +125,11 @@
<version>3.0.3</version> <version>3.0.3</version>
</dependency> </dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.86.Final</version>
</dependency>
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
...@@ -16,7 +16,11 @@ ...@@ -16,7 +16,11 @@
</description> </description>
<dependencies> <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!-- SpringBoot Web容器 --> <!-- SpringBoot Web容器 -->
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
......
package com.zehong.framework.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @author lenovo
* @date 2025/7/31
* @description Netty配置类
*/
@Component
@ConfigurationProperties(prefix = "netty")
public class NettyConfig {
/**
* 服务端口
*/
private int port = 6001;
/**
* 主线程数
*/
private int bossGroupThreadCount = 1;
/**
* 工作线程数
*/
private int workerGroupThreadCount = Runtime.getRuntime().availableProcessors() * 2;
/**
* 最大帧长度
*/
private int maxFrameLength = 65535;
/**
* 心跳检测超时时间(秒)
*/
private int heartbeatTimeout = 60;
// getter和setter方法
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public int getBossGroupThreadCount() {
return bossGroupThreadCount;
}
public void setBossGroupThreadCount(int bossGroupThreadCount) {
this.bossGroupThreadCount = bossGroupThreadCount;
}
public int getWorkerGroupThreadCount() {
return workerGroupThreadCount;
}
public void setWorkerGroupThreadCount(int workerGroupThreadCount) {
this.workerGroupThreadCount = workerGroupThreadCount;
}
public int getMaxFrameLength() {
return maxFrameLength;
}
public void setMaxFrameLength(int maxFrameLength) {
this.maxFrameLength = maxFrameLength;
}
public int getHeartbeatTimeout() {
return heartbeatTimeout;
}
public void setHeartbeatTimeout(int heartbeatTimeout) {
this.heartbeatTimeout = heartbeatTimeout;
}
}
package com.zehong.framework.netty;
import com.zehong.framework.config.NettyConfig;
import com.zehong.framework.netty.handler.NettyUdpServerHandler;
import org.springframework.stereotype.Component;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PreDestroy;
import java.util.concurrent.TimeUnit;
/**
* @author lenovo
* @date 2025/7/31
* @description TODO
*/
@Component
public class NettyUdpServer {
private static final Logger log = LoggerFactory.getLogger(NettyUdpServer.class);
@Autowired
private NettyConfig nettyConfig;
@Autowired
private NettyUdpServerHandler nettyUdpServerHandler;
private EventLoopGroup group;
private ChannelFuture channelFuture;
/**
* 启动UDP服务器
*/
public void start() {
// 避免重复启动
if (channelFuture != null && channelFuture.channel().isActive()) {
log.warn("Netty UDP服务器已启动,无需重复启动");
return;
}
try {
// UDP只需要一个事件循环组
group = new NioEventLoopGroup(nettyConfig.getWorkerGroupThreadCount());
// UDP使用Bootstrap而非ServerBootstrap
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
// 设置UDP通道类型
.channel(NioDatagramChannel.class)
// UDP支持广播
.option(ChannelOption.SO_BROADCAST, true)
// 设置接收缓冲区大小
.option(ChannelOption.SO_RCVBUF, 1024 * 1024)
// 设置发送缓冲区大小
.option(ChannelOption.SO_SNDBUF, 1024 * 1024)
// 设置处理器
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel ch) throws Exception {
ch.pipeline()
// UDP也可以使用心跳检测,但意义不如TCP大
.addLast(new IdleStateHandler(
0,
0,
nettyConfig.getHeartbeatTimeout(),
TimeUnit.SECONDS))
// 帧解码,解决粘包拆包问题
.addLast(new LengthFieldBasedFrameDecoder(
nettyConfig.getMaxFrameLength(),
0, 4, 0, 4))
// 帧编码
.addLast(new LengthFieldPrepender(4))
// 字符串解码
.addLast(new StringDecoder(CharsetUtil.UTF_8))
// 字符串编码
.addLast(new StringEncoder(CharsetUtil.UTF_8))
// 自定义UDP处理器
.addLast(nettyUdpServerHandler);
}
});
// 绑定端口,UDP不需要监听连接
channelFuture = bootstrap.bind(nettyConfig.getPort()).sync();
log.info("Netty UDP服务器启动成功,监听端口: {}", nettyConfig.getPort());
// 等待通道关闭
channelFuture.channel().closeFuture().addListener(future -> {
log.info("Netty UDP服务器通道关闭");
stop();
});
} catch (Exception e) {
log.error("Netty UDP服务器启动失败", e);
stop();
}
}
/**
* 停止UDP服务器
*/
@PreDestroy
public void stop() {
try {
if (channelFuture != null) {
channelFuture.channel().close().sync();
}
} catch (Exception e) {
log.error("Netty UDP通道关闭异常", e);
} finally {
if (group != null) {
group.shutdownGracefully();
}
log.info("Netty UDP服务器已停止");
}
}
/**
* 重启UDP服务器
*/
public void restart() {
log.info("准备重启Netty UDP服务器...");
stop();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
start();
}
}
package com.zehong.framework.netty.handler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.concurrent.locks.ReentrantLock;
import java.io.*;
import java.util.Date;
/**
* @author lenovo
* @date 2025/7/31
* @description TODO
*/
@Component
@Sharable
public class NettyUdpServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
private static final Logger log = LoggerFactory.getLogger(NettyUdpServerHandler.class);
// 日志文件保存目录
// 写法1:使用双反斜杠
private static final String LOG_DIR = "D:\\BaiduNetdiskDownload\\udp_message_logs\\";
// 日期格式器,用于生成文件名和日志时间
private static final SimpleDateFormat DATE_FORMATTER = new SimpleDateFormat("yyyyMMdd");
private static final SimpleDateFormat TIME_FORMATTER = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 线程安全锁,确保文件写入安全
private final ReentrantLock fileLock = new ReentrantLock();
/**
* 接收UDP消息
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
try {
// 获取消息内容的字节缓冲区
ByteBuf content = packet.content();
// 分配与消息内容长度相同的字节数组
byte[] bytes = new byte[content.readableBytes()];
// 将缓冲区内容复制到字节数组
content.readBytes(bytes);
// 尝试多种编码方式解码,用于排查问题
String messageGbk = new String(bytes, "GBK");
// 日志打印多种编码结果,帮助确定发送端使用的编码
log.info("收到来自{}的UDP消息:", packet.sender());
log.info("GBK解码: {}", messageGbk);
// 根据实际情况选择正确的编码方式
// 这里假设通过日志观察后确定发送端使用GBK编码
String correctMessage = messageGbk; // 初始值,根据实际情况修改
// 检测哪种编码更可能是正确的(简单判断)
if (containsValidChinese(messageGbk)) {
correctMessage = messageGbk;
}
// 保存消息到文件
saveMessageToFile(packet.sender().toString(), correctMessage);
// 处理消息逻辑
String response = "服务器已收到UDP消息:" + correctMessage;
// 回复客户端,明确使用UTF-8编码
byte[] responseBytes = response.getBytes(StandardCharsets.UTF_8);
ctx.writeAndFlush(new DatagramPacket(
io.netty.buffer.Unpooled.copiedBuffer(responseBytes),
packet.sender()));
} catch (Exception e) {
log.error("处理UDP消息异常", e);
}
}
/**
* 将消息保存到本地文件
*
* @param clientAddress 客户端地址
* @param message 消息内容
*/
private void saveMessageToFile(String clientAddress, String message) {
// 创建日志目录(如果不存在)
File dir = new File(LOG_DIR);
if (!dir.exists()) {
dir.mkdirs();
}
// 按日期生成文件名,每天一个文件
String fileName = LOG_DIR + "udp_log_" + DATE_FORMATTER.format(new Date()) + ".log";
// 构建日志内容
String logContent = String.format("[%s] 客户端[%s]:%s%n",
TIME_FORMATTER.format(new Date()),
clientAddress,
message);
// 使用锁确保多线程写入安全
fileLock.lock();
BufferedWriter writer = null;
try {
// 以追加模式写入文件
writer = new BufferedWriter(new OutputStreamWriter(
new FileOutputStream(fileName, true),
StandardCharsets.UTF_8));
writer.write(logContent);
writer.flush();
} catch (IOException e) {
log.error("保存UDP消息到文件失败", e);
} finally {
if (writer != null) {
try {
writer.close();
} catch (IOException e) {
log.error("关闭文件写入流失败", e);
}
}
fileLock.unlock();
}
}
/**
* 简单判断字符串是否包含有效的中文字符
*/
private boolean containsValidChinese(String str) {
if (str == null || str.isEmpty()) {
return false;
}
for (char c : str.toCharArray()) {
// 中文字符的Unicode范围
if (c >= 0x4E00 && c <= 0x9FA5) {
return true;
}
}
return false;
}
/**
* 异常处理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("UDP通道异常:{}", cause.getMessage(), cause);
// UDP无连接,通常不需要关闭通道
}
/**
* 心跳检测
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.ALL_IDLE) {
log.info("UDP服务器超过规定时间未收到数据");
// UDP无连接,一般不关闭通道
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
package com.zehong.framework.netty.listener;
import com.zehong.framework.netty.NettyUdpServer;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
/**
* @author lenovo
* @date 2025/7/31
* @description TODO
*/
@Component
@Order(1) // 控制启动顺序,数值越小越先执行
public class NettyServerListener implements ApplicationRunner {
private static final Logger log = LoggerFactory.getLogger(NettyServerListener.class);
@Autowired
private NettyUdpServer nettyServer;
/**
* 应用启动后执行,启动Netty服务器
*/
@Override
public void run(ApplicationArguments args) throws Exception {
try {
log.info("准备启动Netty服务器...");
// 启动Netty服务
nettyServer.start();
// 注册钩子,在JVM关闭时优雅停止Netty服务
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("JVM即将关闭,准备停止Netty服务器...");
nettyServer.stop();
}));
} catch (Exception e) {
log.error("启动Netty服务器失败", e);
// 启动失败时可以选择退出应用或进行其他处理
// System.exit(1);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment