Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
Z
zhmes-agecal
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
耿迪迪
zhmes-agecal
Commits
3214dfa3
Commit
3214dfa3
authored
Sep 24, 2025
by
wanghao
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Revert "1 测试 上电后通信 和 最终完成 定时任务功能"
parent
8f2416a0
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
139 additions
and
233 deletions
+139
-233
DeviceStatusReaderAndTimeSetter.java
...stem/modbus/business/DeviceStatusReaderAndTimeSetter.java
+34
-75
DeviceCommunicationJob.java
...n/java/com/zehong/system/task/DeviceCommunicationJob.java
+91
-139
DeviceTaskScheduler.java
...main/java/com/zehong/system/task/DeviceTaskScheduler.java
+14
-19
No files found.
zhmes-agecal-system/src/main/java/com/zehong/system/modbus/business/DeviceStatusReaderAndTimeSetter.java
View file @
3214dfa3
...
...
@@ -20,8 +20,11 @@ import javax.annotation.Resource;
import
java.lang.reflect.Field
;
import
java.net.Socket
;
import
java.util.List
;
import
java.util.concurrent.*
;
import
java.util.concurrent.atomic.AtomicReference
;
import
java.util.concurrent.CountDownLatch
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.atomic.AtomicInteger
;
import
java.util.function.Consumer
;
import
java.util.function.Predicate
;
...
...
@@ -39,29 +42,9 @@ public class DeviceStatusReaderAndTimeSetter {
public
static
final
int
TARGET_VALUE
=
1
;
public
static
final
int
MAX_RETRIES
=
1
;
// 仅1次重试(减少总耗时)
public
static
final
int
RETRY_DELAY
=
200
;
// 重试间隔200ms(缩短等待)
// 线程池:Spring管理,非静态(避免资源泄漏)
private
final
ExecutorService
deviceExecutor
;
// 构造函数:初始化线程池(核心线程=设备数/10,避免过多)
public
DeviceStatusReaderAndTimeSetter
()
{
this
.
deviceExecutor
=
new
ThreadPoolExecutor
(
4
,
// 核心线程4个(72个设备,4线程,每线程处理18个,效率适中)
8
,
// 最大线程8个(峰值扩容)
60
,
TimeUnit
.
SECONDS
,
new
LinkedBlockingQueue
<>(
30
),
// 队列30个,避免任务堆积
new
ThreadFactory
()
{
private
int
count
=
0
;
@Override
public
Thread
newThread
(
Runnable
r
)
{
Thread
thread
=
new
Thread
(
r
,
"modbus-device-pool-"
+
(
count
++));
thread
.
setDaemon
(
true
);
// 守护线程,不阻塞JVM退出
return
thread
;
}
},
new
ThreadPoolExecutor
.
CallerRunsPolicy
()
// 队列满时调用线程执行(避免任务丢失)
);
}
// 简化线程池配置
private
final
ExecutorService
deviceExecutor
=
Executors
.
newFixedThreadPool
(
4
);
// 工厂(单例)
private
static
final
ModbusFactory
modbusFactory
=
new
ModbusFactory
();
...
...
@@ -234,76 +217,52 @@ public class DeviceStatusReaderAndTimeSetter {
Consumer
<
DeviceStatusReaderDto
>
resultHandler
,
Predicate
<
int
[]>
stopCondition
,
int
portTimeout
)
{
if
(
deviceIds
==
null
||
deviceIds
.
isEmpty
())
{
log
.
warn
(
"设备ID列表为空,不执行监控:IP={}, 端口={}"
,
ip
,
port
);
return
;
}
if
(
deviceIds
==
null
||
deviceIds
.
isEmpty
())
return
;
final
CountDownLatch
latch
=
new
CountDownLatch
(
deviceIds
.
size
());
log
.
info
(
"启动端口{}监控:IP={},设备数={},超时={}s"
,
port
,
ip
,
deviceIds
.
size
(),
portTimeout
);
CountDownLatch
latch
=
new
CountDownLatch
(
deviceIds
.
size
());
AtomicInteger
successCount
=
new
AtomicInteger
(
0
);
for
(
int
deviceId
:
deviceIds
)
{
final
int
devId
=
deviceId
;
deviceExecutor
.
submit
(()
->
{
// 关键:threadMaster声明为普通ModbusMaster,无需包装
final
ModbusMaster
[]
threadMaster
=
{
null
};
ExecutorService
devExecutor
=
null
;
// 临时线程池,避免泄漏
try
{
// 单个设备超时(端口总超时/设备数,确保不超总时间)
int
devTimeout
=
Math
.
max
(
1
,
portTimeout
*
1000
/
deviceIds
.
size
());
// 创建临时线程池(处理单个设备超时)
devExecutor
=
Executors
.
newSingleThreadExecutor
();
Future
<?>
devFuture
=
devExecutor
.
submit
(()
->
{
try
{
// 1. 创建独立连接(直接赋值,无需get())
threadMaster
[
0
]
=
createModbusMaster
(
ip
,
port
);
// 2. 读取数据(直接传threadMaster,无需get())
int
[]
result
=
readWithConditionalRetry
(
threadMaster
[
0
],
ip
,
port
,
devId
,
stopCondition
);
// 3. 回调结果
if
(
resultHandler
!=
null
)
{
resultHandler
.
accept
(
new
DeviceStatusReaderDto
(
ip
,
port
,
devId
,
result
));
}
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
"设备"
+
devId
+
"执行异常"
,
e
);
}
});
// 单个设备超时等待
devFuture
.
get
(
devTimeout
,
TimeUnit
.
MILLISECONDS
);
}
catch
(
TimeoutException
e
)
{
String
errMsg
=
"设备"
+
devId
+
"超时:IP="
+
ip
+
", 端口="
+
port
;
log
.
error
(
errMsg
,
e
);
recordAlarm
(
"03"
,
"ip:"
+
ip
+
",port:"
+
port
+
",deviceId:"
+
devId
,
errMsg
);
// 简化设备通信逻辑
executeSingleDevice
(
ip
,
port
,
deviceId
,
resultHandler
,
stopCondition
);
successCount
.
incrementAndGet
();
}
catch
(
Exception
e
)
{
String
errMsg
=
"设备"
+
devId
+
"执行异常:IP="
+
ip
+
", 端口="
+
port
;
log
.
error
(
errMsg
,
e
);
recordAlarm
(
"03"
,
"ip:"
+
ip
+
",port:"
+
port
+
",deviceId:"
+
devId
,
errMsg
+
":"
+
e
.
getMessage
());
log
.
error
(
"设备通信失败: ip={}, port={}, deviceId={}"
,
ip
,
port
,
deviceId
,
e
);
}
finally
{
// 1. 销毁Modbus连接(直接传threadMaster,无需get())
destroyModbusMaster
(
threadMaster
[
0
],
devId
);
// 2. 关闭临时线程池(避免资源泄漏)
if
(
devExecutor
!=
null
&&
!
devExecutor
.
isShutdown
())
{
devExecutor
.
shutdownNow
();
}
// 3. 计数减1
latch
.
countDown
();
log
.
debug
(
"设备{}完成:IP={}, 端口={},剩余任务={}"
,
devId
,
ip
,
port
,
latch
.
getCount
());
}
});
}
// 端口总超时等待(确保不阻塞上层Job)
try
{
// 等待所有设备完成或超时
if
(!
latch
.
await
(
portTimeout
,
TimeUnit
.
SECONDS
))
{
String
errMsg
=
"端口"
+
port
+
"部分设备超时:IP="
+
ip
+
",未完成="
+
latch
.
getCount
();
log
.
warn
(
errMsg
);
recordAlarm
(
"03"
,
"ip:"
+
ip
+
",port:"
+
port
,
errMsg
);
log
.
warn
(
"端口通信超时: ip={}, port={}, 完成数={}/{}"
,
ip
,
port
,
successCount
.
get
(),
deviceIds
.
size
());
}
}
catch
(
InterruptedException
e
)
{
log
.
error
(
"端口{}监控被中断:IP={}"
,
port
,
ip
,
e
);
Thread
.
currentThread
().
interrupt
();
}
}
private
void
executeSingleDevice
(
String
ip
,
int
port
,
int
deviceId
,
Consumer
<
DeviceStatusReaderDto
>
resultHandler
,
Predicate
<
int
[]>
stopCondition
)
{
ModbusMaster
master
=
null
;
try
{
master
=
createModbusMaster
(
ip
,
port
);
int
[]
result
=
readWithConditionalRetry
(
master
,
ip
,
port
,
deviceId
,
stopCondition
);
if
(
resultHandler
!=
null
)
{
resultHandler
.
accept
(
new
DeviceStatusReaderDto
(
ip
,
port
,
deviceId
,
result
));
}
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
"设备通信异常"
,
e
);
}
finally
{
destroyModbusMaster
(
master
,
deviceId
);
}
}
/**
* 统一告警记录(抽离避免重复代码)
*/
...
...
zhmes-agecal-system/src/main/java/com/zehong/system/task/DeviceCommunicationJob.java
View file @
3214dfa3
...
...
@@ -30,13 +30,9 @@ import java.util.function.Predicate;
public
class
DeviceCommunicationJob
implements
Job
{
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
DeviceCommunicationJob
.
class
);
// 常量:超时控制(确保总耗时 < 5分钟Cron周期)
private
static
final
int
SINGLE_PORT_TIMEOUT
=
5
;
// 单个端口通信超时(秒)
private
static
final
int
JOB_TOTAL_TIMEOUT
=
15
;
// Job总超时(秒)
// 设备ID列表(拆分:按端口分组,避免单次任务过多)
private
static
final
List
<
Integer
>
OFFSETS_501
=
Arrays
.
asList
(
1
,
2
,
3
,
4
,
5
,
6
,
7
,
8
,
9
,
10
,
11
,
12
,
13
,
14
,
15
,
16
,
17
,
18
,
19
,
20
,
21
,
22
,
23
,
24
,
25
,
26
,
27
);
private
static
final
List
<
Integer
>
OFFSETS_502
=
Arrays
.
asList
(
28
,
29
,
30
,
31
,
32
,
33
,
34
,
35
,
36
,
37
,
38
,
39
,
40
,
41
,
42
,
43
,
44
,
45
,
46
,
47
,
48
,
49
,
50
,
51
,
52
,
53
,
54
);
private
static
final
List
<
Integer
>
OFFSETS_503
=
Arrays
.
asList
(
55
,
56
,
57
,
58
,
59
,
60
,
61
,
62
,
63
,
64
,
65
,
66
,
67
,
68
,
69
,
70
,
71
,
72
);
// 超时控制(确保在Cron周期内完成)
private
static
final
int
TOTAL_TIMEOUT_SECONDS
=
120
;
// 2分钟总超时
private
static
final
int
PORT_TIMEOUT_SECONDS
=
30
;
// 单个端口30秒超时
@Resource
private
ITEquipmentAlarmDataService
alarmDataService
;
...
...
@@ -48,176 +44,132 @@ public class DeviceCommunicationJob implements Job {
@Autowired
private
ModbusResultHandler
resultHandler
;
// 单个端口通信超时时间(确保总时间 < Cron周期)
private
static
final
int
PORT_TIMEOUT_SECONDS
=
8
;
// 临时线程池(避免静态线程池关闭问题)
private
final
ExecutorService
portExecutor
=
Executors
.
newFixedThreadPool
(
3
);
// 3个端口,3个线程
@Override
public
void
execute
(
JobExecutionContext
context
)
{
CompletableFuture
<
Void
>
taskFuture
=
null
;
try
{
// 1. 包装整个任务在CompletableFuture中,便于超时控制
taskFuture
=
CompletableFuture
.
runAsync
(()
->
{
executeInternal
(
context
);
});
// 2. 设置总超时,避免任务无限期执行
taskFuture
.
get
(
TOTAL_TIMEOUT_SECONDS
,
TimeUnit
.
SECONDS
);
}
catch
(
TimeoutException
e
)
{
log
.
error
(
"任务执行超时,强制取消"
);
if
(
taskFuture
!=
null
)
{
taskFuture
.
cancel
(
true
);
}
// 记录超时告警,但不抛出异常到Quartz
recordTimeoutAlarm
(
context
);
}
catch
(
Exception
e
)
{
log
.
error
(
"任务执行异常"
,
e
);
// 记录异常,但不传播到Quartz
}
}
private
void
executeInternal
(
JobExecutionContext
context
)
{
long
startTime
=
System
.
currentTimeMillis
();
String
fStoreyIdStr
=
null
;
Long
fStoreyId
=
null
;
TStoreyInfo
tStoreyInfo
=
null
;
String
ip
=
null
;
try
{
log
.
info
(
"=== DeviceCommunicationJob 启动:线程={},开始时间={} ==="
,
Thread
.
currentThread
().
getName
(),
new
Date
(
startTime
));
// 1. 提取参数(严格校验,避免后续异常)
// 参数提取和验证
JobDataMap
data
=
context
.
getJobDetail
().
getJobDataMap
();
if
(
data
==
null
)
{
String
errMsg
=
"JobDataMap为空,终止执行"
;
log
.
error
(
errMsg
);
recordAlarm
(
null
,
"fStoreyId未知"
,
errMsg
);
return
;
}
fStoreyIdStr
=
data
.
getString
(
"fStoreyId"
);
if
(
StringUtils
.
isBlank
(
fStoreyIdStr
))
{
String
errMsg
=
"fStoreyId参数为空,终止执行"
;
log
.
error
(
errMsg
);
recordAlarm
(
null
,
"fStoreyId为空"
,
errMsg
);
log
.
warn
(
"fStoreyId参数为空,跳过执行"
);
return
;
}
// 2. 转换参数(处理格式错误)
try
{
fStoreyId
=
Long
.
parseLong
(
fStoreyIdStr
);
}
catch
(
NumberFormatException
e
)
{
String
errMsg
=
"fStoreyId格式错误:"
+
fStoreyIdStr
;
log
.
error
(
errMsg
,
e
);
recordAlarm
(
null
,
fStoreyIdStr
,
errMsg
+
":"
+
e
.
getMessage
());
return
;
}
Long
fStoreyId
=
Long
.
parseLong
(
fStoreyIdStr
);
TStoreyInfo
tStoreyInfo
=
tStoreyInfoMapper
.
selectTStoreyInfoById
(
fStoreyId
);
// 3. 查询设备信息(捕获数据库异常,避免穿透)
try
{
tStoreyInfo
=
tStoreyInfoMapper
.
selectTStoreyInfoById
(
fStoreyId
);
}
catch
(
Exception
e
)
{
String
errMsg
=
"查询设备信息异常:fStoreyId="
+
fStoreyId
;
log
.
error
(
errMsg
,
e
);
recordAlarm
(
null
,
fStoreyIdStr
,
errMsg
+
":"
+
e
.
getMessage
());
return
;
}
if
(
tStoreyInfo
==
null
)
{
String
errMsg
=
"未查询到设备信息:fStoreyId="
+
fStoreyId
;
log
.
error
(
errMsg
);
recordAlarm
(
null
,
fStoreyIdStr
,
errMsg
);
// 清理无效任务(避免后续重复执行)
try
{
context
.
getScheduler
().
deleteJob
(
context
.
getJobDetail
().
getKey
());
log
.
info
(
"清理无效任务:fStoreyId={}"
,
fStoreyId
);
}
catch
(
SchedulerException
e
)
{
log
.
error
(
"清理无效任务失败:fStoreyId={}"
,
fStoreyId
,
e
);
}
return
;
}
ip
=
tStoreyInfo
.
getfIp
();
if
(
StringUtils
.
isBlank
(
ip
))
{
String
errMsg
=
"设备IP为空:fStoreyId="
+
fStoreyId
;
log
.
error
(
errMsg
);
recordAlarm
(
tStoreyInfo
,
errMsg
,
errMsg
);
if
(
tStoreyInfo
==
null
||
StringUtils
.
isBlank
(
tStoreyInfo
.
getfIp
()))
{
log
.
warn
(
"设备信息无效,跳过执行: fStoreyId={}"
,
fStoreyId
);
return
;
}
// 4. 校验依赖组件(避免空指针)
if
(
resultHandler
==
null
)
{
String
errMsg
=
"ModbusResultHandler未初始化:fStoreyId="
+
fStoreyId
;
log
.
error
(
errMsg
);
recordAlarm
(
tStoreyInfo
,
errMsg
,
errMsg
);
return
;
// 核心通信逻辑(简化版)
executeDeviceCommunication
(
tStoreyInfo
,
fStoreyId
);
long
costTime
=
System
.
currentTimeMillis
()
-
startTime
;
log
.
info
(
"设备通信任务完成: fStoreyId={}, 耗时={}ms"
,
fStoreyId
,
costTime
);
}
catch
(
Exception
e
)
{
log
.
error
(
"设备通信任务执行异常: fStoreyIdStr={}"
,
fStoreyIdStr
,
e
);
// 仅记录日志,不抛出异常
}
Predicate
<
int
[]>
stopCondition
=
ModbusResultHandler
.
createDefaultStopCondition
();
if
(
stopCondition
==
null
)
{
String
errMsg
=
"Modbus停止条件未初始化:fStoreyId="
+
fStoreyId
;
log
.
error
(
errMsg
);
recordAlarm
(
tStoreyInfo
,
errMsg
,
errMsg
);
return
;
}
// 5. 核心逻辑:按端口顺序执行通信(避免并行线程竞争,简化时序)
log
.
info
(
"开始设备通信:fStoreyId={},IP={},端口501/502/503"
,
fStoreyId
,
ip
);
executePortCommunication
(
ip
,
501
,
OFFSETS_501
,
stopCondition
,
fStoreyId
,
tStoreyInfo
);
executePortCommunication
(
ip
,
502
,
OFFSETS_502
,
stopCondition
,
fStoreyId
,
tStoreyInfo
);
executePortCommunication
(
ip
,
503
,
OFFSETS_503
,
stopCondition
,
fStoreyId
,
tStoreyInfo
);
// 6. 校验总耗时(确保未超Quartz超时)
long
totalCost
=
System
.
currentTimeMillis
()
-
startTime
;
if
(
totalCost
>
JOB_TOTAL_TIMEOUT
*
1000
)
{
String
warnMsg
=
"任务执行超时:fStoreyId="
+
fStoreyId
+
",耗时="
+
totalCost
+
"ms(阈值="
+
JOB_TOTAL_TIMEOUT
+
"s)"
;
log
.
warn
(
warnMsg
);
recordAlarm
(
tStoreyInfo
,
"任务超时"
,
warnMsg
);
}
else
{
log
.
info
(
"=== DeviceCommunicationJob 成功:fStoreyId={},总耗时={}ms ==="
,
fStoreyId
,
totalCost
);
}
private
void
executeDeviceCommunication
(
TStoreyInfo
tStoreyInfo
,
Long
fStoreyId
)
{
String
ip
=
tStoreyInfo
.
getfIp
();
}
catch
(
Throwable
e
)
{
// 7. 捕获所有异常(确保不传播到Quartz,避免Trigger变ERROR)
String
errMsg
=
"DeviceCommunicationJob 致命异常:fStoreyIdStr="
+
fStoreyIdStr
;
log
.
error
(
errMsg
,
e
);
recordAlarm
(
tStoreyInfo
,
fStoreyIdStr
,
errMsg
+
":"
+
e
.
getMessage
());
// 使用并行流简化设备通信(按端口分组并行)
List
<
CompletableFuture
<
Void
>>
portFutures
=
Arrays
.
asList
(
CompletableFuture
.
runAsync
(()
->
executeSinglePort
(
ip
,
501
,
Arrays
.
asList
(
1
,
2
,
3
,
4
,
5
,
6
,
7
,
8
,
9
,
10
,
11
,
12
,
13
,
14
,
15
,
16
,
17
,
18
,
19
,
20
,
21
,
22
,
23
,
24
,
25
,
26
,
27
),
fStoreyId
,
tStoreyInfo
)
),
CompletableFuture
.
runAsync
(()
->
executeSinglePort
(
ip
,
502
,
Arrays
.
asList
(
28
,
29
,
30
,
31
,
32
,
33
,
34
,
35
,
36
,
37
,
38
,
39
,
40
,
41
,
42
,
43
,
44
,
45
,
46
,
47
,
48
,
49
,
50
,
51
,
52
,
53
,
54
),
fStoreyId
,
tStoreyInfo
)
),
CompletableFuture
.
runAsync
(()
->
executeSinglePort
(
ip
,
503
,
Arrays
.
asList
(
55
,
56
,
57
,
58
,
59
,
60
,
61
,
62
,
63
,
64
,
65
,
66
,
67
,
68
,
69
,
70
,
71
,
72
),
fStoreyId
,
tStoreyInfo
)
)
);
}
finally
{
long
totalCost
=
System
.
currentTimeMillis
()
-
startTime
;
log
.
info
(
"=== DeviceCommunicationJob 结束:fStoreyIdStr={},总耗时={}ms ==="
,
fStoreyIdStr
,
totalCost
);
// 等待所有端口完成(带超时)
try
{
CompletableFuture
.
allOf
(
portFutures
.
toArray
(
new
CompletableFuture
[
0
]))
.
get
(
PORT_TIMEOUT_SECONDS
,
TimeUnit
.
SECONDS
);
}
catch
(
TimeoutException
e
)
{
log
.
warn
(
"端口通信超时,取消未完成的任务"
);
portFutures
.
forEach
(
future
->
future
.
cancel
(
true
));
}
catch
(
Exception
e
)
{
log
.
error
(
"端口通信异常"
,
e
);
}
}
/**
* 单个端口通信(同步执行,简化时序,便于超时控制)
*/
private
void
executePortCommunication
(
String
ip
,
int
port
,
List
<
Integer
>
deviceIds
,
Predicate
<
int
[]>
stopCondition
,
Long
fStoreyId
,
TStoreyInfo
tStoreyInfo
)
{
long
portStartTime
=
System
.
currentTimeMillis
();
log
.
info
(
"端口{}通信开始:fStoreyId={},设备数={},开始时间={}"
,
port
,
fStoreyId
,
deviceIds
.
size
(),
new
Date
(
portStartTime
));
private
void
executeSinglePort
(
String
ip
,
int
port
,
List
<
Integer
>
deviceIds
,
Long
fStoreyId
,
TStoreyInfo
tStoreyInfo
)
{
try
{
// 调用Modbus服务(同步执行,依赖内部超时控制)
Predicate
<
int
[]>
stopCondition
=
ModbusResultHandler
.
createDefaultStopCondition
();
deviceStatusReaderAndTimeSetter
.
startMultiDeviceMonitoring
(
ip
,
port
,
deviceIds
,
resultHandler
,
stopCondition
,
SINGLE_PORT_TIMEOUT
);
long
portCost
=
System
.
currentTimeMillis
()
-
portStartTime
;
log
.
info
(
"端口{}通信完成:fStoreyId={},耗时={}ms"
,
port
,
fStoreyId
,
portCost
);
ip
,
port
,
deviceIds
,
resultHandler
,
stopCondition
,
PORT_TIMEOUT_SECONDS
);
log
.
debug
(
"端口{}通信完成: fStoreyId={}"
,
port
,
fStoreyId
);
}
catch
(
Exception
e
)
{
long
portCost
=
System
.
currentTimeMillis
()
-
portStartTime
;
String
errMsg
=
"端口"
+
port
+
"通信异常:fStoreyId="
+
fStoreyId
;
log
.
error
(
errMsg
+
",耗时="
+
portCost
+
"ms"
,
e
);
recordAlarm
(
tStoreyInfo
,
"端口"
+
port
+
"异常"
,
errMsg
+
":"
+
e
.
getMessage
());
log
.
error
(
"端口{}通信异常: fStoreyId={}"
,
port
,
fStoreyId
,
e
);
recordAlarm
(
tStoreyInfo
,
"端口"
+
port
+
"通信异常: "
+
e
.
getMessage
());
}
}
// 简化方法签名,不需要传入ip和fStoreyId参数
private
void
executeWithTimeout
(
Runnable
task
,
long
timeout
,
TimeUnit
unit
,
TStoreyInfo
tStoreyInfo
,
String
timeoutMsg
,
String
ip
,
Long
fStoreyId
)
{
private
void
recordTimeoutAlarm
(
JobExecutionContext
context
)
{
try
{
ExecutorService
executor
=
Executors
.
newSingleThreadExecutor
();
Future
<?>
future
=
executor
.
submit
(
task
);
future
.
get
(
timeout
,
unit
);
executor
.
shutdown
();
}
catch
(
TimeoutException
e
)
{
log
.
error
(
"{}:fStoreyId={}, ip={}"
,
timeoutMsg
,
fStoreyId
,
ip
,
e
);
recordAlarm
(
tStoreyInfo
,
tStoreyInfo
.
getfEquipmentCode
(),
timeoutMsg
+
"(IP:"
+
ip
+
",设备ID:"
+
fStoreyId
+
")"
);
JobDataMap
data
=
context
.
getJobDetail
().
getJobDataMap
();
String
fStoreyIdStr
=
data
.
getString
(
"fStoreyId"
);
TEquipmentAlarmData
alarm
=
new
TEquipmentAlarmData
();
alarm
.
setfAlarmType
(
"03"
);
alarm
.
setfEquipmentCode
(
fStoreyIdStr
!=
null
?
fStoreyIdStr
:
"unknown"
);
alarm
.
setfAlarmData
(
"设备通信任务执行超时(>"
+
TOTAL_TIMEOUT_SECONDS
+
"秒)"
);
alarm
.
setfCreateTime
(
new
Date
());
alarmDataService
.
insertTEquipmentAlarmData
(
alarm
);
}
catch
(
Exception
e
)
{
log
.
error
(
"{}执行异常:fStoreyId={}, ip={}"
,
timeoutMsg
,
fStoreyId
,
ip
,
e
);
recordAlarm
(
tStoreyInfo
,
tStoreyInfo
.
getfEquipmentCode
(),
timeoutMsg
+
"执行异常(IP:"
+
ip
+
",设备ID:"
+
fStoreyId
+
"):"
+
e
.
getMessage
());
log
.
error
(
"记录超时告警失败"
,
e
);
}
}
/**
* 统一告警记录(兼容设备信息为空的场景)
*/
private
void
recordAlarm
(
TStoreyInfo
tStoreyInfo
,
String
equipmentCode
,
String
alarmData
)
{
private
void
recordAlarm
(
TStoreyInfo
tStoreyInfo
,
String
alarmData
)
{
try
{
TEquipmentAlarmData
alarm
=
new
TEquipmentAlarmData
();
alarm
.
setfAlarmType
(
"03"
);
// 老化层告警
alarm
.
setfEquipmentCode
(
tStoreyInfo
!=
null
?
tStoreyInfo
.
getfStoreyCode
()
:
equipmentCode
);
alarm
.
setfAlarmType
(
"03"
);
alarm
.
setfEquipmentCode
(
tStoreyInfo
!=
null
?
tStoreyInfo
.
getfStoreyCode
()
:
"unknown"
);
alarm
.
setfAlarmData
(
alarmData
);
alarm
.
setfCreateTime
(
new
Date
());
alarmDataService
.
insertTEquipmentAlarmData
(
alarm
);
}
catch
(
Exception
e
)
{
log
.
error
(
"
告警记录失败:{}"
,
alarmData
,
e
);
log
.
error
(
"
记录告警失败"
,
e
);
}
}
}
zhmes-agecal-system/src/main/java/com/zehong/system/task/DeviceTaskScheduler.java
View file @
3214dfa3
...
...
@@ -81,39 +81,34 @@ public class DeviceTaskScheduler {
private
void
createHourlyCommunicationJob
(
Long
fStoreyId
)
throws
SchedulerException
{
String
jobId
=
"COMM_"
+
fStoreyId
;
JobKey
jobKey
=
new
JobKey
(
jobId
,
JOB_GROUP
);
TriggerKey
triggerKey
=
new
TriggerKey
(
jobId
+
"_TRIGGER"
,
TRIGGER_GROUP
);
// 1. 构建JobDetail(仅元数据,禁止并发执行通过注解控制)
JobDetail
job
=
JobBuilder
.
newJob
(
DeviceCommunicationJob
.
class
)
.
withIdentity
(
jobKey
)
.
withDescription
(
"设备"
+
fStoreyId
+
"每5分钟Modbus通信任务"
)
.
usingJobData
(
"fStoreyId"
,
fStoreyId
.
toString
())
// 传递参数(String避免类型问题)
.
storeDurably
(
false
)
// 触发器删除后Job自动失效
.
requestRecovery
(
true
)
// 服务重启后恢复未完成任务
.
usingJobData
(
"fStoreyId"
,
fStoreyId
.
toString
())
.
storeDurably
(
false
)
.
build
();
//
2. 构建CronTrigger(移除withMisfireThreshold,兼容低版本)
//
使用更宽松的misfire策略
CronTrigger
trigger
=
TriggerBuilder
.
newTrigger
()
.
withIdentity
(
triggerKey
)
.
withIdentity
(
jobId
+
"_TRIGGER"
,
TRIGGER_GROUP
)
.
forJob
(
jobKey
)
.
withDescription
(
"设备"
+
fStoreyId
+
"通信任务触发器(每5分钟)"
)
.
withSchedule
(
CronScheduleBuilder
.
cronSchedule
(
CRON_EXPRESSION
)
// Misfire策略:错过触发后立即执行,再按原计划(低版本支持)
.
withMisfireHandlingInstructionFireAndProceed
())
.
withMisfireHandlingInstructionDoNothing
())
// 错过就忽略
.
startNow
()
.
endAt
(
Date
.
from
(
Instant
.
now
().
plus
(
TASK_VALID_DAYS
,
ChronoUnit
.
DAYS
)))
.
build
();
//
3. 原子操作:创建/更新(优先更新,避免删除重建)
//
清理旧任务时更温和
if
(
scheduler
.
checkExists
(
jobKey
))
{
Date
nextFireTime
=
scheduler
.
rescheduleJob
(
triggerKey
,
trigger
);
log
.
info
(
"通信任务[{}]更新触发器成功,下次执行时间:{}"
,
jobId
,
nextFireTime
);
}
else
{
Date
nextFireTime
=
scheduler
.
scheduleJob
(
job
,
trigger
);
log
.
info
(
"通信任务[{}]创建成功,下次执行时间:{}"
,
jobId
,
nextFireTim
e
);
scheduler
.
deleteJob
(
jobKey
);
try
{
Thread
.
sleep
(
100
);
// 短暂等待确保清理完成
}
catch
(
InterruptedException
e
)
{
throw
new
RuntimeException
(
e
);
}
}
scheduler
.
scheduleJob
(
job
,
trigger
);
}
/**
* 2. 创建15分钟后执行的最终任务(保持原逻辑,优化超时)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment