Commit bdb2918e authored by wanghao's avatar wanghao

Revert "1 测试 上电后通信 和 最终完成 定时任务功能"

parent a80d6ec5
...@@ -13,6 +13,7 @@ import com.zehong.system.domain.TEquipmentAlarmData; ...@@ -13,6 +13,7 @@ import com.zehong.system.domain.TEquipmentAlarmData;
import com.zehong.system.modbus.handler.dto.DeviceStatusReaderDto; import com.zehong.system.modbus.handler.dto.DeviceStatusReaderDto;
import com.zehong.system.modbus.util.Modbus4jUtils; import com.zehong.system.modbus.util.Modbus4jUtils;
import com.zehong.system.service.ITEquipmentAlarmDataService; import com.zehong.system.service.ITEquipmentAlarmDataService;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
...@@ -20,6 +21,7 @@ import java.lang.reflect.Field; ...@@ -20,6 +21,7 @@ import java.lang.reflect.Field;
import java.net.Socket; import java.net.Socket;
import java.util.List; import java.util.List;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Predicate; import java.util.function.Predicate;
...@@ -30,42 +32,40 @@ import java.util.function.Predicate; ...@@ -30,42 +32,40 @@ import java.util.function.Predicate;
*/ */
@Component @Component
public class DeviceStatusReaderAndTimeSetter { public class DeviceStatusReaderAndTimeSetter {
// 常量配置(优化重试次数和超时,避免任务堆积 // 常量配置(优化重试,减少耗时
public static final int START_ADDRESS = 0; public static final int START_ADDRESS = 0;
public static final int REGISTER_COUNT = 10; public static final int REGISTER_COUNT = 10;
public static final int TARGET_VALUE = 1; public static final int MAX_RETRIES = 1; // 仅1次重试(减少总耗时)
public static final int MAX_RETRIES = 2; // 减少重试次数,缩短总耗时 public static final int RETRY_DELAY = 200; // 重试间隔200ms(缩短等待)
public static final int RETRY_DELAY = 300; // 重试间隔300ms // 线程池:Spring管理,非静态(避免资源泄漏)
public static final int TASK_TIMEOUT_SECONDS = 20; // 单设备任务总超时 private final ExecutorService deviceExecutor;
// 工厂
private static final ModbusFactory modbusFactory = new ModbusFactory(); // 构造函数:初始化线程池(核心线程=设备数/10,避免过多)
public DeviceStatusReaderAndTimeSetter() {
// 1. 关键优化:固定线程池大小(不随设备数变化,根据CPU核数设置,如10) this.deviceExecutor = new ThreadPoolExecutor(
4, // 核心线程4个(72个设备,4线程,每线程处理18个,效率适中)
// 设备通信线程池(Spring风格,避免静态线程池问题) 8, // 最大线程8个(峰值扩容)
private final ExecutorService deviceExecutor = new ThreadPoolExecutor(
8, // 核心线程8个(72个设备,8线程,每线程处理9个)
10, // 最大线程10个
60, 60,
TimeUnit.SECONDS, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(50), new LinkedBlockingQueue<>(30), // 队列30个,避免任务堆积
new ThreadFactory() { new ThreadFactory() {
private int count = 0; private int count = 0;
@Override @Override
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "modbus-device-pool-" + (count++)); Thread thread = new Thread(r, "modbus-device-pool-" + (count++));
thread.setDaemon(true); thread.setDaemon(true); // 守护线程,不阻塞JVM退出
return thread; return thread;
} }
}, },
new ThreadPoolExecutor.CallerRunsPolicy() // 队列满时调用线程执行,避免任务丢失 new ThreadPoolExecutor.CallerRunsPolicy() // 队列满时调用线程执行(避免任务丢失)
); );
}
// 工厂(单例)
private static final ModbusFactory modbusFactory = new ModbusFactory();
@Resource @Resource
private ITEquipmentAlarmDataService alarmDataService; private ITEquipmentAlarmDataService alarmDataService;
private static final org.slf4j.Logger log = LoggerFactory.getLogger(DeviceStatusReaderAndTimeSetter.class);
private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(DeviceStatusReaderAndTimeSetter.class);
/** /**
* 读取设备寄存器(线程安全版) * 读取设备寄存器(线程安全版)
*/ */
...@@ -219,67 +219,77 @@ public class DeviceStatusReaderAndTimeSetter { ...@@ -219,67 +219,77 @@ public class DeviceStatusReaderAndTimeSetter {
} }
} }
/** /**
* 启动多设备监控(优化版:资源管控、超时控制 * 启动多设备监控(新增portTimeout参数,统一超时
*/ */
public void startMultiDeviceMonitoring( public void startMultiDeviceMonitoring(
String ip, int port, List<Integer> deviceIds, String ip, int port, List<Integer> deviceIds,
Consumer<DeviceStatusReaderDto> resultHandler, Consumer<DeviceStatusReaderDto> resultHandler,
Predicate<int[]> stopCondition) { Predicate<int[]> stopCondition,
int portTimeout) { // 单个端口总超时(秒)
if (deviceIds == null || deviceIds.isEmpty()) { if (deviceIds == null || deviceIds.isEmpty()) {
log.warn("设备ID列表为空,不执行监控"); log.warn("设备ID列表为空,不执行监控:IP={}, 端口={}", ip, port);
return; return;
} }
final CountDownLatch latch = new CountDownLatch(deviceIds.size()); final CountDownLatch latch = new CountDownLatch(deviceIds.size());
log.info("启动多设备监控:IP={}, 端口={}, 设备数={},活跃线程={}", log.debug("启动端口{}监控:IP={},设备数={},超时={}s", ip, port, deviceIds.size(), portTimeout);
ip, port, deviceIds.size(), ((ThreadPoolExecutor) deviceExecutor).getActiveCount());
for (int deviceId : deviceIds) { for (int deviceId : deviceIds) {
final int devId = deviceId; final int devId = deviceId;
// 提交设备任务(带超时控制)
deviceExecutor.submit(() -> { deviceExecutor.submit(() -> {
ModbusMaster threadMaster = null; AtomicReference<ModbusMaster> threadMaster = null;
try { try {
// 创建独立连接(每个设备一个连接,避免复用泄漏) // 单个设备任务超时控制(portTimeout/设备数,避免整体超时)
threadMaster = createModbusMaster(ip, port); int devTimeout = Math.max(1, portTimeout * 1000 / deviceIds.size());
// 读取数据(带重试) ExecutorService devExecutor = Executors.newSingleThreadExecutor();
int[] result = readWithConditionalRetry(threadMaster, ip, port, devId, stopCondition); Future<?> devFuture = devExecutor.submit(() -> {
// 回调结果(判空避免NPE) try {
// 1. 创建独立连接(每个设备一个,避免复用泄漏)
threadMaster.set(createModbusMaster(ip, port));
// 2. 读取数据(带重试)
int[] result = readWithConditionalRetry(threadMaster.get(), ip, port, devId, stopCondition);
// 3. 回调结果
if (resultHandler != null) { if (resultHandler != null) {
resultHandler.accept(new DeviceStatusReaderDto(ip, port, devId, result)); resultHandler.accept(new DeviceStatusReaderDto(ip, port, devId, result));
} }
} catch (Exception e) {
throw new RuntimeException("设备" + devId + "执行异常", e);
}
});
} catch (ModbusInitException e) { // 单个设备任务超时等待
log.error("设备{}: 连接初始化失败", devId, e); devFuture.get(devTimeout, TimeUnit.MILLISECONDS);
recordAlarm("03", "ip:" + ip + ",port:" + port + ",deviceId:" + devId, devExecutor.shutdown();
"连接初始化失败:" + e.getMessage());
} catch (Throwable e) {
log.error("设备{}: 监控任务异常", devId, e);
String alarmMsg = e instanceof OutOfMemoryError ? "内存溢出:" + e.getMessage() : "监控任务异常:" + e.getMessage();
recordAlarm("03", "ip:" + ip + ",port:" + port + ",deviceId:" + devId, alarmMsg);
} catch (TimeoutException e) {
String errMsg = "设备" + devId + "超时:IP=" + ip + ", 端口=" + port;
log.error(errMsg, e);
recordAlarm("03", "ip:" + ip + ",port:" + port + ",deviceId:" + devId, errMsg);
} catch (Exception e) {
String errMsg = "设备" + devId + "执行异常:IP=" + ip + ", 端口=" + port;
log.error(errMsg, e);
recordAlarm("03", "ip:" + ip + ",port:" + port + ",deviceId:" + devId, errMsg + ":" + e.getMessage());
} finally { } finally {
// 强制销毁连接(无论成功/失败) // 强制释放资源(无论成功/失败)
destroyModbusMaster(threadMaster, devId); destroyModbusMaster(threadMaster.get(), devId);
latch.countDown(); latch.countDown();
log.debug("设备{}: 任务完成,剩余任务数:{}", devId, latch.getCount()); log.debug("设备{}完成:IP={}, 端口={},剩余任务={}", devId, ip, port, latch.getCount());
} }
}); });
} }
// 等待任务完成(超时后记录告警,不关闭线程池 // 端口总超时等待(确保不阻塞上层Job
try { try {
if (!latch.await(TASK_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { if (!latch.await(portTimeout, TimeUnit.SECONDS)) {
log.warn("IP{}: 部分设备监控超时(未完成:{})", ip, latch.getCount()); String errMsg = "端口" + port + "部分设备超时:IP=" + ip + ",未完成=" + latch.getCount();
recordAlarm("03", "ip:" + ip + ",port:" + port, log.warn(errMsg);
"部分设备监控超时(未完成:" + latch.getCount() + ")"); recordAlarm("03", "ip:" + ip + ",port:" + port, errMsg);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("IP{}: 监控任务被中断", ip, e); log.error("端口{}监控被中断:IP={}", port, ip, e);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
/** /**
* 统一告警记录(抽离避免重复代码) * 统一告警记录(抽离避免重复代码)
*/ */
......
...@@ -342,7 +342,7 @@ public class TStoreyInfoServiceImpl implements ITStoreyInfoService ...@@ -342,7 +342,7 @@ public class TStoreyInfoServiceImpl implements ITStoreyInfoService
} else { } else {
registerOffset = Arrays.asList(55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72); registerOffset = Arrays.asList(55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72);
} }
deviceStatusReaderAndTimeSetter.startMultiDeviceMonitoring(split[0], Integer.parseInt(split[1]),registerOffset, modbusResultHandler, ModbusResultHandler.createDefaultStopCondition()); // deviceStatusReaderAndTimeSetter.startMultiDeviceMonitoring(split[0], Integer.parseInt(split[1]),registerOffset, modbusResultHandler, ModbusResultHandler.createDefaultStopCondition());
return null; return null;
......
...@@ -30,6 +30,14 @@ import java.util.function.Predicate; ...@@ -30,6 +30,14 @@ import java.util.function.Predicate;
public class DeviceCommunicationJob implements Job { public class DeviceCommunicationJob implements Job {
private static final Logger log = LoggerFactory.getLogger(DeviceCommunicationJob.class); private static final Logger log = LoggerFactory.getLogger(DeviceCommunicationJob.class);
// 常量:超时控制(确保总耗时 < 5分钟Cron周期)
private static final int SINGLE_PORT_TIMEOUT = 5; // 单个端口通信超时(秒)
private static final int JOB_TOTAL_TIMEOUT = 15; // Job总超时(秒)
// 设备ID列表(拆分:按端口分组,避免单次任务过多)
private static final List<Integer> OFFSETS_501 = Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27);
private static final List<Integer> OFFSETS_502 = Arrays.asList(28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54);
private static final List<Integer> OFFSETS_503 = Arrays.asList(55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72);
@Resource @Resource
private ITEquipmentAlarmDataService alarmDataService; private ITEquipmentAlarmDataService alarmDataService;
@Resource @Resource
...@@ -47,26 +55,28 @@ public class DeviceCommunicationJob implements Job { ...@@ -47,26 +55,28 @@ public class DeviceCommunicationJob implements Job {
@Override @Override
public void execute(JobExecutionContext context) { public void execute(JobExecutionContext context) {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
JobDataMap data = null;
String fStoreyIdStr = null; String fStoreyIdStr = null;
Long fStoreyId = null; Long fStoreyId = null;
TStoreyInfo tStoreyInfo = null; TStoreyInfo tStoreyInfo = null;
String ip = null; String ip = null;
try { try {
log.info("=== DeviceCommunicationJob 启动:fStoreyIdStr={},线程={} ===", log.info("=== DeviceCommunicationJob 启动:线程={},开始时间={} ===",
fStoreyIdStr, Thread.currentThread().getName()); Thread.currentThread().getName(), new Date(startTime));
// 1. 提取参数(严格校验,避免后续异常) // 1. 提取参数(严格校验,避免后续异常)
data = context.getJobDetail().getJobDataMap(); JobDataMap data = context.getJobDetail().getJobDataMap();
if (data == null) { if (data == null) {
log.error("JobDataMap为空,终止执行"); String errMsg = "JobDataMap为空,终止执行";
log.error(errMsg);
recordAlarm(null, "fStoreyId未知", errMsg);
return; return;
} }
fStoreyIdStr = data.getString("fStoreyId"); fStoreyIdStr = data.getString("fStoreyId");
if (StringUtils.isBlank(fStoreyIdStr)) { if (StringUtils.isBlank(fStoreyIdStr)) {
log.error("fStoreyId参数为空,终止执行"); String errMsg = "fStoreyId参数为空,终止执行";
recordAlarm(null, "fStoreyId为空", "通信任务异常:fStoreyId参数为空"); log.error(errMsg);
recordAlarm(null, "fStoreyId为空", errMsg);
return; return;
} }
...@@ -74,104 +84,106 @@ public class DeviceCommunicationJob implements Job { ...@@ -74,104 +84,106 @@ public class DeviceCommunicationJob implements Job {
try { try {
fStoreyId = Long.parseLong(fStoreyIdStr); fStoreyId = Long.parseLong(fStoreyIdStr);
} catch (NumberFormatException e) { } catch (NumberFormatException e) {
log.error("fStoreyId格式错误:{},终止执行", fStoreyIdStr); String errMsg = "fStoreyId格式错误:" + fStoreyIdStr;
recordAlarm(null, fStoreyIdStr, "通信任务异常:fStoreyId格式错误:" + fStoreyIdStr); log.error(errMsg, e);
recordAlarm(null, fStoreyIdStr, errMsg + ":" + e.getMessage());
return; return;
} }
// 3. 查询设备信息(双重校验+异常捕获 // 3. 查询设备信息(捕获数据库异常,避免穿透
try { try {
tStoreyInfo = tStoreyInfoMapper.selectTStoreyInfoById(fStoreyId); tStoreyInfo = tStoreyInfoMapper.selectTStoreyInfoById(fStoreyId);
} catch (Exception e) { } catch (Exception e) {
log.error("查询设备信息异常:fStoreyId={}", fStoreyId, e); String errMsg = "查询设备信息异常:fStoreyId=" + fStoreyId;
recordAlarm(null, fStoreyIdStr, "通信任务异常:查询设备信息失败:" + e.getMessage()); log.error(errMsg, e);
recordAlarm(null, fStoreyIdStr, errMsg + ":" + e.getMessage());
return; return;
} }
if (tStoreyInfo == null) { if (tStoreyInfo == null) {
log.error("未查询到设备信息:fStoreyId={},清理无效任务", fStoreyId); String errMsg = "未查询到设备信息:fStoreyId=" + fStoreyId;
// 清理任务(单独捕获SchedulerException,避免异常穿透) log.error(errMsg);
recordAlarm(null, fStoreyIdStr, errMsg);
// 清理无效任务(避免后续重复执行)
try { try {
context.getScheduler().deleteJob(context.getJobDetail().getKey()); context.getScheduler().deleteJob(context.getJobDetail().getKey());
log.info("清理无效任务:fStoreyId={}", fStoreyId);
} catch (SchedulerException e) { } catch (SchedulerException e) {
log.error("清理无效任务失败:fStoreyId={}", fStoreyId, e); log.error("清理无效任务失败:fStoreyId={}", fStoreyId, e);
} }
recordAlarm(null, fStoreyIdStr, "通信任务异常:未查询到设备信息,已清理任务");
return; return;
} }
ip = tStoreyInfo.getfIp(); ip = tStoreyInfo.getfIp();
if (StringUtils.isBlank(ip)) { if (StringUtils.isBlank(ip)) {
log.error("设备IP为空:fStoreyId={},终止执行", fStoreyId); String errMsg = "设备IP为空:fStoreyId=" + fStoreyId;
recordAlarm(tStoreyInfo, "设备IP为空", "通信任务异常:设备IP为空"); log.error(errMsg);
recordAlarm(tStoreyInfo, errMsg, errMsg);
return; return;
} }
// 4. 校验依赖组件(避免空指针) // 4. 校验依赖组件(避免空指针)
if (resultHandler == null) { if (resultHandler == null) {
log.error("ModbusResultHandler未初始化,终止执行"); String errMsg = "ModbusResultHandler未初始化:fStoreyId=" + fStoreyId;
recordAlarm(tStoreyInfo, "ResultHandler未初始化", "通信任务异常:ModbusResultHandler未初始化"); log.error(errMsg);
recordAlarm(tStoreyInfo, errMsg, errMsg);
return; return;
} }
// 校验停止条件(避免NullPointerException)
Predicate<int[]> stopCondition = ModbusResultHandler.createDefaultStopCondition(); Predicate<int[]> stopCondition = ModbusResultHandler.createDefaultStopCondition();
if (stopCondition == null) {
String errMsg = "Modbus停止条件未初始化:fStoreyId=" + fStoreyId;
log.error(errMsg);
recordAlarm(tStoreyInfo, errMsg, errMsg);
return;
}
// 5. 核心逻辑:按端口顺序执行通信(避免并行线程竞争,简化时序)
log.info("开始设备通信:fStoreyId={},IP={},端口501/502/503", fStoreyId, ip);
executePortCommunication(ip, 501, OFFSETS_501, stopCondition, fStoreyId, tStoreyInfo);
executePortCommunication(ip, 502, OFFSETS_502, stopCondition, fStoreyId, tStoreyInfo);
executePortCommunication(ip, 503, OFFSETS_503, stopCondition, fStoreyId, tStoreyInfo);
// 5. 设备ID列表(拆分后便于管理) // 6. 校验总耗时(确保未超Quartz超时)
List<Integer> offsets1 = Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27); long totalCost = System.currentTimeMillis() - startTime;
List<Integer> offsets2 = Arrays.asList(28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54); if (totalCost > JOB_TOTAL_TIMEOUT * 1000) {
List<Integer> offsets3 = Arrays.asList(55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72); String warnMsg = "任务执行超时:fStoreyId=" + fStoreyId + ",耗时=" + totalCost + "ms(阈值=" + JOB_TOTAL_TIMEOUT + "s)";
log.warn(warnMsg);
// 6. 多端口并行通信(使用临时线程池,避免静态线程池问题) recordAlarm(tStoreyInfo, "任务超时", warnMsg);
final String finalIp = ip; } else {
final Long finalFStoreyId = fStoreyId; log.info("=== DeviceCommunicationJob 成功:fStoreyId={},总耗时={}ms ===", fStoreyId, totalCost);
final TStoreyInfo finalTStoreyInfo = tStoreyInfo; }
Future<?> port1Future = portExecutor.submit(() -> {
executePortCommunication(finalIp, 501, offsets1, resultHandler, stopCondition, finalFStoreyId, finalTStoreyInfo);
});
Future<?> port2Future = portExecutor.submit(() -> {
executePortCommunication(finalIp, 502, offsets2, resultHandler, stopCondition, finalFStoreyId, finalTStoreyInfo);
});
Future<?> port3Future = portExecutor.submit(() -> {
executePortCommunication(finalIp, 503, offsets3, resultHandler, stopCondition, finalFStoreyId, finalTStoreyInfo);
});
// 等待所有端口完成(总超时 = 单个端口超时 * 1.2,避免无限等待)
port1Future.get(PORT_TIMEOUT_SECONDS * 1200, TimeUnit.MILLISECONDS);
port2Future.get(PORT_TIMEOUT_SECONDS * 1200, TimeUnit.MILLISECONDS);
port3Future.get(PORT_TIMEOUT_SECONDS * 1200, TimeUnit.MILLISECONDS);
// 7. 校验执行时间(确保未超过Cron周期)
long costTime = System.currentTimeMillis() - startTime;
if (costTime > 240000) { // 超过4分钟(Cron为5分钟,留1分钟缓冲)
log.warn("任务执行超时:{}ms(Cron周期5分钟),可能导致下一次任务延迟", costTime);
recordAlarm(tStoreyInfo, "任务超时", "通信任务警告:执行时间过长:" + costTime + "ms");
}
log.info("=== DeviceCommunicationJob 成功:fStoreyId={},耗时={}ms ===", finalFStoreyId, costTime);
} catch (Throwable e) { } catch (Throwable e) {
// 8. 捕获所有异常(确保不传播到Quartz) // 7. 捕获所有异常(确保不传播到Quartz,避免Trigger变ERROR)
log.error("=== DeviceCommunicationJob 致命异常:fStoreyIdStr={} ===", fStoreyIdStr, e); String errMsg = "DeviceCommunicationJob 致命异常:fStoreyIdStr=" + fStoreyIdStr;
recordAlarm(tStoreyInfo, fStoreyIdStr, "通信任务致命异常:" + e.getMessage()); log.error(errMsg, e);
recordAlarm(tStoreyInfo, fStoreyIdStr, errMsg + ":" + e.getMessage());
} finally { } finally {
// 9. 强制释放资源(避免内存泄漏) long totalCost = System.currentTimeMillis() - startTime;
portExecutor.shutdown(); // 关闭临时线程池 log.info("=== DeviceCommunicationJob 结束:fStoreyIdStr={},总耗时={}ms ===", fStoreyIdStr, totalCost);
long costTime = System.currentTimeMillis() - startTime;
log.info("=== DeviceCommunicationJob 结束:fStoreyIdStr={},总耗时={}ms ===", fStoreyIdStr, costTime);
} }
} }
/** /**
* 单个端口的Modbus通信(独立方法,便于异常管控 * 单个端口通信(同步执行,简化时序,便于超时控制
*/ */
private void executePortCommunication(String ip, int port, List<Integer> deviceIds, private void executePortCommunication(String ip, int port, List<Integer> deviceIds,
ModbusResultHandler resultHandler, Predicate<int[]> stopCondition, Predicate<int[]> stopCondition, Long fStoreyId, TStoreyInfo tStoreyInfo) {
Long fStoreyId, TStoreyInfo tStoreyInfo) { long portStartTime = System.currentTimeMillis();
log.info("端口{}通信开始:fStoreyId={},设备数={},开始时间={}",
port, fStoreyId, deviceIds.size(), new Date(portStartTime));
try { try {
log.info("开始端口{}通信:fStoreyId={},设备数={}", port, fStoreyId, deviceIds.size()); // 调用Modbus服务(同步执行,依赖内部超时控制)
deviceStatusReaderAndTimeSetter.startMultiDeviceMonitoring(ip, port, deviceIds, resultHandler, stopCondition); deviceStatusReaderAndTimeSetter.startMultiDeviceMonitoring(
log.info("端口{}通信完成:fStoreyId={}", port, fStoreyId); ip, port, deviceIds, resultHandler, stopCondition, SINGLE_PORT_TIMEOUT);
long portCost = System.currentTimeMillis() - portStartTime;
log.info("端口{}通信完成:fStoreyId={},耗时={}ms", port, fStoreyId, portCost);
} catch (Exception e) { } catch (Exception e) {
log.error("端口{}通信异常:fStoreyId={}", port, fStoreyId, e); long portCost = System.currentTimeMillis() - portStartTime;
recordAlarm(tStoreyInfo, "端口" + port + "异常", "Modbus通信异常:端口" + port + "失败:" + e.getMessage()); String errMsg = "端口" + port + "通信异常:fStoreyId=" + fStoreyId;
log.error(errMsg + ",耗时=" + portCost + "ms", e);
recordAlarm(tStoreyInfo, "端口" + port + "异常", errMsg + ":" + e.getMessage());
} }
} }
// 简化方法签名,不需要传入ip和fStoreyId参数 // 简化方法签名,不需要传入ip和fStoreyId参数
......
...@@ -31,8 +31,12 @@ public class DeviceTaskScheduler { ...@@ -31,8 +31,12 @@ public class DeviceTaskScheduler {
// 新增:每个设备任务的Quartz线程隔离(避免任务间干扰) // 新增:每个设备任务的Quartz线程隔离(避免任务间干扰)
private static final String THREAD_GROUP = "DEVICE_THREAD_GROUP"; private static final String THREAD_GROUP = "DEVICE_THREAD_GROUP";
// 常量:Cron周期(5分钟)、任务有效期(7天)
private static final String CRON_EXPRESSION = "0 0/5 * * * ?";
private static final int TASK_VALID_DAYS = 7;
/** /**
* 创建设备监控任务(入口方法 * 创建设备监控任务(入口:增加调度器健康检查、任务去重
*/ */
public void scheduleDeviceMonitoring(Long fStoreyId, String fPowerOutageIp, Integer fPowerOutagePort) { public void scheduleDeviceMonitoring(Long fStoreyId, String fPowerOutageIp, Integer fPowerOutagePort) {
if (fStoreyId == null || fPowerOutageIp == null || fPowerOutagePort == null) { if (fStoreyId == null || fPowerOutageIp == null || fPowerOutagePort == null) {
...@@ -43,63 +47,64 @@ public class DeviceTaskScheduler { ...@@ -43,63 +47,64 @@ public class DeviceTaskScheduler {
try { try {
log.info("=== 开始创建设备监控任务:fStoreyId={} ===", fStoreyId); log.info("=== 开始创建设备监控任务:fStoreyId={} ===", fStoreyId);
// 1. 确保调度器已启动(若依框架可能延迟启动) // 1. 调度器健康检查(确保线程池可用)
if (!scheduler.isStarted()) { checkSchedulerHealth();
log.warn("调度器未启动,手动启动...");
scheduler.start();
}
// 关键:添加调度器状态监控,确保线程池可用 // 2. 任务去重(避免重复创建导致资源竞争)
SchedulerMetaData metaData = scheduler.getMetaData(); if (isTaskExists("COMM_" + fStoreyId)) {
log.info("Quartz线程池状态:核心线程={} 任务总数={}", log.info("通信任务[COMM_{}]已存在,无需重复创建", fStoreyId);
metaData.getThreadPoolSize(), // 检查现有触发器状态,若为ERROR则重建
metaData.getNumberOfJobsExecuted()); if (isTriggerError("COMM_" + fStoreyId)) {
log.warn("通信任务[COMM_{}]触发器状态为ERROR,重建触发器", fStoreyId);
createHourlyCommunicationJob(fStoreyId);
}
return;
}
// 2. 创建两个核心任务 // 3. 创建核心任务
createHourlyCommunicationJob(fStoreyId); createHourlyCommunicationJob(fStoreyId);
createFinalExecutionJob(fStoreyId, fPowerOutageIp, fPowerOutagePort); createFinalExecutionJob(fStoreyId, fPowerOutageIp, fPowerOutagePort);
// 3. 验证任务是否创建成功(关键:打印触发器状态)
checkTaskStatus(fStoreyId); checkTaskStatus(fStoreyId);
log.info("=== 设备监控任务创建完成:fStoreyId={} ===", fStoreyId); log.info("=== 设备监控任务创建完成:fStoreyId={} ===", fStoreyId);
} catch (SchedulerException e) { } catch (SchedulerException e) {
log.error("=== 创建设备监控任务失败:fStoreyId={} ===", fStoreyId, e); log.error("=== 创建设备监控任务失败:fStoreyId={} ===", fStoreyId, e);
cleanInvalidTask(fStoreyId); // 失败时清理残留
throw new RuntimeException("Quartz任务调度失败", e); throw new RuntimeException("Quartz任务调度失败", e);
} }
} }
/** /**
* 1. 创建每2分钟执行的通信任务(CronTrigger * 1. 创建每5分钟执行的通信任务(核心优化:简化调度逻辑、调整Misfire策略
*/ */
private void createHourlyCommunicationJob(Long fStoreyId) throws SchedulerException { private void createHourlyCommunicationJob(Long fStoreyId) throws SchedulerException {
String jobId = "COMM_" + fStoreyId; String jobId = "COMM_" + fStoreyId;
JobKey jobKey = new JobKey(jobId, JOB_GROUP); JobKey jobKey = new JobKey(jobId, JOB_GROUP);
TriggerKey triggerKey = new TriggerKey(jobId + "_TRIGGER", TRIGGER_GROUP); TriggerKey triggerKey = new TriggerKey(jobId + "_TRIGGER", TRIGGER_GROUP);
// 1. 构建JobDetail(仅定义任务元数据,不涉及调度规则 // 1. 构建JobDetail(仅元数据,禁止并发执行通过注解控制
JobDetail job = JobBuilder.newJob(DeviceCommunicationJob.class) JobDetail job = JobBuilder.newJob(DeviceCommunicationJob.class)
.withIdentity(jobKey) .withIdentity(jobKey)
.withDescription("设备" + fStoreyId + "每5分钟Modbus通信任务") .withDescription("设备" + fStoreyId + "每5分钟Modbus通信任务")
.usingJobData("fStoreyId", fStoreyId.toString()) .usingJobData("fStoreyId", fStoreyId.toString()) // 传递参数(String避免类型问题)
.storeDurably(false) .storeDurably(false) // 触发器删除后Job自动失效
.requestRecovery(true) // 服务重启后恢复未完成任务 .requestRecovery(true) // 服务重启后恢复未完成任务
.build(); // 移除错误的 withSchedule() 调用 .build();
// 2. 构建CronTrigger(调度规则在此配置 // 2. 构建CronTrigger(移除withMisfireThreshold,兼容低版本
CronTrigger trigger = TriggerBuilder.newTrigger() CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey) .withIdentity(triggerKey)
.forJob(jobKey) .forJob(jobKey)
.withDescription("设备" + fStoreyId + "通信任务触发器(每5分钟)") .withDescription("设备" + fStoreyId + "通信任务触发器(每5分钟)")
.withSchedule(CronScheduleBuilder.cronSchedule("0 0/5 * * * ?") .withSchedule(CronScheduleBuilder.cronSchedule(CRON_EXPRESSION)
// 关键:Misfire策略(错过触发后忽略,避免集中执行 // Misfire策略:错过触发后立即执行,再按原计划(低版本支持
.withMisfireHandlingInstructionDoNothing()) .withMisfireHandlingInstructionFireAndProceed())
.startNow() .startNow()
.endAt(Date.from(Instant.now().plus(7, ChronoUnit.DAYS))) // 7天有效期 .endAt(Date.from(Instant.now().plus(TASK_VALID_DAYS, ChronoUnit.DAYS)))
.build(); .build();
// 3. 原子操作:创建/更新任务 // 3. 原子操作:创建/更新(优先更新,避免删除重建)
if (scheduler.checkExists(jobKey)) { if (scheduler.checkExists(jobKey)) {
Date nextFireTime = scheduler.rescheduleJob(triggerKey, trigger); Date nextFireTime = scheduler.rescheduleJob(triggerKey, trigger);
log.info("通信任务[{}]更新触发器成功,下次执行时间:{}", jobId, nextFireTime); log.info("通信任务[{}]更新触发器成功,下次执行时间:{}", jobId, nextFireTime);
...@@ -109,15 +114,15 @@ public class DeviceTaskScheduler { ...@@ -109,15 +114,15 @@ public class DeviceTaskScheduler {
} }
} }
/** /**
* 2. 创建5分钟后执行的最终任务(SimpleTrigger,仅执行一次 * 2. 创建15分钟后执行的最终任务(保持原逻辑,优化超时
*/ */
private void createFinalExecutionJob(Long fStoreyId, String fPowerOutageIp, Integer fPowerOutagePort) throws SchedulerException { private void createFinalExecutionJob(Long fStoreyId, String fPowerOutageIp, Integer fPowerOutagePort) throws SchedulerException {
String jobId = "FINAL_" + fStoreyId; String jobId = "FINAL_" + fStoreyId;
JobKey jobKey = new JobKey(jobId, JOB_GROUP); JobKey jobKey = new JobKey(jobId, JOB_GROUP);
TriggerKey triggerKey = new TriggerKey(jobId + "_TRIGGER", TRIGGER_GROUP); TriggerKey triggerKey = new TriggerKey(jobId + "_TRIGGER", TRIGGER_GROUP);
// 构建JobDetail
JobDetail job = JobBuilder.newJob(FinalExecutionJob.class) JobDetail job = JobBuilder.newJob(FinalExecutionJob.class)
.withIdentity(jobKey) .withIdentity(jobKey)
.withDescription("设备" + fStoreyId + "最终执行任务(仅一次)") .withDescription("设备" + fStoreyId + "最终执行任务(仅一次)")
...@@ -128,7 +133,6 @@ public class DeviceTaskScheduler { ...@@ -128,7 +133,6 @@ public class DeviceTaskScheduler {
.requestRecovery(true) .requestRecovery(true)
.build(); .build();
// 构建SimpleTrigger
Date executeTime = Date.from(Instant.now().plus(15, ChronoUnit.MINUTES)); Date executeTime = Date.from(Instant.now().plus(15, ChronoUnit.MINUTES));
SimpleTrigger trigger = TriggerBuilder.newTrigger() SimpleTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey) .withIdentity(triggerKey)
...@@ -136,11 +140,10 @@ public class DeviceTaskScheduler { ...@@ -136,11 +140,10 @@ public class DeviceTaskScheduler {
.withDescription("设备" + fStoreyId + "最终任务触发器") .withDescription("设备" + fStoreyId + "最终任务触发器")
.startAt(executeTime) .startAt(executeTime)
.withSchedule(SimpleScheduleBuilder.simpleSchedule() .withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withMisfireHandlingInstructionFireNow() // 错过触发立即执行 .withMisfireHandlingInstructionFireNow() // 错过立即执行
.withRepeatCount(0)) // 仅执行一次 .withRepeatCount(0)) // 仅一次
.build(); .build();
// 原子操作:创建/更新
if (scheduler.checkExists(jobKey)) { if (scheduler.checkExists(jobKey)) {
Date nextFireTime = scheduler.rescheduleJob(triggerKey, trigger); Date nextFireTime = scheduler.rescheduleJob(triggerKey, trigger);
log.info("最终任务[{}]更新触发器成功,执行时间:{}", jobId, nextFireTime); log.info("最终任务[{}]更新触发器成功,执行时间:{}", jobId, nextFireTime);
...@@ -149,6 +152,38 @@ public class DeviceTaskScheduler { ...@@ -149,6 +152,38 @@ public class DeviceTaskScheduler {
log.info("最终任务[{}]创建成功,执行时间:{}", jobId, nextFireTime); log.info("最终任务[{}]创建成功,执行时间:{}", jobId, nextFireTime);
} }
} }
// ------------------------------ 工具方法:增强稳定性 ------------------------------
/**
* 检查Quartz调度器健康状态(避免线程池耗尽)
*/
private void checkSchedulerHealth() throws SchedulerException {
if (!scheduler.isStarted()) {
log.warn("调度器未启动,手动启动...");
scheduler.start();
}
SchedulerMetaData metaData = scheduler.getMetaData();
// 低版本兼容:获取线程池大小和已执行任务数(替代活跃线程数)
int poolSize = metaData.getThreadPoolSize();
long executedJobs = metaData.getNumberOfJobsExecuted();
log.info("Quartz健康状态:线程池大小={}, 已执行任务数={}", poolSize, executedJobs);
// 线程池大小预警(根据实际需求调整阈值)
if (poolSize < 5) {
log.warn("Quartz线程池过小(当前={}),可能导致任务延迟", poolSize);
}
}
/**
* 检查触发器是否为ERROR状态
*/
private boolean isTriggerError(String jobId) throws SchedulerException {
TriggerKey triggerKey = new TriggerKey(jobId + "_TRIGGER", TRIGGER_GROUP);
if (!scheduler.checkExists(triggerKey)) {
return false;
}
Trigger.TriggerState state = scheduler.getTriggerState(triggerKey);
return state == Trigger.TriggerState.ERROR;
}
/** /**
* 检查任务是否已存在(避免重复创建) * 检查任务是否已存在(避免重复创建)
*/ */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment