Commit 5bf63fbd authored by wanghao's avatar wanghao

1 使用 modbus4j + juc 实现 读取层 72个点位的 设备数据测试中。

parent 804418ce
......@@ -4,14 +4,12 @@ import com.serotonin.modbus4j.ModbusMaster;
import com.serotonin.modbus4j.exception.ErrorResponseException;
import com.serotonin.modbus4j.exception.ModbusInitException;
import com.serotonin.modbus4j.exception.ModbusTransportException;
import com.zehong.framework.modbus4j.Modbus4jUtils;
import com.zehong.system.domain.TEquipmentAlarmData;
import com.zehong.system.domain.TEquipmentInfo;
import com.zehong.system.domain.TStoreyInfo;
import com.zehong.system.domain.modbus.ModbusDeviceData;
import com.zehong.system.modbus.util.Modbus4jUtils;
import com.zehong.system.service.ITEquipmentAlarmDataService;
import com.zehong.system.service.ITEquipmentInfoService;
import com.zehong.system.service.ITStoreyInfoService;
import com.zehong.web.task.AgingCabinetInspectionAndPowerCheckTask;
import com.zehong.web.task.CheckPowerOnCommandEvent;
import com.zehong.web.task.PowerOffCommandEvent;
......@@ -73,21 +71,6 @@ public class TestTaskController {
* 五分钟一次
* 1.老化柜、标定柜巡查
* 2.老化层断电
* 这种方式先注释掉
* // for (TEquipmentInfo equipmentInfo : equipmentInfos) {
* // Future<Map<Integer, Object>> future = executor.submit(new ModbusTcpTask(equipmentInfo, registerOffset));
* // futures.add(future);
* // }
* // List<ModbusDeviceData> results = new ArrayList<>();
* //
* // for (int i = 0; i < futures.size(); i++) {
* // Map<Integer, Object> data = futures.get(i).get();
* // ModbusDeviceData deviceData = new ModbusDeviceData();
* // deviceData.setDeviceId(equipmentInfos.get(i).getfEquipmentId().toString());
* // deviceData.setRegisterValues(data.entrySet().stream()
* // .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString())));
* // results.add(deviceData);
* // }
*/
@GetMapping("/getAgingCabinetAndPowerCheck")
public DeferredResult<List<ModbusDeviceData>> getAgingCabinetAndPowerCheck() {
......
package com.zehong.web.task;
import com.serotonin.modbus4j.ModbusMaster;
import com.serotonin.modbus4j.exception.ErrorResponseException;
import com.serotonin.modbus4j.exception.ModbusInitException;
import com.serotonin.modbus4j.exception.ModbusTransportException;
import com.zehong.framework.modbus4j.Modbus4jUtils;
import com.zehong.system.domain.TEquipmentInfo;
import com.zehong.system.domain.modbus.ModbusDeviceData;
import com.zehong.system.service.ITEquipmentInfoService;
import com.zehong.web.controller.equipment.EquipmentDataCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.async.DeferredResult;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
/**
* @author lenovo
......
package com.zehong.web.task;
import com.serotonin.modbus4j.ModbusMaster;
import com.zehong.framework.modbus4j.Modbus4jUtils;
import com.zehong.system.domain.TEquipmentAlarmData;
import com.zehong.system.domain.TStoreyInfo;
import com.zehong.system.mapper.TStoreyInfoMapper;
import com.zehong.system.modbus.business.DeviceStatusReaderAndTimeSetter;
import com.zehong.system.modbus.handler.ModbusResultHandler;
import com.zehong.system.service.ITEquipmentAlarmDataService;
import com.zehong.system.service.ITStoreyInfoService;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
* @author lenovo
* @date 2025/6/25
* @description TODO
* @description 上电以后 两分钟执行一次的逻辑
*/
@Component
public class DeviceCommunicationJob implements Job {
......@@ -34,7 +27,7 @@ public class DeviceCommunicationJob implements Job {
@Resource
private ITEquipmentAlarmDataService alarmDataService;
@Resource
private ITStoreyInfoService tStoreyInfoService;
private DeviceStatusReaderAndTimeSetter deviceStatusReaderAndTimeSetter;
@Resource
private TStoreyInfoMapper tStoreyInfoMapper;
......@@ -49,35 +42,22 @@ public class DeviceCommunicationJob implements Job {
// 1. 执行Modbus通信
String s = tStoreyInfo.getfPort();
String ip = tStoreyInfo.getfIp();
// 三个端口号,对应三组Modbus通信
String[] split = s.split(",");
List<Integer> registerOffsetsOne = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26);
log.info("registerOffsetsOne.ip:{}, port{}", ip, Integer.parseInt(split[0]));
Map<Integer, Object> integerObjectMapOne = Modbus4jUtils.batchReadAgingCabinetStatus(Modbus4jUtils.getMaster(ip, Integer.parseInt(split[0])), registerOffsetsOne);
for (Map.Entry<Integer, Object> integerObjectEntry : integerObjectMapOne.entrySet()) {
log.info("integerObjectMapOne 的 key是:{} value: {}", integerObjectEntry.getKey(), integerObjectEntry.getValue());
}
List<Integer> registerOffsetsTwo = Arrays.asList(27,28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53);
log.info("integerObjectMapTwo.ip:{}, port{}", ip, Integer.parseInt(split[1]));
Map<Integer, Object> integerObjectMapTwo = Modbus4jUtils.batchReadAgingCabinetStatus(Modbus4jUtils.getMaster(ip, Integer.parseInt(split[1])), registerOffsetsTwo);
for (Map.Entry<Integer, Object> integerObjectEntry : integerObjectMapTwo.entrySet()) {
log.info("integerObjectMapTwo 的 key是:{} value: {}", integerObjectEntry.getKey(), integerObjectEntry.getValue());
}
List<Integer> registerOffsetsThree = Arrays.asList(54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71);
Map<Integer, Object> integerObjectMapThree = Modbus4jUtils.batchReadAgingCabinetStatus(Modbus4jUtils.getMaster(ip, Integer.parseInt(split[2])), registerOffsetsThree);
log.info("integerObjectMapThree.ip:{}, port{}", ip, Integer.parseInt(split[2]));
for (Map.Entry<Integer, Object> integerObjectEntry : integerObjectMapThree.entrySet()) {
log.info("integerObjectMapThree 的 key是:{} value: {}", integerObjectEntry.getKey(), integerObjectEntry.getValue());
}
ModbusResultHandler resultHandler = new ModbusResultHandler();
// 501 的 27个 设备id
List<Integer> registerOffsetsOne = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27);
// 501 对应 27个设备读取状态 并设置时间
deviceStatusReaderAndTimeSetter.startMultiDeviceMonitoring(ip,501, registerOffsetsOne, resultHandler, ModbusResultHandler.createDefaultStopCondition());
// 502 端口号的 27个 设备id
List<Integer> registerOffsetsTwo = Arrays.asList(28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53,54);
// 502 对应 27个设备读取状态 并设置时间
deviceStatusReaderAndTimeSetter.startMultiDeviceMonitoring(ip, 502, registerOffsetsTwo, resultHandler, ModbusResultHandler.createDefaultStopCondition());
// 503 端口号的 剩下的设备
List<Integer> registerOffsetsThree = Arrays.asList(55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71,72);
// 503 对应 剩下设备读取状态 并设置时间
deviceStatusReaderAndTimeSetter.startMultiDeviceMonitoring(ip, 503, registerOffsetsThree, resultHandler, ModbusResultHandler.createDefaultStopCondition());
// 2. 检查是否到达特殊时间点
//checkSpecialTimePoints(tStoreyInfo);
} catch (Exception e) {
// 记录异常
TEquipmentAlarmData alarmData = new TEquipmentAlarmData();
......@@ -87,24 +67,4 @@ public class DeviceCommunicationJob implements Job {
alarmDataService.insertTEquipmentAlarmData(alarmData);
}
}
private void checkSpecialTimePoints(TStoreyInfo tStoreyInfo) {
if (tStoreyInfo == null || "COMPLETED".equals(tStoreyInfo.getfStatus())) return;
// 计算已运行小时数
long hours = Duration.between(
tStoreyInfo.getfAgingStartTime().toInstant(), Instant.now()).toHours();
// 特殊时间点处理 (24小时和48小时)
if (hours == 24 || hours == 48) {
log.info("设备[{}]到达特殊时间点: {}小时", tStoreyInfo.getfStoreyCode(), hours);
// 执行特殊操作
performSpecialOperation(tStoreyInfo);
}
}
// ... 其他辅助方法
private void performSpecialOperation(TStoreyInfo tStoreyInfo) {
}
}
......@@ -4,10 +4,10 @@ import com.serotonin.modbus4j.ModbusMaster;
import com.serotonin.modbus4j.exception.ModbusInitException;
import com.serotonin.modbus4j.exception.ModbusTransportException;
import com.zehong.common.utils.StringUtils;
import com.zehong.framework.modbus4j.Modbus4jUtils;
import com.zehong.system.domain.TEquipmentAlarmData;
import com.zehong.system.domain.TStoreyInfo;
import com.zehong.system.mapper.TStoreyInfoMapper;
import com.zehong.system.modbus.util.Modbus4jUtils;
import com.zehong.system.service.ITEquipmentAlarmDataService;
import com.zehong.system.service.ITStoreyInfoService;
import org.quartz.*;
......@@ -17,9 +17,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
/**
* @author lenovo
......
package com.zehong.system.modbus.business;
import com.serotonin.modbus4j.ModbusFactory;
import com.serotonin.modbus4j.ModbusMaster;
import com.serotonin.modbus4j.exception.ModbusInitException;
import com.serotonin.modbus4j.exception.ModbusTransportException;
import com.serotonin.modbus4j.ip.IpParameters;
import com.serotonin.modbus4j.msg.ModbusResponse;
import com.serotonin.modbus4j.msg.ReadHoldingRegistersRequest;
import com.serotonin.modbus4j.msg.ReadHoldingRegistersResponse;
import com.zehong.system.domain.TEquipmentAlarmData;
import com.zehong.system.modbus.dto.DeviceStatusReaderDto;
import com.zehong.system.modbus.util.Modbus4jUtils;
import com.zehong.system.service.ITEquipmentAlarmDataService;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
/**
* @author lenovo
* @date 2025/6/27
* @description modbus4j 单独提取出来的 读设备状态并设置时间
*/
@Component
public class DeviceStatusReaderAndTimeSetter {
// 常量改为public以便外部访问
public static final int START_ADDRESS = 0;
public static final int REGISTER_COUNT = 10;
public static final int TARGET_VALUE = 1;
public static final int MAX_RETRIES = 3;
public static final int RETRY_DELAY = 500;
public static final int TIMEOUT_MINUTES = 5;
// 工厂
private static final ModbusFactory modbusFactory = new ModbusFactory();
@Resource
private ITEquipmentAlarmDataService alarmDataService;
private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(DeviceStatusReaderAndTimeSetter.class);
/**
* 读取设备寄存器(线程安全版)
*/
private int[] readDeviceRegisters(ModbusMaster master, int deviceId )
throws ModbusTransportException {
// 创建读取请求
ReadHoldingRegistersRequest request = Modbus4jUtils.getReadHoldingRegistersRequest(deviceId,START_ADDRESS, REGISTER_COUNT);
// 发送请求并获取响应
ModbusResponse response = master.send(request);
// 检查响应类型
if (!(response instanceof ReadHoldingRegistersResponse)) {
throw new IllegalArgumentException("Invalid response type: " + response.getClass().getName());
}
ReadHoldingRegistersResponse regResponse = (ReadHoldingRegistersResponse) response;
short[] signedValues = regResponse.getShortData();
// 转换为无符号整数
int[] unsignedValues = new int[signedValues.length];
for (int i = 0; i < signedValues.length; i++) {
unsignedValues[i] = signedValues[i] & 0xFFFF;
}
return unsignedValues;
}
/**
* 带自定义条件的重试读取(线程安全版)
*/
private int[] readWithConditionalRetry(ModbusMaster threadMaster,String ip, int port, int deviceId,
Predicate<int[]> conditionChecker)
throws InterruptedException {
TEquipmentAlarmData alarmData;
int[] result = null;
int attempt = 0;
boolean conditionMet = false;
if(threadMaster != null) {
while (attempt <= MAX_RETRIES && !conditionMet) {
attempt++;
try {
log.info("当前 - 尝试第:{}次 读取设备ip:{} port:{} device:{}的数据", attempt, ip,port,deviceId);
// 读取数据
result = readDeviceRegisters(threadMaster, deviceId);
// 使用自定义条件检查
if (conditionChecker.test(result)) {
log.info("当前 设备ip:{} port:{} device:{}的数据条件满足", ip,port,deviceId);
conditionMet = true;
} else if (attempt <= MAX_RETRIES) {
log.info("当前 设备ip:{} port:{} device:{}的数据条件未满足,等待重试...", ip,port,deviceId);
Thread.sleep(RETRY_DELAY);
}
} catch (Exception e) {
alarmData = new TEquipmentAlarmData();
alarmData.setfAlarmType("04"); //01.老化柜 02.机械臂 03.老化层 04.点位
alarmData.setfEquipmentCode("ip:" + ip + ",port:" + port + ",deviceId:" + deviceId);
alarmData.setfAlarmData("读取失败");
alarmDataService.insertTEquipmentAlarmData(alarmData);
if (attempt <= MAX_RETRIES) {
Thread.sleep(RETRY_DELAY);
}
}
}
// 如果达到最大重试次数仍未满足条件
if (!conditionMet) {
log.info("当前 设备ip:{} port:{} device:{}的尝试次数达到最大:{}", ip,port,deviceId,MAX_RETRIES);
}
}
return result != null ? result : new int[REGISTER_COUNT];
}
/**
* 创建新的Modbus连接
*/
private static ModbusMaster createModbusMaster(String ip, int port) throws ModbusInitException {
IpParameters params = new IpParameters();
params.setHost(ip);
params.setPort(port);
ModbusMaster master = modbusFactory.createTcpMaster(params, true);
master.setTimeout(3000);
master.setRetries(1);
master.init();
return master;
}
/**
* 启动多设备监控(核心方法)
*/
public void startMultiDeviceMonitoring(
String ip,
int port,
List<Integer> deviceIds,
Consumer<DeviceStatusReaderDto> resultHandler,
Predicate<int[]> stopCondition ) {
if (deviceIds == null || deviceIds.isEmpty()) {
System.out.println("⚠️ 警告: 设备ID列表为空,不执行监控");
return;
}
final CountDownLatch latch = new CountDownLatch(deviceIds.size());
ExecutorService executor = Executors.newFixedThreadPool(deviceIds.size());
for (int deviceId : deviceIds) {
final int devId = deviceId;
executor.submit(() -> {
ModbusMaster threadMaster = null;
try {
// 1 获取 线程专有的Modbus连接
threadMaster = createModbusMaster(ip, port);
// 2 初始化线程专有的Modbus连接 并尝试读取数据
// 为什么传了 master 还传 ip 和 port ,因为 master 要读完后才释放,而 ip 和 port 是为了log用
int[] result = readWithConditionalRetry(threadMaster,ip, port, devId, stopCondition);
// 创建包含完整信息的结果对象
DeviceStatusReaderDto deviceStatusReaderDto = new DeviceStatusReaderDto(ip, port, devId, result);
// 3 设置回调数据
resultHandler.accept(deviceStatusReaderDto);
} catch (ModbusInitException e) {
TEquipmentAlarmData alarmData = new TEquipmentAlarmData();
alarmData.setfAlarmType("03"); //01.老化柜 02.机械臂 03.老化层 04.点位
alarmData.setfEquipmentCode("ip:" + ip + ",port:" + port);
alarmData.setfAlarmData("定时任务巡检:Modbus初始化失败");
alarmDataService.insertTEquipmentAlarmData(alarmData);
} catch (Exception e) {
TEquipmentAlarmData alarmData = new TEquipmentAlarmData();
alarmData.setfAlarmType("03"); //01.老化柜 02.机械臂 03.老化层 04.点位
alarmData.setfEquipmentCode("ip:" + ip + ",port:" + port);
alarmData.setfAlarmData("监控任务异常:" + e.getMessage());
alarmDataService.insertTEquipmentAlarmData(alarmData);
} finally {
if (threadMaster != null) threadMaster.destroy();
latch.countDown();
log.info("当前 - 设备ip:{} port:{} device:{}的监控任务完成", ip,port,deviceId);
}
});
}
executor.shutdown();
try {
if (!executor.awaitTermination(TIMEOUT_MINUTES, TimeUnit.MINUTES)) {
TEquipmentAlarmData alarmData = new TEquipmentAlarmData();
alarmData.setfAlarmType("03"); //01.老化柜 02.机械臂 03.老化层 04.点位
alarmData.setfEquipmentCode("ip:" + ip + ",port:" + port);
alarmData.setfAlarmData("警告: 部分设备监控任务超时未完成");
alarmDataService.insertTEquipmentAlarmData(alarmData);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
package com.zehong.system.modbus.dto;
import java.util.Arrays;
/**
* @author lenovo
* @date 2025/6/27
* @description TODO
*/
public class DeviceStatusReaderDto {
private String ip;
private int port;
private int deviceId;
private int[] registerData;
public DeviceStatusReaderDto(String ip, int port, int deviceId, int[] registerData) {
this.ip = ip;
this.port = port;
this.deviceId = deviceId;
this.registerData = registerData;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public int getDeviceId() {
return deviceId;
}
public void setDeviceId(int deviceId) {
this.deviceId = deviceId;
}
public int[] getRegisterData() {
return registerData;
}
public void setRegisterData(int[] registerData) {
this.registerData = registerData;
}
@Override
public String toString() {
return "ModbusResult{" +
"ip='" + ip + '\'' +
", port=" + port +
", deviceId=" + deviceId +
", registerData=" + Arrays.toString(registerData) +
'}';
}
}
package com.zehong.system.modbus.handler;
import com.zehong.system.modbus.business.DeviceStatusReaderAndTimeSetter;
import com.zehong.system.modbus.dto.DeviceStatusReaderDto;
import java.util.Arrays;
import java.util.function.Consumer;
import java.util.function.Predicate;
/**
* @author lenovo
* @date 2025/6/27
* @description TODO
*/
public class ModbusResultHandler implements Consumer<DeviceStatusReaderDto> {
@Override
public void accept(DeviceStatusReaderDto deviceStatusReaderDto) {
int deviceId = deviceStatusReaderDto.getDeviceId();
int[] data = deviceStatusReaderDto.getRegisterData();
String ip = deviceStatusReaderDto.getIp();
int port = deviceStatusReaderDto.getPort();
System.out.println(">>> 回调处理: 接收到新数据");
System.out.println(" 数据: " + Arrays.toString(data));
if (data.length >= 2 && data[1] == DeviceStatusReaderAndTimeSetter.TARGET_VALUE) {
System.out.println(" >>> 注意: 第二个寄存器值为1!");
}
}
// 创建通用的停止条件(可选)
public static Predicate<int[]> createDefaultStopCondition() {
return values -> values.length >= 2 && values[1] == DeviceStatusReaderAndTimeSetter.TARGET_VALUE;
}
}
package com.zehong.framework.modbus4j;
package com.zehong.system.modbus.util;
import com.serotonin.modbus4j.ModbusMaster;
import com.serotonin.modbus4j.exception.ModbusInitException;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment