Commit bec4920b authored by wanghao's avatar wanghao

1 测试 上电后通信 和 最终完成 定时任务功能

parent ef6631ac
......@@ -26,162 +26,163 @@ public class DeviceTaskScheduler {
private static final String JOB_GROUP = "DEVICE_TASKS";
// 触发器组名(统一固定,与原有逻辑保持一致)
private static final String TRIGGER_GROUP = "DEVICE_TRIGGERS";
/**
* 增强的创建设备监控任务方法
* 创建设备监控任务(入口方法)
*/
public void scheduleDeviceMonitoring(Long fStoreyId, String fPowerOutageIp, Integer fPowerOutagePort) {
if (fStoreyId == null || fPowerOutageIp == null || fPowerOutagePort == null) {
log.error("任务参数为空:fStoreyId={}, ip={}, port={}", fStoreyId, fPowerOutageIp, fPowerOutagePort);
throw new RuntimeException("任务参数不可为空");
}
try {
log.info("开始创建设备监控任务:{}", fStoreyId);
log.info("=== 开始创建设备监控任务:fStoreyId={} ===", fStoreyId);
// 检查调度器状态
// 1. 确保调度器已启动(若依框架可能延迟启动)
if (!scheduler.isStarted()) {
log.warn("调度器未启动,正在启动...");
log.warn("调度器未启动,手动启动...");
scheduler.start();
}
// 创建任务
// 2. 创建两个核心任务
createHourlyCommunicationJob(fStoreyId);
createFinalExecutionJob(fStoreyId, fPowerOutageIp, fPowerOutagePort);
log.info("设备监控任务创建完成:{}", fStoreyId);
// 3. 验证任务是否创建成功(关键:打印触发器状态)
checkTaskStatus(fStoreyId);
log.info("=== 设备监控任务创建完成:fStoreyId={} ===", fStoreyId);
} catch (SchedulerException e) {
log.error("创建设备监控任务失败:{}", fStoreyId, e);
throw new RuntimeException("任务调度失败", e);
log.error("=== 创建设备监控任务失败:fStoreyId={} ===", fStoreyId, e);
throw new RuntimeException("Quartz任务调度失败", e);
}
}
/**
* 增强的每小时通信任务创建
* 1. 创建每2分钟执行的通信任务(CronTrigger)
*/
private void createHourlyCommunicationJob(Long fStoreyId) throws SchedulerException {
String jobId = "COMM_" + fStoreyId;
JobKey jobKey = new JobKey(jobId, JOB_GROUP);
TriggerKey triggerKey = new TriggerKey(jobId + "_TRIGGER", TRIGGER_GROUP);
// 检查任务是否已存在且有效
if (isJobExistsAndValid(jobKey, triggerKey)) {
log.info("每小时通信任务[{}]已存在且有效,跳过创建", jobId);
// 若任务已存在且有效,跳过创建
if (isJobValid(jobKey, triggerKey)) {
log.info("通信任务[{}]已存在且有效,跳过创建", jobId);
return;
}
// 1. 构建JobDetail(若依要求:Job类必须是Spring Bean,且无参构造)
JobDetail job = JobBuilder.newJob(DeviceCommunicationJob.class)
.withIdentity(jobKey)
.usingJobData("fStoreyId", fStoreyId.toString())
.storeDurably()
.requestRecovery(true) // 添加任务恢复能力:cite[4]
.usingJobData("fStoreyId", fStoreyId.toString()) // 传递参数(String类型避免溢出)
.storeDurably(false) // 若依建议:非持久化(触发器删除后Job自动失效)
.requestRecovery(true) // 服务重启后恢复未完成的任务
.build();
CronTrigger newTrigger = TriggerBuilder.newTrigger()
// 2. 构建CronTrigger(每2分钟执行,兼容若依时间解析)
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
.forJob(jobKey) // 显式绑定Job(避免触发器游离)
.withSchedule(CronScheduleBuilder.cronSchedule("0 0/2 * * * ?")
// 错过触发时:立即执行一次,后续按原计划(关键!)
.withMisfireHandlingInstructionFireAndProceed())
.startNow() // 立即生效(无需等待第一个周期)
.build();
// 使用事务性操作
scheduleJobWithRecovery(job, newTrigger, jobKey, triggerKey, "每小时通信任务");
// 3. 原子操作:创建/更新任务(若依框架推荐用scheduleJob)
if (scheduler.checkExists(jobKey)) {
// 任务已存在:更新触发器
Date nextFireTime = scheduler.rescheduleJob(triggerKey, trigger);
log.info("通信任务[{}]更新触发器成功,下次执行时间:{}", jobId, nextFireTime);
} else {
// 任务不存在:创建Job+Trigger(原子操作,避免分步异常)
Date nextFireTime = scheduler.scheduleJob(job, trigger);
log.info("通信任务[{}]创建成功,下次执行时间:{}", jobId, nextFireTime);
}
}
/**
* 增强的最终执行任务创建
* 2. 创建5分钟后执行的最终任务(SimpleTrigger,仅执行一次)
*/
private void createFinalExecutionJob(Long fStoreyId, String fPowerOutageIp, Integer fPowerOutagePort)
throws SchedulerException {
private void createFinalExecutionJob(Long fStoreyId, String fPowerOutageIp, Integer fPowerOutagePort) throws SchedulerException {
String jobId = "FINAL_" + fStoreyId;
JobKey jobKey = new JobKey(jobId, JOB_GROUP);
TriggerKey triggerKey = new TriggerKey(jobId + "_TRIGGER", TRIGGER_GROUP);
// 检查任务是否已存在且有效
if (isJobExistsAndValid(jobKey, triggerKey)) {
log.info("最终执行任务[{}]已存在且有效,跳过创建", jobId);
// 若任务已存在且有效,跳过创建
if (isJobValid(jobKey, triggerKey)) {
log.info("最终任务[{}]已存在且有效,跳过创建", jobId);
return;
}
// 1. 构建JobDetail
JobDetail job = JobBuilder.newJob(FinalExecutionJob.class)
.withIdentity(jobKey)
.usingJobData("fStoreyId", fStoreyId.toString())
.usingJobData("fPowerOutageIp", fPowerOutageIp)
.usingJobData("fPowerOutagePort", fPowerOutagePort.toString())
.storeDurably()
.requestRecovery(true) // 添加任务恢复能力
.storeDurably(false)
.requestRecovery(true)
.build();
// 2. 构建SimpleTrigger(5分钟后执行,仅一次)
Date executeTime = Date.from(Instant.now().plus(5, ChronoUnit.MINUTES));
SimpleTrigger newTrigger = TriggerBuilder.newTrigger()
SimpleTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
.startAt(executeTime)
.forJob(jobKey) // 显式绑定Job
.startAt(executeTime) // 精确执行时间
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withMisfireHandlingInstructionFireNow())
// 错过触发时:立即执行(避免任务失效)
.withMisfireHandlingInstructionFireNow()
.withRepeatCount(0)) // 仅执行一次(关键!)
.build();
scheduleJobWithRecovery(job, newTrigger, jobKey, triggerKey, "最终执行任务");
// 3. 原子操作:创建/更新任务
if (scheduler.checkExists(jobKey)) {
Date nextFireTime = scheduler.rescheduleJob(triggerKey, trigger);
log.info("最终任务[{}]更新触发器成功,执行时间:{}", jobId, nextFireTime);
} else {
Date nextFireTime = scheduler.scheduleJob(job, trigger);
log.info("最终任务[{}]创建成功,执行时间:{}", jobId, nextFireTime);
}
}
/**
* 检查任务是否存在且有效
* 检查任务是否存在且有效(触发器有下次执行时间)
*/
private boolean isJobExistsAndValid(JobKey jobKey, TriggerKey triggerKey) throws SchedulerException {
private boolean isJobValid(JobKey jobKey, TriggerKey triggerKey) throws SchedulerException {
// 1. 检查Job和Trigger是否存在
if (!scheduler.checkExists(jobKey) || !scheduler.checkExists(triggerKey)) {
log.debug("任务[{}]或触发器[{}]不存在", jobKey.getName(), triggerKey.getName());
return false;
}
// 2. 检查触发器是否有下次执行时间
Trigger trigger = scheduler.getTrigger(triggerKey);
return trigger != null && trigger.getNextFireTime() != null;
}
/**
* 带恢复机制的任务调度方法
*/
private void scheduleJobWithRecovery(JobDetail job, Trigger trigger, JobKey jobKey,
TriggerKey triggerKey, String taskType) throws SchedulerException {
String jobId = jobKey.getName();
try {
if (scheduler.checkExists(jobKey)) {
// 更新现有任务
if (scheduler.checkExists(triggerKey)) {
if (trigger == null || trigger.getNextFireTime() == null) {
log.debug("触发器[{}]无效(无下次执行时间)", triggerKey.getName());
// 清理无效触发器
scheduler.unscheduleJob(triggerKey);
return false;
}
scheduler.addJob(job, true); // true 表示替换现有任务
scheduler.scheduleJob(trigger);
log.info("{}[{}]更新成功,下次执行时间:{}", taskType, jobId, trigger.getNextFireTime());
} else {
// 创建新任务
scheduler.scheduleJob(job, trigger);
log.info("{}[{}]创建成功,下次执行时间:{}", taskType, jobId, trigger.getNextFireTime());
}
} catch (ObjectAlreadyExistsException e) {
log.warn("{}[{}]已存在,尝试恢复调度", taskType, jobId);
// 任务已存在,尝试恢复触发器
if (scheduler.checkExists(triggerKey)) {
scheduler.rescheduleJob(triggerKey, trigger);
} else {
scheduler.scheduleJob(trigger);
}
}
return true;
}
/**
* 检查任务状态的方法
* 验证任务状态(调试用,打印关键信息)
*/
public void checkTaskStatus(Long fStoreyId) throws SchedulerException {
String commJobId = "COMM_" + fStoreyId;
String finalJobId = "FINAL_" + fStoreyId;
checkTriggerStatus(commJobId, "通信任务");
checkTriggerStatus(finalJobId, "最终任务");
}
private void checkTriggerStatus(String jobId, String taskType) throws SchedulerException {
TriggerKey triggerKey = new TriggerKey(jobId + "_TRIGGER", TRIGGER_GROUP);
Trigger trigger = scheduler.getTrigger(triggerKey);
if (trigger != null) {
log.info("{}[{}]状态 - 下次执行: {}, 最后执行: {}",
taskType, jobId, trigger.getNextFireTime(), trigger.getPreviousFireTime());
} else {
log.warn("{}[{}]触发器不存在", taskType, jobId);
}
private void checkTaskStatus(Long fStoreyId) throws SchedulerException {
// 检查通信任务
Trigger commTrigger = scheduler.getTrigger(new TriggerKey("COMM_" + fStoreyId + "_TRIGGER", TRIGGER_GROUP));
// 检查最终任务
Trigger finalTrigger = scheduler.getTrigger(new TriggerKey("FINAL_" + fStoreyId + "_TRIGGER", TRIGGER_GROUP));
log.info("=== 任务状态验证:fStoreyId={} ===", fStoreyId);
log.info("通信任务下次执行时间:{}", commTrigger != null ? commTrigger.getNextFireTime() : "不存在");
log.info("最终任务执行时间:{}", finalTrigger != null ? finalTrigger.getNextFireTime() : "不存在");
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment