Commit daa59df2 authored by wanghao's avatar wanghao

1 测试 上电后通信 和 最终完成 定时任务功能

parent 3a8765a2
......@@ -55,6 +55,46 @@ spring:
wall:
config:
multi-statement-allow: true
# Quartz持久化配置:cite[1]:cite[5]
quartz:
# 持久化到数据库方式
job-store-type: jdbc
# 初始化数据库架构
jdbc:
initialize-schema: always
# Quartz调度程序属性:cite[1]
properties:
org:
quartz:
scheduler:
# 调度任务实例名称
instanceName: ZhMESDeviceScheduler
# 实例ID,AUTO自动生成
instanceId: AUTO
jobStore:
# JobStore实现类
class: org.quartz.impl.jdbcjobstore.JobStoreTX
# 驱动程序代理类
driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# 表名前缀
tablePrefix: QRTZ_
# 是否使用属性文件存储JobDataMaps
useProperties: true
# 错过触发阈值(毫秒)
misfireThreshold: 60000
# 是否启用集群功能
isClustered: false
# 集群检查间隔(毫秒)
clusterCheckinInterval: 15000
# 配置线程池:cite[1]
threadPool:
class: org.quartz.simpl.SimpleThreadPool
# 线程数
threadCount: 10
# 线程优先级
threadPriority: 5
# 线程名称前缀
threadNamePrefix: ZhMESQuartzWorker_
# redis 配置
redis:
# 地址
......
......@@ -55,6 +55,46 @@ spring:
wall:
config:
multi-statement-allow: true
# Quartz持久化配置:cite[1]:cite[5]
quartz:
# 持久化到数据库方式
job-store-type: jdbc
# 初始化数据库架构
jdbc:
initialize-schema: always
# Quartz调度程序属性:cite[1]
properties:
org:
quartz:
scheduler:
# 调度任务实例名称
instanceName: ZhMESDeviceScheduler
# 实例ID,AUTO自动生成
instanceId: AUTO
jobStore:
# JobStore实现类
class: org.quartz.impl.jdbcjobstore.JobStoreTX
# 驱动程序代理类
driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
# 表名前缀
tablePrefix: QRTZ_
# 是否使用属性文件存储JobDataMaps
useProperties: true
# 错过触发阈值(毫秒)
misfireThreshold: 60000
# 是否启用集群功能
isClustered: false
# 集群检查间隔(毫秒)
clusterCheckinInterval: 15000
# 配置线程池:cite[1]
threadPool:
class: org.quartz.simpl.SimpleThreadPool
# 线程数
threadCount: 10
# 线程优先级
threadPriority: 5
# 线程名称前缀
threadNamePrefix: ZhMESQuartzWorker_
# redis 配置
redis:
host: 127.0.0.1
......
package com.zehong.framework.config.properties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import javax.sql.DataSource;
import java.util.Properties;
/**
* @author lenovo
* @date 2025/9/23
* @description TODO
*/
@Configuration
public class QuartzConfig {
@Bean
public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource) {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setDataSource(dataSource);
factory.setAutoStartup(true);
factory.setWaitForJobsToCompleteOnShutdown(true);
factory.setOverwriteExistingJobs(true);
Properties props = new Properties();
props.put("org.quartz.scheduler.instanceName", "DeviceScheduler");
props.put("org.quartz.scheduler.instanceId", "AUTO");
props.put("org.quartz.jobStore.misfireThreshold", "60000");
factory.setQuartzProperties(props);
return factory;
}
}
package com.zehong.framework.config.properties;
import com.zehong.system.task.DeviceTaskScheduler;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* @author lenovo
* @date 2025/9/23
* @description TODO
*/
@Component
public class QuartzStartupListener implements ApplicationListener<ApplicationReadyEvent> {
private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(QuartzStartupListener.class);
@Resource
private DeviceTaskScheduler deviceTaskScheduler;
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
log.info("应用启动完成,检查Quartz任务状态...");
// 这里可以添加需要自动恢复的任务检查逻辑
}
}
package com.zehong.system.task;
import org.quartz.*;
import org.quartz.spi.OperableTrigger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
......@@ -10,7 +9,6 @@ import javax.annotation.Resource;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.List;
/**
* @author lenovo
......@@ -29,107 +27,161 @@ public class DeviceTaskScheduler {
// 触发器组名(统一固定,与原有逻辑保持一致)
private static final String TRIGGER_GROUP = "DEVICE_TRIGGERS";
/**
* 创建设备监控任务
* @param fStoreyId 设备ID
* 增强的创建设备监控任务方法
*/
public void scheduleDeviceMonitoring(Long fStoreyId,String fPowerOutageIp,Integer fPowerOutagePort) throws SchedulerException {
public void scheduleDeviceMonitoring(Long fStoreyId, String fPowerOutageIp, Integer fPowerOutagePort) {
try {
log.info("开始创建设备监控任务:{}", fStoreyId);
// 检查调度器状态
if (!scheduler.isStarted()) {
log.warn("调度器未启动,正在启动...");
scheduler.start();
}
log.info("创建设备监控任务:{}", fStoreyId);
// 2. 创建每小时通信任务
// 创建任务
createHourlyCommunicationJob(fStoreyId);
createFinalExecutionJob(fStoreyId, fPowerOutageIp, fPowerOutagePort);
log.info("设备监控任务创建完成:{}", fStoreyId);
// 3. 创建71小时后执行任务
createFinalExecutionJob(fStoreyId,fPowerOutageIp,fPowerOutagePort);
} catch (SchedulerException e) {
log.error("创建设备监控任务失败:{}", fStoreyId, e);
throw new RuntimeException("任务调度失败", e);
}
}
/**
* 创建每小时通信任务
* 增强的每小时通信任务创建
*/
private void createHourlyCommunicationJob(Long fStoreyId) throws SchedulerException {
// 1. 构建任务唯一标识(JobKey = jobId + 任务组名)
log.info("创建每小时通信任务:{}", fStoreyId);
String jobId = "COMM_" + fStoreyId;
JobKey jobKey = new JobKey(jobId, JOB_GROUP);
// 构建触发器唯一标识(TriggerKey = jobId + "_TRIGGER" + 触发器组名)
TriggerKey triggerKey = new TriggerKey(jobId + "_TRIGGER", TRIGGER_GROUP);
// 2. 准备JobDetail(与原有逻辑一致,仅初始化不提交)
String fStoreyIdStr = fStoreyId.toString();
// 检查任务是否已存在且有效
if (isJobExistsAndValid(jobKey, triggerKey)) {
log.info("每小时通信任务[{}]已存在且有效,跳过创建", jobId);
return;
}
JobDetail job = JobBuilder.newJob(DeviceCommunicationJob.class)
.withIdentity(jobKey) // 直接用构建好的JobKey,避免重复编码
.usingJobData("fStoreyId", fStoreyIdStr)
.storeDurably() // 保留原有持久化配置
.withIdentity(jobKey)
.usingJobData("fStoreyId", fStoreyId.toString())
.storeDurably()
.requestRecovery(true) // 添加任务恢复能力:cite[4]
.build();
// 3. 准备新触发器(Cron调度,与原有逻辑一致)
// 明确创建CronTrigger,并配置错过执行的策略
CronTrigger newTrigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
.withSchedule(CronScheduleBuilder.cronSchedule("0 0/2 * * * ?")
.withMisfireHandlingInstructionFireAndProceed()) // 处理错过执行的策略
.withMisfireHandlingInstructionFireAndProceed())
.build();
// 4. 分场景处理:存在则更新,不存在则创建
if (scheduler.checkExists(jobKey)) {
// 先删除旧触发器再添加新触发器
// 删除旧触发器,避免冲突
if (scheduler.checkExists(triggerKey)) {
scheduler.unscheduleJob(triggerKey);
}
scheduler.scheduleJob(newTrigger);
log.info("每小时通信任务[{}]已存在,成功更新触发器", jobId);
} else {
// 任务不存在:创建JobDetail和触发器
scheduler.scheduleJob(job, newTrigger);
log.info("每小时通信任务[{}]不存在,成功创建任务及触发器", jobId);
}
// 使用事务性操作
scheduleJobWithRecovery(job, newTrigger, jobKey, triggerKey, "每小时通信任务");
}
/**
* 创建71小时后执行任务
* 增强的最终执行任务创建
*/
private void createFinalExecutionJob(Long fStoreyId,String fPowerOutageIp,Integer fPowerOutagePort) throws SchedulerException {
log.info("创建71小时后执行任务:{}", fStoreyId);
private void createFinalExecutionJob(Long fStoreyId, String fPowerOutageIp, Integer fPowerOutagePort)
throws SchedulerException {
String jobId = "FINAL_" + fStoreyId;
JobKey jobKey = new JobKey(jobId, JOB_GROUP);
TriggerKey triggerKey = new TriggerKey(jobId + "_TRIGGER", TRIGGER_GROUP);
// 检查任务是否已存在且有效
if (isJobExistsAndValid(jobKey, triggerKey)) {
log.info("最终执行任务[{}]已存在且有效,跳过创建", jobId);
return;
}
JobDetail job = JobBuilder.newJob(FinalExecutionJob.class)
.withIdentity(jobKey)
.usingJobData("fStoreyId", fStoreyId.toString())
.usingJobData("fPowerOutageIp", fPowerOutageIp)
.usingJobData("fPowerOutagePort", fPowerOutagePort.toString())
.storeDurably()
.requestRecovery(true) // 添加任务恢复能力
.build();
// 修复触发器配置:明确使用一次性触发器
Date executeTime = Date.from(Instant.now().plus(5, ChronoUnit.MINUTES));
SimpleTrigger newTrigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
.startAt(executeTime)
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withMisfireHandlingInstructionFireNow()) // 添加错过执行的处理策略
.withMisfireHandlingInstructionFireNow())
.build();
// 修复更新逻辑:先删除再添加,避免冲突
scheduleJobWithRecovery(job, newTrigger, jobKey, triggerKey, "最终执行任务");
}
/**
* 检查任务是否存在且有效
*/
private boolean isJobExistsAndValid(JobKey jobKey, TriggerKey triggerKey) throws SchedulerException {
if (!scheduler.checkExists(jobKey) || !scheduler.checkExists(triggerKey)) {
return false;
}
Trigger trigger = scheduler.getTrigger(triggerKey);
return trigger != null && trigger.getNextFireTime() != null;
}
/**
* 带恢复机制的任务调度方法
*/
private void scheduleJobWithRecovery(JobDetail job, Trigger trigger, JobKey jobKey,
TriggerKey triggerKey, String taskType) throws SchedulerException {
String jobId = jobKey.getName();
try {
if (scheduler.checkExists(jobKey)) {
// 删除旧触发器,避免冲突
// 更新现有任务
if (scheduler.checkExists(triggerKey)) {
scheduler.unscheduleJob(triggerKey);
}
scheduler.scheduleJob(newTrigger);
log.info("71小时后执行任务[{}]已存在,成功更新触发器,新执行时间:{}", jobId, executeTime);
scheduler.addJob(job, true); // true 表示替换现有任务
scheduler.scheduleJob(trigger);
log.info("{}[{}]更新成功,下次执行时间:{}", taskType, jobId, trigger.getNextFireTime());
} else {
// 创建新任务
scheduler.scheduleJob(job, trigger);
log.info("{}[{}]创建成功,下次执行时间:{}", taskType, jobId, trigger.getNextFireTime());
}
} catch (ObjectAlreadyExistsException e) {
log.warn("{}[{}]已存在,尝试恢复调度", taskType, jobId);
// 任务已存在,尝试恢复触发器
if (scheduler.checkExists(triggerKey)) {
scheduler.rescheduleJob(triggerKey, trigger);
} else {
scheduler.scheduleJob(job, newTrigger);
log.info("71小时后执行任务[{}]不存在,成功创建任务及触发器,执行时间:{}", jobId, executeTime);
scheduler.scheduleJob(trigger);
}
}
}
/**
* 检查任务状态的方法
*/
public void checkTaskStatus(Long fStoreyId) throws SchedulerException {
String commJobId = "COMM_" + fStoreyId;
String finalJobId = "FINAL_" + fStoreyId;
// 验证触发器状态
checkTriggerStatus(commJobId, "通信任务");
checkTriggerStatus(finalJobId, "最终任务");
}
private void checkTriggerStatus(String jobId, String taskType) throws SchedulerException {
TriggerKey triggerKey = new TriggerKey(jobId + "_TRIGGER", TRIGGER_GROUP);
Trigger trigger = scheduler.getTrigger(triggerKey);
if (trigger != null) {
log.info("最终任务触发器[{}]下次执行时间: {}", triggerKey, trigger.getNextFireTime());
log.info("{}[{}]状态 - 下次执行: {}, 最后执行: {}",
taskType, jobId, trigger.getNextFireTime(), trigger.getPreviousFireTime());
} else {
log.warn("{}[{}]触发器不存在", taskType, jobId);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment