Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Contribute to GitLab
Sign in
Toggle navigation
Z
zhmes-agecal
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
耿迪迪
zhmes-agecal
Commits
6cc339fe
Commit
6cc339fe
authored
Oct 15, 2025
by
wanghao
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
1 任务调度 定时 测试;
parent
a255955e
Show whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
364 additions
and
202 deletions
+364
-202
EquipmentDataCollection.java
...ong/web/controller/equipment/EquipmentDataCollection.java
+8
-0
application-test.yml
zhmes-agecal-admin/src/main/resources/application-test.yml
+82
-136
QuartzTaskMonitor.java
...main/java/com/zehong/quartz/config/QuartzTaskMonitor.java
+167
-0
ScheduleConfig.java
...rc/main/java/com/zehong/quartz/config/ScheduleConfig.java
+22
-22
BaseDeviceCommJob.java
...m/zehong/system/task/DeviceCommJob/BaseDeviceCommJob.java
+30
-17
DeviceTaskScheduler.java
...main/java/com/zehong/system/task/DeviceTaskScheduler.java
+55
-27
No files found.
zhmes-agecal-admin/src/main/java/com/zehong/web/controller/equipment/EquipmentDataCollection.java
View file @
6cc339fe
...
@@ -2,6 +2,7 @@ package com.zehong.web.controller.equipment;
...
@@ -2,6 +2,7 @@ package com.zehong.web.controller.equipment;
import
com.zehong.common.core.domain.AjaxResult
;
import
com.zehong.common.core.domain.AjaxResult
;
import
com.zehong.common.utils.TCP.TCPClient
;
import
com.zehong.common.utils.TCP.TCPClient
;
import
com.zehong.quartz.config.QuartzTaskMonitor
;
import
com.zehong.system.domain.TEquipmentAlarmData
;
import
com.zehong.system.domain.TEquipmentAlarmData
;
import
com.zehong.system.domain.TEquipmentInfo
;
import
com.zehong.system.domain.TEquipmentInfo
;
import
com.zehong.system.domain.TStoreyInfo
;
import
com.zehong.system.domain.TStoreyInfo
;
...
@@ -35,6 +36,10 @@ public class EquipmentDataCollection {
...
@@ -35,6 +36,10 @@ public class EquipmentDataCollection {
@Autowired
@Autowired
private
DeviceTaskScheduler
deviceTaskScheduler
;
private
DeviceTaskScheduler
deviceTaskScheduler
;
// 新增:注入监控器
@Autowired
private
QuartzTaskMonitor
quartzTaskMonitor
;
/**
/**
* 触发设备监控任务
* 触发设备监控任务
* @return 执行结果
* @return 执行结果
...
@@ -57,6 +62,9 @@ public class EquipmentDataCollection {
...
@@ -57,6 +62,9 @@ public class EquipmentDataCollection {
// 调用调度器执行任务
// 调用调度器执行任务
deviceTaskScheduler
.
scheduleDeviceMonitoring
(
storeyId
,
DEFAULT_TEST_IP
,
port
);
deviceTaskScheduler
.
scheduleDeviceMonitoring
(
storeyId
,
DEFAULT_TEST_IP
,
port
);
// 新增:立即监控任务状态
quartzTaskMonitor
.
monitorTaskStatus
(
storeyId
);
log
.
info
(
"测试设备任务已提交:storeyId={}"
,
storeyId
);
log
.
info
(
"测试设备任务已提交:storeyId={}"
,
storeyId
);
return
AjaxResult
.
success
(
"设备任务已成功提交"
,
return
AjaxResult
.
success
(
"设备任务已成功提交"
,
String
.
format
(
"任务参数:storeyId=%d, ip=%s, port=%d"
,
String
.
format
(
"任务参数:storeyId=%d, ip=%s, port=%d"
,
...
...
zhmes-agecal-admin/src/main/resources/application-test.yml
View file @
6cc339fe
...
@@ -16,142 +16,88 @@ spring:
...
@@ -16,142 +16,88 @@ spring:
url
:
url
:
username
:
username
:
password
:
password
:
# 1. 基础连接数调整(匹配Quartz线程池需求)
initialSize
:
10
# 初始化连接数从5→10,避免启动时连接不足
minIdle
:
10
# 最小空闲连接数保持10(与initialSize一致,稳定连接池)
maxActive
:
30
# 最大连接数从20→30,应对72个Job并发(Quartz线程15,预留冗余)
# 2. 空闲连接回收优化(解决“长时间空闲被丢弃”警告)
# ==================== 连接池核心优化配置 ====================
maxWait
:
3000
# 获取连接超时从60秒→3秒,避免Quartz等待连接过久导致Trigger超时
# 1. 连接池大小优化(针对Quartz高频短任务特点)
timeBetweenEvictionRunsMillis
:
60000
# 检测间隔保持1分钟(60秒),定期清理无效连接
initialSize
:
15
# 初始化连接数增加到15,应对Quartz启动时并发需求
minEvictableIdleTimeMillis
:
600000
# 最小空闲时间从5分钟→10分钟(与MySQL默认wait_timeout匹配)
minIdle
:
15
# 最小空闲连接保持15,避免连接频繁创建销毁
maxEvictableIdleTimeMillis
:
1800000
# 最大空闲时间从15分钟→30分钟(给长周期任务留缓冲)
maxActive
:
50
# 最大连接数增加到50,考虑72个Job并发 + 业务连接
# 3. 连接有效性检测强化(确保拿到的连接可用)
# 2. 连接获取优化(解决Quartz获取连接超时问题)
validationQuery
:
SELECT 1 FROM DUAL
# 保持MySQL通用检测SQL
maxWait
:
5000
# 获取连接最大等待时间5秒,避免Quartz线程阻塞
testWhileIdle
:
true
# 空闲时检测(核心!主动发现无效连接)
testOnBorrow
:
false
# 借用时不检测(减少性能损耗)
testOnReturn
:
false
# 归还时不检测(减少性能损耗)
validationQueryTimeout
:
3000
# 新增:检测超时3秒,避免检测卡住
# 4. 连接泄露检测(新增配置,排查连接未释放问题
)
# 3. 连接保活核心配置(解决"discard long time none received connection"警告
)
removeAbandoned
:
true
# 开启连接泄露回收
timeBetweenEvictionRunsMillis
:
30000
# 检测间隔30秒,更频繁检查连接有效性
removeAbandonedTimeout
:
300
# 5分钟未释放则视为泄露
minEvictableIdleTimeMillis
:
300000
# 连接最小生存时间5分钟
logAbandoned
:
true
# 记录泄露连接的堆栈日志(调试用)
maxEvictableIdleTimeMillis
:
600000
# 连接最大生存时间10分钟
# 4. 连接有效性检测强化(关键修复)
validationQuery
:
SELECT 1
# 连接有效性检测SQL
testWhileIdle
:
true
# 空闲时检测连接有效性(必须开启)
testOnBorrow
:
true
# 借用时检测连接有效性(关键:确保Quartz拿到有效连接)
testOnReturn
:
false
# 归还时不检测(性能考虑)
validationQueryTimeout
:
2000
# 验证查询超时2秒
# 5. 连接保活机制(新增)
keepAlive
:
true
# 开启连接保活
keepAliveBetweenTimeMillis
:
30000
# 保活执行间隔30秒
phyTimeoutMillis
:
1200000
# 物理连接超时20分钟
# 6. 连接泄露检测(保留原有配置)
removeAbandoned
:
true
removeAbandonedTimeout
:
300
logAbandoned
:
true
# 7. 过滤器配置
webStatFilter
:
webStatFilter
:
enabled
:
true
enabled
:
true
statViewServlet
:
statViewServlet
:
enabled
:
true
enabled
:
true
# 设置白名单,不填则允许所有访问
allow
:
allow
:
url-pattern
:
/druid/*
url-pattern
:
/druid/*
# 控制台管理用户名和密码
login-username
:
ruoyi
login-username
:
ruoyi
login-password
:
123456
login-password
:
123456
filter
:
filter
:
stat
:
stat
:
enabled
:
true
enabled
:
true
# 慢SQL记录
log-slow-sql
:
true
log-slow-sql
:
true
slow-sql-millis
:
1000
slow-sql-millis
:
1000
merge-sql
:
true
merge-sql
:
true
wall
:
wall
:
config
:
config
:
multi-statement-allow
:
true
multi-statement-allow
:
true
# Quartz持久化配置:cite[1]:cite[5]
quartz
:
# redis 配置(保持不变)
# 持久化到数据库方式
job-store-type
:
jdbc
# 初始化数据库架构
jdbc
:
initialize-schema
:
always
# Quartz调度程序属性:cite[1]
properties
:
org
:
quartz
:
scheduler
:
# 调度任务实例名称
instanceName
:
ZhMESDeviceScheduler
# 实例ID,AUTO自动生成
instanceId
:
AUTO
jobStore
:
# JobStore实现类
class
:
org.quartz.impl.jdbcjobstore.JobStoreTX
# 驱动程序代理类
driverDelegateClass
:
org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# 表名前缀
tablePrefix
:
QRTZ_
# 是否使用属性文件存储JobDataMaps
useProperties
:
true
# 错过触发阈值(毫秒)
misfireThreshold
:
300000
tolerateJobFailure
:
true
# 是否启用集群功能
isClustered
:
false
# 集群检查间隔(毫秒)
clusterCheckinInterval
:
15000
# 配置线程池:cite[1]
threadPool
:
class
:
org.quartz.simpl.SimpleThreadPool
# 线程数
threadCount
:
15
# 设为守护线程,避免项目关闭时线程残留
makeThreadsDaemons
:
true
# 保持默认1分钟,72个Job执行时间≤10秒,不会触发misfire
misfireThreshold
:
60000
# 线程优先级
threadPriority
:
5
# 线程名称前缀
threadNamePrefix
:
ZhMESQuartzWorker_
# redis 配置
redis
:
redis
:
host
:
127.0.0.1
host
:
127.0.0.1
port
:
6379
port
:
6379
# 数据库索引
database
:
0
database
:
0
# 密码
password
:
password
:
# 连接超时时间
timeout
:
10s
timeout
:
10s
lettuce
:
lettuce
:
pool
:
pool
:
# 连接池中的最小空闲连接
min-idle
:
0
min-idle
:
0
# 连接池中的最大空闲连接
max-idle
:
8
max-idle
:
8
# 连接池的最大数据库连接数
max-active
:
8
max-active
:
8
# #连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait
:
-1ms
max-wait
:
-1ms
# 其他配置保持不变...
# 项目相关配置
zehong
:
zehong
:
# 名称
name
:
Zehong
name
:
Zehong
# 版本
version
:
3.5.0
version
:
3.5.0
# 版权年份
copyrightYear
:
2021
copyrightYear
:
2021
# 实例演示开关
demoEnabled
:
true
demoEnabled
:
true
# 文件路径 示例( Windows配置D:/zehong/uploadPath,Linux配置 /home/zehong/uploadPath)
profile
:
D:/zhmes-agecal/uploadPath
profile
:
D:/zhmes-agecal/uploadPath
# 获取ip地址开关
addressEnabled
:
false
addressEnabled
:
false
# 验证码类型 math 数组计算 char 字符验证
captchaType
:
math
captchaType
:
math
# Netty配置
netty
:
port
:
6001
# 服务端口
boss-group-thread-count
:
1
# 主线程数
worker-group-thread-count
:
8
# 工作线程数
max-frame-length
:
65535
# 最大帧长度
heartbeat-timeout
:
10
# 心跳超时时间(秒)
netty
:
port
:
6001
boss-group-thread-count
:
1
worker-group-thread-count
:
8
max-frame-length
:
65535
heartbeat-timeout
:
10
# 机械臂UDP配置
robot
:
robot
:
arm
:
arm
:
udp
:
udp
:
...
...
zhmes-agecal-quartz/src/main/java/com/zehong/quartz/config/QuartzTaskMonitor.java
0 → 100644
View file @
6cc339fe
package
com
.
zehong
.
quartz
.
config
;
import
org.quartz.*
;
import
org.quartz.impl.matchers.GroupMatcher
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.stereotype.Component
;
import
javax.annotation.Resource
;
import
java.util.*
;
/**
* @author lenovo
* @date 2025/10/15
* @description TODO
*/
@Component
public
class
QuartzTaskMonitor
{
private
static
final
Logger
log
=
LoggerFactory
.
getLogger
(
QuartzTaskMonitor
.
class
);
@Resource
private
Scheduler
scheduler
;
/**
* 监控指定storeyId的任务状态
*/
public
void
monitorTaskStatus
(
Long
fStoreyId
)
{
try
{
String
JOB_GROUP
=
"DEVICE_TASKS"
;
String
TRIGGER_GROUP
=
"DEVICE_TRIGGERS"
;
Set
<
JobKey
>
jobKeys
=
scheduler
.
getJobKeys
(
GroupMatcher
.
jobGroupEquals
(
JOB_GROUP
));
Set
<
TriggerKey
>
triggerKeys
=
scheduler
.
getTriggerKeys
(
GroupMatcher
.
triggerGroupEquals
(
TRIGGER_GROUP
));
log
.
info
(
"=== 任务状态详细监控:storeyId={} ==="
,
fStoreyId
);
log
.
info
(
"总任务数: {}, 总触发器数: {}"
,
jobKeys
.
size
(),
triggerKeys
.
size
());
int
relatedJobs
=
0
;
int
relatedTriggers
=
0
;
// 监控相关任务
for
(
JobKey
jobKey
:
jobKeys
)
{
if
(
jobKey
.
getName
().
contains
(
fStoreyId
.
toString
()))
{
relatedJobs
++;
List
<?
extends
Trigger
>
triggers
=
scheduler
.
getTriggersOfJob
(
jobKey
);
for
(
Trigger
trigger
:
triggers
)
{
Trigger
.
TriggerState
state
=
scheduler
.
getTriggerState
(
trigger
.
getKey
());
Date
nextFireTime
=
trigger
.
getNextFireTime
();
Date
previousFireTime
=
trigger
.
getPreviousFireTime
();
log
.
info
(
"任务[{}]:"
,
jobKey
.
getName
());
log
.
info
(
" - 状态: {}"
,
state
);
log
.
info
(
" - 下次执行: {}"
,
nextFireTime
);
log
.
info
(
" - 上次执行: {}"
,
previousFireTime
);
log
.
info
(
" - 触发器: {}"
,
trigger
.
getKey
().
getName
());
if
(
trigger
instanceof
SimpleTrigger
)
{
SimpleTrigger
simpleTrigger
=
(
SimpleTrigger
)
trigger
;
log
.
info
(
" - 重复次数: {}/{}"
,
simpleTrigger
.
getTimesTriggered
(),
simpleTrigger
.
getRepeatCount
()
+
1
);
}
}
}
}
// 监控相关触发器
for
(
TriggerKey
triggerKey
:
triggerKeys
)
{
if
(
triggerKey
.
getName
().
contains
(
fStoreyId
.
toString
()))
{
relatedTriggers
++;
Trigger
trigger
=
scheduler
.
getTrigger
(
triggerKey
);
if
(
trigger
!=
null
)
{
Trigger
.
TriggerState
state
=
scheduler
.
getTriggerState
(
triggerKey
);
log
.
info
(
"独立触发器[{}]: 状态={}, 下次执行={}"
,
triggerKey
.
getName
(),
state
,
trigger
.
getNextFireTime
());
}
}
}
log
.
info
(
"相关任务数: {}, 相关触发器数: {}"
,
relatedJobs
,
relatedTriggers
);
// 调度器整体状态
SchedulerMetaData
metaData
=
scheduler
.
getMetaData
();
log
.
info
(
"调度器整体状态:"
);
log
.
info
(
" - 已启动: {}"
,
scheduler
.
isStarted
());
log
.
info
(
" - 待机模式: {}"
,
scheduler
.
isInStandbyMode
());
log
.
info
(
" - 已关闭: {}"
,
scheduler
.
isShutdown
());
log
.
info
(
" - 执行任务总数: {}"
,
metaData
.
getNumberOfJobsExecuted
());
log
.
info
(
" - 运行中任务: {}"
,
scheduler
.
getCurrentlyExecutingJobs
().
size
());
}
catch
(
SchedulerException
e
)
{
log
.
error
(
"任务状态监控失败:storeyId={}"
,
fStoreyId
,
e
);
}
}
/**
* 监控所有任务状态
*/
public
void
monitorAllTasksStatus
()
{
try
{
String
JOB_GROUP
=
"DEVICE_TASKS"
;
String
TRIGGER_GROUP
=
"DEVICE_TRIGGERS"
;
Set
<
JobKey
>
jobKeys
=
scheduler
.
getJobKeys
(
GroupMatcher
.
jobGroupEquals
(
JOB_GROUP
));
Set
<
TriggerKey
>
triggerKeys
=
scheduler
.
getTriggerKeys
(
GroupMatcher
.
triggerGroupEquals
(
TRIGGER_GROUP
));
log
.
info
(
"=== 所有任务状态监控 ==="
);
log
.
info
(
"总任务数: {}, 总触发器数: {}"
,
jobKeys
.
size
(),
triggerKeys
.
size
());
// 按storeyId分组显示
Map
<
String
,
List
<
JobKey
>>
jobsByStorey
=
new
HashMap
<>();
for
(
JobKey
jobKey
:
jobKeys
)
{
String
jobName
=
jobKey
.
getName
();
// 解析storeyId:COMM_501_1_123 中的 123
String
[]
parts
=
jobName
.
split
(
"_"
);
if
(
parts
.
length
>=
4
)
{
String
storeyId
=
parts
[
3
];
jobsByStorey
.
computeIfAbsent
(
storeyId
,
k
->
new
ArrayList
<>()).
add
(
jobKey
);
}
}
for
(
Map
.
Entry
<
String
,
List
<
JobKey
>>
entry
:
jobsByStorey
.
entrySet
())
{
log
.
info
(
"StoreyId {} 的任务数: {}"
,
entry
.
getKey
(),
entry
.
getValue
().
size
());
for
(
JobKey
jobKey
:
entry
.
getValue
())
{
List
<?
extends
Trigger
>
triggers
=
scheduler
.
getTriggersOfJob
(
jobKey
);
for
(
Trigger
trigger
:
triggers
)
{
Trigger
.
TriggerState
state
=
scheduler
.
getTriggerState
(
trigger
.
getKey
());
log
.
info
(
" - {}: 状态={}, 下次执行={}"
,
jobKey
.
getName
(),
state
,
trigger
.
getNextFireTime
());
}
}
}
}
catch
(
SchedulerException
e
)
{
log
.
error
(
"所有任务状态监控失败"
,
e
);
}
}
/**
* 检查调度器健康状态
*/
public
void
checkSchedulerHealth
()
{
try
{
SchedulerMetaData
metaData
=
scheduler
.
getMetaData
();
log
.
info
(
"=== 调度器健康检查 ==="
);
log
.
info
(
"调度器名称: {}"
,
metaData
.
getSchedulerName
());
log
.
info
(
"调度器实例ID: {}"
,
metaData
.
getSchedulerInstanceId
());
log
.
info
(
"已启动: {}"
,
scheduler
.
isStarted
());
log
.
info
(
"待机模式: {}"
,
scheduler
.
isInStandbyMode
());
log
.
info
(
"运行中任务: {}"
,
scheduler
.
getCurrentlyExecutingJobs
().
size
());
log
.
info
(
"线程池大小: {}"
,
metaData
.
getThreadPoolSize
());
log
.
info
(
"执行任务总数: {}"
,
metaData
.
getNumberOfJobsExecuted
());
// 健康状态判断
if
(!
scheduler
.
isStarted
())
{
log
.
error
(
"❌ 调度器未启动!"
);
}
else
if
(
scheduler
.
isInStandbyMode
())
{
log
.
warn
(
"⚠️ 调度器处于待机模式"
);
}
else
{
log
.
info
(
"✅ 调度器运行正常"
);
}
}
catch
(
SchedulerException
e
)
{
log
.
error
(
"调度器健康检查失败"
,
e
);
}
}
}
zhmes-agecal-quartz/src/main/java/com/zehong/quartz/config/ScheduleConfig.java
View file @
6cc339fe
package
com
.
zehong
.
quartz
.
config
;
package
com
.
zehong
.
quartz
.
config
;
import
org.quartz.spi.TriggerFiredBundle
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.config.AutowireCapableBeanFactory
;
import
org.springframework.context.ApplicationContext
;
import
org.springframework.context.ApplicationContext
;
import
org.springframework.context.ApplicationContextAware
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.scheduling.quartz.SchedulerFactoryBean
;
import
org.springframework.scheduling.quartz.SchedulerFactoryBean
;
...
@@ -34,38 +37,35 @@ public class ScheduleConfig
...
@@ -34,38 +37,35 @@ public class ScheduleConfig
factory
.
setJobFactory
(
springBeanJobFactory
());
factory
.
setJobFactory
(
springBeanJobFactory
());
factory
.
setDataSource
(
dataSource
);
factory
.
setDataSource
(
dataSource
);
// quartz参数
Properties
prop
=
new
Properties
();
Properties
prop
=
new
Properties
();
prop
.
put
(
"org.quartz.scheduler.instanceName"
,
"
Ruoyi
Scheduler"
);
prop
.
put
(
"org.quartz.scheduler.instanceName"
,
"
ZhMESDevice
Scheduler"
);
prop
.
put
(
"org.quartz.scheduler.instanceId"
,
"AUTO"
);
prop
.
put
(
"org.quartz.scheduler.instanceId"
,
"AUTO"
);
// 线程池配置
// 线程池配置
prop
.
put
(
"org.quartz.threadPool.class"
,
"org.quartz.simpl.SimpleThreadPool"
);
prop
.
put
(
"org.quartz.threadPool.class"
,
"org.quartz.simpl.SimpleThreadPool"
);
prop
.
put
(
"org.quartz.threadPool.threadCount"
,
"
20
"
);
prop
.
put
(
"org.quartz.threadPool.threadCount"
,
"
15
"
);
prop
.
put
(
"org.quartz.threadPool.threadPriority"
,
"5"
);
prop
.
put
(
"org.quartz.threadPool.threadPriority"
,
"5"
);
// JobStore配置
prop
.
put
(
"org.quartz.jobStore.class"
,
"org.quartz.impl.jdbcjobstore.JobStoreTX"
);
// 集群配置
prop
.
put
(
"org.quartz.jobStore.isClustered"
,
"true"
);
prop
.
put
(
"org.quartz.jobStore.clusterCheckinInterval"
,
"15000"
);
prop
.
put
(
"org.quartz.jobStore.maxMisfiresToHandleAtATime"
,
"1"
);
prop
.
put
(
"org.quartz.jobStore.txIsolationLevelSerializable"
,
"true"
);
//
sqlserver 启用
//
JobStore配置 - 关键修复:关闭集群模式
// prop.put("org.quartz.jobStore.selectWithLockSQL", "SELECT * FROM {0}LOCKS UPDLOCK WHERE LOCK_NAME = ?
");
prop
.
put
(
"org.quartz.jobStore.class"
,
"org.quartz.impl.jdbcjobstore.JobStoreTX
"
);
prop
.
put
(
"org.quartz.jobStore.
misfireThreshold"
,
"12000
"
);
prop
.
put
(
"org.quartz.jobStore.
driverDelegateClass"
,
"org.quartz.impl.jdbcjobstore.StdJDBCDelegate
"
);
prop
.
put
(
"org.quartz.jobStore.tablePrefix"
,
"QRTZ_"
);
prop
.
put
(
"org.quartz.jobStore.tablePrefix"
,
"QRTZ_"
);
factory
.
setQuartzProperties
(
prop
);
prop
.
put
(
"org.quartz.jobStore.isClustered"
,
"false"
);
// 单机部署关闭集群
prop
.
put
(
"org.quartz.jobStore.useProperties"
,
"false"
);
// 使用类型安全的方式
factory
.
setSchedulerName
(
"RuoyiScheduler"
);
// 其他优化配置
// 延时启动
prop
.
put
(
"org.quartz.jobStore.misfireThreshold"
,
"60000"
);
factory
.
setStartupDelay
(
1
);
prop
.
put
(
"org.quartz.scheduler.skipUpdateCheck"
,
"true"
);
factory
.
setApplicationContextSchedulerContextKey
(
"applicationContextKey"
);
// 可选,QuartzScheduler
factory
.
setQuartzProperties
(
prop
);
// 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了
factory
.
setSchedulerName
(
"ZhMESDeviceScheduler"
);
factory
.
setStartupDelay
(
5
);
// 增加启动延迟
factory
.
setApplicationContextSchedulerContextKey
(
"applicationContext"
);
factory
.
setOverwriteExistingJobs
(
true
);
factory
.
setOverwriteExistingJobs
(
true
);
// 设置自动启动,默认为true
factory
.
setAutoStartup
(
true
);
factory
.
setAutoStartup
(
true
);
factory
.
setWaitForJobsToCompleteOnShutdown
(
true
);
return
factory
;
return
factory
;
}
}
}
}
zhmes-agecal-system/src/main/java/com/zehong/system/task/DeviceCommJob/BaseDeviceCommJob.java
View file @
6cc339fe
...
@@ -96,16 +96,29 @@ public abstract class BaseDeviceCommJob implements Job {
...
@@ -96,16 +96,29 @@ public abstract class BaseDeviceCommJob implements Job {
log
.
info
(
"tStoreyInfoMapper: {}"
,
tStoreyInfoMapper
==
null
?
"NULL"
:
"OK"
);
log
.
info
(
"tStoreyInfoMapper: {}"
,
tStoreyInfoMapper
==
null
?
"NULL"
:
"OK"
);
log
.
info
(
"resultHandler: {}"
,
resultHandler
==
null
?
"NULL"
:
"OK"
);
log
.
info
(
"resultHandler: {}"
,
resultHandler
==
null
?
"NULL"
:
"OK"
);
long
taskStartTime
=
System
.
currentTimeMillis
();
Long
fStoreyId
=
null
;
String
storeyIdStr
=
getStoreyIdFromContext
(
context
);
Integer
port
=
null
;
// 单设备场景:直接获取唯一deviceId(子类返回的List仅1个元素)
Integer
deviceId
=
null
;
Integer
deviceId
=
getSingleDeviceId
();
TStoreyInfo
storeyInfo
=
null
;
// 保存JobKey和TriggerKey,用于后续清理
try
{
JobKey
jobKey
=
context
.
getJobDetail
().
getKey
();
// 1. 正确获取参数
TriggerKey
triggerKey
=
context
.
getTrigger
().
getKey
();
JobDataMap
jobDataMap
=
context
.
getJobDetail
().
getJobDataMap
();
log
.
info
(
"单设备任务开始:port={}, deviceId={}"
,
getFixedPort
(),
deviceId
);
// 关键修复:直接获取,不需要转换
fStoreyId
=
jobDataMap
.
getLong
(
"fStoreyId"
);
port
=
jobDataMap
.
getInt
(
"port"
);
deviceId
=
jobDataMap
.
getInt
(
"deviceId"
);
log
.
info
(
"=== 开始执行设备任务:storeyId={}, port={}, deviceId={} ==="
,
fStoreyId
,
port
,
deviceId
);
log
.
info
(
"=== 设备任务执行完成:storeyId={}, port={}, deviceId={} ==="
,
fStoreyId
,
port
,
deviceId
);
}
catch
(
Exception
e
)
{
log
.
error
(
"设备任务执行失败:storeyId={}, port={}, deviceId={}, 错误: {}"
,
fStoreyId
,
port
,
deviceId
,
e
.
getMessage
(),
e
);
}
// try {
// try {
// log.info("单设备任务开始:port={}, deviceId={}, storeyId={}",
// log.info("单设备任务开始:port={}, deviceId={}, storeyId={}",
...
@@ -355,28 +368,28 @@ public abstract class BaseDeviceCommJob implements Job {
...
@@ -355,28 +368,28 @@ public abstract class BaseDeviceCommJob implements Job {
/**
/**
* 从JobContext获取storeyId(精简异常处理,失败返回unknown)
* 从JobContext获取storeyId(精简异常处理,失败返回unknown)
*/
*/
private
Stri
ng
getStoreyIdFromContext
(
JobExecutionContext
context
)
{
private
Lo
ng
getStoreyIdFromContext
(
JobExecutionContext
context
)
{
try
{
try
{
if
(
context
==
null
)
{
if
(
context
==
null
)
{
log
.
error
(
"getStoreyIdFromContext:context为null"
);
log
.
error
(
"getStoreyIdFromContext:context为null"
);
return
"unknown"
;
return
0L
;
}
}
JobDetail
jobDetail
=
context
.
getJobDetail
();
JobDetail
jobDetail
=
context
.
getJobDetail
();
if
(
jobDetail
==
null
)
{
if
(
jobDetail
==
null
)
{
log
.
error
(
"getStoreyIdFromContext:jobDetail为null"
);
log
.
error
(
"getStoreyIdFromContext:jobDetail为null"
);
return
"unknown"
;
return
0L
;
}
}
JobDataMap
dataMap
=
jobDetail
.
getJobDataMap
();
JobDataMap
dataMap
=
jobDetail
.
getJobDataMap
();
if
(
dataMap
==
null
)
{
if
(
dataMap
==
null
)
{
log
.
error
(
"getStoreyIdFromContext:dataMap为null"
);
log
.
error
(
"getStoreyIdFromContext:dataMap为null"
);
return
"unknown"
;
return
0L
;
}
}
String
storeyId
=
dataMap
.
getStri
ng
(
"fStoreyId"
);
long
fStoreyId
=
dataMap
.
getLo
ng
(
"fStoreyId"
);
log
.
info
(
"getStoreyIdFromContext:获取到storeyId={}"
,
s
toreyId
);
log
.
info
(
"getStoreyIdFromContext:获取到storeyId={}"
,
fS
toreyId
);
return
s
toreyId
;
return
fS
toreyId
;
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
log
.
error
(
"getStoreyIdFromContext异常:"
,
e
);
log
.
error
(
"getStoreyIdFromContext异常:"
,
e
);
return
"unknown"
;
return
0L
;
}
}
}
}
...
...
zhmes-agecal-system/src/main/java/com/zehong/system/task/DeviceTaskScheduler.java
View file @
6cc339fe
package
com
.
zehong
.
system
.
task
;
package
com
.
zehong
.
system
.
task
;
import
com.zehong.quartz.config.QuartzTaskMonitor
;
import
com.zehong.system.task.DeviceCommJob.*
;
import
com.zehong.system.task.DeviceCommJob.*
;
import
org.quartz.*
;
import
org.quartz.*
;
import
org.quartz.impl.matchers.GroupMatcher
;
import
org.quartz.impl.matchers.GroupMatcher
;
import
org.slf4j.Logger
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Service
;
import
org.springframework.stereotype.Service
;
import
org.terracotta.quartz.wrappers.TriggerWrapper
;
import
org.terracotta.quartz.wrappers.TriggerWrapper
;
...
@@ -31,6 +33,9 @@ public class DeviceTaskScheduler {
...
@@ -31,6 +33,9 @@ public class DeviceTaskScheduler {
private
static
final
String
JOB_GROUP
=
"DEVICE_TASKS"
;
private
static
final
String
JOB_GROUP
=
"DEVICE_TASKS"
;
// 触发器组名(统一固定,与原有逻辑保持一致)
// 触发器组名(统一固定,与原有逻辑保持一致)
private
static
final
String
TRIGGER_GROUP
=
"DEVICE_TRIGGERS"
;
private
static
final
String
TRIGGER_GROUP
=
"DEVICE_TRIGGERS"
;
// 新增:注入监控器
@Autowired
private
QuartzTaskMonitor
quartzTaskMonitor
;
// @PostConstruct
// @PostConstruct
// public void init() throws SchedulerException {
// public void init() throws SchedulerException {
// scheduler.getListenerManager().addJobListener(new QuartzJobListener());
// scheduler.getListenerManager().addJobListener(new QuartzJobListener());
...
@@ -51,6 +56,11 @@ public class DeviceTaskScheduler {
...
@@ -51,6 +56,11 @@ public class DeviceTaskScheduler {
// 1. 调度器健康检查(确保线程池可用)
// 1. 调度器健康检查(确保线程池可用)
checkSchedulerHealth
();
checkSchedulerHealth
();
// 2. 创建任务前的状态监控
quartzTaskMonitor
.
monitorTaskStatus
(
fStoreyId
);
// 2. 清理可能的残留任务
cleanExistingTasks
(
fStoreyId
);
// 3. 创建核心任务
// 3. 创建核心任务
// 1. 创建3个端口专属任务(501:5分钟后,502:10分钟后,503:15分钟后)
// 1. 创建3个端口专属任务(501:5分钟后,502:10分钟后,503:15分钟后)
createPortSpecificCommJobs
(
fStoreyId
);
createPortSpecificCommJobs
(
fStoreyId
);
...
@@ -66,6 +76,29 @@ public class DeviceTaskScheduler {
...
@@ -66,6 +76,29 @@ public class DeviceTaskScheduler {
throw
new
RuntimeException
(
"Quartz任务调度失败"
,
e
);
throw
new
RuntimeException
(
"Quartz任务调度失败"
,
e
);
}
}
}
}
/**
* 清理已存在的任务 - 修复版本
*/
private
void
cleanExistingTasks
(
Long
fStoreyId
)
throws
SchedulerException
{
// 清理所有与这个fStoreyId相关的任务
Set
<
JobKey
>
jobKeys
=
scheduler
.
getJobKeys
(
GroupMatcher
.
jobGroupEquals
(
JOB_GROUP
));
for
(
JobKey
jobKey
:
jobKeys
)
{
if
(
jobKey
.
getName
().
contains
(
fStoreyId
.
toString
()))
{
scheduler
.
deleteJob
(
jobKey
);
log
.
info
(
"清理现有任务: {}"
,
jobKey
.
getName
());
}
}
// 清理触发器
Set
<
TriggerKey
>
triggerKeys
=
scheduler
.
getTriggerKeys
(
GroupMatcher
.
triggerGroupEquals
(
TRIGGER_GROUP
));
for
(
TriggerKey
triggerKey
:
triggerKeys
)
{
if
(
triggerKey
.
getName
().
contains
(
fStoreyId
.
toString
()))
{
scheduler
.
unscheduleJob
(
triggerKey
);
log
.
info
(
"清理现有触发器: {}"
,
triggerKey
.
getName
());
}
}
}
// -------------------------- 新增:创建3个端口专属任务 --------------------------
// -------------------------- 新增:创建3个端口专属任务 --------------------------
/**
/**
* 为3个端口Job创建一次性SimpleTrigger:
* 为3个端口Job创建一次性SimpleTrigger:
...
@@ -78,7 +111,7 @@ public class DeviceTaskScheduler {
...
@@ -78,7 +111,7 @@ public class DeviceTaskScheduler {
createPortCommJob
(
fStoreyId
,
501
,
1
,
DeviceComm501Device1Job
.
class
,
2
);
createPortCommJob
(
fStoreyId
,
501
,
1
,
DeviceComm501Device1Job
.
class
,
2
);
// 2. 端口502:10分钟后执行
// 2. 端口502:10分钟后执行
createPortCommJob
(
fStoreyId
,
501
,
2
,
DeviceComm501Device2Job
.
class
,
4
);
createPortCommJob
(
fStoreyId
,
501
,
2
,
DeviceComm501Device2Job
.
class
,
4
);
// 3. 端口503:15分钟后执行
//
// 3. 端口503:15分钟后执行
createPortCommJob
(
fStoreyId
,
501
,
3
,
DeviceComm501Device3Job
.
class
,
6
);
createPortCommJob
(
fStoreyId
,
501
,
3
,
DeviceComm501Device3Job
.
class
,
6
);
}
}
/**
/**
...
@@ -89,48 +122,46 @@ public class DeviceTaskScheduler {
...
@@ -89,48 +122,46 @@ public class DeviceTaskScheduler {
* @param delayMin 延迟执行时间(分钟)
* @param delayMin 延迟执行时间(分钟)
*/
*/
private
void
createPortCommJob
(
Long
fStoreyId
,
int
port
,
int
deviceId
,
Class
<?
extends
Job
>
jobClass
,
int
delayMin
)
throws
SchedulerException
{
private
void
createPortCommJob
(
Long
fStoreyId
,
int
port
,
int
deviceId
,
Class
<?
extends
Job
>
jobClass
,
int
delayMin
)
throws
SchedulerException
{
String
jobId
=
"COMM_"
+
port
+
"_"
+
deviceId
+
"_"
+
fStoreyId
;
// 修正命名:port_deviceId_storeyId(避免冲突)
String
jobId
=
String
.
format
(
"COMM_%d_%d_%d"
,
port
,
deviceId
,
fStoreyId
);
String
triggerId
=
"TRIGGER_"
+
port
+
"_"
+
deviceId
+
"_"
+
fStoreyId
;
String
triggerId
=
String
.
format
(
"TRIGGER_%d_%d_%d"
,
port
,
deviceId
,
fStoreyId
);
JobKey
jobKey
=
new
JobKey
(
jobId
,
JOB_GROUP
);
JobKey
jobKey
=
new
JobKey
(
jobId
,
JOB_GROUP
);
TriggerKey
triggerKey
=
new
TriggerKey
(
triggerId
,
TRIGGER_GROUP
);
TriggerKey
triggerKey
=
new
TriggerKey
(
triggerId
,
TRIGGER_GROUP
);
//
增强去重逻辑:先检查Trigger状态
//
清理现有任务
if
(
scheduler
.
checkExists
(
triggerKey
))
{
if
(
scheduler
.
checkExists
(
triggerKey
))
{
Trigger
.
TriggerState
triggerState
=
scheduler
.
getTriggerState
(
triggerKey
);
// 有效状态:WAITING(等待执行)、ACQUIRED(已获取,待执行)
// 无效状态:ERROR/COMPLETE/PAUSED,删除重建
log
.
warn
(
"端口[{}]设备[{}]:Trigger状态无效,删除重建,state={}, triggerKey={}"
,
port
,
deviceId
,
triggerState
,
triggerKey
);
scheduler
.
unscheduleJob
(
triggerKey
);
scheduler
.
unscheduleJob
(
triggerKey
);
}
}
// 检查Job是否存在(若Trigger已删除,Job可能残留)
if
(
scheduler
.
checkExists
(
jobKey
))
{
if
(
scheduler
.
checkExists
(
jobKey
))
{
log
.
info
(
"端口[{}]设备[{}]:Job已存在,删除重建,jobKey={}"
,
port
,
deviceId
,
jobKey
);
scheduler
.
deleteJob
(
jobKey
);
scheduler
.
deleteJob
(
jobKey
);
}
}
// 原有创建JobDetail和Trigger的逻辑...
// 关键修复:使用正确的数据类型传递参数
JobDataMap
jobDataMap
=
new
JobDataMap
();
jobDataMap
.
put
(
"fStoreyId"
,
fStoreyId
);
// 直接传递Long
jobDataMap
.
put
(
"port"
,
port
);
// 直接传递int
jobDataMap
.
put
(
"deviceId"
,
deviceId
);
// 直接传递int
JobDetail
job
=
JobBuilder
.
newJob
(
jobClass
)
JobDetail
job
=
JobBuilder
.
newJob
(
jobClass
)
.
withIdentity
(
jobKey
)
.
withIdentity
(
jobKey
)
.
usingJobData
(
"fStoreyId"
,
fStoreyId
.
toString
())
.
usingJobData
(
jobDataMap
)
// 使用JobDataMap而不是逐个设置
.
storeDurably
(
false
)
.
storeDurably
(
false
)
.
build
();
.
build
();
Date
triggerTime
=
Date
.
from
(
Instant
.
now
().
plus
(
delayMin
,
ChronoUnit
.
MINUTES
));
Date
triggerTime
=
Date
.
from
(
Instant
.
now
().
plus
(
delayMin
,
ChronoUnit
.
MINUTES
));
SimpleTrigger
trigger
=
TriggerBuilder
.
newTrigger
()
SimpleTrigger
trigger
=
TriggerBuilder
.
newTrigger
()
.
withIdentity
(
triggerKey
)
.
withIdentity
(
triggerKey
)
.
forJob
(
jobKey
)
.
forJob
(
jobKey
)
.
startAt
(
triggerTime
)
.
startAt
(
triggerTime
)
.
withSchedule
(
SimpleScheduleBuilder
.
simpleSchedule
()
.
withSchedule
(
SimpleScheduleBuilder
.
simpleSchedule
()
.
withRepeatCount
(
0
)
.
withRepeatCount
(
0
)
// 只执行一次
.
withMisfireHandlingInstructionFireNow
())
.
withMisfireHandlingInstructionFireNow
())
.
build
();
.
build
();
Date
nextFireTime
=
scheduler
.
scheduleJob
(
job
,
trigger
);
Date
nextFireTime
=
scheduler
.
scheduleJob
(
job
,
trigger
);
log
.
info
(
"端口[{}]设备[{}]任务创建成功:
jobId={},延迟{}分钟,下次执行
:{}"
,
log
.
info
(
"端口[{}]设备[{}]任务创建成功:
延迟{}分钟,执行时间
:{}"
,
port
,
deviceId
,
jobId
,
delayMin
,
nextFireTime
);
port
,
deviceId
,
delayMin
,
nextFireTime
);
}
}
/**
/**
* 单个端口Job和Trigger创建(通用方法)
* 单个端口Job和Trigger创建(通用方法)
...
@@ -262,16 +293,13 @@ public class DeviceTaskScheduler {
...
@@ -262,16 +293,13 @@ public class DeviceTaskScheduler {
scheduler
.
start
();
scheduler
.
start
();
}
}
SchedulerMetaData
metaData
=
scheduler
.
getMetaData
();
if
(
scheduler
.
isInStandbyMode
())
{
// 低版本兼容:获取线程池大小和已执行任务数(替代活跃线程数)
log
.
warn
(
"调度器处于待机模式,启动..."
);
int
poolSize
=
metaData
.
getThreadPoolSize
();
scheduler
.
start
();
long
executedJobs
=
metaData
.
getNumberOfJobsExecuted
();
log
.
info
(
"Quartz健康状态:线程池大小={}, 已执行任务数={}"
,
poolSize
,
executedJobs
);
// 线程池大小预警(根据实际需求调整阈值)
if
(
poolSize
<
5
)
{
log
.
warn
(
"Quartz线程池过小(当前={}),可能导致任务延迟"
,
poolSize
);
}
}
log
.
info
(
"调度器状态: 已启动={}, 待机模式={}"
,
scheduler
.
isStarted
(),
scheduler
.
isInStandbyMode
());
}
}
/**
/**
* 检查触发器是否为ERROR状态
* 检查触发器是否为ERROR状态
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment