Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
Z
zhmes-agecal
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
耿迪迪
zhmes-agecal
Commits
985bc9d8
Commit
985bc9d8
authored
Sep 24, 2025
by
wanghao
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
1 指令指令完成,但是 没有检测到机械臂完成。
parent
95befd32
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
249 additions
and
40 deletions
+249
-40
DeviceCommunicationJob.java
...n/java/com/zehong/system/task/DeviceCommunicationJob.java
+249
-40
No files found.
zhmes-agecal-system/src/main/java/com/zehong/system/task/DeviceCommunicationJob.java
View file @
985bc9d8
package
com
.
zehong
.
system
.
task
;
package
com
.
zehong
.
system
.
task
;
import
com.serotonin.modbus4j.ModbusFactory
;
import
com.serotonin.modbus4j.ModbusMaster
;
import
com.serotonin.modbus4j.exception.ModbusInitException
;
import
com.serotonin.modbus4j.exception.ModbusTransportException
;
import
com.serotonin.modbus4j.ip.IpParameters
;
import
com.serotonin.modbus4j.ip.tcp.TcpMaster
;
import
com.serotonin.modbus4j.msg.ModbusResponse
;
import
com.serotonin.modbus4j.msg.ReadHoldingRegistersRequest
;
import
com.serotonin.modbus4j.msg.ReadHoldingRegistersResponse
;
import
com.zehong.system.domain.TEquipmentAlarmData
;
import
com.zehong.system.domain.TEquipmentAlarmData
;
import
com.zehong.system.domain.TStoreyInfo
;
import
com.zehong.system.domain.TStoreyInfo
;
import
com.zehong.system.mapper.TStoreyInfoMapper
;
import
com.zehong.system.mapper.TStoreyInfoMapper
;
import
com.zehong.system.modbus.business.DeviceStatusReaderAndTimeSetter
;
import
com.zehong.system.modbus.business.DeviceStatusReaderAndTimeSetter
;
import
com.zehong.system.modbus.handler.ModbusResultHandler
;
import
com.zehong.system.modbus.handler.ModbusResultHandler
;
import
com.zehong.system.modbus.handler.dto.DeviceStatusReaderDto
;
import
com.zehong.system.modbus.util.Modbus4jUtils
;
import
com.zehong.system.service.ITEquipmentAlarmDataService
;
import
com.zehong.system.service.ITEquipmentAlarmDataService
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.quartz.*
;
import
org.quartz.*
;
...
@@ -14,12 +25,14 @@ import org.springframework.beans.factory.annotation.Autowired;
...
@@ -14,12 +25,14 @@ import org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Component
;
import
javax.annotation.Resource
;
import
javax.annotation.Resource
;
import
java.lang.reflect.Field
;
import
java.net.Socket
;
import
java.util.Arrays
;
import
java.util.Arrays
;
import
java.util.Date
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.List
;
import
java.util.concurrent.
CompletableFuture
;
import
java.util.concurrent.
*
;
import
java.util.concurrent.
TimeUnit
;
import
java.util.concurrent.
atomic.AtomicInteger
;
import
java.util.
concurrent.TimeoutException
;
import
java.util.
function.Consumer
;
import
java.util.function.Predicate
;
import
java.util.function.Predicate
;
/**
/**
...
@@ -35,7 +48,16 @@ public class DeviceCommunicationJob implements Job {
...
@@ -35,7 +48,16 @@ public class DeviceCommunicationJob implements Job {
// 超时控制(确保在Cron周期内完成)
// 超时控制(确保在Cron周期内完成)
private
static
final
int
TOTAL_TIMEOUT_SECONDS
=
120
;
// 2分钟总超时
private
static
final
int
TOTAL_TIMEOUT_SECONDS
=
120
;
// 2分钟总超时
private
static
final
int
PORT_TIMEOUT_SECONDS
=
30
;
// 单个端口30秒超时
private
static
final
int
PORT_TIMEOUT_SECONDS
=
30
;
// 单个端口30秒超时
public
static
final
int
RETRY_DELAY
=
200
;
// 重试间隔200ms(缩短等待)
// 简化线程池配置
private
final
ExecutorService
deviceExecutor
=
Executors
.
newFixedThreadPool
(
4
);
// 工厂(单例)
private
static
final
ModbusFactory
modbusFactory
=
new
ModbusFactory
();
public
static
final
int
MAX_RETRIES
=
1
;
// 仅1次重试(减少总耗时)
// 常量配置(优化重试,减少耗时)
public
static
final
int
START_ADDRESS
=
0
;
public
static
final
int
REGISTER_COUNT
=
10
;
@Resource
@Resource
private
ITEquipmentAlarmDataService
alarmDataService
;
private
ITEquipmentAlarmDataService
alarmDataService
;
@Resource
@Resource
...
@@ -144,40 +166,6 @@ public class DeviceCommunicationJob implements Job {
...
@@ -144,40 +166,6 @@ public class DeviceCommunicationJob implements Job {
return
"unknown"
;
return
"unknown"
;
}
}
}
}
private
void
executeInternal
(
JobExecutionContext
context
)
{
long
startTime
=
System
.
currentTimeMillis
();
String
fStoreyIdStr
=
null
;
try
{
// 参数提取和验证
JobDataMap
data
=
context
.
getJobDetail
().
getJobDataMap
();
fStoreyIdStr
=
data
.
getString
(
"fStoreyId"
);
if
(
StringUtils
.
isBlank
(
fStoreyIdStr
))
{
log
.
warn
(
"fStoreyId参数为空,跳过执行"
);
return
;
}
Long
fStoreyId
=
Long
.
parseLong
(
fStoreyIdStr
);
TStoreyInfo
tStoreyInfo
=
tStoreyInfoMapper
.
selectTStoreyInfoById
(
fStoreyId
);
if
(
tStoreyInfo
==
null
||
StringUtils
.
isBlank
(
tStoreyInfo
.
getfIp
()))
{
log
.
warn
(
"设备信息无效,跳过执行: fStoreyId={}"
,
fStoreyId
);
return
;
}
// 核心通信逻辑(简化版)
executeDeviceCommunication
(
tStoreyInfo
,
fStoreyId
);
long
costTime
=
System
.
currentTimeMillis
()
-
startTime
;
log
.
info
(
"设备通信任务完成: fStoreyId={}, 耗时={}ms"
,
fStoreyId
,
costTime
);
}
catch
(
Exception
e
)
{
log
.
error
(
"设备通信任务执行异常: fStoreyIdStr={}"
,
fStoreyIdStr
,
e
);
// 仅记录日志,不抛出异常
}
}
private
void
executeDeviceCommunication
(
TStoreyInfo
tStoreyInfo
,
Long
fStoreyId
)
{
private
void
executeDeviceCommunication
(
TStoreyInfo
tStoreyInfo
,
Long
fStoreyId
)
{
String
ip
=
tStoreyInfo
.
getfIp
();
String
ip
=
tStoreyInfo
.
getfIp
();
...
@@ -209,9 +197,7 @@ public class DeviceCommunicationJob implements Job {
...
@@ -209,9 +197,7 @@ public class DeviceCommunicationJob implements Job {
private
void
executeSinglePort
(
String
ip
,
int
port
,
List
<
Integer
>
deviceIds
,
Long
fStoreyId
,
TStoreyInfo
tStoreyInfo
)
{
private
void
executeSinglePort
(
String
ip
,
int
port
,
List
<
Integer
>
deviceIds
,
Long
fStoreyId
,
TStoreyInfo
tStoreyInfo
)
{
try
{
try
{
Predicate
<
int
[]>
stopCondition
=
ModbusResultHandler
.
createDefaultStopCondition
();
Predicate
<
int
[]>
stopCondition
=
ModbusResultHandler
.
createDefaultStopCondition
();
deviceStatusReaderAndTimeSetter
.
startMultiDeviceMonitoring
(
startMultiDeviceMonitoring
(
ip
,
port
,
deviceIds
,
resultHandler
,
stopCondition
,
PORT_TIMEOUT_SECONDS
);
ip
,
port
,
deviceIds
,
resultHandler
,
stopCondition
,
PORT_TIMEOUT_SECONDS
);
log
.
debug
(
"端口{}通信完成: fStoreyId={}"
,
port
,
fStoreyId
);
log
.
debug
(
"端口{}通信完成: fStoreyId={}"
,
port
,
fStoreyId
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
log
.
error
(
"端口{}通信异常: fStoreyId={}"
,
port
,
fStoreyId
,
e
);
log
.
error
(
"端口{}通信异常: fStoreyId={}"
,
port
,
fStoreyId
,
e
);
...
@@ -219,6 +205,229 @@ public class DeviceCommunicationJob implements Job {
...
@@ -219,6 +205,229 @@ public class DeviceCommunicationJob implements Job {
}
}
}
}
/**
* 启动多设备监控(核心方法:修正threadMaster调用)
* @param ip 设备IP
* @param port 端口(501/502/503)
* @param deviceIds 设备ID列表
* @param resultHandler 结果处理器
* @param stopCondition 停止条件
* @param portTimeout 端口总超时(秒)
*/
public
void
startMultiDeviceMonitoring
(
String
ip
,
int
port
,
List
<
Integer
>
deviceIds
,
Consumer
<
DeviceStatusReaderDto
>
resultHandler
,
Predicate
<
int
[]>
stopCondition
,
int
portTimeout
)
{
final
CountDownLatch
latch
=
new
CountDownLatch
(
deviceIds
.
size
());
final
AtomicInteger
errorCount
=
new
AtomicInteger
(
0
);
for
(
int
deviceId
:
deviceIds
)
{
final
int
devId
=
deviceId
;
deviceExecutor
.
submit
(()
->
{
boolean
success
=
false
;
try
{
// 执行设备通信
executeSingleDevice
(
ip
,
port
,
devId
,
resultHandler
,
stopCondition
);
success
=
true
;
}
catch
(
Exception
e
)
{
errorCount
.
incrementAndGet
();
log
.
error
(
"设备通信失败: ip={}, port={}, deviceId={}"
,
ip
,
port
,
devId
,
e
);
}
finally
{
latch
.
countDown
();
log
.
debug
(
"设备{}处理完成: 成功={}"
,
devId
,
success
);
}
});
}
try
{
boolean
completed
=
latch
.
await
(
portTimeout
,
TimeUnit
.
SECONDS
);
if
(!
completed
)
{
log
.
warn
(
"端口通信超时: ip={}, port={}, 完成数={}/{}"
,
ip
,
port
,
deviceIds
.
size
()
-
latch
.
getCount
(),
deviceIds
.
size
());
}
// 如果有设备通信失败,抛出异常让上层感知
if
(
errorCount
.
get
()
>
0
)
{
throw
new
RuntimeException
(
String
.
format
(
"端口%d有%d个设备通信失败"
,
port
,
errorCount
.
get
()));
}
}
catch
(
InterruptedException
e
)
{
Thread
.
currentThread
().
interrupt
();
throw
new
RuntimeException
(
"端口通信被中断"
,
e
);
}
}
private
void
executeSingleDevice
(
String
ip
,
int
port
,
int
deviceId
,
Consumer
<
DeviceStatusReaderDto
>
resultHandler
,
Predicate
<
int
[]>
stopCondition
)
{
ModbusMaster
master
=
null
;
try
{
master
=
createModbusMaster
(
ip
,
port
);
int
[]
result
=
readWithConditionalRetry
(
master
,
ip
,
port
,
deviceId
,
stopCondition
);
if
(
resultHandler
!=
null
)
{
resultHandler
.
accept
(
new
DeviceStatusReaderDto
(
ip
,
port
,
deviceId
,
result
));
}
log
.
debug
(
"设备{}通信完成: ip={}, port={}"
,
deviceId
,
ip
,
port
);
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
"设备通信异常"
,
e
);
}
finally
{
destroyModbusMaster
(
master
,
deviceId
);
}
}
/**
* 销毁Modbus连接(先关Socket,再销毁Master,彻底释放)
*/
private
void
destroyModbusMaster
(
ModbusMaster
master
,
int
deviceId
)
{
if
(
master
==
null
)
return
;
try
{
// 1. 直接获取并关闭TcpMaster的socket字段
Socket
socket
=
getUnderlyingSocket
(
master
);
if
(
socket
!=
null
&&
!
socket
.
isClosed
())
{
socket
.
close
();
log
.
debug
(
"设备{}: 底层Socket已强制关闭"
,
deviceId
);
}
}
catch
(
Exception
e
)
{
log
.
error
(
"设备{}: 关闭Socket异常"
,
deviceId
,
e
);
}
finally
{
// 2. 调用自带的destroy()方法,双重保障
try
{
master
.
destroy
();
log
.
debug
(
"设备{}: ModbusMaster已销毁"
,
deviceId
);
}
catch
(
Exception
e
)
{
log
.
error
(
"设备{}: 销毁ModbusMaster异常"
,
deviceId
,
e
);
}
}
}
/**
* 修正:直接从TcpMaster获取socket字段(适配Modbus4j 3.0.3版本)
*/
private
Socket
getUnderlyingSocket
(
ModbusMaster
master
)
{
if
(!(
master
instanceof
TcpMaster
))
{
log
.
error
(
"ModbusMaster不是TcpMaster类型,无法获取Socket"
);
return
null
;
}
TcpMaster
tcpMaster
=
(
TcpMaster
)
master
;
try
{
// 直接获取TcpMaster的private字段 "socket"(你的版本中直接存在该字段)
Field
socketField
=
TcpMaster
.
class
.
getDeclaredField
(
"socket"
);
socketField
.
setAccessible
(
true
);
// 取消私有字段访问限制
Socket
socket
=
(
Socket
)
socketField
.
get
(
tcpMaster
);
if
(
socket
!=
null
)
{
log
.
debug
(
"成功获取Socket:IP={}, 端口={}"
,
socket
.
getInetAddress
(),
socket
.
getPort
());
}
else
{
log
.
debug
(
"TcpMaster的socket字段为null(未建立连接)"
);
}
return
socket
;
}
catch
(
NoSuchFieldException
e
)
{
log
.
error
(
"=== 字段未找到!===\n"
+
"原因:TcpMaster中不存在socket字段(版本不匹配)\n"
+
"解决方案:检查你的TcpMaster源码,确认Socket字段名(当前版本应为'socket')"
,
e
);
}
catch
(
IllegalAccessException
e
)
{
log
.
error
(
"反射访问权限失败(可能被安全管理器拦截)"
,
e
);
}
catch
(
Exception
e
)
{
log
.
error
(
"获取Socket异常"
,
e
);
}
return
null
;
}
/**
* 带自定义条件的重试读取(线程安全版)
*/
private
int
[]
readWithConditionalRetry
(
ModbusMaster
threadMaster
,
String
ip
,
int
port
,
int
deviceId
,
Predicate
<
int
[]>
conditionChecker
)
throws
InterruptedException
{
TEquipmentAlarmData
alarmData
;
int
[]
result
=
null
;
int
attempt
=
0
;
boolean
conditionMet
=
false
;
if
(
threadMaster
!=
null
)
{
while
(
attempt
<=
MAX_RETRIES
&&
!
conditionMet
)
{
attempt
++;
try
{
log
.
info
(
"当前 - 尝试第:{}次 读取设备ip:{} port:{} device:{}的数据"
,
attempt
,
ip
,
port
,
deviceId
);
// 读取数据
result
=
readDeviceRegisters
(
threadMaster
,
deviceId
);
// 使用自定义条件检查
if
(
conditionChecker
.
test
(
result
))
{
log
.
info
(
"当前 设备ip:{} port:{} device:{}的数据条件满足"
,
ip
,
port
,
deviceId
);
conditionMet
=
true
;
}
else
if
(
attempt
<=
MAX_RETRIES
)
{
log
.
info
(
"当前 设备ip:{} port:{} device:{}的数据条件未满足,等待重试..."
,
ip
,
port
,
deviceId
);
Thread
.
sleep
(
RETRY_DELAY
);
}
}
catch
(
Throwable
e
)
{
log
.
error
(
"设备{}: 第{}次读取异常"
,
deviceId
,
attempt
,
e
);
if
(
attempt
<=
MAX_RETRIES
)
{
Thread
.
sleep
(
RETRY_DELAY
);
}
}
}
// 如果达到最大重试次数仍未满足条件
if
(!
conditionMet
)
{
log
.
info
(
"当前 设备ip:{} port:{} device:{}的尝试次数达到最大:{}"
,
ip
,
port
,
deviceId
,
MAX_RETRIES
);
}
}
return
result
!=
null
?
result
:
new
int
[
REGISTER_COUNT
];
}
/**
* 读取设备寄存器(线程安全版)
*/
private
int
[]
readDeviceRegisters
(
ModbusMaster
master
,
int
deviceId
)
throws
ModbusTransportException
{
// 创建读取请求
ReadHoldingRegistersRequest
request
=
Modbus4jUtils
.
getReadHoldingRegistersRequest
(
deviceId
,
START_ADDRESS
,
REGISTER_COUNT
);
// 发送请求并获取响应
ModbusResponse
response
=
master
.
send
(
request
);
// 检查响应类型
if
(!(
response
instanceof
ReadHoldingRegistersResponse
))
{
throw
new
IllegalArgumentException
(
"Invalid response type: "
+
response
.
getClass
().
getName
());
}
ReadHoldingRegistersResponse
regResponse
=
(
ReadHoldingRegistersResponse
)
response
;
short
[]
signedValues
=
regResponse
.
getShortData
();
// 转换为无符号整数
int
[]
unsignedValues
=
new
int
[
signedValues
.
length
];
for
(
int
i
=
0
;
i
<
signedValues
.
length
;
i
++)
{
unsignedValues
[
i
]
=
signedValues
[
i
]
&
0xFFFF
;
}
return
unsignedValues
;
}
/**
* 创建新的Modbus连接
*/
private
ModbusMaster
createModbusMaster
(
String
ip
,
int
port
)
throws
ModbusInitException
{
IpParameters
params
=
new
IpParameters
();
params
.
setHost
(
ip
);
params
.
setPort
(
port
);
// 显式声明为TcpMaster,避免后续反射类型转换问题
TcpMaster
master
=
(
TcpMaster
)
modbusFactory
.
createTcpMaster
(
params
,
true
);
master
.
setTimeout
(
3000
);
// 3秒连接超时
master
.
setRetries
(
1
);
// 1次重试
master
.
init
();
log
.
debug
(
"Modbus连接创建成功:IP={}, 端口={}, Master={}"
,
ip
,
port
,
master
);
return
master
;
}
private
void
recordTimeoutAlarm
(
JobExecutionContext
context
)
{
private
void
recordTimeoutAlarm
(
JobExecutionContext
context
)
{
try
{
try
{
JobDataMap
data
=
context
.
getJobDetail
().
getJobDataMap
();
JobDataMap
data
=
context
.
getJobDetail
().
getJobDataMap
();
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment