Commit c0abff62 authored by wanghao's avatar wanghao

1 定时任务写时间功能代码重构,主要针对在线程使用方面及使用modbus4j方面

parent f799feed
package com.zehong.quartz.config; package com.zehong.quartz.config;
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.scheduling.quartz.SpringBeanJobFactory; import org.springframework.scheduling.quartz.SpringBeanJobFactory;
import javax.annotation.Resource;
import javax.sql.DataSource; import javax.sql.DataSource;
import java.util.Properties; import java.util.Properties;
......
...@@ -37,7 +37,8 @@ public class SysJobServiceImpl implements ISysJobService ...@@ -37,7 +37,8 @@ public class SysJobServiceImpl implements ISysJobService
@PostConstruct @PostConstruct
public void init() throws SchedulerException, TaskException public void init() throws SchedulerException, TaskException
{ {
scheduler.clear(); //20251021 wh 调整
//scheduler.clear();
List<SysJob> jobList = jobMapper.selectJobAll(); List<SysJob> jobList = jobMapper.selectJobAll();
for (SysJob job : jobList) for (SysJob job : jobList)
{ {
......
...@@ -9,8 +9,10 @@ import com.serotonin.modbus4j.ip.tcp.TcpMaster; ...@@ -9,8 +9,10 @@ import com.serotonin.modbus4j.ip.tcp.TcpMaster;
import com.serotonin.modbus4j.msg.ModbusResponse; import com.serotonin.modbus4j.msg.ModbusResponse;
import com.serotonin.modbus4j.msg.ReadHoldingRegistersRequest; import com.serotonin.modbus4j.msg.ReadHoldingRegistersRequest;
import com.serotonin.modbus4j.msg.ReadHoldingRegistersResponse; import com.serotonin.modbus4j.msg.ReadHoldingRegistersResponse;
import com.zehong.system.domain.PalletDeviceBinding;
import com.zehong.system.domain.TEquipmentAlarmData; import com.zehong.system.domain.TEquipmentAlarmData;
import com.zehong.system.domain.TStoreyInfo; import com.zehong.system.domain.TStoreyInfo;
import com.zehong.system.mapper.PalletDeviceBindingMapper;
import com.zehong.system.mapper.TStoreyInfoMapper; import com.zehong.system.mapper.TStoreyInfoMapper;
import com.zehong.system.modbus.handler.ModbusResultHandler; import com.zehong.system.modbus.handler.ModbusResultHandler;
import com.zehong.system.modbus.handler.dto.DeviceStatusReaderDto; import com.zehong.system.modbus.handler.dto.DeviceStatusReaderDto;
...@@ -22,16 +24,15 @@ import org.slf4j.Logger; ...@@ -22,16 +24,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Calendar;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.net.Socket;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate; import java.util.stream.Collectors;
/** /**
* 设备通信Job(修复版:显式暴露错误,避免Trigger变ERROR) * 设备通信Job(修复版:显式暴露错误,避免Trigger变ERROR)
...@@ -54,6 +55,14 @@ public class DeviceCommunicationJob implements Job { ...@@ -54,6 +55,14 @@ public class DeviceCommunicationJob implements Job {
private static final int REG_START_ADDR = 0; private static final int REG_START_ADDR = 0;
private static final int REG_READ_COUNT = 10; private static final int REG_READ_COUNT = 10;
// 全局线程池 - 避免重复创建
private static final ExecutorService GLOBAL_DEVICE_EXECUTOR = new ThreadPoolExecutor(
50, 100, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(500),
r -> new Thread(r, "global-modbus-device"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 工厂(单例) // 工厂(单例)
private static final ModbusFactory modbusFactory = new ModbusFactory(); private static final ModbusFactory modbusFactory = new ModbusFactory();
// -------------------------- 依赖注入 -------------------------- // -------------------------- 依赖注入 --------------------------
...@@ -63,304 +72,228 @@ public class DeviceCommunicationJob implements Job { ...@@ -63,304 +72,228 @@ public class DeviceCommunicationJob implements Job {
private TStoreyInfoMapper tStoreyInfoMapper; private TStoreyInfoMapper tStoreyInfoMapper;
@Autowired @Autowired
private ModbusResultHandler resultHandler; private ModbusResultHandler resultHandler;
@Resource
private PalletDeviceBindingMapper palletDeviceBindingMapper;
// -------------------------- 核心执行逻辑 -------------------------- // -------------------------- 核心执行逻辑 --------------------------
@Override @Override
public void execute(JobExecutionContext context) { public void execute(JobExecutionContext context) {
// 关键:不隐藏任何致命异常,让Quartz感知错误(但捕获后包装为JobExecutionException)
long taskStartTime = System.currentTimeMillis();
String storeyIdStr = getStoreyIdFromContext(context); String storeyIdStr = getStoreyIdFromContext(context);
ExecutorService deviceExecutor = null; // 线程池:每次任务新建,结束后关闭 long startTime = System.currentTimeMillis();
try { try {
// 1. 初始化线程池(每次任务新建,避免泄漏) TStoreyInfo storeyInfo = validateAndGetStoreyInfo(storeyIdStr);
deviceExecutor = new ThreadPoolExecutor(
3, 3, // 核心=最大=3(对应3个端口,1:1映射) // 并行处理3个端口
0, TimeUnit.SECONDS, List<CompletableFuture<Void>> portFutures = Arrays.asList(
new LinkedBlockingQueue<>(20), // 队列20个(足够72个设备) processPort(storeyInfo, 501, Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27)),
r -> new Thread(r, "modbus-port-pool-" + storeyIdStr), // 线程名含设备ID,便于追踪 processPort(storeyInfo, 502, Arrays.asList(28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54)),
new ThreadPoolExecutor.AbortPolicy() // 队列满时抛异常,显式暴露问题 processPort(storeyInfo, 503, Arrays.asList(55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72))
); );
// 2. 执行核心逻辑(带总超时) // 等待所有端口完成,带总超时
boolean executeSuccess = executeWithTotalTimeout(context, deviceExecutor, taskStartTime); CompletableFuture<Void> allPorts = CompletableFuture.allOf(
if (!executeSuccess) { portFutures.toArray(new CompletableFuture[0])
log.info("DeviceCommunicationJob执行超时:fStoreyId={},总耗时={}ms",storeyIdStr,TOTAL_TASK_TIMEOUT_SEC); );
}
log.info("DeviceCommunicationJob执行成功:fStoreyId={},总耗时={}ms", allPorts.get(TOTAL_TASK_TIMEOUT_SEC, TimeUnit.SECONDS);
storeyIdStr, System.currentTimeMillis() - taskStartTime);
log.info("任务执行成功: fStoreyId={}, 耗时={}ms",
} catch (JobExecutionException e) { storeyIdStr, System.currentTimeMillis() - startTime);
// 主动抛出的任务异常:Quartz会记录日志,但Trigger状态仍为NORMAL(需配置Misfire)
log.error("DeviceCommunicationJob执行异常(已主动抛出,避免Trigger变ERROR):fStoreyId={}", } catch (TimeoutException e) {
storeyIdStr, e); log.warn("任务执行超时: fStoreyId={}", storeyIdStr);
recordAlarm(null, storeyIdStr, "任务执行超时");
} catch (Throwable e) { } catch (Exception e) {
// 未预期的致命异常:包装为JobExecutionException,显式暴露 log.error("任务执行异常: fStoreyId={}", storeyIdStr, e);
String errMsg = "DeviceCommunicationJob发生未预期致命异常:fStoreyId=" + storeyIdStr; recordAlarm(null, storeyIdStr, "任务执行异常: " + e.getMessage());
log.error(errMsg, e);
recordAlarm(null, storeyIdStr, errMsg + ":" + e.getMessage());
} finally {
// 关键:无论成功/失败,关闭线程池,避免资源泄漏
if (deviceExecutor != null && !deviceExecutor.isShutdown()) {
deviceExecutor.shutdownNow(); // 强制关闭,中断所有未完成任务
try {
// 等待线程池关闭,避免JVM退出前资源未释放
if (!deviceExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
log.warn("线程池关闭超时,强制终止:fStoreyId={}", storeyIdStr);
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.error("线程池关闭被中断:fStoreyId={}", storeyIdStr, ie);
}
}
log.debug("DeviceCommunicationJob最终清理完成:fStoreyId={}", storeyIdStr);
} }
} }
/** /**
* 带总超时的核心执行逻辑 * 处理单个端口的所有设备
*/ */
private boolean executeWithTotalTimeout(JobExecutionContext context, ExecutorService deviceExecutor, long taskStartTime) throws Exception { private CompletableFuture<Void> processPort(TStoreyInfo storeyInfo, int port, List<Integer> deviceIds) {
String storeyIdStr = getStoreyIdFromContext(context); return CompletableFuture.runAsync(() -> {
TStoreyInfo storeyInfo = validateAndGetStoreyInfo(storeyIdStr); String ip = storeyInfo.getfIp();
String storeyIdStr = storeyInfo.getfStoreyId().toString();
// 用自定义线程池执行端口通信(避免混用CompletableFuture默认线程池) log.info("开始端口通信: ip={}, port={}, 设备数={}", ip, port, deviceIds.size());
CountDownLatch portLatch = new CountDownLatch(3); // 3个端口,计数器3
AtomicInteger portErrorCount = new AtomicInteger(0);
// 端口501通信 AtomicInteger errorCount = new AtomicInteger(0);
deviceExecutor.submit(() -> {
try {
executeSinglePort(storeyInfo.getfIp(), 501,
Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27),
storeyIdStr, storeyInfo);
} catch (Exception e) {
portErrorCount.incrementAndGet();
log.error("端口501通信异常:fStoreyId={}", storeyIdStr, e);
} finally {
portLatch.countDown();
}
});
// 端口502通信 // 并行处理该端口的所有设备
deviceExecutor.submit(() -> { List<CompletableFuture<Boolean>> deviceFutures = deviceIds.stream()
try { .map(deviceId -> processDeviceWithWrite(ip, port, deviceId, storeyIdStr, errorCount))
executeSinglePort(storeyInfo.getfIp(), 502, .collect(Collectors.toList());
Arrays.asList(28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54),
storeyIdStr, storeyInfo);
} catch (Exception e) {
portErrorCount.incrementAndGet();
log.error("端口502通信异常:fStoreyId={}", storeyIdStr, e);
} finally {
portLatch.countDown();
}
});
// 端口503通信
deviceExecutor.submit(() -> {
try { try {
executeSinglePort(storeyInfo.getfIp(), 503, // 等待该端口所有设备完成
Arrays.asList(55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72), CompletableFuture<Void> allDevices = CompletableFuture.allOf(
storeyIdStr, storeyInfo); deviceFutures.toArray(new CompletableFuture[0])
);
// 端口超时 = 设备数 * 单设备超时 / 并发因子
int portTimeout = Math.max(30, deviceIds.size() * SINGLE_DEVICE_TIMEOUT_SEC / 5);
allDevices.get(portTimeout, TimeUnit.SECONDS);
} catch (TimeoutException e) {
log.warn("端口{}通信超时: ip={}, fStoreyId={}", port, ip, storeyIdStr);
recordAlarm(storeyInfo, "端口" + port + "通信超时");
} catch (Exception e) { } catch (Exception e) {
portErrorCount.incrementAndGet(); log.error("端口{}通信异常: ip={}, fStoreyId={}", port, ip, storeyIdStr, e);
log.error("端口503通信异常:fStoreyId={}", storeyIdStr, e);
} finally {
portLatch.countDown();
} }
});
// 等待所有端口完成,带总超时
boolean allPortCompleted = portLatch.await(SINGLE_PORT_TIMEOUT_SEC, TimeUnit.SECONDS);
long totalCost = System.currentTimeMillis() - taskStartTime;
// 检查总耗时是否超任务超时
if (totalCost > TOTAL_TASK_TIMEOUT_SEC * 1000) {
log.warn("任务总耗时超阈值:fStoreyId={},耗时={}ms(阈值={}ms)",
storeyIdStr, totalCost, TOTAL_TASK_TIMEOUT_SEC * 1000);
recordAlarm(storeyInfo, "任务总超时:" + totalCost + "ms(阈值" + TOTAL_TASK_TIMEOUT_SEC + "秒)");
return false;
}
// 检查端口错误数 if (errorCount.get() > 0) {
if (portErrorCount.get() > 0) { log.warn("端口{}部分设备失败: 失败数={}, fStoreyId={}",
String errMsg = "部分端口通信失败:fStoreyId=" + storeyIdStr + ",失败端口数=" + portErrorCount.get(); port, errorCount.get(), storeyIdStr);
log.error(errMsg); }
recordAlarm(storeyInfo, errMsg);
// 此处不抛出异常,仅记录告警,避免Trigger变ERROR
}
return allPortCompleted; log.info("端口通信完成: ip={}, port={}, fStoreyId={}", ip, port, storeyIdStr);
}, GLOBAL_DEVICE_EXECUTOR);
} }
/** /**
* 验证参数并获取设备信息(参数错误直接抛出异常 * 处理单个设备(读取 + 条件写入
*/ */
private TStoreyInfo validateAndGetStoreyInfo(String storeyIdStr) { private CompletableFuture<Boolean> processDeviceWithWrite(String ip, int port, int deviceId,
// 1. 校验storeyIdStr String storeyIdStr, AtomicInteger errorCount) {
if (StringUtils.isBlank(storeyIdStr)) { return CompletableFuture.supplyAsync(() -> {
String errMsg = "fStoreyId参数为空,任务终止"; ModbusMaster master = null;
log.error(errMsg); try {
recordAlarm(null, "unknown", errMsg); // 1. 读取设备数据
} int[] result = readDeviceWithRetry(ip, port, deviceId, storeyIdStr);
// 2. 转换storeyId
Long storeyId = null;
try {
storeyId = Long.parseLong(storeyIdStr);
} catch (NumberFormatException e) {
String errMsg = "fStoreyId格式错误:" + storeyIdStr;
log.error(errMsg, e);
recordAlarm(null, storeyIdStr, errMsg);
}
// 3. 查询设备信息
TStoreyInfo storeyInfo = tStoreyInfoMapper.selectTStoreyInfoById(storeyId);
if (storeyInfo == null) {
String errMsg = "未查询到设备信息:fStoreyId=" + storeyId;
log.error(errMsg);
recordAlarm(null, storeyIdStr, errMsg);
}
// 4. 校验设备IP // 2. 查询设备绑定信息
assert storeyInfo != null; PalletDeviceBinding binding = palletDeviceBindingMapper.selectByTrayIdAndIndex(ip, deviceId);
if (StringUtils.isBlank(storeyInfo.getfIp())) { if (binding == null) {
String errMsg = "设备IP为空:fStoreyId=" + storeyId; log.warn("未找到设备绑定: ip={}, deviceId={}", ip, deviceId);
log.error(errMsg); recordAlarm(null, "ip:" + ip + ",port:" + port + ",deviceId:" + deviceId, "未找到设备绑定");
recordAlarm(storeyInfo, errMsg); errorCount.incrementAndGet();
} return false;
}
return storeyInfo; // 3. 更新设备状态
} binding.setStatus(String.valueOf(result[1]));
/** // 4. 条件写入时间
* 单个端口通信(含设备级超时和重试) if (result[1] == 1 || result[1] == 3 || result[1] == 4) {
*/ // 重用之前的master连接进行写操作
private void executeSinglePort(String ip, int port, List<Integer> deviceIds, master = createModbusMaster(ip, port);
String storeyIdStr, TStoreyInfo storeyInfo) throws Exception { writeCurrentTimeToDevice(master, deviceId, binding, ip, port);
log.info("开始端口通信:ip={},port={},设备数={},fStoreyId={}",
ip, port, deviceIds.size(), storeyIdStr);
CountDownLatch deviceLatch = new CountDownLatch(deviceIds.size());
AtomicInteger deviceErrorCount = new AtomicInteger(0);
for (int deviceId : deviceIds) {
// 每个设备用独立线程执行,带超时
Executors.newSingleThreadExecutor(r ->
new Thread(r, "modbus-device-" + storeyIdStr + "-" + port + "-" + deviceId)
).submit(() -> {
try {
// 单个设备通信(带超时)
executeSingleDeviceWithTimeout(ip, port, deviceId, storeyIdStr);
} catch (Exception e) {
deviceErrorCount.incrementAndGet();
log.error("设备通信异常:ip={},port={},deviceId={},fStoreyId={}",
ip, port, deviceId, storeyIdStr, e);
recordAlarm(storeyInfo, "设备" + deviceId + "通信异常:" + e.getMessage());
} finally {
deviceLatch.countDown();
} }
});
}
// 等待该端口所有设备完成,带超时 // 5. 更新数据库
boolean allDeviceCompleted = deviceLatch.await((long) SINGLE_DEVICE_TIMEOUT_SEC * deviceIds.size() / 10, TimeUnit.SECONDS); palletDeviceBindingMapper.updatePalletDeviceBinding(binding);
if (!allDeviceCompleted) {
String errMsg = "端口" + port + "部分设备超时:ip=" + ip + ",未完成设备数=" + deviceLatch.getCount() + ",fStoreyId=" + storeyIdStr;
log.error(errMsg);
recordAlarm(storeyInfo, errMsg);
}
if (deviceErrorCount.get() > 0) { log.debug("设备{}处理完成: ip={}, port={}, status={}", deviceId, ip, port, result[1]);
String errMsg = "端口" + port + "部分设备通信失败:ip=" + ip + ",失败数=" + deviceErrorCount.get() + ",fStoreyId=" + storeyIdStr; return true;
log.error(errMsg);
}
log.info("端口通信完成:ip={},port={},fStoreyId={}", ip, port, storeyIdStr); } catch (Exception e) {
log.error("设备{}处理异常: ip={}, port={}", deviceId, ip, port, e);
errorCount.incrementAndGet();
return false;
} finally {
destroyModbusMaster(master, deviceId);
}
}, GLOBAL_DEVICE_EXECUTOR);
} }
/** /**
* 单个设备通信(带超时和重试 * 验证参数并获取设备信息(参数错误直接抛出异常
*/ */
private void executeSingleDeviceWithTimeout(String ip, int port, int deviceId, String storeyIdStr) { private TStoreyInfo validateAndGetStoreyInfo(String storeyIdStr) {
// 单个设备通信超时:SINGLE_DEVICE_TIMEOUT_SEC秒 if (StringUtils.isBlank(storeyIdStr)) {
Future<?> deviceFuture = Executors.newSingleThreadExecutor().submit(() -> { log.error("fStoreyId参数为空");
try { return null;
// 带重试的设备通信 }
int[] result = readDeviceWithRetry(ip, port, deviceId, storeyIdStr);
// 处理结果(若有)
if (resultHandler != null) {
resultHandler.accept(new DeviceStatusReaderDto(ip, port, deviceId, result));
}
} catch (Exception e) {
log.info("设备" + deviceId + "通信异常");
}
});
try { try {
deviceFuture.get(SINGLE_DEVICE_TIMEOUT_SEC, TimeUnit.SECONDS); Long storeyId = Long.parseLong(storeyIdStr);
} catch (TimeoutException e) { TStoreyInfo storeyInfo = tStoreyInfoMapper.selectTStoreyInfoById(storeyId);
deviceFuture.cancel(true);
String errMsg = "设备" + deviceId + "通信超时(>" + SINGLE_DEVICE_TIMEOUT_SEC + "秒):ip=" + ip + ",port=" + port; if (storeyInfo == null || StringUtils.isBlank(storeyInfo.getfIp())) {
log.info(errMsg); log.error("设备信息无效: fStoreyId={}", storeyIdStr);
} catch (Exception e) { return null;
String errMsg = "设备" + deviceId + "通信异常:ip=" + ip + ",port=" + port; }
log.info(errMsg);
return storeyInfo;
} catch (NumberFormatException e) {
log.error("fStoreyId格式错误: {}", storeyIdStr);
return null;
} }
} }
/** /**
* 带重试的设备寄存器读取(取消Modbus内置重试,统一自定义重试) * 带重试的设备读取
*/ */
private int[] readDeviceWithRetry(String ip, int port, int deviceId, String storeyIdStr) throws Exception { private int[] readDeviceWithRetry(String ip, int port, int deviceId, String storeyIdStr) {
ModbusMaster master = null; ModbusMaster master = null;
int[] result = null;
boolean readSuccess = false;
for (int retry = 0; retry <= CUSTOM_RETRY_TIMES; retry++) { for (int retry = 0; retry <= CUSTOM_RETRY_TIMES; retry++) {
try { try {
// 1. 创建Modbus连接(取消内置重试,master.setRetries(0))
master = createModbusMaster(ip, port); master = createModbusMaster(ip, port);
// 2. 读取寄存器 int[] result = readDeviceRegisters(master, deviceId);
result = readDeviceRegisters(master, deviceId);
// 3. 检查结果(若有停止条件) // 检查停止条件
Predicate<int[]> stopCondition = ModbusResultHandler.createDefaultStopCondition(); if (resultHandler != null && ModbusResultHandler.createDefaultStopCondition().test(result)) {
if (stopCondition.test(result)) { log.debug("设备{}读取成功: ip={}, port={}", deviceId, ip, port);
readSuccess = true; return result;
log.debug("设备读取成功(满足停止条件):ip={},port={},deviceId={},重试次数={},fStoreyId={}", }
ip, port, deviceId, retry, storeyIdStr);
break; if (retry < CUSTOM_RETRY_TIMES) {
} else if (retry < CUSTOM_RETRY_TIMES) { Thread.sleep(200);
log.debug("设备读取结果不满足条件,准备重试:ip={},port={},deviceId={},重试次数={},fStoreyId={}",
ip, port, deviceId, retry, storeyIdStr);
Thread.sleep(RETRY_DELAY_MS);
} }
} catch (Exception e) { } catch (Exception e) {
if (retry < CUSTOM_RETRY_TIMES) { if (retry < CUSTOM_RETRY_TIMES) {
log.warn("设备读取异常,准备重试:ip={},port={},deviceId={},重试次数={},fStoreyId={}", log.debug("设备{}读取失败,准备重试: ip={}, port={}", deviceId, ip, port);
ip, port, deviceId, retry, storeyIdStr, e);
Thread.sleep(RETRY_DELAY_MS);
} else { } else {
String errMsg = "设备读取重试耗尽(共" + CUSTOM_RETRY_TIMES + "次):ip=" + ip + ",port=" + port + ",deviceId=" + deviceId; log.error("设备{}读取重试耗尽: ip={}, port={}", deviceId, ip, port, e);
log.error(errMsg, e); throw new RuntimeException("设备读取失败", e);
} }
} finally { } finally {
// 每次重试后销毁连接,避免资源泄漏
destroyModbusMaster(master, deviceId); destroyModbusMaster(master, deviceId);
} }
} }
if (!readSuccess) { throw new RuntimeException("设备读取未满足条件");
String errMsg = "设备读取未满足停止条件(重试" + CUSTOM_RETRY_TIMES + "次):ip=" + ip + ",port=" + port + ",deviceId=" + deviceId; }
log.error(errMsg); /**
* 写入当前时间到设备
*/
private void writeCurrentTimeToDevice(ModbusMaster master, int deviceId,
PalletDeviceBinding binding, String ip, int port) {
try {
Calendar cal = Calendar.getInstance();
int year = cal.get(Calendar.YEAR);
int month = cal.get(Calendar.MONTH) + 1;
int day = cal.get(Calendar.DATE);
int hour = cal.get(Calendar.HOUR_OF_DAY);
int minute = cal.get(Calendar.MINUTE);
// 写入时间寄存器
boolean success;
success = Modbus4jUtils.writeRegister(master, deviceId, 4, (short) year);
success &= Modbus4jUtils.writeRegister(master, deviceId, 5, (short) month);
success &= Modbus4jUtils.writeRegister(master, deviceId, 6, (short) day);
success &= Modbus4jUtils.writeRegister(master, deviceId, 7, (short) hour);
success &= Modbus4jUtils.writeRegister(master, deviceId, 8, (short) minute);
if (success) {
binding.setRecordYear(String.valueOf(year));
binding.setRecordMonth(String.valueOf(month));
binding.setRecordDate(String.valueOf(day));
binding.setRecordHour(String.valueOf(hour));
binding.setRecordMinute(String.valueOf(minute));
log.debug("设备{}时间写入成功", deviceId);
} else {
recordAlarmByBinding(binding, "设备时间写入失败");
}
} catch (Exception e) {
log.error("设备{}时间写入异常", deviceId, e);
recordAlarmByBinding(binding, "设备时间写入异常: " + e.getMessage());
} }
return result;
} }
// -------------------------- Modbus工具方法(显式抛出异常)-------------------------- // -------------------------- Modbus工具方法(显式抛出异常)--------------------------
/** /**
* 创建Modbus连接(取消内置重试,统一自定义重试) * 创建Modbus连接(取消内置重试,统一自定义重试)
...@@ -372,9 +305,8 @@ public class DeviceCommunicationJob implements Job { ...@@ -372,9 +305,8 @@ public class DeviceCommunicationJob implements Job {
TcpMaster master = (TcpMaster) modbusFactory.createTcpMaster(params, true); TcpMaster master = (TcpMaster) modbusFactory.createTcpMaster(params, true);
master.setTimeout(MODBUS_CONN_TIMEOUT_MS); master.setTimeout(MODBUS_CONN_TIMEOUT_MS);
master.setRetries(0); // 取消内置重试,避免和自定义重试冲突 master.setRetries(0);
master.init(); master.init();
log.debug("Modbus连接创建成功:ip={},port={},master={}", ip, port, master);
return master; return master;
} }
...@@ -406,55 +338,13 @@ public class DeviceCommunicationJob implements Job { ...@@ -406,55 +338,13 @@ public class DeviceCommunicationJob implements Job {
* 销毁Modbus连接(反射失败直接抛出异常,显式暴露问题) * 销毁Modbus连接(反射失败直接抛出异常,显式暴露问题)
*/ */
private void destroyModbusMaster(ModbusMaster master, int deviceId) { private void destroyModbusMaster(ModbusMaster master, int deviceId) {
if (master == null) return; if (master != null) {
try {
// 反射获取Socket(若失败,直接抛异常,不隐藏)
Socket socket = getUnderlyingSocket(master);
if (socket != null && !socket.isClosed()) {
socket.close();
log.trace("设备{}:Modbus底层Socket已关闭", deviceId);
}
} catch (Exception e) {
String errMsg = "设备" + deviceId + ":销毁Modbus Socket异常(可能导致连接泄漏)";
log.error(errMsg, e);
// 此处不抛出异常(避免中断销毁流程),但记录严重告警
recordAlarm(null, String.valueOf(deviceId), errMsg + ":" + e.getMessage());
} finally {
try { try {
master.destroy(); master.destroy();
log.trace("设备{}:ModbusMaster已销毁", deviceId);
} catch (Exception e) { } catch (Exception e) {
log.error("设备{}:销毁ModbusMaster异常", deviceId, e); log.debug("设备{}: ModbusMaster销毁异常", deviceId, e);
}
}
}
/**
* 反射获取TcpMaster的Socket(失败直接抛出异常,显式暴露问题)
*/
private Socket getUnderlyingSocket(ModbusMaster master) {
if (!(master instanceof TcpMaster)) {
log.info("TcpMaster类型转换失败:" + master.getClass().getName());
}
TcpMaster tcpMaster = (TcpMaster) master;
try {
// 反射获取socket字段(根据你的TcpMaster源码确认字段名,必须正确)
Field socketField = TcpMaster.class.getDeclaredField("socket");
socketField.setAccessible(true);
Socket socket = (Socket) socketField.get(tcpMaster);
if (socket == null) {
log.warn("TcpMaster的socket字段为null(未建立连接)");
} }
return socket;
} catch (NoSuchFieldException e) {
log.info("反射获取Socket失败:TcpMaster中不存在'socket'字段(版本不匹配");
} catch (IllegalAccessException e) {
log.info("反射获取Socket失败:无访问权限(可能被安全管理器拦截)");
} }
return null;
} }
// -------------------------- 辅助方法(日志/告警)-------------------------- // -------------------------- 辅助方法(日志/告警)--------------------------
...@@ -494,4 +384,27 @@ public class DeviceCommunicationJob implements Job { ...@@ -494,4 +384,27 @@ public class DeviceCommunicationJob implements Job {
private void recordAlarm(TStoreyInfo storeyInfo, String alarmData) { private void recordAlarm(TStoreyInfo storeyInfo, String alarmData) {
recordAlarm(storeyInfo, storeyInfo != null ? storeyInfo.getfStoreyCode() : "unknown", alarmData); recordAlarm(storeyInfo, storeyInfo != null ? storeyInfo.getfStoreyCode() : "unknown", alarmData);
} }
/**
* 统一告警记录(修复字段错误,确保写入成功)
*/
private void recordAlarmByBinding(PalletDeviceBinding binding, String alarmMsg) {
String equipmentCode = binding != null ? binding.getDeviceCode() : "unknown";
recordAlarmByBingding(equipmentCode, alarmMsg);
}
private void recordAlarmByBingding(String equipmentCode, String alarmMsg) {
try {
TEquipmentAlarmData alarm = new TEquipmentAlarmData();
alarm.setfAlarmType("04"); // 04.点位告警
alarm.setfEquipmentCode(equipmentCode);
alarm.setfAlarmData(alarmMsg);
alarm.setfCreateTime(new Date()); // 修复字段错误:用fCreateTime而非createTime
alarmDataService.insertTEquipmentAlarmData(alarm);
log.debug("告警记录成功:equipmentCode={}, msg={}", equipmentCode, alarmMsg);
} catch (Exception e) {
log.error("告警记录失败:equipmentCode={}, msg={}", equipmentCode, alarmMsg, e);
}
}
} }
\ No newline at end of file
package com.zehong.system.task; package com.zehong.system.task;
import com.zehong.quartz.config.QuartzTaskMonitor; import com.zehong.quartz.config.QuartzTaskMonitor;
import com.zehong.system.task.DeviceCommJob.*; import com.zehong.system.task.DeviceCommJob.DeviceComm501Device1Job;
import com.zehong.system.task.DeviceCommJob.DeviceComm501Device2Job;
import com.zehong.system.task.DeviceCommJob.DeviceComm501Device3Job;
import org.quartz.*; import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher; import org.quartz.impl.matchers.GroupMatcher;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.terracotta.quartz.wrappers.TriggerWrapper;
import javax.annotation.PostConstruct;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.time.Instant; import java.time.Instant;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
...@@ -153,6 +153,7 @@ public class DeviceTaskScheduler { ...@@ -153,6 +153,7 @@ public class DeviceTaskScheduler {
SimpleTrigger trigger = TriggerBuilder.newTrigger() SimpleTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey) .withIdentity(triggerKey)
.forJob(jobKey) .forJob(jobKey)
.withDescription("设备" + fStoreyId + "最终任务触发器,触发时间是:" + triggerTime)
.startAt(triggerTime) .startAt(triggerTime)
.withSchedule(SimpleScheduleBuilder.simpleSchedule() .withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withRepeatCount(0) // 只执行一次 .withRepeatCount(0) // 只执行一次
...@@ -268,7 +269,7 @@ public class DeviceTaskScheduler { ...@@ -268,7 +269,7 @@ public class DeviceTaskScheduler {
SimpleTrigger trigger = TriggerBuilder.newTrigger() SimpleTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey) .withIdentity(triggerKey)
.forJob(jobKey) .forJob(jobKey)
.withDescription("设备" + fStoreyId + "最终任务触发器") .withDescription("设备" + fStoreyId + "最终任务触发器,触发时间是:" + executeTime)
.startAt(executeTime) .startAt(executeTime)
.withSchedule(SimpleScheduleBuilder.simpleSchedule() .withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withMisfireHandlingInstructionFireNow() // 错过立即执行 .withMisfireHandlingInstructionFireNow() // 错过立即执行
......
...@@ -126,7 +126,11 @@ export default { ...@@ -126,7 +126,11 @@ export default {
this.selectedMenu = index; this.selectedMenu = index;
}, },
goToAdmin() { goToAdmin() {
this.$router.push('/index') // 或者 '/dashboard' 如果已经登录 // 先跳转到目标页面
this.$router.push('/index').then(() => {
// 跳转成功后刷新页面
window.location.reload()
})
}, },
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment