Commit 6a332201 authored by wanghao's avatar wanghao

1 指令指令完成,但是 没有检测到机械臂完成。

parent 2511f9c5
package com.zehong.system.task;
import org.quartz.Job;
import org.springframework.stereotype.Component;
import com.serotonin.modbus4j.ModbusFactory;
import com.serotonin.modbus4j.ModbusMaster;
import com.serotonin.modbus4j.exception.ModbusInitException;
import com.serotonin.modbus4j.exception.ModbusTransportException;
import com.serotonin.modbus4j.ip.IpParameters;
import com.serotonin.modbus4j.ip.tcp.TcpMaster;
import com.serotonin.modbus4j.msg.ModbusResponse;
import com.serotonin.modbus4j.msg.ReadHoldingRegistersRequest;
import com.serotonin.modbus4j.msg.ReadHoldingRegistersResponse;
import com.zehong.system.domain.TEquipmentAlarmData;
import com.zehong.system.domain.TStoreyInfo;
import com.zehong.system.mapper.TStoreyInfoMapper;
import com.zehong.system.modbus.handler.ModbusResultHandler;
import com.zehong.system.modbus.handler.dto.DeviceStatusReaderDto;
import com.zehong.system.modbus.util.Modbus4jUtils;
import com.zehong.system.service.ITEquipmentAlarmDataService;
import org.apache.commons.lang3.StringUtils;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.net.Socket;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
/**
* @author lenovo
* @date 2025/9/25
* @description TODO
*/
@Component
public abstract class BaseDeviceCommJob implements Job {
private static final Logger log = LoggerFactory.getLogger(BaseDeviceCommJob.class);
// -------------------------- 公共常量配置 --------------------------
private static final int TOTAL_TASK_TIMEOUT_SEC = 240; // 任务总超时:4分钟
private static final int SINGLE_DEVICE_TIMEOUT_SEC = 10; // 单个设备超时:10秒
private static final int MODBUS_CONN_TIMEOUT_MS = 3000; // 连接超时:3秒
private static final int CUSTOM_RETRY_TIMES = 1; // 自定义重试次数:1次
private static final int RETRY_DELAY_MS = 200; // 重试间隔:200ms
private static final int REG_START_ADDR = 0;
private static final int REG_READ_COUNT = 10;
private static final ModbusFactory modbusFactory = new ModbusFactory(); // 单例工厂
// -------------------------- 公共依赖注入 --------------------------
@Resource
protected ITEquipmentAlarmDataService alarmDataService;
@Resource
protected TStoreyInfoMapper tStoreyInfoMapper;
@Autowired
protected ModbusResultHandler resultHandler;
// -------------------------- 抽象方法:子类实现差异化 --------------------------
/**
* 获取当前Job的固定端口号(501/502/503)
*/
protected abstract int getFixedPort();
/**
* 获取当前端口对应的设备ID列表
*/
protected abstract List<Integer> getDeviceIds();
// -------------------------- 核心执行逻辑(公共) --------------------------
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
long taskStartTime = System.currentTimeMillis();
String storeyIdStr = getStoreyIdFromContext(context);
ExecutorService deviceExecutor = null;
try {
// 1. 初始化线程池(单个端口设备数较少,核心线程数设为5)
deviceExecutor = new ThreadPoolExecutor(
5, 5,
0, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(30),
r -> new Thread(r, "modbus-port-" + getFixedPort() + "-" + storeyIdStr),
new ThreadPoolExecutor.AbortPolicy()
);
// 2. 执行当前端口的设备通信(带总超时)
executePortDevices(context, deviceExecutor, taskStartTime, storeyIdStr);
log.info("端口[{}]通信任务执行成功:fStoreyId={},总耗时={}ms",
getFixedPort(), storeyIdStr, System.currentTimeMillis() - taskStartTime);
} catch (Throwable e) {
String errMsg = "端口[" + getFixedPort() + "]通信任务异常:fStoreyId=" + storeyIdStr;
log.error(errMsg, e);
recordAlarm(null, storeyIdStr, errMsg + ":" + e.getMessage());
throw new JobExecutionException(errMsg, e); // 抛出让Quartz感知,不隐藏异常
} finally {
// 3. 强制关闭线程池,避免资源泄漏
if (deviceExecutor != null && !deviceExecutor.isShutdown()) {
deviceExecutor.shutdownNow();
try {
if (!deviceExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
log.warn("端口[{}]线程池关闭超时:fStoreyId={}", getFixedPort(), storeyIdStr);
}
} catch (InterruptedException e) {
log.warn("端口[{}]线程池关闭被中断:fStoreyId={}", getFixedPort(), storeyIdStr);
}
}
}
}
/**
* 执行当前端口的所有设备通信(带总超时)
*/
private void executePortDevices(JobExecutionContext context, ExecutorService deviceExecutor,
long taskStartTime, String storeyIdStr) throws Exception {
// 校验设备信息
TStoreyInfo storeyInfo = validateAndGetStoreyInfo(storeyIdStr);
String ip = storeyInfo.getfIp();
int port = getFixedPort();
List<Integer> deviceIds = getDeviceIds();
// 设备通信计数器
CountDownLatch deviceLatch = new CountDownLatch(deviceIds.size());
AtomicInteger deviceErrorCount = new AtomicInteger(0);
// 逐个设备提交任务
for (int deviceId : deviceIds) {
deviceExecutor.submit(() -> {
try {
executeSingleDeviceWithTimeout(ip, port, deviceId, storeyIdStr);
} catch (Exception e) {
deviceErrorCount.incrementAndGet();
log.error("端口[{}]设备[{}]通信异常:fStoreyId={}",
port, deviceId, storeyIdStr, e);
recordAlarm(storeyInfo, "设备" + deviceId + "通信异常:" + e.getMessage());
} finally {
deviceLatch.countDown();
}
});
}
// 等待所有设备完成(带总超时)
boolean allCompleted = deviceLatch.await(TOTAL_TASK_TIMEOUT_SEC, TimeUnit.SECONDS);
long totalCost = System.currentTimeMillis() - taskStartTime;
// 超时校验
if (totalCost > TOTAL_TASK_TIMEOUT_SEC * 1000) {
String errMsg = "端口[" + port + "]任务总超时:fStoreyId=" + storeyIdStr + ",耗时=" + totalCost + "ms";
log.warn(errMsg);
recordAlarm(storeyInfo, errMsg);
}
// 错误统计
if (deviceErrorCount.get() > 0) {
String errMsg = "端口[" + port + "]" + deviceErrorCount.get() + "个设备通信失败:fStoreyId=" + storeyIdStr;
log.error(errMsg);
recordAlarm(storeyInfo, errMsg);
}
}
// -------------------------- 以下为原DeviceCommunicationJob的公共工具方法 --------------------------
protected TStoreyInfo validateAndGetStoreyInfo(String storeyIdStr) {
if (StringUtils.isBlank(storeyIdStr)) {
String errMsg = "fStoreyId为空:端口[" + getFixedPort() + "]";
log.error(errMsg);
recordAlarm(null, "unknown", errMsg);
throw new RuntimeException(errMsg);
}
Long storeyId;
try {
storeyId = Long.parseLong(storeyIdStr);
} catch (NumberFormatException e) {
String errMsg = "fStoreyId格式错误:" + storeyIdStr + ",端口[" + getFixedPort() + "]";
log.error(errMsg);
recordAlarm(null, storeyIdStr, errMsg);
throw new RuntimeException(errMsg);
}
TStoreyInfo storeyInfo = tStoreyInfoMapper.selectTStoreyInfoById(storeyId);
if (storeyInfo == null || StringUtils.isBlank(storeyInfo.getfIp())) {
String errMsg = "设备信息无效:fStoreyId=" + storeyId + ",端口[" + getFixedPort() + "]";
log.error(errMsg);
recordAlarm(null, storeyIdStr, errMsg);
throw new RuntimeException(errMsg);
}
return storeyInfo;
}
protected void executeSingleDeviceWithTimeout(String ip, int port, int deviceId, String storeyIdStr) {
Future<?> deviceFuture = Executors.newSingleThreadExecutor().submit(() -> {
try {
int[] result = readDeviceWithRetry(ip, port, deviceId, storeyIdStr);
if (resultHandler != null) {
resultHandler.accept(new DeviceStatusReaderDto(ip, port, deviceId, result));
}
} catch (Exception e) {
throw new RuntimeException("设备[" + deviceId + "]读取失败", e);
}
});
try {
deviceFuture.get(SINGLE_DEVICE_TIMEOUT_SEC, TimeUnit.SECONDS);
} catch (TimeoutException e) {
deviceFuture.cancel(true);
throw new RuntimeException("设备[" + deviceId + "]通信超时(>" + SINGLE_DEVICE_TIMEOUT_SEC + "秒)");
} catch (Exception e) {
throw new RuntimeException("设备[" + deviceId + "]通信异常", e.getCause());
}
}
protected int[] readDeviceWithRetry(String ip, int port, int deviceId, String storeyIdStr) throws Exception {
ModbusMaster master = null;
int[] result = null;
for (int retry = 0; retry <= CUSTOM_RETRY_TIMES; retry++) {
try {
master = createModbusMaster(ip, port);
result = readDeviceRegisters(master, deviceId);
Predicate<int[]> stopCondition = ModbusResultHandler.createDefaultStopCondition();
if (stopCondition.test(result)) {
log.debug("端口[{}]设备[{}]读取成功:重试={},fStoreyId={}",
port, deviceId, retry, storeyIdStr);
break;
} else if (retry < CUSTOM_RETRY_TIMES) {
Thread.sleep(RETRY_DELAY_MS);
}
} catch (Exception e) {
if (retry >= CUSTOM_RETRY_TIMES) {
log.error("端口[{}]设备[{}]重试耗尽({}次):fStoreyId={}",
port, deviceId, CUSTOM_RETRY_TIMES, storeyIdStr, e);
throw e;
}
Thread.sleep(RETRY_DELAY_MS);
} finally {
destroyModbusMaster(master, deviceId);
}
}
return result;
}
protected ModbusMaster createModbusMaster(String ip, int port) throws ModbusInitException {
IpParameters params = new IpParameters();
params.setHost(ip);
params.setPort(port);
TcpMaster master = (TcpMaster) modbusFactory.createTcpMaster(params, true);
master.setTimeout(MODBUS_CONN_TIMEOUT_MS);
master.setRetries(0); // 禁用内置重试,用自定义重试
master.init();
return master;
}
protected int[] readDeviceRegisters(ModbusMaster master, int deviceId) throws ModbusTransportException {
ReadHoldingRegistersRequest request = Modbus4jUtils.getReadHoldingRegistersRequest(
deviceId, REG_START_ADDR, REG_READ_COUNT);
ModbusResponse response = master.send(request);
if (!(response instanceof ReadHoldingRegistersResponse)) {
throw new RuntimeException("无效Modbus响应:设备[" + deviceId + "]");
}
short[] signedVals = ((ReadHoldingRegistersResponse) response).getShortData();
int[] unsignedVals = new int[signedVals.length];
for (int i = 0; i < signedVals.length; i++) {
unsignedVals[i] = signedVals[i] & 0xFFFF; // 转无符号整数
}
return unsignedVals;
}
protected void destroyModbusMaster(ModbusMaster master, int deviceId) {
if (master == null) return;
try {
if (master instanceof TcpMaster) {
Socket socket = getUnderlyingSocket((TcpMaster) master);
if (socket != null && !socket.isClosed()) {
socket.close();
}
}
} catch (Exception e) {
log.error("设备[{}]Socket销毁异常", deviceId, e);
} finally {
try {
master.destroy();
} catch (Exception e) {
log.error("设备[{}]ModbusMaster销毁异常", deviceId, e);
}
}
}
private Socket getUnderlyingSocket(TcpMaster tcpMaster) {
try {
Field socketField = TcpMaster.class.getDeclaredField("socket");
socketField.setAccessible(true);
return (Socket) socketField.get(tcpMaster);
} catch (Exception e) {
log.warn("反射获取Socket失败:TcpMaster版本不匹配", e);
return null;
}
}
protected String getStoreyIdFromContext(JobExecutionContext context) {
try {
return context.getJobDetail().getJobDataMap().getString("fStoreyId");
} catch (Exception e) {
log.error("获取fStoreyId失败", e);
return "unknown";
}
}
protected void recordAlarm(TStoreyInfo storeyInfo, String equipmentCode, String alarmData) {
try {
TEquipmentAlarmData alarm = new TEquipmentAlarmData();
alarm.setfAlarmType("03");
alarm.setfEquipmentCode(storeyInfo != null ? storeyInfo.getfStoreyCode() : equipmentCode);
alarm.setfAlarmData(alarmData);
alarm.setfCreateTime(new Date());
alarmDataService.insertTEquipmentAlarmData(alarm);
} catch (Exception e) {
log.error("告警记录失败:{}", alarmData, e);
}
}
protected void recordAlarm(TStoreyInfo storeyInfo, String alarmData) {
recordAlarm(storeyInfo, storeyInfo != null ? storeyInfo.getfStoreyCode() : "unknown", alarmData);
}
}
package com.zehong.system.task;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
/**
* @author lenovo
* @date 2025/9/25
* @description TODO
*/
@Component
public class DeviceComm501Job extends BaseDeviceCommJob {
@Override
protected int getFixedPort() {
return 501; // 固定端口501
}
@Override
protected List<Integer> getDeviceIds() {
// 端口501对应的设备ID列表
return Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27);
}
}
package com.zehong.system.task;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
/**
* @author lenovo
* @date 2025/9/25
* @description TODO
*/
@Component
public class DeviceComm502Job extends BaseDeviceCommJob {
@Override
protected int getFixedPort() {
return 502; // 固定端口502
}
@Override
protected List<Integer> getDeviceIds() {
// 端口502对应的设备ID列表
return Arrays.asList(28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54);
}
}
package com.zehong.system.task;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
/**
* @author lenovo
* @date 2025/9/25
* @description TODO
*/
@Component
public class DeviceComm503Job extends BaseDeviceCommJob{
@Override
protected int getFixedPort() {
return 503; // 固定端口503
}
@Override
protected List<Integer> getDeviceIds() {
// 端口503对应的设备ID列表
return Arrays.asList(55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72);
}
}
......@@ -29,14 +29,6 @@ public class DeviceTaskScheduler {
private static final String JOB_GROUP = "DEVICE_TASKS";
// 触发器组名(统一固定,与原有逻辑保持一致)
private static final String TRIGGER_GROUP = "DEVICE_TRIGGERS";
// 新增:每个设备任务的Quartz线程隔离(避免任务间干扰)
private static final String THREAD_GROUP = "DEVICE_THREAD_GROUP";
// 常量:Cron周期(3分钟)、任务有效期(7天)
private static final String CRON_EXPRESSION = "0 0/5 * * * ?";
private static final int TASK_VALID_DAYS = 7;
@PostConstruct
public void init() throws SchedulerException {
scheduler.getListenerManager().addJobListener(new QuartzJobListener());
......@@ -57,19 +49,10 @@ public class DeviceTaskScheduler {
// 1. 调度器健康检查(确保线程池可用)
checkSchedulerHealth();
// 2. 任务去重(避免重复创建导致资源竞争)
if (isTaskExists("COMM_" + fStoreyId)) {
log.info("通信任务[COMM_{}]已存在,无需重复创建", fStoreyId);
// 检查现有触发器状态,若为ERROR则重建
if (isTriggerError("COMM_" + fStoreyId)) {
log.warn("通信任务[COMM_{}]触发器状态为ERROR,重建触发器", fStoreyId);
createHourlyCommunicationJob(fStoreyId);
}
return;
}
// 3. 创建核心任务
createHourlyCommunicationJob(fStoreyId);
// 1. 创建3个端口专属任务(501:5分钟后,502:10分钟后,503:15分钟后)
createPortSpecificCommJobs(fStoreyId);
createFinalExecutionJob(fStoreyId, fPowerOutageIp, fPowerOutagePort);
checkTaskStatus(fStoreyId);
......@@ -81,6 +64,68 @@ public class DeviceTaskScheduler {
throw new RuntimeException("Quartz任务调度失败", e);
}
}
// -------------------------- 新增:创建3个端口专属任务 --------------------------
/**
* 为3个端口Job创建一次性SimpleTrigger:
* - 501Job:当前时间+5分钟
* - 502Job:当前时间+10分钟
* - 503Job:当前时间+15分钟
*/
private void createPortSpecificCommJobs(Long fStoreyId) throws SchedulerException {
// 1. 端口501:5分钟后执行
createPortCommJob(fStoreyId, 501, DeviceComm501Job.class, 5);
// 2. 端口502:10分钟后执行
createPortCommJob(fStoreyId, 502, DeviceComm502Job.class, 10);
// 3. 端口503:15分钟后执行
createPortCommJob(fStoreyId, 503, DeviceComm503Job.class, 15);
}
/**
* 单个端口Job和Trigger创建(通用方法)
* @param fStoreyId 设备ID
* @param port 端口号
* @param jobClass Job类
* @param delayMin 延迟执行时间(分钟)
*/
private void createPortCommJob(Long fStoreyId, int port, Class<? extends Job> jobClass, int delayMin) throws SchedulerException {
String jobId = "COMM_" + port + "_" + fStoreyId; // JobID:COMM_501_123(端口+设备ID,确保唯一)
String triggerId = "TRIGGER_" + port + "_" + fStoreyId; // TriggerID:TRIGGER_501_123
JobKey jobKey = new JobKey(jobId, JOB_GROUP);
TriggerKey triggerKey = new TriggerKey(triggerId, TRIGGER_GROUP);
// 1. 去重:先删除旧任务/触发器
if (scheduler.checkExists(triggerKey)) {
scheduler.unscheduleJob(triggerKey);
log.info("端口[{}]旧触发器已删除:{}", port, triggerId);
}
if (scheduler.checkExists(jobKey)) {
scheduler.deleteJob(jobKey);
log.info("端口[{}]旧任务已删除:{}", port, jobId);
}
// 2. 创建JobDetail
JobDetail job = JobBuilder.newJob(jobClass)
.withIdentity(jobKey)
.usingJobData("fStoreyId", fStoreyId.toString()) // 传递设备ID
.storeDurably(false)
.build();
// 3. 创建一次性SimpleTrigger(延迟delayMin分钟,仅执行1次)
Date triggerTime = Date.from(Instant.now().plus(delayMin, ChronoUnit.MINUTES));
SimpleTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
.forJob(jobKey)
.startAt(triggerTime) // 延迟执行时间
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withRepeatCount(0) // 仅执行1次
.withMisfireHandlingInstructionFireNow()) // 错过则立即执行
.build();
// 4. 提交调度
Date nextFireTime = scheduler.scheduleJob(job, trigger);
log.info("端口[{}]任务创建成功:jobId={},延迟{}分钟,下次执行:{}",
port, jobId, delayMin, nextFireTime);
}
/**
* 1. 创建每5分钟执行的通信任务(核心优化:简化调度逻辑、调整Misfire策略)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment