Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
Z
zhmes-agecal
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
耿迪迪
zhmes-agecal
Commits
20ea5ebf
Commit
20ea5ebf
authored
Sep 25, 2025
by
wanghao
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
1 指令指令完成,但是 没有检测到机械臂完成。
parent
b09d5a05
Hide whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
314 additions
and
98 deletions
+314
-98
application-test.yml
zhmes-agecal-admin/src/main/resources/application-test.yml
+5
-1
ModbusMasterPool.java
.../java/com/zehong/system/modbus/util/ModbusMasterPool.java
+235
-0
BaseDeviceCommJob.java
...m/zehong/system/task/DeviceCommJob/BaseDeviceCommJob.java
+54
-17
DeviceComm501Device1Job.java
...ng/system/task/DeviceCommJob/DeviceComm501Device1Job.java
+0
-5
DeviceComm501Device2Job.java
...ng/system/task/DeviceCommJob/DeviceComm501Device2Job.java
+0
-4
DeviceComm501Device3Job.java
...ng/system/task/DeviceCommJob/DeviceComm501Device3Job.java
+0
-4
DeviceTaskScheduler.java
...main/java/com/zehong/system/task/DeviceTaskScheduler.java
+20
-67
No files found.
zhmes-agecal-admin/src/main/resources/application-test.yml
View file @
20ea5ebf
...
...
@@ -91,7 +91,11 @@ spring:
threadPool
:
class
:
org.quartz.simpl.SimpleThreadPool
# 线程数
threadCount
:
10
threadCount
:
15
# 设为守护线程,避免项目关闭时线程残留
makeThreadsDaemons
:
true
# 保持默认1分钟,72个Job执行时间≤10秒,不会触发misfire
misfireThreshold
:
60000
# 线程优先级
threadPriority
:
5
# 线程名称前缀
...
...
zhmes-agecal-system/src/main/java/com/zehong/system/modbus/util/ModbusMasterPool.java
0 → 100644
View file @
20ea5ebf
package
com
.
zehong
.
system
.
modbus
.
util
;
import
org.springframework.stereotype.Component
;
import
com.serotonin.modbus4j.ModbusMaster
;
import
com.serotonin.modbus4j.exception.ModbusInitException
;
import
com.serotonin.modbus4j.ip.IpParameters
;
import
com.serotonin.modbus4j.ip.tcp.TcpMaster
;
import
org.apache.commons.pool2.BasePooledObjectFactory
;
import
org.apache.commons.pool2.PooledObject
;
import
org.apache.commons.pool2.impl.DefaultPooledObject
;
import
org.apache.commons.pool2.impl.GenericObjectPool
;
import
org.apache.commons.pool2.impl.GenericObjectPoolConfig
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.stereotype.Component
;
import
javax.annotation.PostConstruct
;
import
java.util.Map
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.TimeUnit
;
/**
* @author lenovo
* @date 2025/9/25
* @description TODO
*/
@Component
public
class
ModbusMasterPool
{
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
ModbusMasterPool
.
class
);
// 连接池配置:可在application.yml中配置,这里先硬编码,后续可优化为配置类
private
static
final
int
MAX_TOTAL
=
10
;
// 每个池最大连接数(3个端口共30个,足够72个Job)
private
static
final
int
MAX_IDLE
=
5
;
// 每个池最大空闲连接数
private
static
final
int
MIN_IDLE
=
2
;
// 每个池最小空闲连接数
private
static
final
long
MAX_WAIT_MS
=
3000
;
// 借连接超时时间(3秒)
private
static
final
long
TIME_BETWEEN_EVICTION_RUNS_MS
=
60000
;
// 空闲连接检测间隔(1分钟)
private
static
final
boolean
TEST_ON_BORROW
=
true
;
// 借连接时验证有效性
private
static
final
boolean
TEST_ON_RETURN
=
false
;
// 还连接时不验证(减少开销)
// 存储「ip:port -> 连接池」的映射,线程安全
private
final
Map
<
String
,
GenericObjectPool
<
ModbusMaster
>>
poolMap
=
new
ConcurrentHashMap
<>();
// 单例Modbus工厂(复用,避免重复创建)
private
static
final
com
.
serotonin
.
modbus4j
.
ModbusFactory
MODBUS_FACTORY
=
new
com
.
serotonin
.
modbus4j
.
ModbusFactory
();
/**
* 借连接:根据ip和port获取连接池,再从池中借连接
* @param ip 设备IP
* @param port 端口(501/502/503)
* @return 可用的ModbusMaster
* @throws Exception 借连接超时或连接无效
*/
public
ModbusMaster
borrowMaster
(
String
ip
,
int
port
)
throws
Exception
{
String
poolKey
=
getPoolKey
(
ip
,
port
);
// 懒加载创建连接池:第一次使用时才创建对应ip:port的池
GenericObjectPool
<
ModbusMaster
>
pool
=
poolMap
.
computeIfAbsent
(
poolKey
,
this
::
createPool
);
// 借连接(超时抛异常)
ModbusMaster
master
=
pool
.
borrowObject
(
MAX_WAIT_MS
);
log
.
debug
(
"借Modbus连接成功:{},当前池空闲数={},活跃数={}"
,
poolKey
,
pool
.
getNumIdle
(),
pool
.
getNumActive
());
return
master
;
}
/**
* 还连接:将连接归还给对应池(无论连接是否有效,池会自动处理)
* @param ip 设备IP
* @param port 端口
* @param master 借到的ModbusMaster
*/
public
void
returnMaster
(
String
ip
,
int
port
,
ModbusMaster
master
)
{
if
(
master
==
null
)
return
;
String
poolKey
=
getPoolKey
(
ip
,
port
);
GenericObjectPool
<
ModbusMaster
>
pool
=
poolMap
.
get
(
poolKey
);
if
(
pool
==
null
)
{
log
.
warn
(
"还连接失败:连接池不存在(已被销毁),{}"
,
poolKey
);
destroyMaster
(
master
);
// 池不存在,直接销毁连接
return
;
}
try
{
pool
.
returnObject
(
master
);
log
.
debug
(
"还Modbus连接成功:{},当前池空闲数={},活跃数={}"
,
poolKey
,
pool
.
getNumIdle
(),
pool
.
getNumActive
());
}
catch
(
Exception
e
)
{
log
.
error
(
"还连接失败:{}"
,
poolKey
,
e
);
destroyMaster
(
master
);
// 归还失败,销毁连接
}
}
/**
* 销毁连接:强制关闭连接(池内部也会调用此方法)
*/
private
void
destroyMaster
(
ModbusMaster
master
)
{
if
(
master
==
null
)
return
;
try
{
// 1. 关闭底层Socket(反射)
if
(
master
instanceof
TcpMaster
)
{
TcpMaster
tcpMaster
=
(
TcpMaster
)
master
;
java
.
lang
.
reflect
.
Field
socketField
=
TcpMaster
.
class
.
getDeclaredField
(
"socket"
);
socketField
.
setAccessible
(
true
);
java
.
net
.
Socket
socket
=
(
java
.
net
.
Socket
)
socketField
.
get
(
tcpMaster
);
if
(
socket
!=
null
&&
!
socket
.
isClosed
())
{
socket
.
close
();
}
}
// 2. 销毁Master
master
.
destroy
();
log
.
debug
(
"销毁Modbus连接成功"
);
}
catch
(
Exception
e
)
{
log
.
error
(
"销毁Modbus连接失败"
,
e
);
}
}
/**
* 创建单个连接池(按ip:port)
*/
private
GenericObjectPool
<
ModbusMaster
>
createPool
(
String
poolKey
)
{
// 1. 配置池参数
GenericObjectPoolConfig
<
ModbusMaster
>
poolConfig
=
new
GenericObjectPoolConfig
<>();
poolConfig
.
setMaxTotal
(
MAX_TOTAL
);
poolConfig
.
setMaxIdle
(
MAX_IDLE
);
poolConfig
.
setMinIdle
(
MIN_IDLE
);
poolConfig
.
setTimeBetweenEvictionRunsMillis
(
TIME_BETWEEN_EVICTION_RUNS_MS
);
poolConfig
.
setTestOnBorrow
(
TEST_ON_BORROW
);
poolConfig
.
setTestOnReturn
(
TEST_ON_RETURN
);
poolConfig
.
setEvictionPolicyClassName
(
"org.apache.commons.pool2.impl.DefaultEvictionPolicy"
);
// 2. 创建池(传入连接工厂)
GenericObjectPool
<
ModbusMaster
>
pool
=
new
GenericObjectPool
<>(
new
ModbusMasterFactory
(
poolKey
),
poolConfig
);
log
.
info
(
"创建Modbus连接池成功:{},配置={}"
,
poolKey
,
poolConfig
);
return
pool
;
}
/**
* 生成连接池Key:ip:port(如192.168.2.1:501)
*/
private
String
getPoolKey
(
String
ip
,
int
port
)
{
return
ip
+
":"
+
port
;
}
/**
* 连接工厂:负责创建、验证、销毁ModbusMaster(Pool2核心接口)
*/
private
class
ModbusMasterFactory
extends
BasePooledObjectFactory
<
ModbusMaster
>
{
private
final
String
poolKey
;
private
final
String
ip
;
private
final
int
port
;
public
ModbusMasterFactory
(
String
poolKey
)
{
this
.
poolKey
=
poolKey
;
String
[]
parts
=
poolKey
.
split
(
":"
);
this
.
ip
=
parts
[
0
];
this
.
port
=
Integer
.
parseInt
(
parts
[
1
]);
}
/**
* 创建新连接(池无空闲连接时调用)
*/
@Override
public
ModbusMaster
create
()
throws
Exception
{
IpParameters
params
=
new
IpParameters
();
params
.
setHost
(
ip
);
params
.
setPort
(
port
);
TcpMaster
master
=
(
TcpMaster
)
MODBUS_FACTORY
.
createTcpMaster
(
params
,
true
);
master
.
setTimeout
(
3000
);
// 连接超时3秒
master
.
setRetries
(
0
);
// 禁用内置重试(业务层控制重试)
master
.
init
();
// 初始化连接(关键:不初始化会导致后续操作失败)
log
.
debug
(
"创建新Modbus连接:{}"
,
poolKey
);
return
master
;
}
/**
* 包装连接为池化对象(Pool2要求)
*/
@Override
public
PooledObject
<
ModbusMaster
>
wrap
(
ModbusMaster
master
)
{
return
new
DefaultPooledObject
<>(
master
);
}
/**
* 验证连接有效性(借连接时调用,TEST_ON_BORROW=true)
*/
@Override
public
boolean
validateObject
(
PooledObject
<
ModbusMaster
>
p
)
{
try
{
ModbusMaster
master
=
p
.
getObject
();
// 简单验证:检查连接是否初始化(master.init()后isInitialized为true)
// 复杂验证:可发送一个空请求(如读0个寄存器),但会增加开销,这里用简单验证
if
(
master
instanceof
TcpMaster
)
{
TcpMaster
tcpMaster
=
(
TcpMaster
)
master
;
java
.
lang
.
reflect
.
Field
initializedField
=
TcpMaster
.
class
.
getDeclaredField
(
"initialized"
);
initializedField
.
setAccessible
(
true
);
return
(
boolean
)
initializedField
.
get
(
tcpMaster
);
}
return
false
;
}
catch
(
Exception
e
)
{
log
.
warn
(
"验证Modbus连接无效:{}"
,
poolKey
,
e
);
return
false
;
}
}
/**
* 销毁无效连接(验证失败或池清理时调用)
*/
@Override
public
void
destroyObject
(
PooledObject
<
ModbusMaster
>
p
)
{
destroyMaster
(
p
.
getObject
());
}
}
/**
* 容器关闭时销毁所有连接池(避免资源泄漏)
*/
@PostConstruct
public
void
init
()
{
// JVM关闭钩子:确保容器关闭时销毁所有池
Runtime
.
getRuntime
().
addShutdownHook
(
new
Thread
(()
->
{
log
.
info
(
"开始销毁所有Modbus连接池"
);
poolMap
.
forEach
((
key
,
pool
)
->
{
try
{
pool
.
close
();
log
.
info
(
"销毁连接池成功:{}"
,
key
);
}
catch
(
Exception
e
)
{
log
.
error
(
"销毁连接池失败:{}"
,
key
,
e
);
}
});
poolMap
.
clear
();
}));
}
}
zhmes-agecal-system/src/main/java/com/zehong/system/task/BaseDeviceCommJob.java
→
zhmes-agecal-system/src/main/java/com/zehong/system/task/
DeviceCommJob/
BaseDeviceCommJob.java
View file @
20ea5ebf
package
com
.
zehong
.
system
.
task
;
package
com
.
zehong
.
system
.
task
.
DeviceCommJob
;
import
com.serotonin.modbus4j.ModbusFactory
;
import
com.serotonin.modbus4j.ModbusMaster
;
...
...
@@ -16,21 +16,18 @@ import com.zehong.system.mapper.TStoreyInfoMapper;
import
com.zehong.system.modbus.handler.ModbusResultHandler
;
import
com.zehong.system.modbus.handler.dto.DeviceStatusReaderDto
;
import
com.zehong.system.modbus.util.Modbus4jUtils
;
import
com.zehong.system.modbus.util.ModbusMasterPool
;
import
com.zehong.system.service.ITEquipmentAlarmDataService
;
import
org.apache.commons.lang3.StringUtils
;
import
org.quartz.Job
;
import
org.quartz.JobExecutionContext
;
import
org.quartz.JobExecutionException
;
import
org.quartz.*
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
import
javax.annotation.Resource
;
import
java.lang.reflect.Field
;
import
java.net.Socket
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.Future
;
...
...
@@ -63,9 +60,15 @@ public abstract class BaseDeviceCommJob implements Job {
protected
ITEquipmentAlarmDataService
alarmDataService
;
@Resource
protected
TStoreyInfoMapper
tStoreyInfoMapper
;
@
Autowired
@
Resource
protected
ModbusResultHandler
resultHandler
;
@Resource
private
ModbusMasterPool
modbusMasterPool
;
@Resource
private
Scheduler
scheduler
;
@Resource
private
PalletDeviceBindingMapper
palletDeviceBindingMapper
;
...
...
@@ -88,6 +91,11 @@ public abstract class BaseDeviceCommJob implements Job {
// 单设备场景:直接获取唯一deviceId(子类返回的List仅1个元素)
Integer
deviceId
=
getSingleDeviceId
();
TStoreyInfo
storeyInfo
=
null
;
// 保存JobKey和TriggerKey,用于后续清理
JobKey
jobKey
=
context
.
getJobDetail
().
getKey
();
TriggerKey
triggerKey
=
context
.
getTrigger
().
getKey
();
try
{
log
.
info
(
"单设备任务开始:port={}, deviceId={}, storeyId={}"
,
getFixedPort
(),
deviceId
,
storeyIdStr
);
...
...
@@ -124,9 +132,42 @@ public abstract class BaseDeviceCommJob implements Job {
updateDeviceError
(
storeyInfo
.
getfIp
());
}
recordAlarm
(
storeyInfo
,
storeyIdStr
,
errMsg
+
":"
+
e
.
getMessage
());
}
finally
{
// 核心:任务执行完成后,清理Job和Trigger(无论成功/失败)
cleanJobAndTrigger
(
jobKey
,
triggerKey
);
}
}
/**
* 清理Job和Trigger,避免元数据残留
*/
private
void
cleanJobAndTrigger
(
JobKey
jobKey
,
TriggerKey
triggerKey
)
{
if
(
jobKey
==
null
||
triggerKey
==
null
)
{
log
.
warn
(
"清理任务失败:JobKey或TriggerKey为空"
);
return
;
}
try
{
// 1. 先解绑Trigger(避免删除Job时失败)
if
(
scheduler
.
checkExists
(
triggerKey
))
{
boolean
unscheduleSuccess
=
scheduler
.
unscheduleJob
(
triggerKey
);
log
.
debug
(
"{}:Trigger解绑{},key={}"
,
unscheduleSuccess
?
"成功"
:
"失败"
,
unscheduleSuccess
?
"成功"
:
"失败"
,
triggerKey
);
}
// 2. 再删除Job(删除Job会自动删除关联的Trigger,但保险起见先解绑)
if
(
scheduler
.
checkExists
(
jobKey
))
{
boolean
deleteSuccess
=
scheduler
.
deleteJob
(
jobKey
);
log
.
debug
(
"{}:Job删除{},key={}"
,
deleteSuccess
?
"成功"
:
"失败"
,
deleteSuccess
?
"成功"
:
"失败"
,
jobKey
);
}
}
catch
(
SchedulerException
e
)
{
log
.
error
(
"清理任务元数据失败:jobKey={}, triggerKey={}"
,
jobKey
,
triggerKey
,
e
);
}
}
// -------------------------- 精简后的核心工具方法 --------------------------
/**
* 提取单个deviceId(子类返回的List仅1个元素,避免集合遍历)
...
...
@@ -191,11 +232,10 @@ public abstract class BaseDeviceCommJob implements Job {
* Modbus读取(带重试,连接用完即时销毁)
*/
private
int
[]
readModbusWithRetry
(
String
ip
,
int
port
,
int
deviceId
)
throws
Exception
{
ModbusMaster
master
=
null
;
// 1. 从连接池借连接(超时3秒)
ModbusMaster
master
=
modbusMasterPool
.
borrowMaster
(
ip
,
port
);
for
(
int
retry
=
0
;
retry
<=
CUSTOM_RETRY_TIMES
;
retry
++)
{
try
{
// 1. 创建Modbus连接(禁用内置重试,避免冲突)
master
=
createModbusMaster
(
ip
,
port
);
// 2. 读取寄存器
int
[]
result
=
readRegisters
(
master
,
deviceId
);
// 3. 校验结果(满足停止条件则返回,否则重试)
...
...
@@ -207,16 +247,13 @@ public abstract class BaseDeviceCommJob implements Job {
}
catch
(
Exception
e
)
{
// 重试耗尽才抛异常
if
(
retry
>=
CUSTOM_RETRY_TIMES
)
{
throw
new
RuntimeException
(
String
.
format
(
"Modbus重试耗尽(%d次):ip=%s, port=%d, deviceId=%d"
,
CUSTOM_RETRY_TIMES
,
ip
,
port
,
deviceId
),
e
);
log
.
error
(
"Modbus重试耗尽({}次):ip={}, port={}, deviceId={}"
,
CUSTOM_RETRY_TIMES
,
ip
,
port
,
deviceId
);
}
log
.
info
(
"Modbus读取异常,重试:retry={}, port={}, deviceId={}"
,
retry
,
port
,
deviceId
,
e
);
}
finally
{
// 每次重试后销毁连接(关键:避免单设备场景下的连接泄漏)
destroyModbusMaster
(
master
,
deviceId
);
// 重试间隔(最后一次重试无需等)
if
(
retry
<
CUSTOM_RETRY_TIMES
)
{
Thread
.
sleep
(
RETRY_DELAY_MS
);
// 3. 无论成功/失败,归还连接到池(关键:避免连接泄漏)
if
(
master
!=
null
)
{
modbusMasterPool
.
returnMaster
(
ip
,
port
,
master
);
}
}
}
...
...
zhmes-agecal-system/src/main/java/com/zehong/system/task/DeviceCommJob/DeviceComm501Device1Job.java
View file @
20ea5ebf
package
com
.
zehong
.
system
.
task
.
DeviceCommJob
;
import
com.zehong.system.task.BaseDeviceCommJob
;
import
org.springframework.stereotype.Component
;
import
java.util.Arrays
;
import
java.util.Collections
;
import
java.util.List
;
/**
* @author lenovo
* @date 2025/9/25
...
...
zhmes-agecal-system/src/main/java/com/zehong/system/task/DeviceCommJob/DeviceComm501Device2Job.java
View file @
20ea5ebf
package
com
.
zehong
.
system
.
task
.
DeviceCommJob
;
import
com.zehong.system.task.BaseDeviceCommJob
;
import
org.springframework.stereotype.Component
;
import
java.util.Collections
;
import
java.util.List
;
/**
* @author lenovo
* @date 2025/9/25
...
...
zhmes-agecal-system/src/main/java/com/zehong/system/task/DeviceCommJob/DeviceComm501Device3Job.java
View file @
20ea5ebf
package
com
.
zehong
.
system
.
task
.
DeviceCommJob
;
import
com.zehong.system.task.BaseDeviceCommJob
;
import
org.springframework.stereotype.Component
;
import
java.util.Collections
;
import
java.util.List
;
/**
* @author lenovo
* @date 2025/9/25
...
...
zhmes-agecal-system/src/main/java/com/zehong/system/task/DeviceTaskScheduler.java
View file @
20ea5ebf
...
...
@@ -6,6 +6,7 @@ import org.quartz.impl.matchers.GroupMatcher;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.stereotype.Service
;
import
org.terracotta.quartz.wrappers.TriggerWrapper
;
import
javax.annotation.PostConstruct
;
import
javax.annotation.Resource
;
...
...
@@ -88,44 +89,48 @@ public class DeviceTaskScheduler {
* @param delayMin 延迟执行时间(分钟)
*/
private
void
createPortCommJob
(
Long
fStoreyId
,
int
port
,
int
deviceId
,
Class
<?
extends
Job
>
jobClass
,
int
delayMin
)
throws
SchedulerException
{
String
jobId
=
"COMM_"
+
port
+
deviceId
+
"_"
+
fStoreyId
;
// JobID:COMM_501_123(端口+设备ID,确保唯一)
String
triggerId
=
"TRIGGER_"
+
port
+
deviceId
+
"_"
+
fStoreyId
;
// TriggerID:TRIGGER_501_123
String
jobId
=
"COMM_"
+
port
+
"_"
+
deviceId
+
"_"
+
fStoreyId
;
// 修正命名:port_deviceId_storeyId(避免冲突)
String
triggerId
=
"TRIGGER_"
+
port
+
"_"
+
deviceId
+
"_"
+
fStoreyId
;
JobKey
jobKey
=
new
JobKey
(
jobId
,
JOB_GROUP
);
TriggerKey
triggerKey
=
new
TriggerKey
(
triggerId
,
TRIGGER_GROUP
);
//
1. 去重:先删除旧任务/触发器
//
增强去重逻辑:先检查Trigger状态
if
(
scheduler
.
checkExists
(
triggerKey
))
{
Trigger
.
TriggerState
triggerState
=
scheduler
.
getTriggerState
(
triggerKey
);
// 有效状态:WAITING(等待执行)、ACQUIRED(已获取,待执行)
// 无效状态:ERROR/COMPLETE/PAUSED,删除重建
log
.
warn
(
"端口[{}]设备[{}]:Trigger状态无效,删除重建,state={}, triggerKey={}"
,
port
,
deviceId
,
triggerState
,
triggerKey
);
scheduler
.
unscheduleJob
(
triggerKey
);
log
.
info
(
"端口[{}]旧触发器已删除:{}"
,
port
,
triggerId
);
}
// 检查Job是否存在(若Trigger已删除,Job可能残留)
if
(
scheduler
.
checkExists
(
jobKey
))
{
log
.
info
(
"端口[{}]设备[{}]:Job已存在,删除重建,jobKey={}"
,
port
,
deviceId
,
jobKey
);
scheduler
.
deleteJob
(
jobKey
);
log
.
info
(
"端口[{}]旧任务已删除:{}"
,
port
,
jobId
);
}
//
2. 创建JobDetail
//
原有创建JobDetail和Trigger的逻辑...
JobDetail
job
=
JobBuilder
.
newJob
(
jobClass
)
.
withIdentity
(
jobKey
)
.
usingJobData
(
"fStoreyId"
,
fStoreyId
.
toString
())
// 传递设备ID
.
usingJobData
(
"fStoreyId"
,
fStoreyId
.
toString
())
.
storeDurably
(
false
)
.
build
();
// 3. 创建一次性SimpleTrigger(延迟delayMin分钟,仅执行1次)
Date
triggerTime
=
Date
.
from
(
Instant
.
now
().
plus
(
delayMin
,
ChronoUnit
.
MINUTES
));
SimpleTrigger
trigger
=
TriggerBuilder
.
newTrigger
()
.
withIdentity
(
triggerKey
)
.
forJob
(
jobKey
)
.
startAt
(
triggerTime
)
// 延迟执行时间
.
startAt
(
triggerTime
)
.
withSchedule
(
SimpleScheduleBuilder
.
simpleSchedule
()
.
withRepeatCount
(
0
)
// 仅执行1次
.
withMisfireHandlingInstructionFireNow
())
// 错过则立即执行
.
withRepeatCount
(
0
)
.
withMisfireHandlingInstructionFireNow
())
.
build
();
// 4. 提交调度
Date
nextFireTime
=
scheduler
.
scheduleJob
(
job
,
trigger
);
log
.
info
(
"端口[{}]任务创建成功:jobId={},延迟{}分钟,下次执行:{}"
,
port
,
jobId
,
delayMin
,
nextFireTime
);
log
.
info
(
"端口[{}]
设备[{}]
任务创建成功:jobId={},延迟{}分钟,下次执行:{}"
,
port
,
deviceId
,
jobId
,
delayMin
,
nextFireTime
);
}
/**
* 单个端口Job和Trigger创建(通用方法)
...
...
@@ -310,27 +315,6 @@ public class DeviceTaskScheduler {
log
.
error
(
"清理无效任务失败:fStoreyId={}"
,
fStoreyId
,
e
);
}
}
/**
* 检查任务是否有效(优化逻辑:避免误删有效任务)
*/
private
boolean
isJobValid
(
JobKey
jobKey
,
TriggerKey
triggerKey
)
throws
SchedulerException
{
if
(!
scheduler
.
checkExists
(
jobKey
)
||
!
scheduler
.
checkExists
(
triggerKey
))
{
log
.
debug
(
"任务[{}]或触发器[{}]不存在"
,
jobKey
.
getName
(),
triggerKey
.
getName
());
return
false
;
}
Trigger
trigger
=
scheduler
.
getTrigger
(
triggerKey
);
Trigger
.
TriggerState
state
=
scheduler
.
getTriggerState
(
triggerKey
);
// 触发器状态为NORMAL且有下次执行时间,才视为有效
if
(
trigger
==
null
||
trigger
.
getNextFireTime
()
==
null
||
state
!=
Trigger
.
TriggerState
.
NORMAL
)
{
log
.
debug
(
"触发器[{}]无效:状态={}, 下次执行时间={}"
,
triggerKey
.
getName
(),
state
,
trigger
!=
null
?
trigger
.
getNextFireTime
()
:
"null"
);
// 清理无效触发器(保留Job,避免重建)
scheduler
.
unscheduleJob
(
triggerKey
);
return
false
;
}
return
true
;
}
/**
* 验证任务状态(增强日志详情)
*/
...
...
@@ -351,35 +335,4 @@ public class DeviceTaskScheduler {
finalTrigger
!=
null
?
finalTrigger
.
getNextFireTime
()
:
"不存在"
,
finalTrigger
!=
null
?
finalTrigger
.
getEndTime
()
:
"不存在"
);
}
/**
* 监控并修复ERROR状态的触发器
*/
public
void
monitorAndRepairTriggers
()
throws
SchedulerException
{
for
(
String
groupName
:
scheduler
.
getTriggerGroupNames
())
{
for
(
TriggerKey
triggerKey
:
scheduler
.
getTriggerKeys
(
GroupMatcher
.
triggerGroupEquals
(
groupName
)))
{
Trigger
.
TriggerState
state
=
scheduler
.
getTriggerState
(
triggerKey
);
if
(
state
==
Trigger
.
TriggerState
.
ERROR
)
{
log
.
warn
(
"发现ERROR状态触发器: {}"
,
triggerKey
);
repairErrorTrigger
(
triggerKey
);
}
}
}
}
private
void
repairErrorTrigger
(
TriggerKey
triggerKey
)
throws
SchedulerException
{
Trigger
oldTrigger
=
scheduler
.
getTrigger
(
triggerKey
);
JobKey
jobKey
=
oldTrigger
.
getJobKey
();
// 重新创建触发器
Trigger
newTrigger
=
TriggerBuilder
.
newTrigger
()
.
withIdentity
(
triggerKey
)
.
forJob
(
jobKey
)
.
withSchedule
(
CronScheduleBuilder
.
cronSchedule
(
"0 0/3 * * * ?"
)
.
withMisfireHandlingInstructionDoNothing
())
.
startAt
(
new
Date
())
// 关键:使用当前时间
.
build
();
scheduler
.
rescheduleJob
(
triggerKey
,
newTrigger
);
log
.
info
(
"触发器修复完成: {}"
,
triggerKey
);
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment