Commit 03b43c92 authored by wanghao's avatar wanghao

Revert "1 测试 上电后通信 和 最终完成 定时任务功能"

parent df387f58
...@@ -30,39 +30,38 @@ import java.util.function.Predicate; ...@@ -30,39 +30,38 @@ import java.util.function.Predicate;
*/ */
@Component @Component
public class DeviceStatusReaderAndTimeSetter { public class DeviceStatusReaderAndTimeSetter {
// 常量配置(优化重试次数和超时,避免任务堆积)
// 常量改为public以便外部访问
public static final int START_ADDRESS = 0; public static final int START_ADDRESS = 0;
public static final int REGISTER_COUNT = 10; public static final int REGISTER_COUNT = 10;
public static final int TARGET_VALUE = 1; public static final int TARGET_VALUE = 1;
public static final int MAX_RETRIES = 3; public static final int MAX_RETRIES = 2; // 减少重试次数,缩短总耗时
public static final int RETRY_DELAY = 500; public static final int RETRY_DELAY = 300; // 重试间隔300ms
public static final int TIMEOUT_MINUTES = 5; public static final int TASK_TIMEOUT_SECONDS = 20; // 单设备任务总超时
public static final int TIMEOUT_SECONDS = 30; // 总超时30秒
// 工厂 // 工厂
private static final ModbusFactory modbusFactory = new ModbusFactory(); private static final ModbusFactory modbusFactory = new ModbusFactory();
// 1. 关键优化:固定线程池大小(不随设备数变化,根据CPU核数设置,如10) // 1. 关键优化:固定线程池大小(不随设备数变化,根据CPU核数设置,如10)
// 线程池优化:添加线程工厂,便于问题追踪 // 设备通信线程池(Spring风格,避免静态线程池问题)
private static final ExecutorService FIXED_THREAD_POOL = new ThreadPoolExecutor( private final ExecutorService deviceExecutor = new ThreadPoolExecutor(
5, 8, // 核心线程8个(72个设备,8线程,每线程处理9个)
10, 10, // 最大线程10个
60, 60,
TimeUnit.SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100), new LinkedBlockingQueue<>(50),
new ThreadFactory() { new ThreadFactory() {
private int count = 0; private int count = 0;
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "modbus-pool-" + (count++)); Thread thread = new Thread(r, "modbus-device-pool-" + (count++));
thread.setDaemon(true); // 设为守护线程,避免阻塞JVM退出 thread.setDaemon(true);
return thread; return thread;
} }
}, },
new ThreadPoolExecutor.CallerRunsPolicy() new ThreadPoolExecutor.CallerRunsPolicy() // 队列满时调用线程执行,避免任务丢失
); );
@Resource @Resource
private ITEquipmentAlarmDataService alarmDataService; private ITEquipmentAlarmDataService alarmDataService;
...@@ -159,72 +158,67 @@ public class DeviceStatusReaderAndTimeSetter { ...@@ -159,72 +158,67 @@ public class DeviceStatusReaderAndTimeSetter {
log.debug("Modbus连接创建成功:IP={}, 端口={}, Master={}", ip, port, master); log.debug("Modbus连接创建成功:IP={}, 端口={}, Master={}", ip, port, master);
return master; return master;
} }
/** /**
* 核心修复:反射获取TcpMaster底层Socket(处理私有字段权限 * 核心修复:反射获取TcpMaster底层Socket(内部类路径正确
*/ */
private static Socket getUnderlyingSocket(ModbusMaster master) { private static Socket getUnderlyingSocket(ModbusMaster master) {
if (!(master instanceof TcpMaster)) { if (!(master instanceof TcpMaster)) {
log.error("ModbusMaster不是TcpMaster类型,无法获取Socket"); log.error("ModbusMaster类型错误(实际:{}),无法获取Socket", master.getClass().getName());
return null; return null;
} }
TcpMaster tcpMaster = (TcpMaster) master; TcpMaster tcpMaster = (TcpMaster) master;
try { try {
// 1. 关键:内部类路径用 "$" 连接外部类(必须与你的Modbus4j版本匹配 // 关键:内部类路径必须加 "$"(Modbus4j 3.x/4.x 通用
String transportClassName = "com.serotonin.modbus4j.ip.tcp.TcpMaster"; String transportClassName = "com.serotonin.modbus4j.ip.tcp.TcpMaster$TcpTransport";
Class<?> transportClass = Class.forName(transportClassName); Class<?> transportClass = Class.forName(transportClassName);
log.debug("成功加载TcpMaster内部类:{}", transportClassName); log.debug("加载TcpMaster内部类成功:{}", transportClassName);
// 2. 获取TcpMaster的private字段 "transport"(存储内部类实例) // 获取TcpMaster的private字段 "transport"
Field transportField = TcpMaster.class.getDeclaredField("transport"); Field transportField = TcpMaster.class.getDeclaredField("transport");
transportField.setAccessible(true); // 取消私有字段访问限制 transportField.setAccessible(true);
Object transport = transportField.get(tcpMaster); Object transport = transportField.get(tcpMaster);
if (transport == null) { if (transport == null) {
log.error("TcpMaster的transport字段为null(连接未初始化?)"); log.error("TcpMaster的transport字段为空(连接未初始化)");
return null; return null;
} }
// 3. 获取内部类TcpTransport的private字段 "socket"(底层Socket) // 获取内部类的private字段 "socket"(若报错,确认字段名是否为"clientSocket")
// 注意:若报错"NoSuchFieldException",需打开Modbus4j源码确认字段名(如"clientSocket")
Field socketField = transportClass.getDeclaredField("socket"); Field socketField = transportClass.getDeclaredField("socket");
socketField.setAccessible(true); // 取消私有字段访问限制 socketField.setAccessible(true);
Socket socket = (Socket) socketField.get(transport); Socket socket = (Socket) socketField.get(transport);
log.debug("成功获取Socket:IP={}, 本地端口={}", socket.getInetAddress(), socket.getLocalPort()); log.debug("获取Socket成功:IP={}, 本地端口={}", socket.getInetAddress(), socket.getLocalPort());
return socket; return socket;
} catch (ClassNotFoundException e) { } catch (ClassNotFoundException e) {
log.error("=== 致命错误:TcpTransport类未找到!===\n" + log.error("=== 反射错误:TcpTransport类未找到!===\n" +
"原因:1. Modbus4j版本不兼容;2. 内部类路径错误(需确认是否为 TcpMaster$TcpTransport)\n" + "解决方案:1. 打开Modbus4j源码,确认内部类路径;2. 替换transportClassName为实际路径(如 com.xxx.TcpMaster$TcpTransport)", e);
"解决方案:1. 打开Modbus4j源码,找到TcpMaster的内部类TcpTransport,复制完整类名;\n" +
" 2. 将 transportClassName 改为实际路径(如 com.xxx.TcpMaster$TcpTransport)", e);
} catch (NoSuchFieldException e) { } catch (NoSuchFieldException e) {
log.error("=== 致命错误:字段未找到!===\n" + log.error("=== 反射错误:字段未找到!===\n" +
"原因:1. TcpMaster的transport字段名错误;2. TcpTransport的socket字段名错误\n" + "解决方案:1. 打开TcpMaster源码,确认transport字段名;2. 打开TcpTransport源码,确认socket字段名(可能为clientSocket)", e);
"解决方案:1. 打开TcpMaster源码,确认传输字段名(如'tcpTransport');\n" +
" 2. 打开TcpTransport源码,确认Socket字段名(如'clientSocket')", e);
} catch (IllegalAccessException e) { } catch (IllegalAccessException e) {
log.error("反射访问字段权限失败(可能被安全管理器拦截)", e); log.error("反射访问权限失败(安全管理器拦截)", e);
} catch (Exception e) { } catch (Exception e) {
log.error("获取底层Socket异常", e); log.error("获取Socket异常", e);
} }
return null; return null;
} }
/** /**
* 销毁ModbusMaster并关闭底层Socket(确保资源彻底释放) * 销毁Modbus连接(先关Socket,再销毁Master,彻底释放)
*/ */
private static void destroyModbusMaster(ModbusMaster master, int deviceId) { private static void destroyModbusMaster(ModbusMaster master, int deviceId) {
if (master == null) return; if (master == null) return;
try { try {
// 1. 先关闭底层Socket(反射获取) // 1. 先关闭底层Socket
Socket socket = getUnderlyingSocket(master); Socket socket = getUnderlyingSocket(master);
if (socket != null && !socket.isClosed()) { if (socket != null && !socket.isClosed()) {
socket.close(); socket.close();
log.debug("设备{}: 底层Socket已强制关闭", deviceId); log.debug("设备{}: Socket已关闭", deviceId);
} }
} catch (Exception e) { } catch (Exception e) {
log.error("设备{}: 关闭底层Socket异常", deviceId, e); log.error("设备{}: 关闭Socket异常", deviceId, e);
} finally { } finally {
// 2. 再销毁ModbusMaster // 2. 再销毁ModbusMaster
try { try {
...@@ -235,73 +229,68 @@ public class DeviceStatusReaderAndTimeSetter { ...@@ -235,73 +229,68 @@ public class DeviceStatusReaderAndTimeSetter {
} }
} }
} }
/** /**
* 启动多设备监控(反射方案,确保资源释放 * 启动多设备监控(优化版:资源管控、超时控制
*/ */
public void startMultiDeviceMonitoring( public void startMultiDeviceMonitoring(
String ip, int port, List<Integer> deviceIds, String ip, int port, List<Integer> deviceIds,
Consumer<DeviceStatusReaderDto> resultHandler, Consumer<DeviceStatusReaderDto> resultHandler,
Predicate<int[]> stopCondition) { Predicate<int[]> stopCondition) {
if (deviceIds == null || deviceIds.isEmpty()) { if (deviceIds == null || deviceIds.isEmpty()) {
log.warn("⚠️ 设备ID列表为空,不执行监控"); log.warn("设备ID列表为空,不执行监控");
return; return;
} }
final CountDownLatch latch = new CountDownLatch(deviceIds.size()); final CountDownLatch latch = new CountDownLatch(deviceIds.size());
log.info("开始监控设备:IP={}, 端口={}, 设备数={}(线程池活跃线程:{})", log.info("启动多设备监控:IP={}, 端口={}, 设备数={},活跃线程={}",
ip, port, deviceIds.size(), ((ThreadPoolExecutor) FIXED_THREAD_POOL).getActiveCount()); ip, port, deviceIds.size(), ((ThreadPoolExecutor) deviceExecutor).getActiveCount());
for (int deviceId : deviceIds) { for (int deviceId : deviceIds) {
final int devId = deviceId; final int devId = deviceId;
FIXED_THREAD_POOL.submit(() -> { // 提交设备任务(带超时控制)
deviceExecutor.submit(() -> {
ModbusMaster threadMaster = null; ModbusMaster threadMaster = null;
try { try {
// 1. 创建独立连接(每个设备任务一个连接,避免复用泄漏) // 创建独立连接(每个设备一个连接,避免复用泄漏)
threadMaster = createModbusMaster(ip, port); threadMaster = createModbusMaster(ip, port);
// 读取数据(带重试)
// 2. 读取数据(带重试和异常处理)
int[] result = readWithConditionalRetry(threadMaster, ip, port, devId, stopCondition); int[] result = readWithConditionalRetry(threadMaster, ip, port, devId, stopCondition);
// 回调结果(判空避免NPE)
// 3. 回调处理结果(判空避免NPE)
if (resultHandler != null) { if (resultHandler != null) {
resultHandler.accept(new DeviceStatusReaderDto(ip, port, devId, result)); resultHandler.accept(new DeviceStatusReaderDto(ip, port, devId, result));
} }
} catch (ModbusInitException e) { } catch (ModbusInitException e) {
log.error("设备{}: Modbus连接初始化失败", devId, e); log.error("设备{}: 连接初始化失败", devId, e);
recordAlarm("03", "ip:" + ip + ",port:" + port + ",deviceId:" + devId, recordAlarm("03", "ip:" + ip + ",port:" + port + ",deviceId:" + devId,
"连接初始化失败:" + e.getMessage()); "连接初始化失败:" + e.getMessage());
} catch (Throwable e) { } catch (Throwable e) {
log.error("设备{}: 监控任务致命异常", devId, e); log.error("设备{}: 监控任务异常", devId, e);
String alarmMsg = e instanceof OutOfMemoryError ? "内存溢出:" + e.getMessage() : "监控任务异常:" + e.getMessage(); String alarmMsg = e instanceof OutOfMemoryError ? "内存溢出:" + e.getMessage() : "监控任务异常:" + e.getMessage();
recordAlarm("03", "ip:" + ip + ",port:" + port + ",deviceId:" + devId, alarmMsg); recordAlarm("03", "ip:" + ip + ",port:" + port + ",deviceId:" + devId, alarmMsg);
} finally { } finally {
// 关键:无论成功/失败,都强制销毁连接(避免资源泄漏 // 强制销毁连接(无论成功/失败
destroyModbusMaster(threadMaster, devId); destroyModbusMaster(threadMaster, devId);
latch.countDown(); latch.countDown();
log.info("设备{}: 监控任务完成(剩余任务:{})", devId, latch.getCount()); log.debug("设备{}: 任务完成,剩余任务数:{}", devId, latch.getCount());
} }
}); });
} }
// 等待所有任务完成,超时后强制中断(避免长期阻塞 // 等待任务完成(超时后记录告警,不关闭线程池
try { try {
if (!latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS)) { if (!latch.await(TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
log.warn("IP{}: 部分设备监控超时(未完成任务数:{}),强制中断", ip, latch.getCount()); log.warn("IP{}: 部分设备监控超时(未完成:{})", ip, latch.getCount());
recordAlarm("03", "ip:" + ip + ",port:" + port, recordAlarm("03", "ip:" + ip + ",port:" + port,
"部分设备监控超时(未完成:" + latch.getCount() + ")"); "部分设备监控超时(未完成:" + latch.getCount() + ")");
// 超时后关闭线程池(避免任务堆积)
FIXED_THREAD_POOL.shutdownNow();
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("IP{}: 监控任务被中断", ip, e); log.error("IP{}: 监控任务被中断", ip, e);
Thread.currentThread().interrupt(); // 恢复中断状态 Thread.currentThread().interrupt();
} }
} }
/** /**
* 统一告警记录(抽离避免重复代码) * 统一告警记录(抽离避免重复代码)
*/ */
......
...@@ -7,9 +7,7 @@ import com.zehong.system.modbus.business.DeviceStatusReaderAndTimeSetter; ...@@ -7,9 +7,7 @@ import com.zehong.system.modbus.business.DeviceStatusReaderAndTimeSetter;
import com.zehong.system.modbus.handler.ModbusResultHandler; import com.zehong.system.modbus.handler.ModbusResultHandler;
import com.zehong.system.service.ITEquipmentAlarmDataService; import com.zehong.system.service.ITEquipmentAlarmDataService;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.quartz.Job; import org.quartz.*;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -20,6 +18,7 @@ import java.util.Arrays; ...@@ -20,6 +18,7 @@ import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.function.Predicate;
/** /**
* @author lenovo * @author lenovo
...@@ -27,6 +26,7 @@ import java.util.concurrent.*; ...@@ -27,6 +26,7 @@ import java.util.concurrent.*;
* @description 上电以后 两分钟执行一次的逻辑 * @description 上电以后 两分钟执行一次的逻辑
*/ */
@Component @Component
@DisallowConcurrentExecution
public class DeviceCommunicationJob implements Job { public class DeviceCommunicationJob implements Job {
private static final Logger log = LoggerFactory.getLogger(DeviceCommunicationJob.class); private static final Logger log = LoggerFactory.getLogger(DeviceCommunicationJob.class);
...@@ -39,6 +39,11 @@ public class DeviceCommunicationJob implements Job { ...@@ -39,6 +39,11 @@ public class DeviceCommunicationJob implements Job {
@Autowired @Autowired
private ModbusResultHandler resultHandler; private ModbusResultHandler resultHandler;
// 单个端口通信超时时间(确保总时间 < Cron周期)
private static final int PORT_TIMEOUT_SECONDS = 8;
// 临时线程池(避免静态线程池关闭问题)
private final ExecutorService portExecutor = Executors.newFixedThreadPool(3); // 3个端口,3个线程
@Override @Override
public void execute(JobExecutionContext context) { public void execute(JobExecutionContext context) {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
...@@ -49,10 +54,10 @@ public class DeviceCommunicationJob implements Job { ...@@ -49,10 +54,10 @@ public class DeviceCommunicationJob implements Job {
String ip = null; String ip = null;
try { try {
log.info("=== DeviceCommunicationJob 开始执行:fStoreyIdStr={}(当前线程:{}) ===", log.info("=== DeviceCommunicationJob 启动:fStoreyIdStr={},线程={} ===",
fStoreyIdStr, Thread.currentThread().getName()); fStoreyIdStr, Thread.currentThread().getName());
// 提取并校验参数 // 1. 提取参数(严格校验,避免后续异常)
data = context.getJobDetail().getJobDataMap(); data = context.getJobDetail().getJobDataMap();
if (data == null) { if (data == null) {
log.error("JobDataMap为空,终止执行"); log.error("JobDataMap为空,终止执行");
...@@ -61,94 +66,114 @@ public class DeviceCommunicationJob implements Job { ...@@ -61,94 +66,114 @@ public class DeviceCommunicationJob implements Job {
fStoreyIdStr = data.getString("fStoreyId"); fStoreyIdStr = data.getString("fStoreyId");
if (StringUtils.isBlank(fStoreyIdStr)) { if (StringUtils.isBlank(fStoreyIdStr)) {
log.error("fStoreyId参数为空,终止执行"); log.error("fStoreyId参数为空,终止执行");
recordAlarm(null, "fStoreyId为空", "通信任务异常:fStoreyId参数为空");
return; return;
} }
// 转换参数 // 2. 转换参数(处理格式错误)
try { try {
fStoreyId = Long.parseLong(fStoreyIdStr); fStoreyId = Long.parseLong(fStoreyIdStr);
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
log.error("fStoreyId格式错误:{},终止执行", fStoreyIdStr); log.error("fStoreyId格式错误:{},终止执行", fStoreyIdStr);
recordAlarm(null, fStoreyIdStr, "通信任务异常:fStoreyId格式错误:" + fStoreyIdStr);
return; return;
} }
// 查询设备信息 // 3. 查询设备信息(双重校验+异常捕获)
tStoreyInfo = tStoreyInfoMapper.selectTStoreyInfoById(fStoreyId); try {
tStoreyInfo = tStoreyInfoMapper.selectTStoreyInfoById(fStoreyId);
} catch (Exception e) {
log.error("查询设备信息异常:fStoreyId={}", fStoreyId, e);
recordAlarm(null, fStoreyIdStr, "通信任务异常:查询设备信息失败:" + e.getMessage());
return;
}
if (tStoreyInfo == null) { if (tStoreyInfo == null) {
log.error("未查询到设备信息:fStoreyId={},终止执行并清理任务", fStoreyId); log.error("未查询到设备信息:fStoreyId={},清理无效任务", fStoreyId);
context.getScheduler().deleteJob(context.getJobDetail().getKey()); // 清理任务(单独捕获SchedulerException,避免异常穿透)
try {
context.getScheduler().deleteJob(context.getJobDetail().getKey());
} catch (SchedulerException e) {
log.error("清理无效任务失败:fStoreyId={}", fStoreyId, e);
}
recordAlarm(null, fStoreyIdStr, "通信任务异常:未查询到设备信息,已清理任务");
return; return;
} }
ip = tStoreyInfo.getfIp(); ip = tStoreyInfo.getfIp();
if (StringUtils.isBlank(ip)) { if (StringUtils.isBlank(ip)) {
log.error("设备IP为空:fStoreyId={},终止执行", fStoreyId); log.error("设备IP为空:fStoreyId={},终止执行", fStoreyId);
recordAlarm(tStoreyInfo, "设备IP为空,无法执行通信任务"); recordAlarm(tStoreyInfo, "设备IP为空", "通信任务异常:设备IP为空");
return; return;
} }
// 校验resultHandler // 4. 校验依赖组件(避免空指针)
if (resultHandler == null) { if (resultHandler == null) {
log.error("ModbusResultHandler未初始化,终止执行"); log.error("ModbusResultHandler未初始化,终止执行");
recordAlarm(tStoreyInfo, "ModbusResultHandler未初始化"); recordAlarm(tStoreyInfo, "ResultHandler未初始化", "通信任务异常:ModbusResultHandler未初始化");
return; return;
} }
// 校验停止条件(避免NullPointerException)
Predicate<int[]> stopCondition = ModbusResultHandler.createDefaultStopCondition();
// 将变量声明为final,供匿名内部类使用 // 5. 设备ID列表(拆分后便于管理)
final String finalIp = ip;
final Long finalFStoreyId = fStoreyId;
// 设备ID列表
List<Integer> offsets1 = Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27); List<Integer> offsets1 = Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27);
List<Integer> offsets2 = Arrays.asList(28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54); List<Integer> offsets2 = Arrays.asList(28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54);
List<Integer> offsets3 = Arrays.asList(55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72); List<Integer> offsets3 = Arrays.asList(55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72);
// 使用final变量 // 6. 多端口并行通信(使用临时线程池,避免静态线程池问题)
executeWithTimeout(() -> { final String finalIp = ip;
deviceStatusReaderAndTimeSetter.startMultiDeviceMonitoring(finalIp, 501, offsets1, resultHandler, ModbusResultHandler.createDefaultStopCondition()); final Long finalFStoreyId = fStoreyId;
log.info("Modbus 501端口通信完成:fStoreyId={}", finalFStoreyId); final TStoreyInfo finalTStoreyInfo = tStoreyInfo;
}, 10, TimeUnit.SECONDS, tStoreyInfo, "501端口通信超时", finalIp, finalFStoreyId); Future<?> port1Future = portExecutor.submit(() -> {
executePortCommunication(finalIp, 501, offsets1, resultHandler, stopCondition, finalFStoreyId, finalTStoreyInfo);
executeWithTimeout(() -> { });
deviceStatusReaderAndTimeSetter.startMultiDeviceMonitoring(finalIp, 502, offsets2, resultHandler, ModbusResultHandler.createDefaultStopCondition()); Future<?> port2Future = portExecutor.submit(() -> {
log.info("Modbus 502端口通信完成:fStoreyId={}", finalFStoreyId); executePortCommunication(finalIp, 502, offsets2, resultHandler, stopCondition, finalFStoreyId, finalTStoreyInfo);
}, 10, TimeUnit.SECONDS, tStoreyInfo, "502端口通信超时", finalIp, finalFStoreyId); });
Future<?> port3Future = portExecutor.submit(() -> {
executeWithTimeout(() -> { executePortCommunication(finalIp, 503, offsets3, resultHandler, stopCondition, finalFStoreyId, finalTStoreyInfo);
deviceStatusReaderAndTimeSetter.startMultiDeviceMonitoring(finalIp, 503, offsets3, resultHandler, ModbusResultHandler.createDefaultStopCondition()); });
log.info("Modbus 503端口通信完成:fStoreyId={}", finalFStoreyId);
}, 10, TimeUnit.SECONDS, tStoreyInfo, "503端口通信超时", finalIp, finalFStoreyId); // 等待所有端口完成(总超时 = 单个端口超时 * 1.2,避免无限等待)
port1Future.get(PORT_TIMEOUT_SECONDS * 1200, TimeUnit.MILLISECONDS);
// 校验执行时间 port2Future.get(PORT_TIMEOUT_SECONDS * 1200, TimeUnit.MILLISECONDS);
port3Future.get(PORT_TIMEOUT_SECONDS * 1200, TimeUnit.MILLISECONDS);
// 7. 校验执行时间(确保未超过Cron周期)
long costTime = System.currentTimeMillis() - startTime; long costTime = System.currentTimeMillis() - startTime;
if (costTime > 110000) { if (costTime > 240000) { // 超过4分钟(Cron为5分钟,留1分钟缓冲)
log.warn("任务执行时间过长:{}ms(接近Cron周期),可能导致任务叠加", costTime); log.warn("任务执行超时:{}ms(Cron周期5分钟),可能导致下一次任务延迟", costTime);
recordAlarm(tStoreyInfo, "任务执行时间过长:" + costTime + "ms"); recordAlarm(tStoreyInfo, "任务超时", "通信任务警告:执行时间过长:" + costTime + "ms");
} }
log.info("=== DeviceCommunicationJob 执行成功:fStoreyId={}(耗时:{}ms) ===", finalFStoreyId, costTime); log.info("=== DeviceCommunicationJob 成功:fStoreyId={},耗时={}ms ===", finalFStoreyId, costTime);
} catch (Throwable e) { } catch (Throwable e) {
log.error("=== DeviceCommunicationJob 执行致命异常:fStoreyIdStr={} ===", fStoreyIdStr, e); // 8. 捕获所有异常(确保不传播到Quartz)
try { log.error("=== DeviceCommunicationJob 致命异常:fStoreyIdStr={} ===", fStoreyIdStr, e);
if (tStoreyInfo != null && StringUtils.isNotBlank(tStoreyInfo.getfStoreyCode())) { recordAlarm(tStoreyInfo, fStoreyIdStr, "通信任务致命异常:" + e.getMessage());
recordAlarm(tStoreyInfo, "通信任务异常:" + e.getMessage());
} else {
TEquipmentAlarmData alarm = new TEquipmentAlarmData();
alarm.setfAlarmType("03");
alarm.setfEquipmentCode(fStoreyIdStr);
alarm.setfAlarmData("通信任务异常(设备信息缺失):" + e.getMessage());
alarm.setfCreateTime(new Date());
alarmDataService.insertTEquipmentAlarmData(alarm);
}
} catch (Exception alarmEx) {
log.error("=== 告警记录失败(不影响触发器) ===", alarmEx);
}
} finally { } finally {
// 9. 强制释放资源(避免内存泄漏)
portExecutor.shutdown(); // 关闭临时线程池
long costTime = System.currentTimeMillis() - startTime; long costTime = System.currentTimeMillis() - startTime;
log.info("=== DeviceCommunicationJob 执行结束:fStoreyIdStr={}(总耗时:{}ms) ===", fStoreyIdStr, costTime); log.info("=== DeviceCommunicationJob 结束:fStoreyIdStr={},总耗时={}ms ===", fStoreyIdStr, costTime);
}
}
/**
* 单个端口的Modbus通信(独立方法,便于异常管控)
*/
private void executePortCommunication(String ip, int port, List<Integer> deviceIds,
ModbusResultHandler resultHandler, Predicate<int[]> stopCondition,
Long fStoreyId, TStoreyInfo tStoreyInfo) {
try {
log.info("开始端口{}通信:fStoreyId={},设备数={}", port, fStoreyId, deviceIds.size());
deviceStatusReaderAndTimeSetter.startMultiDeviceMonitoring(ip, port, deviceIds, resultHandler, stopCondition);
log.info("端口{}通信完成:fStoreyId={}", port, fStoreyId);
} catch (Exception e) {
log.error("端口{}通信异常:fStoreyId={}", port, fStoreyId, e);
recordAlarm(tStoreyInfo, "端口" + port + "异常", "Modbus通信异常:端口" + port + "失败:" + e.getMessage());
} }
} }
// 简化方法签名,不需要传入ip和fStoreyId参数 // 简化方法签名,不需要传入ip和fStoreyId参数
private void executeWithTimeout(Runnable task, long timeout, TimeUnit unit, private void executeWithTimeout(Runnable task, long timeout, TimeUnit unit,
TStoreyInfo tStoreyInfo, String timeoutMsg, TStoreyInfo tStoreyInfo, String timeoutMsg,
...@@ -160,23 +185,27 @@ public class DeviceCommunicationJob implements Job { ...@@ -160,23 +185,27 @@ public class DeviceCommunicationJob implements Job {
executor.shutdown(); executor.shutdown();
} catch (TimeoutException e) { } catch (TimeoutException e) {
log.error("{}:fStoreyId={}, ip={}", timeoutMsg, fStoreyId, ip, e); log.error("{}:fStoreyId={}, ip={}", timeoutMsg, fStoreyId, ip, e);
recordAlarm(tStoreyInfo, timeoutMsg + "(IP:" + ip + ",设备ID:" + fStoreyId + ")"); recordAlarm(tStoreyInfo, tStoreyInfo.getfEquipmentCode(),timeoutMsg + "(IP:" + ip + ",设备ID:" + fStoreyId + ")");
} catch (Exception e) { } catch (Exception e) {
log.error("{}执行异常:fStoreyId={}, ip={}", timeoutMsg, fStoreyId, ip, e); log.error("{}执行异常:fStoreyId={}, ip={}", timeoutMsg, fStoreyId, ip, e);
recordAlarm(tStoreyInfo, timeoutMsg + "执行异常(IP:" + ip + ",设备ID:" + fStoreyId + "):" + e.getMessage()); recordAlarm(tStoreyInfo, tStoreyInfo.getfEquipmentCode(),timeoutMsg + "执行异常(IP:" + ip + ",设备ID:" + fStoreyId + "):" + e.getMessage());
} }
} }
private void recordAlarm(TStoreyInfo tStoreyInfo, String alarmData) {
/**
* 统一告警记录(兼容设备信息为空的场景)
*/
private void recordAlarm(TStoreyInfo tStoreyInfo, String equipmentCode, String alarmData) {
try { try {
TEquipmentAlarmData alarm = new TEquipmentAlarmData(); TEquipmentAlarmData alarm = new TEquipmentAlarmData();
alarm.setfAlarmType("03"); alarm.setfAlarmType("03"); // 老化层告警
alarm.setfEquipmentCode(tStoreyInfo.getfStoreyCode()); alarm.setfEquipmentCode(tStoreyInfo != null ? tStoreyInfo.getfStoreyCode() : equipmentCode);
alarm.setfAlarmData(alarmData); alarm.setfAlarmData(alarmData);
alarm.setfCreateTime(new Date()); alarm.setfCreateTime(new Date());
alarmDataService.insertTEquipmentAlarmData(alarm); alarmDataService.insertTEquipmentAlarmData(alarm);
} catch (Exception e) { } catch (Exception e) {
log.error("记录告警失败:{}", alarmData, e); log.error("告警记录失败:{}", alarmData, e);
} }
} }
} }
package com.zehong.system.task; package com.zehong.system.task;
import org.quartz.*; import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -9,6 +10,7 @@ import javax.annotation.Resource; ...@@ -9,6 +10,7 @@ import javax.annotation.Resource;
import java.time.Instant; import java.time.Instant;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.Date; import java.util.Date;
import java.util.Set;
/** /**
* @author lenovo * @author lenovo
...@@ -76,39 +78,35 @@ public class DeviceTaskScheduler { ...@@ -76,39 +78,35 @@ public class DeviceTaskScheduler {
JobKey jobKey = new JobKey(jobId, JOB_GROUP); JobKey jobKey = new JobKey(jobId, JOB_GROUP);
TriggerKey triggerKey = new TriggerKey(jobId + "_TRIGGER", TRIGGER_GROUP); TriggerKey triggerKey = new TriggerKey(jobId + "_TRIGGER", TRIGGER_GROUP);
// 若任务已存在且有效,跳过创建 // 1. 构建JobDetail(仅定义任务元数据,不涉及调度规则)
if (isJobValid(jobKey, triggerKey)) {
log.info("通信任务[{}]已存在且有效,跳过创建", jobId);
return;
}
// 1. 构建JobDetail(若依要求:Job类必须是Spring Bean,且无参构造)
JobDetail job = JobBuilder.newJob(DeviceCommunicationJob.class) JobDetail job = JobBuilder.newJob(DeviceCommunicationJob.class)
.withIdentity(jobKey) .withIdentity(jobKey)
.usingJobData("fStoreyId", fStoreyId.toString()) // 传递参数(String类型避免溢出) .withDescription("设备" + fStoreyId + "每5分钟Modbus通信任务")
.storeDurably(false) // 若依建议:非持久化(触发器删除后Job自动失效) .usingJobData("fStoreyId", fStoreyId.toString())
.withDescription("设备" + fStoreyId + "每2分钟通信任务") .storeDurably(false)
.requestRecovery(true) // 服务重启后恢复未完成的任务 .requestRecovery(true) // 服务重启后恢复未完成任务
.build(); .build(); // 移除错误的 withSchedule() 调用
// 2. 构建CronTrigger(每2分钟执行,兼容若依时间解析 // 2. 构建CronTrigger(调度规则在此配置
CronTrigger trigger = TriggerBuilder.newTrigger() CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey) .withIdentity(triggerKey)
.forJob(jobKey) // 显式绑定Job(避免触发器游离) .forJob(jobKey)
.withDescription("设备" + fStoreyId + "通信任务触发器(每5分钟)")
.withSchedule(CronScheduleBuilder.cronSchedule("0 0/5 * * * ?") .withSchedule(CronScheduleBuilder.cronSchedule("0 0/5 * * * ?")
// 错过触发时:立即执行一次,后续按原计划(关键!) // 关键:Misfire策略(错过触发后忽略,避免集中执行)
.withMisfireHandlingInstructionFireAndProceed()) .withMisfireHandlingInstructionDoNothing())
.startNow() // 立即生效(无需等待第一个周期) .startNow()
.endAt(Date.from(Instant.now().plus(7, ChronoUnit.DAYS))) // 7天有效期
.build(); .build();
// 3. 原子操作:强制替换旧任务(避免残留状态) // 3. 原子操作:创建/更新任务
if (scheduler.checkExists(jobKey)) { if (scheduler.checkExists(jobKey)) {
// 先删除旧任务,再创建新任务(彻底清理状态) Date nextFireTime = scheduler.rescheduleJob(triggerKey, trigger);
scheduler.deleteJob(jobKey); log.info("通信任务[{}]更新触发器成功,下次执行时间:{}", jobId, nextFireTime);
log.info("通信任务[{}]旧任务已删除,准备创建新任务", jobId); } else {
Date nextFireTime = scheduler.scheduleJob(job, trigger);
log.info("通信任务[{}]创建成功,下次执行时间:{}", jobId, nextFireTime);
} }
Date nextFireTime = scheduler.scheduleJob(job, trigger);
log.info("通信任务[{}]创建成功,下次执行时间:{}", jobId, nextFireTime);
} }
/** /**
...@@ -119,15 +117,10 @@ public class DeviceTaskScheduler { ...@@ -119,15 +117,10 @@ public class DeviceTaskScheduler {
JobKey jobKey = new JobKey(jobId, JOB_GROUP); JobKey jobKey = new JobKey(jobId, JOB_GROUP);
TriggerKey triggerKey = new TriggerKey(jobId + "_TRIGGER", TRIGGER_GROUP); TriggerKey triggerKey = new TriggerKey(jobId + "_TRIGGER", TRIGGER_GROUP);
// 若任务已存在且有效,跳过创建 // 构建JobDetail
if (isJobValid(jobKey, triggerKey)) {
log.info("最终任务[{}]已存在且有效,跳过创建", jobId);
return;
}
// 1. 构建JobDetail
JobDetail job = JobBuilder.newJob(FinalExecutionJob.class) JobDetail job = JobBuilder.newJob(FinalExecutionJob.class)
.withIdentity(jobKey) .withIdentity(jobKey)
.withDescription("设备" + fStoreyId + "最终执行任务(仅一次)")
.usingJobData("fStoreyId", fStoreyId.toString()) .usingJobData("fStoreyId", fStoreyId.toString())
.usingJobData("fPowerOutageIp", fPowerOutageIp) .usingJobData("fPowerOutageIp", fPowerOutageIp)
.usingJobData("fPowerOutagePort", fPowerOutagePort.toString()) .usingJobData("fPowerOutagePort", fPowerOutagePort.toString())
...@@ -135,19 +128,19 @@ public class DeviceTaskScheduler { ...@@ -135,19 +128,19 @@ public class DeviceTaskScheduler {
.requestRecovery(true) .requestRecovery(true)
.build(); .build();
// 2. 构建SimpleTrigger(5分钟后执行,仅一次) // 构建SimpleTrigger
Date executeTime = Date.from(Instant.now().plus(15, ChronoUnit.MINUTES)); Date executeTime = Date.from(Instant.now().plus(15, ChronoUnit.MINUTES));
SimpleTrigger trigger = TriggerBuilder.newTrigger() SimpleTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey) .withIdentity(triggerKey)
.forJob(jobKey) // 显式绑定Job .forJob(jobKey)
.startAt(executeTime) // 精确执行时间 .withDescription("设备" + fStoreyId + "最终任务触发器")
.startAt(executeTime)
.withSchedule(SimpleScheduleBuilder.simpleSchedule() .withSchedule(SimpleScheduleBuilder.simpleSchedule()
// 错过触发时:立即执行(避免任务失效) .withMisfireHandlingInstructionFireNow() // 错过触发立即执行
.withMisfireHandlingInstructionFireNow() .withRepeatCount(0)) // 仅执行一次
.withRepeatCount(0)) // 仅执行一次(关键!)
.build(); .build();
// 3. 原子操作:创建/更新任务 // 原子操作:创建/更新
if (scheduler.checkExists(jobKey)) { if (scheduler.checkExists(jobKey)) {
Date nextFireTime = scheduler.rescheduleJob(triggerKey, trigger); Date nextFireTime = scheduler.rescheduleJob(triggerKey, trigger);
log.info("最终任务[{}]更新触发器成功,执行时间:{}", jobId, nextFireTime); log.info("最终任务[{}]更新触发器成功,执行时间:{}", jobId, nextFireTime);
...@@ -156,40 +149,77 @@ public class DeviceTaskScheduler { ...@@ -156,40 +149,77 @@ public class DeviceTaskScheduler {
log.info("最终任务[{}]创建成功,执行时间:{}", jobId, nextFireTime); log.info("最终任务[{}]创建成功,执行时间:{}", jobId, nextFireTime);
} }
} }
/** /**
* 检查任务是否存在且有效(触发器有下次执行时间) * 检查任务是否已存在(避免重复创建)
*/
private boolean isTaskExists(String jobId) throws SchedulerException {
Set<JobKey> jobKeys = scheduler.getJobKeys(GroupMatcher.jobGroupEquals(JOB_GROUP));
for (JobKey key : jobKeys) {
if (key.getName().equals(jobId)) {
return true;
}
}
return false;
}
/**
* 清理无效任务(创建失败时避免残留)
*/
private void cleanInvalidTask(Long fStoreyId) {
try {
JobKey commJobKey = new JobKey("COMM_" + fStoreyId, JOB_GROUP);
JobKey finalJobKey = new JobKey("FINAL_" + fStoreyId, JOB_GROUP);
if (scheduler.checkExists(commJobKey)) {
scheduler.deleteJob(commJobKey);
log.info("清理无效通信任务:fStoreyId={}", fStoreyId);
}
if (scheduler.checkExists(finalJobKey)) {
scheduler.deleteJob(finalJobKey);
log.info("清理无效最终任务:fStoreyId={}", fStoreyId);
}
} catch (SchedulerException e) {
log.error("清理无效任务失败:fStoreyId={}", fStoreyId, e);
}
}
/**
* 检查任务是否有效(优化逻辑:避免误删有效任务)
*/ */
private boolean isJobValid(JobKey jobKey, TriggerKey triggerKey) throws SchedulerException { private boolean isJobValid(JobKey jobKey, TriggerKey triggerKey) throws SchedulerException {
// 1. 检查Job和Trigger是否存在
if (!scheduler.checkExists(jobKey) || !scheduler.checkExists(triggerKey)) { if (!scheduler.checkExists(jobKey) || !scheduler.checkExists(triggerKey)) {
log.debug("任务[{}]或触发器[{}]不存在", jobKey.getName(), triggerKey.getName()); log.debug("任务[{}]或触发器[{}]不存在", jobKey.getName(), triggerKey.getName());
return false; return false;
} }
// 2. 检查触发器是否有下次执行时间
Trigger trigger = scheduler.getTrigger(triggerKey); Trigger trigger = scheduler.getTrigger(triggerKey);
if (trigger == null || trigger.getNextFireTime() == null) { Trigger.TriggerState state = scheduler.getTriggerState(triggerKey);
log.debug("触发器[{}]无效(无下次执行时间)", triggerKey.getName()); // 触发器状态为NORMAL且有下次执行时间,才视为有效
// 清理无效触发器 if (trigger == null || trigger.getNextFireTime() == null || state != Trigger.TriggerState.NORMAL) {
log.debug("触发器[{}]无效:状态={}, 下次执行时间={}",
triggerKey.getName(), state, trigger != null ? trigger.getNextFireTime() : "null");
// 清理无效触发器(保留Job,避免重建)
scheduler.unscheduleJob(triggerKey); scheduler.unscheduleJob(triggerKey);
return false; return false;
} }
return true; return true;
} }
/** /**
* 验证任务状态(调试用,打印关键信息 * 验证任务状态(增强日志详情
*/ */
private void checkTaskStatus(Long fStoreyId) throws SchedulerException { private void checkTaskStatus(Long fStoreyId) throws SchedulerException {
// 检查通信任务 TriggerKey commTriggerKey = new TriggerKey("COMM_" + fStoreyId + "_TRIGGER", TRIGGER_GROUP);
Trigger commTrigger = scheduler.getTrigger(new TriggerKey("COMM_" + fStoreyId + "_TRIGGER", TRIGGER_GROUP)); TriggerKey finalTriggerKey = new TriggerKey("FINAL_" + fStoreyId + "_TRIGGER", TRIGGER_GROUP);
// 检查最终任务
Trigger finalTrigger = scheduler.getTrigger(new TriggerKey("FINAL_" + fStoreyId + "_TRIGGER", TRIGGER_GROUP)); Trigger commTrigger = scheduler.getTrigger(commTriggerKey);
Trigger finalTrigger = scheduler.getTrigger(finalTriggerKey);
log.info("=== 任务状态验证:fStoreyId={} ===", fStoreyId); log.info("=== 任务状态验证:fStoreyId={} ===", fStoreyId);
log.info("通信任务下次执行时间:{}", commTrigger != null ? commTrigger.getNextFireTime() : "不存在"); log.info("通信任务:状态={}, 下次执行={}, 过期时间={}",
log.info("最终任务执行时间:{}", finalTrigger != null ? finalTrigger.getNextFireTime() : "不存在"); commTrigger != null ? scheduler.getTriggerState(commTriggerKey) : "不存在",
commTrigger != null ? commTrigger.getNextFireTime() : "不存在",
commTrigger != null ? commTrigger.getEndTime() : "不存在");
log.info("最终任务:状态={}, 执行时间={}, 过期时间={}",
finalTrigger != null ? scheduler.getTriggerState(finalTriggerKey) : "不存在",
finalTrigger != null ? finalTrigger.getNextFireTime() : "不存在",
finalTrigger != null ? finalTrigger.getEndTime() : "不存在");
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment