Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
Z
zhmes-agecal
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
耿迪迪
zhmes-agecal
Commits
db7ad245
Commit
db7ad245
authored
Sep 24, 2025
by
wanghao
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
1 指令指令完成,但是 没有检测到机械臂完成。
parent
f13c55b9
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
399 additions
and
180 deletions
+399
-180
DeviceCommunicationJob.java
...n/java/com/zehong/system/task/DeviceCommunicationJob.java
+399
-179
QuartzJobListener.java
...c/main/java/com/zehong/system/task/QuartzJobListener.java
+0
-1
No files found.
zhmes-agecal-system/src/main/java/com/zehong/system/task/DeviceCommunicationJob.java
View file @
db7ad245
...
@@ -17,275 +17,495 @@ import com.zehong.system.modbus.handler.dto.DeviceStatusReaderDto;
...
@@ -17,275 +17,495 @@ import com.zehong.system.modbus.handler.dto.DeviceStatusReaderDto;
import
com.zehong.system.modbus.util.Modbus4jUtils
;
import
com.zehong.system.modbus.util.Modbus4jUtils
;
import
com.zehong.system.service.ITEquipmentAlarmDataService
;
import
com.zehong.system.service.ITEquipmentAlarmDataService
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.quartz.DisallowConcurrentExecution
;
import
org.quartz.*
;
import
org.quartz.Job
;
import
org.quartz.JobDataMap
;
import
org.quartz.JobExecutionContext
;
import
org.slf4j.Logger
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Component
;
import
javax.annotation.Resource
;
import
javax.annotation.Resource
;
import
java.lang.reflect.Field
;
import
java.net.Socket
;
import
java.util.Arrays
;
import
java.util.Arrays
;
import
java.util.Date
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.List
;
import
java.util.concurrent.*
;
import
java.util.concurrent.*
;
import
java.util.concurrent.atomic.AtomicInteger
;
import
java.util.function.Predicate
;
/**
/**
* @author lenovo
* 设备通信Job(修复版:显式暴露错误,避免Trigger变ERROR)
* @date 2025/6/25
* @description 上电以后 两分钟执行一次的逻辑
*/
*/
@Component
@Component
@DisallowConcurrentExecution
@DisallowConcurrentExecution
// 禁止同一任务并行执行(必须保留)
public
class
DeviceCommunicationJob
implements
Job
{
public
class
DeviceCommunicationJob
implements
Job
{
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
DeviceCommunicationJob
.
class
);
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
DeviceCommunicationJob
.
class
);
// -------------------------- 常量配置(统一管理,避免魔法值)--------------------------
// 超时控制:必须小于Cron周期(假设Cron为5分钟,这里留1分钟缓冲)
private
static
final
int
TOTAL_TASK_TIMEOUT_SEC
=
240
;
// 任务总超时:4分钟
private
static
final
int
SINGLE_PORT_TIMEOUT_SEC
=
60
;
// 单个端口超时:1分钟
private
static
final
int
SINGLE_DEVICE_TIMEOUT_SEC
=
10
;
// 单个设备超时:10秒
// Modbus配置:取消内置重试,统一用自定义重试
private
static
final
int
MODBUS_CONN_TIMEOUT_MS
=
3000
;
// 连接超时:3秒
private
static
final
int
CUSTOM_RETRY_TIMES
=
1
;
// 自定义重试次数:1次
private
static
final
int
RETRY_DELAY_MS
=
200
;
// 重试间隔:200ms
// Modbus寄存器配置
private
static
final
int
REG_START_ADDR
=
0
;
private
static
final
int
REG_READ_COUNT
=
10
;
// 工厂(单例)
// 工厂(单例)
private
static
final
ModbusFactory
modbusFactory
=
new
ModbusFactory
();
private
static
final
ModbusFactory
modbusFactory
=
new
ModbusFactory
();
public
static
final
int
MAX_RETRIES
=
1
;
// 仅1次重试(减少总耗时)
// -------------------------- 依赖注入 --------------------------
// 常量配置(优化重试,减少耗时)
public
static
final
int
START_ADDRESS
=
0
;
public
static
final
int
REGISTER_COUNT
=
10
;
@Resource
@Resource
private
ITEquipmentAlarmDataService
alarmDataService
;
private
ITEquipmentAlarmDataService
alarmDataService
;
// 移除静态线程池,使用单线程顺序执行
private
final
ExecutorService
sequentialExecutor
=
Executors
.
newSingleThreadExecutor
();
@Resource
@Resource
private
TStoreyInfoMapper
tStoreyInfoMapper
;
private
TStoreyInfoMapper
tStoreyInfoMapper
;
@Autowired
@Autowired
private
ModbusResultHandler
resultHandler
;
private
ModbusResultHandler
resultHandler
;
// -------------------------- 核心执行逻辑 --------------------------
@Override
@Override
public
void
execute
(
JobExecutionContext
context
)
{
public
void
execute
(
JobExecutionContext
context
)
throws
JobExecutionException
{
// 绝对确保任何异常都不会传播到Quartz
// 关键:不隐藏任何致命异常,让Quartz感知错误(但捕获后包装为JobExecutionException)
try
{
long
taskStartTime
=
System
.
currentTimeMillis
();
log
.
info
(
"=== DeviceCommunicationJob 开始执行 ==="
);
String
storeyIdStr
=
getStoreyIdFromContext
(
context
);
executeWithFullProtection
(
context
);
ExecutorService
deviceExecutor
=
null
;
// 线程池:每次任务新建,结束后关闭
log
.
info
(
"=== DeviceCommunicationJob 执行完成 ==="
);
}
catch
(
Throwable
e
)
{
// 捕获Throwable而不是Exception
// 关键:记录错误但不传播到Quartz
log
.
error
(
"=== 任务执行异常(已完全捕获,不影响触发器) ==="
,
e
);
safeRecordAlarm
(
context
,
"任务异常已捕获: "
+
e
.
getClass
().
getSimpleName
());
}
}
private
void
safeRecordAlarm
(
JobExecutionContext
context
,
String
message
)
{
try
{
try
{
JobDataMap
data
=
context
.
getJobDetail
().
getJobDataMap
();
// 1. 初始化线程池(每次任务新建,避免泄漏)
String
fStoreyIdStr
=
data
!=
null
?
data
.
getString
(
"fStoreyId"
)
:
"unknown"
;
deviceExecutor
=
new
ThreadPoolExecutor
(
3
,
3
,
// 核心=最大=3(对应3个端口,1:1映射)
0
,
TimeUnit
.
SECONDS
,
new
LinkedBlockingQueue
<>(
20
),
// 队列20个(足够72个设备)
r
->
new
Thread
(
r
,
"modbus-port-pool-"
+
storeyIdStr
),
// 线程名含设备ID,便于追踪
new
ThreadPoolExecutor
.
AbortPolicy
()
// 队列满时抛异常,显式暴露问题
);
// 2. 执行核心逻辑(带总超时)
boolean
executeSuccess
=
executeWithTotalTimeout
(
context
,
deviceExecutor
,
taskStartTime
);
if
(!
executeSuccess
)
{
// 任务超时:主动抛出异常,让Quartz记录但不标记Trigger为ERROR(需配合Misfire策略)
throw
new
JobExecutionException
(
"DeviceCommunicationJob执行超时(>"
+
TOTAL_TASK_TIMEOUT_SEC
+
"秒)"
,
false
// 第二个参数为false:不立即暂停Trigger
);
}
TEquipmentAlarmData
alarm
=
new
TEquipmentAlarmData
();
log
.
info
(
"DeviceCommunicationJob执行成功:fStoreyId={},总耗时={}ms"
,
alarm
.
setfAlarmType
(
"03"
);
storeyIdStr
,
System
.
currentTimeMillis
()
-
taskStartTime
);
alarm
.
setfEquipmentCode
(
fStoreyIdStr
);
alarm
.
setfAlarmData
(
message
);
}
catch
(
JobExecutionException
e
)
{
alarm
.
setfCreateTime
(
new
Date
());
// 主动抛出的任务异常:Quartz会记录日志,但Trigger状态仍为NORMAL(需配置Misfire)
alarmDataService
.
insertTEquipmentAlarmData
(
alarm
);
log
.
error
(
"DeviceCommunicationJob执行异常(已主动抛出,避免Trigger变ERROR):fStoreyId={}"
,
}
catch
(
Exception
e
)
{
storeyIdStr
,
e
);
log
.
error
(
"记录告警失败"
,
e
);
throw
e
;
// 必须抛出,让Quartz感知,但通过第二个参数控制不暂停Trigger
}
catch
(
Throwable
e
)
{
// 未预期的致命异常:包装为JobExecutionException,显式暴露
String
errMsg
=
"DeviceCommunicationJob发生未预期致命异常:fStoreyId="
+
storeyIdStr
;
log
.
error
(
errMsg
,
e
);
recordAlarm
(
null
,
storeyIdStr
,
errMsg
+
":"
+
e
.
getMessage
());
// 抛出时第二个参数设为false,避免Trigger直接变ERROR
throw
new
JobExecutionException
(
errMsg
,
e
,
false
);
}
finally
{
// 关键:无论成功/失败,关闭线程池,避免资源泄漏
if
(
deviceExecutor
!=
null
&&
!
deviceExecutor
.
isShutdown
())
{
deviceExecutor
.
shutdownNow
();
// 强制关闭,中断所有未完成任务
try
{
// 等待线程池关闭,避免JVM退出前资源未释放
if
(!
deviceExecutor
.
awaitTermination
(
5
,
TimeUnit
.
SECONDS
))
{
log
.
warn
(
"线程池关闭超时,强制终止:fStoreyId={}"
,
storeyIdStr
);
}
}
catch
(
InterruptedException
ie
)
{
Thread
.
currentThread
().
interrupt
();
log
.
error
(
"线程池关闭被中断:fStoreyId={}"
,
storeyIdStr
,
ie
);
}
}
log
.
debug
(
"DeviceCommunicationJob最终清理完成:fStoreyId={}"
,
storeyIdStr
);
}
}
}
}
private
void
executeWithFullProtection
(
JobExecutionContext
context
)
{
// 移除所有CompletableFuture和多线程,简化执行流程
String
fStoreyIdStr
=
null
;
try
{
JobDataMap
data
=
context
.
getJobDetail
().
getJobDataMap
();
fStoreyIdStr
=
data
.
getString
(
"fStoreyId"
);
if
(
StringUtils
.
isBlank
(
fStoreyIdStr
))
{
/**
log
.
warn
(
"fStoreyId为空,跳过执行"
);
* 带总超时的核心执行逻辑
return
;
*/
}
private
boolean
executeWithTotalTimeout
(
JobExecutionContext
context
,
ExecutorService
deviceExecutor
,
long
taskStartTime
)
throws
Exception
{
String
storeyIdStr
=
getStoreyIdFromContext
(
context
);
TStoreyInfo
storeyInfo
=
validateAndGetStoreyInfo
(
storeyIdStr
);
Long
fStoreyId
=
Long
.
parseLong
(
fStoreyIdStr
);
// 用自定义线程池执行端口通信(避免混用CompletableFuture默认线程池)
TStoreyInfo
tStoreyInfo
=
tStoreyInfoMapper
.
selectTStoreyInfoById
(
fStoreyId
);
CountDownLatch
portLatch
=
new
CountDownLatch
(
3
);
// 3个端口,计数器3
AtomicInteger
portErrorCount
=
new
AtomicInteger
(
0
);
if
(
tStoreyInfo
==
null
||
StringUtils
.
isBlank
(
tStoreyInfo
.
getfIp
()))
{
// 端口501通信
log
.
warn
(
"设备信息无效,跳过执行"
);
deviceExecutor
.
submit
(()
->
{
return
;
try
{
executeSinglePort
(
storeyInfo
.
getfIp
(),
501
,
Arrays
.
asList
(
1
,
2
,
3
,
4
,
5
,
6
,
7
,
8
,
9
,
10
,
11
,
12
,
13
,
14
,
15
,
16
,
17
,
18
,
19
,
20
,
21
,
22
,
23
,
24
,
25
,
26
,
27
),
storeyIdStr
,
storeyInfo
);
}
catch
(
Exception
e
)
{
portErrorCount
.
incrementAndGet
();
log
.
error
(
"端口501通信异常:fStoreyId={}"
,
storeyIdStr
,
e
);
}
finally
{
portLatch
.
countDown
();
}
}
});
// 简化执行:只执行最基本的通信测试
// 端口502通信
executeSimpleCommunicationTest
(
tStoreyInfo
,
fStoreyId
);
deviceExecutor
.
submit
(()
->
{
try
{
executeSinglePort
(
storeyInfo
.
getfIp
(),
502
,
Arrays
.
asList
(
28
,
29
,
30
,
31
,
32
,
33
,
34
,
35
,
36
,
37
,
38
,
39
,
40
,
41
,
42
,
43
,
44
,
45
,
46
,
47
,
48
,
49
,
50
,
51
,
52
,
53
,
54
),
storeyIdStr
,
storeyInfo
);
}
catch
(
Exception
e
)
{
portErrorCount
.
incrementAndGet
();
log
.
error
(
"端口502通信异常:fStoreyId={}"
,
storeyIdStr
,
e
);
}
finally
{
portLatch
.
countDown
();
}
});
}
catch
(
NumberFormatException
e
)
{
// 端口503通信
log
.
warn
(
"参数格式错误: {}"
,
fStoreyIdStr
,
e
);
deviceExecutor
.
submit
(()
->
{
}
catch
(
Exception
e
)
{
try
{
log
.
error
(
"通信任务执行异常"
,
e
);
executeSinglePort
(
storeyInfo
.
getfIp
(),
503
,
// 不抛出!只记录日志
Arrays
.
asList
(
55
,
56
,
57
,
58
,
59
,
60
,
61
,
62
,
63
,
64
,
65
,
66
,
67
,
68
,
69
,
70
,
71
,
72
),
storeyIdStr
,
storeyInfo
);
}
catch
(
Exception
e
)
{
portErrorCount
.
incrementAndGet
();
log
.
error
(
"端口503通信异常:fStoreyId={}"
,
storeyIdStr
,
e
);
}
finally
{
portLatch
.
countDown
();
}
});
// 等待所有端口完成,带总超时
boolean
allPortCompleted
=
portLatch
.
await
(
SINGLE_PORT_TIMEOUT_SEC
,
TimeUnit
.
SECONDS
);
long
totalCost
=
System
.
currentTimeMillis
()
-
taskStartTime
;
// 检查总耗时是否超任务超时
if
(
totalCost
>
TOTAL_TASK_TIMEOUT_SEC
*
1000
)
{
log
.
warn
(
"任务总耗时超阈值:fStoreyId={},耗时={}ms(阈值={}ms)"
,
storeyIdStr
,
totalCost
,
TOTAL_TASK_TIMEOUT_SEC
*
1000
);
recordAlarm
(
storeyInfo
,
"任务总超时:"
+
totalCost
+
"ms(阈值"
+
TOTAL_TASK_TIMEOUT_SEC
+
"秒)"
);
return
false
;
}
// 检查端口错误数
if
(
portErrorCount
.
get
()
>
0
)
{
String
errMsg
=
"部分端口通信失败:fStoreyId="
+
storeyIdStr
+
",失败端口数="
+
portErrorCount
.
get
();
log
.
error
(
errMsg
);
recordAlarm
(
storeyInfo
,
errMsg
);
// 此处不抛出异常,仅记录告警,避免Trigger变ERROR
}
}
return
allPortCompleted
;
}
}
private
void
executeSimpleCommunicationTest
(
TStoreyInfo
tStoreyInfo
,
Long
fStoreyId
)
{
String
ip
=
tStoreyInfo
.
getfIp
();
// 只测试第一个设备,简化逻辑
/**
* 验证参数并获取设备信息(参数错误直接抛出异常)
*/
private
TStoreyInfo
validateAndGetStoreyInfo
(
String
storeyIdStr
)
throws
JobExecutionException
{
// 1. 校验storeyIdStr
if
(
StringUtils
.
isBlank
(
storeyIdStr
))
{
String
errMsg
=
"fStoreyId参数为空,任务终止"
;
log
.
error
(
errMsg
);
recordAlarm
(
null
,
"unknown"
,
errMsg
);
throw
new
JobExecutionException
(
errMsg
,
false
);
}
// 2. 转换storeyId
Long
storeyId
;
try
{
try
{
ModbusMaster
master
=
createModbusMaster
(
ip
,
501
);
storeyId
=
Long
.
parseLong
(
storeyIdStr
);
int
[]
result
=
readDeviceRegistersOnce
(
master
,
1
);
// 只读第一个设备
}
catch
(
NumberFormatException
e
)
{
master
.
destroy
();
String
errMsg
=
"fStoreyId格式错误:"
+
storeyIdStr
;
log
.
error
(
errMsg
,
e
);
recordAlarm
(
null
,
storeyIdStr
,
errMsg
);
throw
new
JobExecutionException
(
errMsg
,
e
,
false
);
}
log
.
info
(
"设备通信测试成功: fStoreyId={}, 状态值={}"
,
fStoreyId
,
// 3. 查询设备信息
result
.
length
>
1
?
result
[
1
]
:
"无数据"
);
TStoreyInfo
storeyInfo
=
tStoreyInfoMapper
.
selectTStoreyInfoById
(
storeyId
);
if
(
storeyInfo
==
null
)
{
String
errMsg
=
"未查询到设备信息:fStoreyId="
+
storeyId
;
log
.
error
(
errMsg
);
recordAlarm
(
null
,
storeyIdStr
,
errMsg
);
// 清理无效任务(避免后续重复执行)
throw
new
JobExecutionException
(
errMsg
,
false
);
}
}
catch
(
Exception
e
)
{
// 4. 校验设备IP
log
.
warn
(
"设备通信测试失败: fStoreyId={}, IP={}"
,
fStoreyId
,
ip
,
e
);
if
(
StringUtils
.
isBlank
(
storeyInfo
.
getfIp
()))
{
// 不抛出异常!
String
errMsg
=
"设备IP为空:fStoreyId="
+
storeyId
;
log
.
error
(
errMsg
);
recordAlarm
(
storeyInfo
,
errMsg
);
throw
new
JobExecutionException
(
errMsg
,
false
);
}
}
return
storeyInfo
;
}
}
private
void
executeJobSafely
(
JobExecutionContext
context
)
{
try
{
// 1. 提取参数
JobDataMap
data
=
context
.
getJobDetail
().
getJobDataMap
();
String
fStoreyIdStr
=
data
.
getString
(
"fStoreyId"
);
if
(
StringUtils
.
isBlank
(
fStoreyIdStr
))
{
/**
log
.
warn
(
"fStoreyId参数为空"
);
* 单个端口通信(含设备级超时和重试)
return
;
*/
}
private
void
executeSinglePort
(
String
ip
,
int
port
,
List
<
Integer
>
deviceIds
,
String
storeyIdStr
,
TStoreyInfo
storeyInfo
)
throws
Exception
{
log
.
info
(
"开始端口通信:ip={},port={},设备数={},fStoreyId={}"
,
ip
,
port
,
deviceIds
.
size
(),
storeyIdStr
);
Long
fStoreyId
=
Long
.
parseLong
(
fStoreyIdStr
);
CountDownLatch
deviceLatch
=
new
CountDownLatch
(
deviceIds
.
size
()
);
TStoreyInfo
tStoreyInfo
=
tStoreyInfoMapper
.
selectTStoreyInfoById
(
fStoreyId
);
AtomicInteger
deviceErrorCount
=
new
AtomicInteger
(
0
);
if
(
tStoreyInfo
==
null
||
StringUtils
.
isBlank
(
tStoreyInfo
.
getfIp
()))
{
for
(
int
deviceId
:
deviceIds
)
{
log
.
warn
(
"设备信息无效: fStoreyId={}"
,
fStoreyId
);
int
finalDeviceId
=
deviceId
;
return
;
// 每个设备用独立线程执行,带超时
}
Executors
.
newSingleThreadExecutor
(
r
->
new
Thread
(
r
,
"modbus-device-"
+
storeyIdStr
+
"-"
+
port
+
"-"
+
finalDeviceId
)
).
submit
(()
->
{
try
{
// 单个设备通信(带超时)
executeSingleDeviceWithTimeout
(
ip
,
port
,
finalDeviceId
,
storeyIdStr
);
}
catch
(
Exception
e
)
{
deviceErrorCount
.
incrementAndGet
();
log
.
error
(
"设备通信异常:ip={},port={},deviceId={},fStoreyId={}"
,
ip
,
port
,
finalDeviceId
,
storeyIdStr
,
e
);
recordAlarm
(
storeyInfo
,
"设备"
+
finalDeviceId
+
"通信异常:"
+
e
.
getMessage
());
}
finally
{
deviceLatch
.
countDown
();
}
});
}
// 2. 顺序执行三个端口(不要并行)
// 等待该端口所有设备完成,带超时
executePortSequentially
(
tStoreyInfo
,
fStoreyId
);
boolean
allDeviceCompleted
=
deviceLatch
.
await
(
SINGLE_DEVICE_TIMEOUT_SEC
*
deviceIds
.
size
()
/
10
,
TimeUnit
.
SECONDS
);
if
(!
allDeviceCompleted
)
{
String
errMsg
=
"端口"
+
port
+
"部分设备超时:ip="
+
ip
+
",未完成设备数="
+
deviceLatch
.
getCount
()
+
",fStoreyId="
+
storeyIdStr
;
log
.
error
(
errMsg
);
recordAlarm
(
storeyInfo
,
errMsg
);
throw
new
Exception
(
errMsg
);
// 抛出异常,标记该端口失败
}
}
catch
(
Exception
e
)
{
if
(
deviceErrorCount
.
get
()
>
0
)
{
log
.
error
(
"executeJobSafely内部异常"
,
e
);
String
errMsg
=
"端口"
+
port
+
"部分设备通信失败:ip="
+
ip
+
",失败数="
+
deviceErrorCount
.
get
()
+
",fStoreyId="
+
storeyIdStr
;
// 这里不抛出异常,只是记录日志
log
.
error
(
errMsg
);
throw
new
Exception
(
errMsg
);
// 抛出异常,标记该端口失败
}
}
log
.
info
(
"端口通信完成:ip={},port={},fStoreyId={}"
,
ip
,
port
,
storeyIdStr
);
}
}
private
void
executePortSequentially
(
TStoreyInfo
tStoreyInfo
,
Long
fStoreyId
)
{
String
ip
=
tStoreyInfo
.
getfIp
();
// 顺序执行,避免并发问题
/**
executeSinglePortSafely
(
ip
,
501
,
getOffsets501
(),
fStoreyId
,
tStoreyInfo
);
* 单个设备通信(带超时和重试)
executeSinglePortSafely
(
ip
,
502
,
getOffsets502
(),
fStoreyId
,
tStoreyInfo
);
*/
executeSinglePortSafely
(
ip
,
503
,
getOffsets503
(),
fStoreyId
,
tStoreyInfo
);
private
void
executeSingleDeviceWithTimeout
(
String
ip
,
int
port
,
int
deviceId
,
String
storeyIdStr
)
throws
Exception
{
// 单个设备通信超时:SINGLE_DEVICE_TIMEOUT_SEC秒
Future
<?>
deviceFuture
=
Executors
.
newSingleThreadExecutor
().
submit
(()
->
{
try
{
// 带重试的设备通信
int
[]
result
=
readDeviceWithRetry
(
ip
,
port
,
deviceId
,
storeyIdStr
);
// 处理结果(若有)
if
(
resultHandler
!=
null
)
{
resultHandler
.
accept
(
new
DeviceStatusReaderDto
(
ip
,
port
,
deviceId
,
result
));
}
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
"设备"
+
deviceId
+
"通信失败"
,
e
);
}
});
try
{
deviceFuture
.
get
(
SINGLE_DEVICE_TIMEOUT_SEC
,
TimeUnit
.
SECONDS
);
}
catch
(
TimeoutException
e
)
{
deviceFuture
.
cancel
(
true
);
String
errMsg
=
"设备"
+
deviceId
+
"通信超时(>"
+
SINGLE_DEVICE_TIMEOUT_SEC
+
"秒):ip="
+
ip
+
",port="
+
port
;
throw
new
Exception
(
errMsg
,
e
);
}
catch
(
Exception
e
)
{
String
errMsg
=
"设备"
+
deviceId
+
"通信异常:ip="
+
ip
+
",port="
+
port
;
throw
new
Exception
(
errMsg
,
e
);
}
}
}
private
void
executeSinglePortSafely
(
String
ip
,
int
port
,
List
<
Integer
>
deviceIds
,
/**
Long
fStoreyId
,
TStoreyInfo
tStoreyInfo
)
{
* 带重试的设备寄存器读取(取消Modbus内置重试,统一自定义重试)
log
.
info
(
"开始执行端口{}: ip={}, 设备数={}"
,
port
,
ip
,
deviceIds
.
size
());
*/
private
int
[]
readDeviceWithRetry
(
String
ip
,
int
port
,
int
deviceId
,
String
storeyIdStr
)
throws
Exception
{
ModbusMaster
master
=
null
;
int
[]
result
=
null
;
boolean
readSuccess
=
false
;
for
(
int
deviceId
:
deviceIds
)
{
for
(
int
retry
=
0
;
retry
<=
CUSTOM_RETRY_TIMES
;
retry
++
)
{
try
{
try
{
executeSingleDeviceSafely
(
ip
,
port
,
deviceId
,
fStoreyId
,
tStoreyInfo
);
// 1. 创建Modbus连接(取消内置重试,master.setRetries(0))
// 每个设备执行后短暂休息,避免资源竞争
master
=
createModbusMaster
(
ip
,
port
);
Thread
.
sleep
(
50
);
// 2. 读取寄存器
result
=
readDeviceRegisters
(
master
,
deviceId
);
// 3. 检查结果(若有停止条件)
Predicate
<
int
[]>
stopCondition
=
ModbusResultHandler
.
createDefaultStopCondition
();
if
(
stopCondition
!=
null
&&
stopCondition
.
test
(
result
))
{
readSuccess
=
true
;
log
.
debug
(
"设备读取成功(满足停止条件):ip={},port={},deviceId={},重试次数={},fStoreyId={}"
,
ip
,
port
,
deviceId
,
retry
,
storeyIdStr
);
break
;
}
else
if
(
retry
<
CUSTOM_RETRY_TIMES
)
{
log
.
debug
(
"设备读取结果不满足条件,准备重试:ip={},port={},deviceId={},重试次数={},fStoreyId={}"
,
ip
,
port
,
deviceId
,
retry
,
storeyIdStr
);
Thread
.
sleep
(
RETRY_DELAY_MS
);
}
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
log
.
error
(
"设备{}执行异常: ip={}, port={}"
,
deviceId
,
ip
,
port
,
e
);
if
(
retry
<
CUSTOM_RETRY_TIMES
)
{
// 继续执行下一个设备,不中断整个任务
log
.
warn
(
"设备读取异常,准备重试:ip={},port={},deviceId={},重试次数={},fStoreyId={}"
,
ip
,
port
,
deviceId
,
retry
,
storeyIdStr
,
e
);
Thread
.
sleep
(
RETRY_DELAY_MS
);
}
else
{
String
errMsg
=
"设备读取重试耗尽(共"
+
CUSTOM_RETRY_TIMES
+
"次):ip="
+
ip
+
",port="
+
port
+
",deviceId="
+
deviceId
;
log
.
error
(
errMsg
,
e
);
throw
new
Exception
(
errMsg
,
e
);
// 重试耗尽,抛出异常
}
}
finally
{
// 每次重试后销毁连接,避免资源泄漏
destroyModbusMaster
(
master
,
deviceId
);
}
}
}
}
log
.
info
(
"端口{}执行完成: ip={}"
,
port
,
ip
);
if
(!
readSuccess
)
{
String
errMsg
=
"设备读取未满足停止条件(重试"
+
CUSTOM_RETRY_TIMES
+
"次):ip="
+
ip
+
",port="
+
port
+
",deviceId="
+
deviceId
;
log
.
error
(
errMsg
);
throw
new
Exception
(
errMsg
);
}
return
result
;
}
}
private
void
executeSingleDeviceSafely
(
String
ip
,
int
port
,
int
deviceId
,
// -------------------------- Modbus工具方法(显式抛出异常)--------------------------
Long
fStoreyId
,
TStoreyInfo
tStoreyInfo
)
{
/**
ModbusMaster
master
=
null
;
* 创建Modbus连接(取消内置重试,统一自定义重试)
try
{
*/
// 创建连接
private
ModbusMaster
createModbusMaster
(
String
ip
,
int
port
)
throws
ModbusInitException
{
master
=
createModbusMaster
(
ip
,
port
);
IpParameters
params
=
new
IpParameters
();
params
.
setHost
(
ip
);
params
.
setPort
(
port
);
// 读取数据(简化重试逻辑)
TcpMaster
master
=
(
TcpMaster
)
modbusFactory
.
createTcpMaster
(
params
,
true
);
int
[]
result
=
readDeviceRegistersOnce
(
master
,
deviceId
);
master
.
setTimeout
(
MODBUS_CONN_TIMEOUT_MS
);
master
.
setRetries
(
0
);
// 取消内置重试,避免和自定义重试冲突
master
.
init
();
log
.
debug
(
"Modbus连接创建成功:ip={},port={},master={}"
,
ip
,
port
,
master
);
return
master
;
}
// 处理结果
/**
if
(
resultHandler
!=
null
)
{
* 读取设备寄存器(异常直接抛出)
resultHandler
.
accept
(
new
DeviceStatusReaderDto
(
ip
,
port
,
deviceId
,
result
));
*/
}
private
int
[]
readDeviceRegisters
(
ModbusMaster
master
,
int
deviceId
)
throws
ModbusTransportException
{
ReadHoldingRegistersRequest
request
=
Modbus4jUtils
.
getReadHoldingRegistersRequest
(
deviceId
,
REG_START_ADDR
,
REG_READ_COUNT
);
ModbusResponse
response
=
master
.
send
(
request
);
log
.
debug
(
"设备{}通信成功: ip={}, port={}"
,
deviceId
,
ip
,
port
);
if
(!(
response
instanceof
ReadHoldingRegistersResponse
))
{
throw
new
IllegalArgumentException
(
"无效Modbus响应类型:"
+
response
.
getClass
().
getName
()
+
",deviceId="
+
deviceId
);
}
ReadHoldingRegistersResponse
regResp
=
(
ReadHoldingRegistersResponse
)
response
;
short
[]
signedVals
=
regResp
.
getShortData
();
int
[]
unsignedVals
=
new
int
[
signedVals
.
length
];
for
(
int
i
=
0
;
i
<
signedVals
.
length
;
i
++)
{
unsignedVals
[
i
]
=
signedVals
[
i
]
&
0xFFFF
;
// 转换为无符号整数
}
log
.
trace
(
"设备寄存器读取结果:deviceId={},值={}"
,
deviceId
,
Arrays
.
toString
(
unsignedVals
));
return
unsignedVals
;
}
/**
* 销毁Modbus连接(反射失败直接抛出异常,显式暴露问题)
*/
private
void
destroyModbusMaster
(
ModbusMaster
master
,
int
deviceId
)
{
if
(
master
==
null
)
return
;
try
{
// 反射获取Socket(若失败,直接抛异常,不隐藏)
Socket
socket
=
getUnderlyingSocket
(
master
);
if
(
socket
!=
null
&&
!
socket
.
isClosed
())
{
socket
.
close
();
log
.
trace
(
"设备{}:Modbus底层Socket已关闭"
,
deviceId
);
}
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
log
.
warn
(
"设备{}通信失败: ip={}, port={}, 错误: {}"
,
String
errMsg
=
"设备"
+
deviceId
+
":销毁Modbus Socket异常(可能导致连接泄漏)"
;
deviceId
,
ip
,
port
,
e
.
getMessage
());
log
.
error
(
errMsg
,
e
);
// 不抛出异常,只记录警告
// 此处不抛出异常(避免中断销毁流程),但记录严重告警
recordAlarm
(
null
,
String
.
valueOf
(
deviceId
),
errMsg
+
":"
+
e
.
getMessage
());
}
finally
{
}
finally
{
// 确保资源释放
try
{
if
(
master
!=
null
)
{
master
.
destroy
();
try
{
log
.
trace
(
"设备{}:ModbusMaster已销毁"
,
deviceId
);
master
.
destroy
();
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
log
.
error
(
"设备{}:销毁ModbusMaster异常"
,
deviceId
,
e
);
log
.
debug
(
"释放Modbus连接异常: deviceId={}"
,
deviceId
,
e
);
}
}
}
}
}
}
}
/**
/**
*
简化版:只读取一次,不重试
*
反射获取TcpMaster的Socket(失败直接抛出异常,显式暴露问题)
*/
*/
private
int
[]
readDeviceRegistersOnce
(
ModbusMaster
master
,
int
deviceId
)
private
Socket
getUnderlyingSocket
(
ModbusMaster
master
)
throws
Exception
{
throws
ModbusTransportException
{
if
(!(
master
instanceof
TcpMaster
))
{
ReadHoldingRegistersRequest
request
=
throw
new
IllegalArgumentException
(
"ModbusMaster不是TcpMaster类型:"
+
master
.
getClass
().
getName
());
Modbus4jUtils
.
getReadHoldingRegistersRequest
(
deviceId
,
START_ADDRESS
,
REGISTER_COUNT
);
ModbusResponse
response
=
master
.
send
(
request
);
if
(!(
response
instanceof
ReadHoldingRegistersResponse
))
{
throw
new
IllegalArgumentException
(
"Invalid response type"
);
}
}
ReadHoldingRegistersResponse
regResponse
=
(
ReadHoldingRegistersResponse
)
response
;
TcpMaster
tcpMaster
=
(
TcpMaster
)
master
;
short
[]
signedValues
=
regResponse
.
getShortData
();
try
{
// 反射获取socket字段(根据你的TcpMaster源码确认字段名,必须正确)
Field
socketField
=
TcpMaster
.
class
.
getDeclaredField
(
"socket"
);
socketField
.
setAccessible
(
true
);
Socket
socket
=
(
Socket
)
socketField
.
get
(
tcpMaster
);
// 转换为无符号整数
if
(
socket
==
null
)
{
int
[]
unsignedValues
=
new
int
[
signedValues
.
length
];
log
.
warn
(
"TcpMaster的socket字段为null(未建立连接)"
);
for
(
int
i
=
0
;
i
<
signedValues
.
length
;
i
++)
{
}
unsignedValues
[
i
]
=
signedValues
[
i
]
&
0xFFFF
;
return
socket
;
}
catch
(
NoSuchFieldException
e
)
{
throw
new
Exception
(
"反射获取Socket失败:TcpMaster中不存在'socket'字段(版本不匹配)"
,
e
);
}
catch
(
IllegalAccessException
e
)
{
throw
new
Exception
(
"反射获取Socket失败:无访问权限(可能被安全管理器拦截)"
,
e
);
}
}
return
unsignedValues
;
}
}
private
void
recordAlarm
(
JobExecutionContext
context
,
String
alarmData
)
{
// -------------------------- 辅助方法(日志/告警)--------------------------
/**
* 从JobContext中获取fStoreyId(失败返回unknown)
*/
private
String
getStoreyIdFromContext
(
JobExecutionContext
context
)
{
try
{
try
{
JobDataMap
data
=
context
.
getJobDetail
().
getJobDataMap
();
JobDataMap
data
=
context
.
getJobDetail
().
getJobDataMap
();
String
fStoreyIdStr
=
data
.
getString
(
"fStoreyId"
);
return
data
!=
null
?
data
.
getString
(
"fStoreyId"
)
:
"unknown"
;
}
catch
(
Exception
e
)
{
log
.
error
(
"从JobContext获取fStoreyId失败"
,
e
);
return
"unknown"
;
}
}
/**
* 记录告警(兼容设备信息为空的场景)
*/
private
void
recordAlarm
(
TStoreyInfo
storeyInfo
,
String
equipmentCode
,
String
alarmData
)
{
try
{
TEquipmentAlarmData
alarm
=
new
TEquipmentAlarmData
();
TEquipmentAlarmData
alarm
=
new
TEquipmentAlarmData
();
alarm
.
setfAlarmType
(
"03"
);
alarm
.
setfAlarmType
(
"03"
);
// 老化层告警
alarm
.
setfEquipmentCode
(
fStoreyIdStr
!=
null
?
fStoreyIdStr
:
"unknown"
);
alarm
.
setfEquipmentCode
(
storeyInfo
!=
null
?
storeyInfo
.
getfStoreyCode
()
:
equipmentCode
);
alarm
.
setfAlarmData
(
alarmData
);
alarm
.
setfAlarmData
(
alarmData
);
alarm
.
setfCreateTime
(
new
Date
());
alarm
.
setfCreateTime
(
new
Date
());
alarmDataService
.
insertTEquipmentAlarmData
(
alarm
);
alarmDataService
.
insertTEquipmentAlarmData
(
alarm
);
log
.
debug
(
"告警记录成功:设备编码={},内容={}"
,
alarm
.
getfEquipmentCode
(),
alarmData
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
log
.
error
(
"
记录告警失败"
,
e
);
log
.
error
(
"
告警记录失败:设备编码={},内容={}"
,
equipmentCode
,
alarmData
,
e
);
}
}
}
}
// 设备列表
private
List
<
Integer
>
getOffsets501
()
{
return
Arrays
.
asList
(
1
,
2
,
3
,
4
,
5
,
6
,
7
,
8
,
9
,
10
,
11
,
12
,
13
,
14
,
15
,
16
,
17
,
18
,
19
,
20
,
21
,
22
,
23
,
24
,
25
,
26
,
27
);
}
private
List
<
Integer
>
getOffsets502
()
{
return
Arrays
.
asList
(
28
,
29
,
30
,
31
,
32
,
33
,
34
,
35
,
36
,
37
,
38
,
39
,
40
,
41
,
42
,
43
,
44
,
45
,
46
,
47
,
48
,
49
,
50
,
51
,
52
,
53
,
54
);
}
private
List
<
Integer
>
getOffsets503
()
{
return
Arrays
.
asList
(
55
,
56
,
57
,
58
,
59
,
60
,
61
,
62
,
63
,
64
,
65
,
66
,
67
,
68
,
69
,
70
,
71
,
72
);
}
/**
/**
*
创建新的Modbus连接
*
简化告警记录(设备信息非空时)
*/
*/
private
ModbusMaster
createModbusMaster
(
String
ip
,
int
port
)
throws
ModbusInitException
{
private
void
recordAlarm
(
TStoreyInfo
storeyInfo
,
String
alarmData
)
{
IpParameters
params
=
new
IpParameters
();
recordAlarm
(
storeyInfo
,
storeyInfo
!=
null
?
storeyInfo
.
getfStoreyCode
()
:
"unknown"
,
alarmData
);
params
.
setHost
(
ip
);
params
.
setPort
(
port
);
// 显式声明为TcpMaster,避免后续反射类型转换问题
TcpMaster
master
=
(
TcpMaster
)
modbusFactory
.
createTcpMaster
(
params
,
true
);
master
.
setTimeout
(
3000
);
// 3秒连接超时
master
.
setRetries
(
1
);
// 1次重试
master
.
init
();
log
.
debug
(
"Modbus连接创建成功:IP={}, 端口={}, Master={}"
,
ip
,
port
,
master
);
return
master
;
}
}
}
}
\ No newline at end of file
zhmes-agecal-system/src/main/java/com/zehong/system/task/QuartzJobListener.java
View file @
db7ad245
...
@@ -3,7 +3,6 @@ package com.zehong.system.task;
...
@@ -3,7 +3,6 @@ package com.zehong.system.task;
import
org.quartz.JobExecutionContext
;
import
org.quartz.JobExecutionContext
;
import
org.quartz.JobExecutionException
;
import
org.quartz.JobExecutionException
;
import
org.quartz.JobListener
;
import
org.quartz.JobListener
;
import
org.quartz.SchedulerException
;
import
org.slf4j.Logger
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Component
;
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment