Commit 4f3434bd authored by wanghao's avatar wanghao

1 测试 上电后通信 和 最终完成 定时任务功能

parent bc577bd4
...@@ -5,6 +5,7 @@ import com.serotonin.modbus4j.ModbusMaster; ...@@ -5,6 +5,7 @@ import com.serotonin.modbus4j.ModbusMaster;
import com.serotonin.modbus4j.exception.ModbusInitException; import com.serotonin.modbus4j.exception.ModbusInitException;
import com.serotonin.modbus4j.exception.ModbusTransportException; import com.serotonin.modbus4j.exception.ModbusTransportException;
import com.serotonin.modbus4j.ip.IpParameters; import com.serotonin.modbus4j.ip.IpParameters;
import com.serotonin.modbus4j.ip.tcp.TcpMaster;
import com.serotonin.modbus4j.msg.ModbusResponse; import com.serotonin.modbus4j.msg.ModbusResponse;
import com.serotonin.modbus4j.msg.ReadHoldingRegistersRequest; import com.serotonin.modbus4j.msg.ReadHoldingRegistersRequest;
import com.serotonin.modbus4j.msg.ReadHoldingRegistersResponse; import com.serotonin.modbus4j.msg.ReadHoldingRegistersResponse;
...@@ -15,6 +16,7 @@ import com.zehong.system.service.ITEquipmentAlarmDataService; ...@@ -15,6 +16,7 @@ import com.zehong.system.service.ITEquipmentAlarmDataService;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.net.Socket; import java.net.Socket;
import java.util.List; import java.util.List;
import java.util.concurrent.*; import java.util.concurrent.*;
...@@ -149,48 +151,88 @@ public class DeviceStatusReaderAndTimeSetter { ...@@ -149,48 +151,88 @@ public class DeviceStatusReaderAndTimeSetter {
params.setHost(ip); params.setHost(ip);
params.setPort(port); params.setPort(port);
ModbusMaster master = modbusFactory.createTcpMaster(params, true); // 显式声明为TcpMaster,避免后续反射类型转换问题
master.setTimeout(3000); TcpMaster master = (TcpMaster) modbusFactory.createTcpMaster(params, true);
master.setRetries(1); master.setTimeout(3000); // 3秒连接超时
master.setRetries(1); // 1次重试
master.init(); master.init();
log.debug("Modbus连接创建成功:IP={}, 端口={}, Master={}", ip, port, master);
return master; return master;
} }
/**
* 核心修复:反射获取TcpMaster底层Socket(处理私有字段权限)
*/
private static Socket getUnderlyingSocket(ModbusMaster master) {
if (!(master instanceof TcpMaster)) {
log.error("ModbusMaster不是TcpMaster类型,无法获取Socket");
return null;
}
TcpMaster tcpMaster = (TcpMaster) master;
try {
// 1. 获取TcpMaster的private字段"transport"(类型:TcpTransport)
Field transportField = TcpMaster.class.getDeclaredField("transport");
transportField.setAccessible(true); // 取消私有字段访问检查
Object transport = transportField.get(tcpMaster);
if (transport == null) {
log.error("TcpMaster的transport字段为null");
return null;
}
// 2. 获取TcpTransport的private字段"socket"(类型:Socket)
// 注意:TcpTransport是Modbus4j的内部类,包路径为com.serotonin.modbus4j.ip.tcp.TcpTransport
Class<?> transportClass = Class.forName("com.serotonin.modbus4j.ip.tcp.TcpTransport");
Field socketField = transportClass.getDeclaredField("socket");
socketField.setAccessible(true); // 取消私有字段访问检查
Socket socket = (Socket) socketField.get(transport);
log.debug("成功获取Socket:{}", socket);
return socket;
} catch (NoSuchFieldException e) {
log.error("反射获取字段失败(可能Modbus4j版本不兼容),请检查字段名", e);
} catch (IllegalAccessException e) {
log.error("反射访问字段权限失败", e);
} catch (ClassNotFoundException e) {
log.error("TcpTransport类未找到(Modbus4j版本不兼容)", e);
} catch (Exception e) {
log.error("获取底层Socket异常", e);
}
return null;
}
/** /**
* 启动多设备监控(核心方法) * 启动多设备监控(核心方法)
*/ */
public void startMultiDeviceMonitoring( public void startMultiDeviceMonitoring(
String ip, String ip, int port, List<Integer> deviceIds,
int port,
List<Integer> deviceIds,
Consumer<DeviceStatusReaderDto> resultHandler, Consumer<DeviceStatusReaderDto> resultHandler,
Predicate<int[]> stopCondition ) { Predicate<int[]> stopCondition) {
if (deviceIds == null || deviceIds.isEmpty()) { if (deviceIds == null || deviceIds.isEmpty()) {
log.warn("⚠️ 设备ID列表为空,不执行监控"); log.warn("⚠️ 设备ID列表为空,不执行监控");
return; return;
} }
final CountDownLatch latch = new CountDownLatch(deviceIds.size()); final CountDownLatch latch = new CountDownLatch(deviceIds.size());
log.info("开始监控设备:IP={}, 端口={}, 设备数={}(线程池活跃线程:{})", log.info("开始监控设备:IP={}, 端口={}, 设备数={}(活跃线程:{})",
ip, port, deviceIds.size(), ((ThreadPoolExecutor) FIXED_THREAD_POOL).getActiveCount()); ip, port, deviceIds.size(), ((ThreadPoolExecutor) FIXED_THREAD_POOL).getActiveCount());
for (int deviceId : deviceIds) { for (int deviceId : deviceIds) {
final int devId = deviceId; final int devId = deviceId;
FIXED_THREAD_POOL.submit(() -> { FIXED_THREAD_POOL.submit(() -> {
ModbusMaster threadMaster = null; ModbusMaster threadMaster = null;
Socket underlyingSocket = null; // 新增:跟踪底层Socket Socket underlyingSocket = null;
try { try {
// 1. 创建Modbus连接并获取底层Socket(反射方式,根据Modbus4j版本调整) // 1. 创建Modbus连接
threadMaster = createModbusMaster(ip, port); threadMaster = createModbusMaster(ip, port);
// 注意:以下反射代码依赖Modbus4j内部实现,不同版本可能需要调整 // 2. 反射获取底层Socket(修复私有字段访问问题)
// 目的是获取底层Socket,确保关闭 underlyingSocket = getUnderlyingSocket(threadMaster);
Object transport = threadMaster.getClass().getDeclaredField("transport").get(threadMaster);
underlyingSocket = (Socket) transport.getClass().getDeclaredField("socket").get(transport);
// 2. 读取数据(带超时控制) // 3. 读取设备数据
int[] result = readWithConditionalRetry(threadMaster, ip, port, devId, stopCondition); int[] result = readWithConditionalRetry(threadMaster, ip, port, devId, stopCondition);
// 3. 处理结果(判空避免NPE) // 4. 回调处理结果
if (resultHandler != null) { if (resultHandler != null) {
resultHandler.accept(new DeviceStatusReaderDto(ip, port, devId, result)); resultHandler.accept(new DeviceStatusReaderDto(ip, port, devId, result));
} }
...@@ -199,8 +241,9 @@ public class DeviceStatusReaderAndTimeSetter { ...@@ -199,8 +241,9 @@ public class DeviceStatusReaderAndTimeSetter {
log.error("设备{}: 监控任务异常", devId, e); log.error("设备{}: 监控任务异常", devId, e);
String alarmMsg = e instanceof OutOfMemoryError ? "内存溢出:" + e.getMessage() : "监控任务异常:" + e.getMessage(); String alarmMsg = e instanceof OutOfMemoryError ? "内存溢出:" + e.getMessage() : "监控任务异常:" + e.getMessage();
recordAlarm("03", "ip:" + ip + ",port:" + port + ",deviceId:" + devId, alarmMsg); recordAlarm("03", "ip:" + ip + ",port:" + port + ",deviceId:" + devId, alarmMsg);
} finally { } finally {
// 关键修复:优先关闭底层Socket,再销毁Master // 5. 优先关闭Socket,再销毁ModbusMaster(彻底释放资源)
if (underlyingSocket != null) { if (underlyingSocket != null) {
try { try {
if (!underlyingSocket.isClosed()) { if (!underlyingSocket.isClosed()) {
...@@ -211,6 +254,7 @@ public class DeviceStatusReaderAndTimeSetter { ...@@ -211,6 +254,7 @@ public class DeviceStatusReaderAndTimeSetter {
log.error("设备{}: Socket关闭失败", devId, e); log.error("设备{}: Socket关闭失败", devId, e);
} }
} }
// 6. 销毁ModbusMaster
if (threadMaster != null) { if (threadMaster != null) {
try { try {
threadMaster.destroy(); threadMaster.destroy();
...@@ -220,23 +264,23 @@ public class DeviceStatusReaderAndTimeSetter { ...@@ -220,23 +264,23 @@ public class DeviceStatusReaderAndTimeSetter {
} }
} }
latch.countDown(); latch.countDown();
log.info("设备{}: 监控任务完成(剩余:{})", devId, latch.getCount()); log.info("设备{}: 监控任务完成(剩余任务:{})", devId, latch.getCount());
} }
}); });
} }
// 等待任务完成,超时后中断未完成任务 // 等待所有任务完成,超时后记录告警
try { try {
if (!latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS)) { if (!latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
log.warn("IP{}: 部分设备监控超时(未完成:{}),强制中断", ip, latch.getCount()); log.warn("IP{}: 部分设备监控超时(未完成:{})", ip, latch.getCount());
recordAlarm("03", "ip:" + ip + ",port:" + port, "部分设备监控超时(未完成:" + latch.getCount() + ")"); recordAlarm("03", "ip:" + ip + ",port:" + port, "部分设备监控超时(未完成:" + latch.getCount() + ")");
// 注意:线程池任务无法直接中断,需通过任务内部的中断机制(如Thread.interrupted())
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("IP{}: 监控任务被中断", ip, e); log.error("IP{}: 监控任务被中断", ip, e);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
/** /**
* 统一告警记录(抽离避免重复代码) * 统一告警记录(抽离避免重复代码)
*/ */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment