Commit 2511f9c5 authored by wanghao's avatar wanghao

1 指令指令完成,但是 没有检测到机械臂完成。

parent db7ad245
...@@ -15,11 +15,15 @@ import org.slf4j.LoggerFactory; ...@@ -15,11 +15,15 @@ import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.net.Socket;
import java.util.Arrays; import java.util.Arrays;
import java.util.Calendar; import java.util.Calendar;
import java.util.Date; import java.util.Date;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Predicate; import java.util.function.Predicate;
...@@ -38,146 +42,265 @@ public class ModbusResultHandler implements Consumer<DeviceStatusReaderDto> { ...@@ -38,146 +42,265 @@ public class ModbusResultHandler implements Consumer<DeviceStatusReaderDto> {
@Resource @Resource
private PalletDeviceBindingMapper palletDeviceBindingMapper; private PalletDeviceBindingMapper palletDeviceBindingMapper;
// 创建固定大小的线程池(根据实际情况调整大小) // 线程池:非静态,Spring管理,容器关闭时关闭
private static final ExecutorService executorService = Executors.newFixedThreadPool( private final ExecutorService executorService;
Runtime.getRuntime().availableProcessors() * 2 // -------------------------- 构造函数(初始化线程池) --------------------------
public ModbusResultHandler() {
// 线程池配置:核心线程数=CPU核数,最大线程数=CPU核数*2,队列有界(避免OOM)
this.executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors() * 2,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100), // 队列大小100,超过则触发拒绝策略
r -> new Thread(r, "modbus-result-handler-" + System.currentTimeMillis()),
new ThreadPoolExecutor.CallerRunsPolicy() // 队列满时由调用线程执行,避免任务丢失
); );
}
// -------------------------- 核心处理逻辑 --------------------------
@Override @Override
public void accept(DeviceStatusReaderDto deviceStatusReaderDto) { public void accept(DeviceStatusReaderDto deviceStatusReaderDto) {
// 提交任务到线程池异步执行 // 校验dto不为null
executorService.submit(() -> handleData(deviceStatusReaderDto)); if (deviceStatusReaderDto == null) {
log.error("ModbusResultHandler接收的DeviceStatusReaderDto为null,跳过处理");
return;
}
// 提交任务到线程池,并用Future捕获异常(避免异常被吞噬)
executorService.submit(() -> {
try {
handleData(deviceStatusReaderDto);
} catch (Throwable e) {
// 捕获所有异常,记录详细日志(关键:避免线程池任务异常被吞噬)
log.error("ModbusResultHandler处理数据异常:dto={}", deviceStatusReaderDto, e);
// 记录告警,感知异常
recordAlarm(null,
"ip:" + deviceStatusReaderDto.getIp() + ",port:" + deviceStatusReaderDto.getPort(),
"结果处理器异常:" + e.getMessage());
}
});
} }
// 实际的数据处理方法(线程安全) /**
* 实际数据处理(修复空指针、资源泄漏、异常捕获)
*/
private void handleData(DeviceStatusReaderDto dto) { private void handleData(DeviceStatusReaderDto dto) {
int deviceId = dto.getDeviceId(); // 1. 提取参数并校验(避免NPE)
Integer deviceId = dto.getDeviceId();
int[] data = dto.getRegisterData(); int[] data = dto.getRegisterData();
String ip = dto.getIp(); String ip = dto.getIp();
int port = dto.getPort(); Integer port = dto.getPort();
// 根据 层ip 查找 层 code 根据层code 查 托盘id 根据托盘id 和 deviceId 查 绑定的点位数据 if (deviceId == null || data == null || StringUtils.isBlank(ip) || port == null) {
PalletDeviceBinding palletDeviceBinding = palletDeviceBindingMapper.selectByTrayIdAndIndex(ip, deviceId); log.error("DeviceStatusReaderDto参数不完整:deviceId={}, ip={}, port={}, data={}",
deviceId, ip, port, Arrays.toString(data));
recordAlarm(null, "ip:" + ip + ",port:" + port, "参数不完整:deviceId/ip/port/data为空");
return;
}
TEquipmentAlarmData alarmData; // 2. 查询数据库并校验(避免NPE)
PalletDeviceBinding palletDeviceBinding;
try {
palletDeviceBinding = palletDeviceBindingMapper.selectByTrayIdAndIndex(ip, deviceId);
} catch (Exception e) {
log.error("查询PalletDeviceBinding异常:ip={}, deviceId={}", ip, deviceId, e);
recordAlarm(null, "ip:" + ip + ",port:" + port + ",deviceId:" + deviceId,
"查询绑定关系异常:" + e.getMessage());
return;
}
// 1 获取 线程专有的Modbus连接 if (palletDeviceBinding == null) {
log.error("未查询到PalletDeviceBinding:ip={}, deviceId={}", ip, deviceId);
recordAlarm(null, "ip:" + ip + ",port:" + port + ",deviceId:" + deviceId,
"未找到设备绑定关系");
return;
}
// 3. Modbus操作(修复资源泄漏,捕获所有异常)
ModbusMaster master = null;
try { try {
ModbusMaster master = Modbus4jUtils.getMaster(ip, port); // 获取ModbusMaster
master = Modbus4jUtils.getMaster(ip, port);
if (master == null) {
log.error("获取ModbusMaster失败:ip={}, port={}", ip, port);
recordAlarm(palletDeviceBinding, "获取Modbus连接失败");
return;
}
log.info(">>> 回调处理: 接收到新数据" + Thread.currentThread().getName() + "]: 接收到新数据"); log.info(">>> 回调处理[{}]: 接收到数据:deviceId={}, data={}",
log.info(" 数据: " + Arrays.toString(data)); Thread.currentThread().getName(), deviceId, Arrays.toString(data));
// 4. 校验数据长度并处理
if (data.length >= 2) { if (data.length >= 2) {
// 给点位设置状态 // 更新设备状态
palletDeviceBinding.setStatus(data[1] + ""); palletDeviceBinding.setStatus(String.valueOf(data[1]));
log.info(" >>> 注意: 第二个寄存器值为1 或者 3 或者 4 允许写时间!"); // 校验状态是否允许写时间
if(data[1] == 1 || data[1] == 3 || data[1] == 4) { if (data[1] == 1 || data[1] == 3 || data[1] == 4) {
writeCurrentTimeToDevice(master, deviceId, palletDeviceBinding, ip, port);
} else {
log.warn("设备状态不允许写时间:deviceId={}, status={}", deviceId, data[1]);
recordAlarm(palletDeviceBinding, "设备状态不允许写时间:状态=" + data[1]);
}
} else {
log.warn("设备数据长度不足:deviceId={}, dataLength={}", deviceId, data.length);
recordAlarm(palletDeviceBinding, "设备数据长度不足:" + data.length + "(需至少2个寄存器)");
}
} catch (ModbusInitException e) {
log.error("Modbus初始化异常:ip={}, port={}", ip, port, e);
recordAlarm(palletDeviceBinding, "Modbus初始化失败:" + e.getMessage());
} catch (ModbusTransportException e) {
log.error("Modbus传输异常:ip={}, port={}, deviceId={}", ip, port, deviceId, e);
recordAlarm(palletDeviceBinding, "Modbus写时间失败:" + e.getMessage());
} catch (Exception e) {
// 捕获其他异常(如NPE、数据溢出)
log.error("处理Modbus数据异常:ip={}, port={}, deviceId={}", ip, port, deviceId, e);
recordAlarm(palletDeviceBinding, "数据处理异常:" + e.getMessage());
} finally {
// 关键:关闭Modbus连接,避免资源泄漏
destroyModbusMaster(master, deviceId);
// 更新数据库(单独捕获异常,避免覆盖连接关闭的异常)
try {
palletDeviceBindingMapper.updatePalletDeviceBinding(palletDeviceBinding);
log.debug("更新PalletDeviceBinding成功:deviceId={}", deviceId);
} catch (Exception e) {
log.error("更新PalletDeviceBinding异常:deviceId={}", deviceId, e);
recordAlarm(palletDeviceBinding, "更新设备状态异常:" + e.getMessage());
}
}
}
/**
* 写入当前时间到设备(拆分方法,降低复杂度)
*/
private void writeCurrentTimeToDevice(ModbusMaster master, int deviceId,
PalletDeviceBinding binding, String ip, int port) throws ModbusTransportException, ModbusInitException {
Calendar cal = Calendar.getInstance(); Calendar cal = Calendar.getInstance();
// 当前年 int year = cal.get(Calendar.YEAR);
int y = cal.get(Calendar.YEAR); int month = cal.get(Calendar.MONTH) + 1;
// 当前月 int day = cal.get(Calendar.DATE);
int m = cal.get(Calendar.MONTH) + 1; int hour = cal.get(Calendar.HOUR_OF_DAY);
// 当前日 int minute = cal.get(Calendar.MINUTE);
int d = cal.get(Calendar.DATE);
// 当前小时 // 校验年份是否超过Short范围(避免数据溢出)
int h = cal.get(Calendar.HOUR_OF_DAY); if (year > Short.MAX_VALUE) {
// 当前分钟 log.error("年份超过Short最大值:year={}, max={}", year, Short.MAX_VALUE);
int mm = cal.get(Calendar.MINUTE); recordAlarm(binding, "写入年份异常:年份超过Short最大值(" + Short.MAX_VALUE + ")");
return;
boolean yearResult = Modbus4jUtils.writeRegister(master,deviceId, 4, (short) y); }
if(!yearResult) {
alarmData = new TEquipmentAlarmData(); // 写入年份
alarmData.setfAlarmType("04"); if (Modbus4jUtils.writeRegister(master, deviceId, 4, (short) year)) {
alarmData.setCreateTime(new Date()); binding.setRecordYear(String.valueOf(year));
alarmData.setfEquipmentCode("ip:" + ip + ",port:" + port + "deviceId:" + deviceId);
alarmData.setfAlarmData("年-日期写入失败");
alarmDataService.insertTEquipmentAlarmData(alarmData);
}else {
palletDeviceBinding.setRecordYear(y + "");
}
boolean mResult = Modbus4jUtils.writeRegister(master,deviceId, 5, (short) m);
if(!mResult) {
alarmData = new TEquipmentAlarmData();
alarmData.setfAlarmType("04");
alarmData.setCreateTime(new Date());
alarmData.setfEquipmentCode("ip:" + ip + ",port:" + port + "deviceId:" + deviceId);
alarmData.setfAlarmData("月-日期写入失败");
alarmDataService.insertTEquipmentAlarmData(alarmData);
}else {
palletDeviceBinding.setRecordMonth(m + "");
}
boolean dResult = Modbus4jUtils.writeRegister(master,deviceId, 6, (short) d);
if(!dResult) {
alarmData = new TEquipmentAlarmData();
alarmData.setfAlarmType("04");
alarmData.setCreateTime(new Date());
alarmData.setfEquipmentCode("ip:" + ip + ",port:" + port + "deviceId:" + deviceId);
alarmData.setfAlarmData("天-日期写入失败");
alarmDataService.insertTEquipmentAlarmData(alarmData);
}else {
palletDeviceBinding.setRecordDate(d + "");
}
boolean hResult = Modbus4jUtils.writeRegister(master,deviceId, 7, (short) h);
if(!hResult) {
alarmData = new TEquipmentAlarmData();
alarmData.setfAlarmType("04");
alarmData.setCreateTime(new Date());
alarmData.setfEquipmentCode("ip:" + ip + ",port:" + port + "deviceId:" + deviceId);
alarmData.setfAlarmData("时-日期写入失败");
alarmDataService.insertTEquipmentAlarmData(alarmData);
}else {
palletDeviceBinding.setRecordHour(h + "");
}
boolean mmResult = Modbus4jUtils.writeRegister(master,deviceId, 8, (short) mm);
if(!mmResult) {
alarmData = new TEquipmentAlarmData();
alarmData.setfAlarmType("04");
alarmData.setCreateTime(new Date());
alarmData.setfEquipmentCode("ip:" + ip + ",port:" + port + "deviceId:" + deviceId);
alarmData.setfAlarmData("分-日期写入失败");
alarmDataService.insertTEquipmentAlarmData(alarmData);
} else { } else {
palletDeviceBinding.setRecordMinute(mm + ""); recordAlarm(binding, "写入年份失败");
} }
// 更新
palletDeviceBindingMapper.updatePalletDeviceBinding(palletDeviceBinding); // 写入月份
if (Modbus4jUtils.writeRegister(master, deviceId, 5, (short) month)) {
binding.setRecordMonth(String.valueOf(month));
} else { } else {
alarmData = new TEquipmentAlarmData(); recordAlarm(binding, "写入月份失败");
alarmData.setfAlarmType("04");
alarmData.setCreateTime(new Date());
alarmData.setfEquipmentCode("ip:" + ip + ",port:" + port + "deviceId:" + deviceId);
alarmData.setfAlarmData("三次读取后 状态不是为 1 或 3 或 4");
alarmDataService.insertTEquipmentAlarmData(alarmData);
} }
// 写入日期
if (Modbus4jUtils.writeRegister(master, deviceId, 6, (short) day)) {
binding.setRecordDate(String.valueOf(day));
} else { } else {
alarmData = new TEquipmentAlarmData(); recordAlarm(binding, "写入日期失败");
alarmData.setfAlarmType("04"); }
alarmData.setCreateTime(new Date());
alarmData.setfEquipmentCode("ip:" + ip + ",port:" + port + "deviceId:" + deviceId); // 写入小时
alarmData.setfAlarmData("三次读取后 状态不是为 1 或 3 或 4"); if (Modbus4jUtils.writeRegister(master, deviceId, 7, (short) hour)) {
alarmDataService.insertTEquipmentAlarmData(alarmData); binding.setRecordHour(String.valueOf(hour));
} else {
recordAlarm(binding, "写入小时失败");
}
// 写入分钟
if (Modbus4jUtils.writeRegister(master, deviceId, 8, (short) minute)) {
binding.setRecordMinute(String.valueOf(minute));
} else {
recordAlarm(binding, "写入分钟失败");
}
}
/**
* 销毁ModbusMaster(关闭连接,避免资源泄漏)
*/
private void destroyModbusMaster(ModbusMaster master, int deviceId) {
if (master == null) return;
try {
// 1. 反射关闭Socket(适配Modbus4j 3.0.3)
if (master instanceof com.serotonin.modbus4j.ip.tcp.TcpMaster) {
com.serotonin.modbus4j.ip.tcp.TcpMaster tcpMaster = (com.serotonin.modbus4j.ip.tcp.TcpMaster) master;
Field socketField = com.serotonin.modbus4j.ip.tcp.TcpMaster.class.getDeclaredField("socket");
socketField.setAccessible(true);
Socket socket = (Socket) socketField.get(tcpMaster);
if (socket != null && !socket.isClosed()) {
socket.close();
log.debug("设备{}: Modbus Socket已关闭", deviceId);
}
}
} catch (Exception e) {
log.error("设备{}: 关闭Modbus Socket异常", deviceId, e);
} finally {
// 2. 调用destroy()方法
try {
master.destroy();
log.debug("设备{}: ModbusMaster已销毁", deviceId);
} catch (Exception e) {
log.error("设备{}: 销毁ModbusMaster异常", deviceId, e);
}
} }
} catch (ModbusInitException e) { }
alarmData = new TEquipmentAlarmData();
alarmData.setfAlarmType("04"); //01.老化柜 02.机械臂 03.老化层 04.点位 /**
alarmData.setfEquipmentCode("ip:" + ip + ",port:" + port ); * 统一告警记录(修复字段错误,确保写入成功)
alarmData.setfAlarmData("Modbus初始化失败"); */
alarmData.setCreateTime(new Date()); private void recordAlarm(PalletDeviceBinding binding, String alarmMsg) {
alarmDataService.insertTEquipmentAlarmData(alarmData); String equipmentCode = binding != null ? binding.getDeviceCode() : "unknown";
} catch (ModbusTransportException e) { recordAlarm(binding, equipmentCode, alarmMsg);
alarmData = new TEquipmentAlarmData(); }
alarmData.setfAlarmType("04"); //01.老化柜 02.机械臂 03.老化层 04.点位
alarmData.setfEquipmentCode("ip:" + ip + ",port:" + port + "deviceId:" + deviceId); private void recordAlarm(PalletDeviceBinding binding, String equipmentCode, String alarmMsg) {
alarmData.setfAlarmData("日期写入失败"); try {
alarmData.setCreateTime(new Date()); TEquipmentAlarmData alarm = new TEquipmentAlarmData();
alarmDataService.insertTEquipmentAlarmData(alarmData); alarm.setfAlarmType("04"); // 04.点位告警
alarm.setfEquipmentCode(equipmentCode);
alarm.setfAlarmData(alarmMsg);
alarm.setfCreateTime(new Date()); // 修复字段错误:用fCreateTime而非createTime
alarmDataService.insertTEquipmentAlarmData(alarm);
log.debug("告警记录成功:equipmentCode={}, msg={}", equipmentCode, alarmMsg);
} catch (Exception e) {
log.error("告警记录失败:equipmentCode={}, msg={}", equipmentCode, alarmMsg, e);
}
}
/**
* Spring容器关闭时关闭线程池(避免资源泄漏)
*/
@Override
protected void finalize() throws Throwable {
super.finalize();
if (executorService != null && !executorService.isShutdown()) {
executorService.shutdown();
// 等待线程池关闭(最多等30秒)
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
log.info("ModbusResultHandler线程池已关闭");
} }
} }
// 字符串工具方法(避免依赖Apache Commons)
private static class StringUtils {
public static boolean isBlank(String str) {
return str == null || str.trim().isEmpty();
}
}
// 创建通用的停止条件(可选) // 创建通用的停止条件(可选)
public static Predicate<int[]> createDefaultStopCondition() { public static Predicate<int[]> createDefaultStopCondition() {
return values -> values.length >= 2 && values[1] == DeviceStatusReaderAndTimeSetter.TARGET_VALUE; return values -> values.length >= 2 && values[1] == DeviceStatusReaderAndTimeSetter.TARGET_VALUE;
......
...@@ -66,7 +66,7 @@ public class DeviceCommunicationJob implements Job { ...@@ -66,7 +66,7 @@ public class DeviceCommunicationJob implements Job {
// -------------------------- 核心执行逻辑 -------------------------- // -------------------------- 核心执行逻辑 --------------------------
@Override @Override
public void execute(JobExecutionContext context) throws JobExecutionException { public void execute(JobExecutionContext context) {
// 关键:不隐藏任何致命异常,让Quartz感知错误(但捕获后包装为JobExecutionException) // 关键:不隐藏任何致命异常,让Quartz感知错误(但捕获后包装为JobExecutionException)
long taskStartTime = System.currentTimeMillis(); long taskStartTime = System.currentTimeMillis();
String storeyIdStr = getStoreyIdFromContext(context); String storeyIdStr = getStoreyIdFromContext(context);
...@@ -85,11 +85,7 @@ public class DeviceCommunicationJob implements Job { ...@@ -85,11 +85,7 @@ public class DeviceCommunicationJob implements Job {
// 2. 执行核心逻辑(带总超时) // 2. 执行核心逻辑(带总超时)
boolean executeSuccess = executeWithTotalTimeout(context, deviceExecutor, taskStartTime); boolean executeSuccess = executeWithTotalTimeout(context, deviceExecutor, taskStartTime);
if (!executeSuccess) { if (!executeSuccess) {
// 任务超时:主动抛出异常,让Quartz记录但不标记Trigger为ERROR(需配合Misfire策略) log.info("DeviceCommunicationJob执行超时:fStoreyId={},总耗时={}ms",storeyIdStr,TOTAL_TASK_TIMEOUT_SEC);
throw new JobExecutionException(
"DeviceCommunicationJob执行超时(>" + TOTAL_TASK_TIMEOUT_SEC + "秒)",
false // 第二个参数为false:不立即暂停Trigger
);
} }
log.info("DeviceCommunicationJob执行成功:fStoreyId={},总耗时={}ms", log.info("DeviceCommunicationJob执行成功:fStoreyId={},总耗时={}ms",
...@@ -99,15 +95,12 @@ public class DeviceCommunicationJob implements Job { ...@@ -99,15 +95,12 @@ public class DeviceCommunicationJob implements Job {
// 主动抛出的任务异常:Quartz会记录日志,但Trigger状态仍为NORMAL(需配置Misfire) // 主动抛出的任务异常:Quartz会记录日志,但Trigger状态仍为NORMAL(需配置Misfire)
log.error("DeviceCommunicationJob执行异常(已主动抛出,避免Trigger变ERROR):fStoreyId={}", log.error("DeviceCommunicationJob执行异常(已主动抛出,避免Trigger变ERROR):fStoreyId={}",
storeyIdStr, e); storeyIdStr, e);
throw e; // 必须抛出,让Quartz感知,但通过第二个参数控制不暂停Trigger
} catch (Throwable e) { } catch (Throwable e) {
// 未预期的致命异常:包装为JobExecutionException,显式暴露 // 未预期的致命异常:包装为JobExecutionException,显式暴露
String errMsg = "DeviceCommunicationJob发生未预期致命异常:fStoreyId=" + storeyIdStr; String errMsg = "DeviceCommunicationJob发生未预期致命异常:fStoreyId=" + storeyIdStr;
log.error(errMsg, e); log.error(errMsg, e);
recordAlarm(null, storeyIdStr, errMsg + ":" + e.getMessage()); recordAlarm(null, storeyIdStr, errMsg + ":" + e.getMessage());
// 抛出时第二个参数设为false,避免Trigger直接变ERROR
throw new JobExecutionException(errMsg, e, false);
} finally { } finally {
// 关键:无论成功/失败,关闭线程池,避免资源泄漏 // 关键:无论成功/失败,关闭线程池,避免资源泄漏
...@@ -206,24 +199,22 @@ public class DeviceCommunicationJob implements Job { ...@@ -206,24 +199,22 @@ public class DeviceCommunicationJob implements Job {
/** /**
* 验证参数并获取设备信息(参数错误直接抛出异常) * 验证参数并获取设备信息(参数错误直接抛出异常)
*/ */
private TStoreyInfo validateAndGetStoreyInfo(String storeyIdStr) throws JobExecutionException { private TStoreyInfo validateAndGetStoreyInfo(String storeyIdStr) {
// 1. 校验storeyIdStr // 1. 校验storeyIdStr
if (StringUtils.isBlank(storeyIdStr)) { if (StringUtils.isBlank(storeyIdStr)) {
String errMsg = "fStoreyId参数为空,任务终止"; String errMsg = "fStoreyId参数为空,任务终止";
log.error(errMsg); log.error(errMsg);
recordAlarm(null, "unknown", errMsg); recordAlarm(null, "unknown", errMsg);
throw new JobExecutionException(errMsg, false);
} }
// 2. 转换storeyId // 2. 转换storeyId
Long storeyId; Long storeyId = null;
try { try {
storeyId = Long.parseLong(storeyIdStr); storeyId = Long.parseLong(storeyIdStr);
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
String errMsg = "fStoreyId格式错误:" + storeyIdStr; String errMsg = "fStoreyId格式错误:" + storeyIdStr;
log.error(errMsg, e); log.error(errMsg, e);
recordAlarm(null, storeyIdStr, errMsg); recordAlarm(null, storeyIdStr, errMsg);
throw new JobExecutionException(errMsg, e, false);
} }
// 3. 查询设备信息 // 3. 查询设备信息
...@@ -232,16 +223,14 @@ public class DeviceCommunicationJob implements Job { ...@@ -232,16 +223,14 @@ public class DeviceCommunicationJob implements Job {
String errMsg = "未查询到设备信息:fStoreyId=" + storeyId; String errMsg = "未查询到设备信息:fStoreyId=" + storeyId;
log.error(errMsg); log.error(errMsg);
recordAlarm(null, storeyIdStr, errMsg); recordAlarm(null, storeyIdStr, errMsg);
// 清理无效任务(避免后续重复执行)
throw new JobExecutionException(errMsg, false);
} }
// 4. 校验设备IP // 4. 校验设备IP
assert storeyInfo != null;
if (StringUtils.isBlank(storeyInfo.getfIp())) { if (StringUtils.isBlank(storeyInfo.getfIp())) {
String errMsg = "设备IP为空:fStoreyId=" + storeyId; String errMsg = "设备IP为空:fStoreyId=" + storeyId;
log.error(errMsg); log.error(errMsg);
recordAlarm(storeyInfo, errMsg); recordAlarm(storeyInfo, errMsg);
throw new JobExecutionException(errMsg, false);
} }
return storeyInfo; return storeyInfo;
...@@ -259,19 +248,18 @@ public class DeviceCommunicationJob implements Job { ...@@ -259,19 +248,18 @@ public class DeviceCommunicationJob implements Job {
AtomicInteger deviceErrorCount = new AtomicInteger(0); AtomicInteger deviceErrorCount = new AtomicInteger(0);
for (int deviceId : deviceIds) { for (int deviceId : deviceIds) {
int finalDeviceId = deviceId;
// 每个设备用独立线程执行,带超时 // 每个设备用独立线程执行,带超时
Executors.newSingleThreadExecutor(r -> Executors.newSingleThreadExecutor(r ->
new Thread(r, "modbus-device-" + storeyIdStr + "-" + port + "-" + finalDeviceId) new Thread(r, "modbus-device-" + storeyIdStr + "-" + port + "-" + deviceId)
).submit(() -> { ).submit(() -> {
try { try {
// 单个设备通信(带超时) // 单个设备通信(带超时)
executeSingleDeviceWithTimeout(ip, port, finalDeviceId, storeyIdStr); executeSingleDeviceWithTimeout(ip, port, deviceId, storeyIdStr);
} catch (Exception e) { } catch (Exception e) {
deviceErrorCount.incrementAndGet(); deviceErrorCount.incrementAndGet();
log.error("设备通信异常:ip={},port={},deviceId={},fStoreyId={}", log.error("设备通信异常:ip={},port={},deviceId={},fStoreyId={}",
ip, port, finalDeviceId, storeyIdStr, e); ip, port, deviceId, storeyIdStr, e);
recordAlarm(storeyInfo, "设备" + finalDeviceId + "通信异常:" + e.getMessage()); recordAlarm(storeyInfo, "设备" + deviceId + "通信异常:" + e.getMessage());
} finally { } finally {
deviceLatch.countDown(); deviceLatch.countDown();
} }
...@@ -279,18 +267,16 @@ public class DeviceCommunicationJob implements Job { ...@@ -279,18 +267,16 @@ public class DeviceCommunicationJob implements Job {
} }
// 等待该端口所有设备完成,带超时 // 等待该端口所有设备完成,带超时
boolean allDeviceCompleted = deviceLatch.await(SINGLE_DEVICE_TIMEOUT_SEC * deviceIds.size() / 10, TimeUnit.SECONDS); boolean allDeviceCompleted = deviceLatch.await((long) SINGLE_DEVICE_TIMEOUT_SEC * deviceIds.size() / 10, TimeUnit.SECONDS);
if (!allDeviceCompleted) { if (!allDeviceCompleted) {
String errMsg = "端口" + port + "部分设备超时:ip=" + ip + ",未完成设备数=" + deviceLatch.getCount() + ",fStoreyId=" + storeyIdStr; String errMsg = "端口" + port + "部分设备超时:ip=" + ip + ",未完成设备数=" + deviceLatch.getCount() + ",fStoreyId=" + storeyIdStr;
log.error(errMsg); log.error(errMsg);
recordAlarm(storeyInfo, errMsg); recordAlarm(storeyInfo, errMsg);
throw new Exception(errMsg); // 抛出异常,标记该端口失败
} }
if (deviceErrorCount.get() > 0) { if (deviceErrorCount.get() > 0) {
String errMsg = "端口" + port + "部分设备通信失败:ip=" + ip + ",失败数=" + deviceErrorCount.get() + ",fStoreyId=" + storeyIdStr; String errMsg = "端口" + port + "部分设备通信失败:ip=" + ip + ",失败数=" + deviceErrorCount.get() + ",fStoreyId=" + storeyIdStr;
log.error(errMsg); log.error(errMsg);
throw new Exception(errMsg); // 抛出异常,标记该端口失败
} }
log.info("端口通信完成:ip={},port={},fStoreyId={}", ip, port, storeyIdStr); log.info("端口通信完成:ip={},port={},fStoreyId={}", ip, port, storeyIdStr);
...@@ -299,7 +285,7 @@ public class DeviceCommunicationJob implements Job { ...@@ -299,7 +285,7 @@ public class DeviceCommunicationJob implements Job {
/** /**
* 单个设备通信(带超时和重试) * 单个设备通信(带超时和重试)
*/ */
private void executeSingleDeviceWithTimeout(String ip, int port, int deviceId, String storeyIdStr) throws Exception { private void executeSingleDeviceWithTimeout(String ip, int port, int deviceId, String storeyIdStr) {
// 单个设备通信超时:SINGLE_DEVICE_TIMEOUT_SEC秒 // 单个设备通信超时:SINGLE_DEVICE_TIMEOUT_SEC秒
Future<?> deviceFuture = Executors.newSingleThreadExecutor().submit(() -> { Future<?> deviceFuture = Executors.newSingleThreadExecutor().submit(() -> {
try { try {
...@@ -310,7 +296,7 @@ public class DeviceCommunicationJob implements Job { ...@@ -310,7 +296,7 @@ public class DeviceCommunicationJob implements Job {
resultHandler.accept(new DeviceStatusReaderDto(ip, port, deviceId, result)); resultHandler.accept(new DeviceStatusReaderDto(ip, port, deviceId, result));
} }
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException("设备" + deviceId + "通信失败", e); log.info("设备" + deviceId + "通信异常");
} }
}); });
...@@ -319,10 +305,10 @@ public class DeviceCommunicationJob implements Job { ...@@ -319,10 +305,10 @@ public class DeviceCommunicationJob implements Job {
} catch (TimeoutException e) { } catch (TimeoutException e) {
deviceFuture.cancel(true); deviceFuture.cancel(true);
String errMsg = "设备" + deviceId + "通信超时(>" + SINGLE_DEVICE_TIMEOUT_SEC + "秒):ip=" + ip + ",port=" + port; String errMsg = "设备" + deviceId + "通信超时(>" + SINGLE_DEVICE_TIMEOUT_SEC + "秒):ip=" + ip + ",port=" + port;
throw new Exception(errMsg, e); log.info(errMsg);
} catch (Exception e) { } catch (Exception e) {
String errMsg = "设备" + deviceId + "通信异常:ip=" + ip + ",port=" + port; String errMsg = "设备" + deviceId + "通信异常:ip=" + ip + ",port=" + port;
throw new Exception(errMsg, e); log.info(errMsg);
} }
} }
...@@ -342,7 +328,7 @@ public class DeviceCommunicationJob implements Job { ...@@ -342,7 +328,7 @@ public class DeviceCommunicationJob implements Job {
result = readDeviceRegisters(master, deviceId); result = readDeviceRegisters(master, deviceId);
// 3. 检查结果(若有停止条件) // 3. 检查结果(若有停止条件)
Predicate<int[]> stopCondition = ModbusResultHandler.createDefaultStopCondition(); Predicate<int[]> stopCondition = ModbusResultHandler.createDefaultStopCondition();
if (stopCondition != null && stopCondition.test(result)) { if (stopCondition.test(result)) {
readSuccess = true; readSuccess = true;
log.debug("设备读取成功(满足停止条件):ip={},port={},deviceId={},重试次数={},fStoreyId={}", log.debug("设备读取成功(满足停止条件):ip={},port={},deviceId={},重试次数={},fStoreyId={}",
ip, port, deviceId, retry, storeyIdStr); ip, port, deviceId, retry, storeyIdStr);
...@@ -360,7 +346,6 @@ public class DeviceCommunicationJob implements Job { ...@@ -360,7 +346,6 @@ public class DeviceCommunicationJob implements Job {
} else { } else {
String errMsg = "设备读取重试耗尽(共" + CUSTOM_RETRY_TIMES + "次):ip=" + ip + ",port=" + port + ",deviceId=" + deviceId; String errMsg = "设备读取重试耗尽(共" + CUSTOM_RETRY_TIMES + "次):ip=" + ip + ",port=" + port + ",deviceId=" + deviceId;
log.error(errMsg, e); log.error(errMsg, e);
throw new Exception(errMsg, e); // 重试耗尽,抛出异常
} }
} finally { } finally {
// 每次重试后销毁连接,避免资源泄漏 // 每次重试后销毁连接,避免资源泄漏
...@@ -371,7 +356,6 @@ public class DeviceCommunicationJob implements Job { ...@@ -371,7 +356,6 @@ public class DeviceCommunicationJob implements Job {
if (!readSuccess) { if (!readSuccess) {
String errMsg = "设备读取未满足停止条件(重试" + CUSTOM_RETRY_TIMES + "次):ip=" + ip + ",port=" + port + ",deviceId=" + deviceId; String errMsg = "设备读取未满足停止条件(重试" + CUSTOM_RETRY_TIMES + "次):ip=" + ip + ",port=" + port + ",deviceId=" + deviceId;
log.error(errMsg); log.error(errMsg);
throw new Exception(errMsg);
} }
return result; return result;
...@@ -403,9 +387,10 @@ public class DeviceCommunicationJob implements Job { ...@@ -403,9 +387,10 @@ public class DeviceCommunicationJob implements Job {
ModbusResponse response = master.send(request); ModbusResponse response = master.send(request);
if (!(response instanceof ReadHoldingRegistersResponse)) { if (!(response instanceof ReadHoldingRegistersResponse)) {
throw new IllegalArgumentException("无效Modbus响应类型:" + response.getClass().getName() + ",deviceId=" + deviceId); log.warn("无效Modbus响应类型:" + response.getClass().getName() + ",deviceId=" + deviceId);
} }
assert response instanceof ReadHoldingRegistersResponse;
ReadHoldingRegistersResponse regResp = (ReadHoldingRegistersResponse) response; ReadHoldingRegistersResponse regResp = (ReadHoldingRegistersResponse) response;
short[] signedVals = regResp.getShortData(); short[] signedVals = regResp.getShortData();
int[] unsignedVals = new int[signedVals.length]; int[] unsignedVals = new int[signedVals.length];
...@@ -448,9 +433,9 @@ public class DeviceCommunicationJob implements Job { ...@@ -448,9 +433,9 @@ public class DeviceCommunicationJob implements Job {
/** /**
* 反射获取TcpMaster的Socket(失败直接抛出异常,显式暴露问题) * 反射获取TcpMaster的Socket(失败直接抛出异常,显式暴露问题)
*/ */
private Socket getUnderlyingSocket(ModbusMaster master) throws Exception { private Socket getUnderlyingSocket(ModbusMaster master) {
if (!(master instanceof TcpMaster)) { if (!(master instanceof TcpMaster)) {
throw new IllegalArgumentException("ModbusMaster不是TcpMaster类型:" + master.getClass().getName()); log.info("TcpMaster类型转换失败:" + master.getClass().getName());
} }
TcpMaster tcpMaster = (TcpMaster) master; TcpMaster tcpMaster = (TcpMaster) master;
...@@ -465,10 +450,11 @@ public class DeviceCommunicationJob implements Job { ...@@ -465,10 +450,11 @@ public class DeviceCommunicationJob implements Job {
} }
return socket; return socket;
} catch (NoSuchFieldException e) { } catch (NoSuchFieldException e) {
throw new Exception("反射获取Socket失败:TcpMaster中不存在'socket'字段(版本不匹配)", e); log.info("反射获取Socket失败:TcpMaster中不存在'socket'字段(版本不匹配");
} catch (IllegalAccessException e) { } catch (IllegalAccessException e) {
throw new Exception("反射获取Socket失败:无访问权限(可能被安全管理器拦截)", e); log.info("反射获取Socket失败:无访问权限(可能被安全管理器拦截)");
} }
return null;
} }
// -------------------------- 辅助方法(日志/告警)-------------------------- // -------------------------- 辅助方法(日志/告警)--------------------------
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment