Commit bc577bd4 authored by wanghao's avatar wanghao

1 测试 上电后通信 和 最终完成 定时任务功能

parent ad2e73e8
......@@ -15,6 +15,7 @@ import com.zehong.system.service.ITEquipmentAlarmDataService;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.net.Socket;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Consumer;
......@@ -35,18 +36,31 @@ public class DeviceStatusReaderAndTimeSetter {
public static final int MAX_RETRIES = 3;
public static final int RETRY_DELAY = 500;
public static final int TIMEOUT_MINUTES = 5;
public static final int TIMEOUT_SECONDS = 30; // 总超时30秒
// 工厂
private static final ModbusFactory modbusFactory = new ModbusFactory();
// 1. 关键优化:固定线程池大小(不随设备数变化,根据CPU核数设置,如10)
// 线程池优化:添加线程工厂,便于问题追踪
private static final ExecutorService FIXED_THREAD_POOL = new ThreadPoolExecutor(
5, // 核心线程数(常驻线程)
10, // 最大线程数(峰值线程)
60, // 空闲线程存活时间(60秒)
5,
10,
60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100), // 任务队列(缓冲100个任务,避免任务堆积)
new ThreadPoolExecutor.CallerRunsPolicy() // 队列满时,由调用线程执行(避免任务丢失)
new LinkedBlockingQueue<>(100),
new ThreadFactory() {
private int count = 0;
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "modbus-pool-" + (count++));
thread.setDaemon(true); // 设为守护线程,避免阻塞JVM退出
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
@Resource
private ITEquipmentAlarmDataService alarmDataService;
......@@ -152,52 +166,75 @@ public class DeviceStatusReaderAndTimeSetter {
Consumer<DeviceStatusReaderDto> resultHandler,
Predicate<int[]> stopCondition ) {
if (deviceIds == null || deviceIds.isEmpty()) {
System.out.println("⚠️ 警告: 设备ID列表为空,不执行监控");
log.warn("⚠️ 设备ID列表为空,不执行监控");
return;
}
final CountDownLatch latch = new CountDownLatch(deviceIds.size());
log.info("开始监控设备:IP={}, 端口={}, 设备数={}(线程池活跃线程:{})",
ip, port, deviceIds.size(), ((ThreadPoolExecutor) FIXED_THREAD_POOL).getActiveCount());
for (int deviceId : deviceIds) {
final int devId = deviceId;
FIXED_THREAD_POOL.submit(() -> {
ModbusMaster threadMaster = null;
Socket underlyingSocket = null; // 新增:跟踪底层Socket
try {
// 1 获取 线程专有的Modbus连接
// 1. 创建Modbus连接并获取底层Socket(反射方式,根据Modbus4j版本调整)
threadMaster = createModbusMaster(ip, port);
// 注意:以下反射代码依赖Modbus4j内部实现,不同版本可能需要调整
// 目的是获取底层Socket,确保关闭
Object transport = threadMaster.getClass().getDeclaredField("transport").get(threadMaster);
underlyingSocket = (Socket) transport.getClass().getDeclaredField("socket").get(transport);
// 2 初始化线程专有的Modbus连接 并尝试读取数据
// 为什么传了 master 还传 ip 和 port ,因为 master 要读完后才释放,而 ip 和 port 是为了log用
int[] result = readWithConditionalRetry(threadMaster,ip, port, devId, stopCondition);
// 2. 读取数据(带超时控制)
int[] result = readWithConditionalRetry(threadMaster, ip, port, devId, stopCondition);
// 3. 处理结果(判空避免NPE)
if (resultHandler != null) {
resultHandler.accept(new DeviceStatusReaderDto(ip, port, devId, result));
}
// 创建包含完整信息的结果对象
DeviceStatusReaderDto deviceStatusReaderDto = new DeviceStatusReaderDto(ip, port, devId, result);
// 3 设置回调数据
resultHandler.accept(deviceStatusReaderDto);
} catch (Throwable e) {
log.error("设备{}: 监控任务异常", devId, e);
// 记录告警(区分Error类型)
String alarmMsg = e instanceof OutOfMemoryError ? "内存溢出:" + e.getMessage() : "监控任务异常:" + e.getMessage();
recordAlarm("03", "ip:" + ip + ",port:" + port + ",deviceId:" + devId, alarmMsg);
} finally {
if (threadMaster != null) threadMaster.destroy();
// 关键修复:优先关闭底层Socket,再销毁Master
if (underlyingSocket != null) {
try {
if (!underlyingSocket.isClosed()) {
underlyingSocket.close();
log.debug("设备{}: 底层Socket已关闭", devId);
}
} catch (Exception e) {
log.error("设备{}: Socket关闭失败", devId, e);
}
}
if (threadMaster != null) {
try {
threadMaster.destroy();
log.debug("设备{}: ModbusMaster已销毁", devId);
} catch (Exception e) {
log.error("设备{}: ModbusMaster销毁失败", devId, e);
}
}
latch.countDown();
log.info("当前 - 设备ip:{} port:{} device:{}的监控任务完成", ip,port,deviceId);
log.info("设备{}: 监控任务完成(剩余:{})", devId, latch.getCount());
}
});
}
// 8. 等待任务完成(优化超时时间,避免长期阻塞)
// 等待任务完成,超时后中断未完成任务
try {
// 超时时间设为30秒(根据设备数调整,确保不超过Job执行周期)
if (!latch.await(30, TimeUnit.SECONDS)) {
log.warn("IP{}: 部分设备监控超时(未完成设备数:{})", ip, latch.getCount());
recordAlarm("03", "ip:" + ip + ",port:" + port, "部分设备监控超时");
if (!latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
log.warn("IP{}: 部分设备监控超时(未完成:{}),强制中断", ip, latch.getCount());
recordAlarm("03", "ip:" + ip + ",port:" + port, "部分设备监控超时(未完成:" + latch.getCount() + ")");
// 注意:线程池任务无法直接中断,需通过任务内部的中断机制(如Thread.interrupted())
}
} catch (InterruptedException e) {
log.error("IP{}: 监控任务被中断", ip, e);
Thread.currentThread().interrupt(); // 恢复中断状态
Thread.currentThread().interrupt();
}
}
/**
......
......@@ -11,7 +11,6 @@ import com.zehong.system.modbus.util.Modbus4jUtils;
import com.zehong.system.service.ITEquipmentAlarmDataService;
import com.zehong.system.service.ITStoreyInfoService;
import com.zehong.system.service.ITTrayInfoService;
import org.quartz.SchedulerException;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
......
......@@ -19,6 +19,7 @@ import javax.annotation.Resource;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
/**
* @author lenovo
......@@ -38,10 +39,9 @@ public class DeviceCommunicationJob implements Job {
@Autowired
private ModbusResultHandler resultHandler;
@Override
public void execute(JobExecutionContext context) {
// 1. 初始化所有变量,避免空指针
long startTime = System.currentTimeMillis();
JobDataMap data = null;
String fStoreyIdStr = null;
Long fStoreyId = null;
......@@ -49,7 +49,10 @@ public class DeviceCommunicationJob implements Job {
String ip = null;
try {
// 2. 提取参数(每一步都校验,避免空指针)
log.info("=== DeviceCommunicationJob 开始执行:fStoreyIdStr={}(当前线程:{}) ===",
fStoreyIdStr, Thread.currentThread().getName());
// 提取并校验参数
data = context.getJobDetail().getJobDataMap();
if (data == null) {
log.error("JobDataMap为空,终止执行");
......@@ -61,7 +64,7 @@ public class DeviceCommunicationJob implements Job {
return;
}
// 3. 转换参数(处理NumberFormatException)
// 转换参数
try {
fStoreyId = Long.parseLong(fStoreyIdStr);
} catch (NumberFormatException e) {
......@@ -69,65 +72,112 @@ public class DeviceCommunicationJob implements Job {
return;
}
// 4. 查询设备信息(校验DB查询结果)
// 查询设备信息
tStoreyInfo = tStoreyInfoMapper.selectTStoreyInfoById(fStoreyId);
if (tStoreyInfo == null) {
log.error("未查询到设备信息:fStoreyId={},终止执行", fStoreyId);
log.error("未查询到设备信息:fStoreyId={},终止执行并清理任务", fStoreyId);
context.getScheduler().deleteJob(context.getJobDetail().getKey());
return;
}
ip = tStoreyInfo.getfIp();
if (StringUtils.isBlank(ip)) {
log.error("设备IP为空:fStoreyId={},终止执行", fStoreyId);
recordAlarm(tStoreyInfo, "设备IP为空,无法执行通信任务");
return;
}
// 校验resultHandler
if (resultHandler == null) {
log.error("ModbusResultHandler未初始化,终止执行");
recordAlarm(tStoreyInfo, "ModbusResultHandler未初始化");
return;
}
// 5. 执行Modbus通信(业务逻辑,每个调用都加try-catch)
// 将变量声明为final,供匿名内部类使用
final String finalIp = ip;
final Long finalFStoreyId = fStoreyId;
final TStoreyInfo finalTStoreyInfo = tStoreyInfo;
// 设备ID列表
List<Integer> offsets1 = Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27);
List<Integer> offsets2 = Arrays.asList(28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54);
List<Integer> offsets3 = Arrays.asList(55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72);
// 每个Modbus调用单独捕获异常,避免一个设备失败导致整个Job报错
try {
deviceStatusReaderAndTimeSetter.startMultiDeviceMonitoring(ip, 501, offsets1, resultHandler, ModbusResultHandler.createDefaultStopCondition());
log.info("Modbus 501端口通信完成:fStoreyId={}", fStoreyId);
} catch (Exception e) {
log.error("Modbus 501端口通信异常:fStoreyId={}", fStoreyId, e);
// 仅记录日志,不抛出异常
}
try {
deviceStatusReaderAndTimeSetter.startMultiDeviceMonitoring(ip, 502, offsets2, resultHandler, ModbusResultHandler.createDefaultStopCondition());
log.info("Modbus 502端口通信完成:fStoreyId={}", fStoreyId);
} catch (Exception e) {
log.error("Modbus 502端口通信异常:fStoreyId={}", fStoreyId, e);
}
try {
deviceStatusReaderAndTimeSetter.startMultiDeviceMonitoring(ip, 503, offsets3, resultHandler, ModbusResultHandler.createDefaultStopCondition());
log.info("Modbus 503端口通信完成:fStoreyId={}", fStoreyId);
} catch (Exception e) {
log.error("Modbus 503端口通信异常:fStoreyId={}", fStoreyId, e);
// 使用final变量
executeWithTimeout(() -> {
deviceStatusReaderAndTimeSetter.startMultiDeviceMonitoring(finalIp, 501, offsets1, resultHandler, ModbusResultHandler.createDefaultStopCondition());
log.info("Modbus 501端口通信完成:fStoreyId={}", finalFStoreyId);
}, 10, TimeUnit.SECONDS, finalTStoreyInfo, "501端口通信超时", finalIp, finalFStoreyId);
executeWithTimeout(() -> {
deviceStatusReaderAndTimeSetter.startMultiDeviceMonitoring(finalIp, 502, offsets2, resultHandler, ModbusResultHandler.createDefaultStopCondition());
log.info("Modbus 502端口通信完成:fStoreyId={}", finalFStoreyId);
}, 10, TimeUnit.SECONDS, finalTStoreyInfo, "502端口通信超时", finalIp, finalFStoreyId);
executeWithTimeout(() -> {
deviceStatusReaderAndTimeSetter.startMultiDeviceMonitoring(finalIp, 503, offsets3, resultHandler, ModbusResultHandler.createDefaultStopCondition());
log.info("Modbus 503端口通信完成:fStoreyId={}", finalFStoreyId);
}, 10, TimeUnit.SECONDS, finalTStoreyInfo, "503端口通信超时", finalIp, finalFStoreyId);
// 校验执行时间
long costTime = System.currentTimeMillis() - startTime;
if (costTime > 110000) {
log.warn("任务执行时间过长:{}ms(接近Cron周期),可能导致任务叠加", costTime);
recordAlarm(finalTStoreyInfo, "任务执行时间过长:" + costTime + "ms");
}
log.info("=== DeviceCommunicationJob 执行成功:fStoreyId={} ===", fStoreyId);
log.info("=== DeviceCommunicationJob 执行成功:fStoreyId={}(耗时:{}ms) ===", finalFStoreyId, costTime);
} catch (Throwable e) {
// 6. 捕获所有异常(包括Error,如NoClassDefFoundError)
log.error("=== DeviceCommunicationJob 执行致命异常:fStoreyId={} ===", fStoreyIdStr, e);
// 记录告警(内部异常也不抛出,确保Job不传播任何错误)
log.error("=== DeviceCommunicationJob 执行致命异常:fStoreyIdStr={} ===", fStoreyIdStr, e);
try {
if (tStoreyInfo != null && StringUtils.isNotBlank(tStoreyInfo.getfStoreyCode())) {
recordAlarm(tStoreyInfo, "通信任务异常:" + e.getMessage());
} else {
TEquipmentAlarmData alarm = new TEquipmentAlarmData();
alarm.setfAlarmType("03");
alarm.setfEquipmentCode(tStoreyInfo.getfStoreyCode());
alarm.setfAlarmData("通信任务异常:" + e.getMessage());
alarm.setfEquipmentCode(fStoreyIdStr);
alarm.setfAlarmData("通信任务异常(设备信息缺失):" + e.getMessage());
alarm.setfCreateTime(new Date());
alarmDataService.insertTEquipmentAlarmData(alarm);
}
} catch (Exception alarmEx) {
log.error("=== 告警记录失败(不影响触发器) ===", alarmEx);
}
// 绝对禁止抛出任何异常!!!
} finally {
log.info("=== DeviceCommunicationJob 执行结束:fStoreyId={} ===", fStoreyIdStr);
long costTime = System.currentTimeMillis() - startTime;
log.info("=== DeviceCommunicationJob 执行结束:fStoreyIdStr={}(总耗时:{}ms) ===", fStoreyIdStr, costTime);
}
}
// 简化方法签名,不需要传入ip和fStoreyId参数
private void executeWithTimeout(Runnable task, long timeout, TimeUnit unit,
TStoreyInfo tStoreyInfo, String timeoutMsg,
String ip, Long fStoreyId) {
try {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(task);
future.get(timeout, unit);
executor.shutdown();
} catch (TimeoutException e) {
log.error("{}:fStoreyId={}, ip={}", timeoutMsg, fStoreyId, ip, e);
recordAlarm(tStoreyInfo, timeoutMsg + "(IP:" + ip + ",设备ID:" + fStoreyId + ")");
} catch (Exception e) {
log.error("{}执行异常:fStoreyId={}, ip={}", timeoutMsg, fStoreyId, ip, e);
recordAlarm(tStoreyInfo, timeoutMsg + "执行异常(IP:" + ip + ",设备ID:" + fStoreyId + "):" + e.getMessage());
}
}
private void recordAlarm(TStoreyInfo tStoreyInfo, String alarmData) {
try {
TEquipmentAlarmData alarm = new TEquipmentAlarmData();
alarm.setfAlarmType("03");
alarm.setfEquipmentCode(tStoreyInfo.getfStoreyCode());
alarm.setfAlarmData(alarmData);
alarm.setfCreateTime(new Date());
alarmDataService.insertTEquipmentAlarmData(alarm);
} catch (Exception e) {
log.error("记录告警失败:{}", alarmData, e);
}
}
}
......@@ -27,6 +27,8 @@ public class DeviceTaskScheduler {
// 触发器组名(统一固定,与原有逻辑保持一致)
private static final String TRIGGER_GROUP = "DEVICE_TRIGGERS";
// 新增:每个设备任务的Quartz线程隔离(避免任务间干扰)
private static final String THREAD_GROUP = "DEVICE_THREAD_GROUP";
/**
* 创建设备监控任务(入口方法)
*/
......@@ -45,6 +47,12 @@ public class DeviceTaskScheduler {
scheduler.start();
}
// 关键:添加调度器状态监控,确保线程池可用
SchedulerMetaData metaData = scheduler.getMetaData();
log.info("Quartz线程池状态:核心线程={} 任务总数={}",
metaData.getThreadPoolSize(),
metaData.getNumberOfJobsExecuted());
// 2. 创建两个核心任务
createHourlyCommunicationJob(fStoreyId);
createFinalExecutionJob(fStoreyId, fPowerOutageIp, fPowerOutagePort);
......@@ -79,6 +87,7 @@ public class DeviceTaskScheduler {
.withIdentity(jobKey)
.usingJobData("fStoreyId", fStoreyId.toString()) // 传递参数(String类型避免溢出)
.storeDurably(false) // 若依建议:非持久化(触发器删除后Job自动失效)
.withDescription("设备" + fStoreyId + "每2分钟通信任务")
.requestRecovery(true) // 服务重启后恢复未完成的任务
.build();
......@@ -86,22 +95,20 @@ public class DeviceTaskScheduler {
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
.forJob(jobKey) // 显式绑定Job(避免触发器游离)
.withSchedule(CronScheduleBuilder.cronSchedule("0 0/2 * * * ?")
.withSchedule(CronScheduleBuilder.cronSchedule("0 0/5 * * * ?")
// 错过触发时:立即执行一次,后续按原计划(关键!)
.withMisfireHandlingInstructionFireAndProceed())
.startNow() // 立即生效(无需等待第一个周期)
.build();
// 3. 原子操作:创建/更新任务(若依框架推荐用scheduleJob
// 3. 原子操作:强制替换旧任务(避免残留状态
if (scheduler.checkExists(jobKey)) {
// 任务已存在:更新触发器
Date nextFireTime = scheduler.rescheduleJob(triggerKey, trigger);
log.info("通信任务[{}]更新触发器成功,下次执行时间:{}", jobId, nextFireTime);
} else {
// 任务不存在:创建Job+Trigger(原子操作,避免分步异常)
Date nextFireTime = scheduler.scheduleJob(job, trigger);
log.info("通信任务[{}]创建成功,下次执行时间:{}", jobId, nextFireTime);
// 先删除旧任务,再创建新任务(彻底清理状态)
scheduler.deleteJob(jobKey);
log.info("通信任务[{}]旧任务已删除,准备创建新任务", jobId);
}
Date nextFireTime = scheduler.scheduleJob(job, trigger);
log.info("通信任务[{}]创建成功,下次执行时间:{}", jobId, nextFireTime);
}
/**
......@@ -129,7 +136,7 @@ public class DeviceTaskScheduler {
.build();
// 2. 构建SimpleTrigger(5分钟后执行,仅一次)
Date executeTime = Date.from(Instant.now().plus(5, ChronoUnit.MINUTES));
Date executeTime = Date.from(Instant.now().plus(15, ChronoUnit.MINUTES));
SimpleTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
.forJob(jobKey) // 显式绑定Job
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment