Springboot+Quartz+Dynamic-datasource
在使用dynamic-datasource多数据源切换场景下,实现Quartz任务持久化配置和API动态调度
1. pom依赖
暂未找到版本对应关系,若有版本不一致异常,请自行尝试升降版本。
<dependencies>
<!-- 动态数据源 -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>dynamic-datasource-spring-boot-starter</artifactId>
<version>3.5.0</version>
</dependency>
<!-- quartz依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
<version>2.5.4</version>
</dependency>
</dependencies>
2. 数据源和Quartz参数配置
点击查看代码
#主数据源
spring.datasource.dynamic.primary=main
db.primary.host=127.0.0.1
db.primary.username=root
db.primary.password=123456
db.primary.database=main
spring.datasource.dynamic.datasource.main.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.dynamic.datasource.main.url=jdbc:mysql://${db.primary.host}/${db.primary.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&autoReconnect=true&serverTimezone=GMT%2B8
spring.datasource.dynamic.datasource.main.username=${db.primary.username}
spring.datasource.dynamic.datasource.main.password=${db.primary.password}
#次级数据源
db.slave.host=127.0.0.1
db.slave.username=root
db.slave.password=123456
db.slave.database=slave
spring.datasource.dynamic.datasource.slave.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.dynamic.datasource.slave.url=jdbc:mysql://${db.slave.host}/${db.slave.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&autoReconnect=true&serverTimezone=GMT%2B8
spring.datasource.dynamic.datasource.slave.username=${db.slave.username}
spring.datasource.dynamic.datasource.slave.password=${db.slave.password}
# quartz持久化数据源
db.quartz.host=127.0.0.1
db.quartz.username=root
db.quartz.password=123456
db.quartz.database=quartz
spring.datasource.dynamic.datasource.quartz.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.dynamic.datasource.quartz.url=jdbc:mysql://${db.quartz.host}/${db.quartz.database}?useUnicode=true&characterEncoding=utf8&useSSL=false&autoReconnect=true&serverTimezone=GMT%2B8
spring.datasource.dynamic.datasource.quartz.username=${db.quartz.username}
spring.datasource.dynamic.datasource.quartz.password=${db.quartz.password}
#quartz 配置
spring.quartz.properties.timeZoneId = Asia/Shanghai
spring.quartz.properties.schedulerName=demoScheduler
spring.quartz.properties.quartzDataSourceName=quartz
spring.quartz.properties.schedulerContextKey=SERVICEX-APPLICATION-CONTEXT-KEY
spring.quartz.job-store-type=jdbc
spring.quartz.jdbc.initialize-schema=never
spring.quartz.auto-startup=false
spring.quartz.startup-delay=0
spring.quartz.overwrite-existing-jobs=true
spring.quartz.wait-for-jobs-to-complete-on-shutdown=true
org.quartz.scheduler.instanceName=demo
org.quartz.scheduler.instanceId=AUTO
org.quartz.scheduler.instanceIdGenerator.class=org.quartz.simpl.SimpleInstanceIdGenerator
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.tablePrefix=QRTZ_
org.quartz.jobStore.isClustered=false
org.quartz.jobStore.clusterCheckinInterval=10000
org.quartz.jobStore.useProperties=false
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount=10
org.quartz.threadPool.threadPriority=5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true
3. Job信息实体类
点击查看代码
package com.xx.xx.model;
import lombok.Data;
@Data
public class JobInfo {
private String jobName;
private String jobClassPath;//JobBean执行类的包路径如:com.xx.xx.DemoJob
private String jobGroup;
private String description;
private String cron;
private String timeZoneId;//设置成默认值Asia/Shanghai,并对前端调用隐藏
private String status;//设置对前端调用隐藏,仅在查询Job时回显状态
}
4. JobService服务层接口
点击查看代码
package com.xx.xx.service;
import com.xx.xx.model.JobInfo;
import org.quartz.SchedulerException;
import org.springframework.web.bind.annotation.RequestBody;
import java.text.ParseException;
import java.util.List;
/**
* @author : Mao
* 该接口按需定义定时任务全生命周期方法
* 包括 启动Scheduler、暂停Scheduler、关闭Scheduler、获取所有执行中的Jobs
* 新增Job、更新Job、删除Job、暂停Job、恢复Job、暂停所有Jobs、恢复所有Jobs、立即执行一个Job(主要是用于执行一次任务的场景)
* 注意: standbyScheduler将暂停所有Jobs(包括新添加的),pauseAllJobs将暂停所有已添加的Jobs(新添加的不受影响)
*/
public interface JobService {
/**
* 启动Scheduler
*/
void startScheduler() throws SchedulerException;
/**
* 暂停Scheduler
*/
void standbyScheduler() throws SchedulerException;
/**
* 关闭Scheduler
*/
void shutdownScheduler() throws SchedulerException;
/**
* 获取所有执行中的Jobs
*
* @return 任务信息
*/
List<JobInfo> getAllJobs() throws SchedulerException;
/**
* 新增一个Job
*
* @param jobInfo 任务信息
*/
void addJob(@RequestBody JobInfo jobInfo) throws SchedulerException, ClassNotFoundException, ParseException;
/**
* 更新一个Job
*
* @param jobInfo 任务信息
*/
void updateJob(@RequestBody JobInfo jobInfo) throws SchedulerException, ParseException, ClassNotFoundException;
/**
* 删除一个Job
*
* @param jobName 任务名称
* @param jobGroup 任务组
*/
void removeJob(String jobName, String jobGroup) throws SchedulerException;
/**
* 暂停一个Job
*
* @param jobName 任务名称
* @param jobGroup 任务组
*/
void pauseJob(String jobName, String jobGroup) throws SchedulerException;
/**
* 恢复一个Job
*
* @param jobName 任务名称
* @param jobGroup 任务组
*/
void resumeJob(String jobName, String jobGroup) throws SchedulerException;
/**
* 暂停所有Jobs
*/
void pauseAllJobs() throws SchedulerException;
/**
* 恢复所有Jobs
*/
void resumeAllJobs() throws SchedulerException;
/**
* 立即执行一个Job(主要是用于执行一次任务的场景)
*
* @param jobName 任务名称
* @param jobGroup 任务组
*/
void execJob(String jobName, String jobGroup) throws SchedulerException;
}
5. JobServiceImpl实现类
点击查看代码
package com.xx.xx.service.impl;
import com.baomidou.dynamic.datasource.DynamicRoutingDataSource;
import com.xx.xx.model.JobInfo;
import com.xx.xx.service.JobService;
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.PropertiesFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.text.ParseException;
import java.util.*;
@Service
public class JobServiceImpl implements JobService {
Scheduler scheduler;
@Value("${spring.quartz.properties.schedulerName}")
private String schedulerName;
@Value("${spring.quartz.properties.quartzDataSourceName}")
private String quartzDataSourceName;
@Value("${spring.quartz.properties.schedulerContextKey}")
private String schedulerContextKey;
@Value("${spring.quartz.properties.timeZoneId}")
private String timeZoneId;
@Bean
public Properties quartzProperties() throws IOException {
PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean();
propertiesFactoryBean.setLocation(new ClassPathResource("application.properties"));
propertiesFactoryBean.afterPropertiesSet();
return propertiesFactoryBean.getObject();
}
//关键Bean配置quartz参数和数据源,注意DynamicRoutingDataSource的引用位置。
@Bean
public SchedulerFactoryBean schedulerFactoryBean(Properties quartzProperties, DynamicRoutingDataSource dataSource) throws IOException {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
//properties文件中的配置
factory.setQuartzProperties(quartzProperties);
//其他自定义配置需手动在此设置
factory.setSchedulerName(schedulerName);
factory.setDataSource(dataSource.getDataSource(quartzDataSourceName));
factory.setApplicationContextSchedulerContextKey(schedulerContextKey);
return factory;
}
//scheduler初始化完成,返回实例
@Bean
public Scheduler scheduler(SchedulerFactoryBean schedulerFactoryBean) throws IOException, SchedulerException {
scheduler = schedulerFactoryBean.getScheduler();
return scheduler;
}
@Override
public void startScheduler() throws SchedulerException {
scheduler.start();
}
@Override
public void standbyScheduler() throws SchedulerException {
scheduler.standby();
}
@Override
public void shutdownScheduler() throws SchedulerException {
scheduler.shutdown();
}
@Override
public List<JobInfo> getAllJobs() throws SchedulerException {
//Job list
List<JobInfo> jobs = new ArrayList<>();
List<String> jobGroups = scheduler.getJobGroupNames();
Set<JobKey> jobKeySet = new HashSet<>();
for (String jobGroup : jobGroups) {
jobKeySet = scheduler.getJobKeys(GroupMatcher.jobGroupEquals(jobGroup));
}
for (JobKey jobKey : jobKeySet) {
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
for (Trigger trigger : triggers) {
JobInfo jobInfo = new JobInfo();
jobInfo.setJobName(jobKey.getName());
jobInfo.setJobGroup(jobKey.getGroup());
jobInfo.setJobClassPath(scheduler.getJobDetail(jobKey).getJobClass().getName());
jobInfo.setDescription(scheduler.getJobDetail(jobKey).getDescription());
jobInfo.setStatus(scheduler.getTriggerState(trigger.getKey()).name());
if (trigger instanceof CronTrigger) {
jobInfo.setCron(((CronTrigger) trigger).getCronExpression());
jobInfo.setTimeZoneId(((CronTrigger) trigger).getTimeZone().getID());
}
jobs.add(jobInfo);
}
}
return jobs;
}
@Override
public void addJob(JobInfo jobInfo) throws SchedulerException, ClassNotFoundException, ParseException {
//查询是否已有相同任务 jobKey可以唯一确定一个任务
JobKey jobKey = JobKey.jobKey(jobInfo.getJobName(), jobInfo.getJobGroup());
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
if (Objects.nonNull(jobDetail)) {
throw new SchedulerException("Job already exist.");
}
//和执行具体业务的JobBean类分离
Class<? extends Job> jobClass = null;
if (!Job.class.isAssignableFrom(Class.forName(jobInfo.getJobClassPath()))) {
throw new ClassNotFoundException("Class is not a Job.");
} else {
jobClass = (Class<? extends Job>) Class.forName(jobInfo.getJobClassPath());
}
//任务详情
jobDetail = JobBuilder.newJob(jobClass)
.withDescription(jobInfo.getDescription()) //任务描述
.withIdentity(jobKey) //指定任务
.build();
//根据cron,TimeZone时区,指定执行计划
CronScheduleBuilder builder = CronScheduleBuilder.cronSchedule(jobInfo.getCron())
.inTimeZone(TimeZone.getTimeZone(timeZoneId));
//触发器
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(jobInfo.getJobName(), jobInfo.getJobGroup())
.startNow()
.withSchedule(builder)
.build();
//添加任务
scheduler.scheduleJob(jobDetail, trigger);
}
@Override
public void updateJob(JobInfo jobInfo) throws SchedulerException, ClassNotFoundException {
JobKey jobKey = JobKey.jobKey(jobInfo.getJobName(), jobInfo.getJobGroup());
if (!CronExpression.isValidExpression(jobInfo.getCron()) || !scheduler.checkExists(jobKey)) {
throw new JobExecutionException("Job Not Exist");
}
TriggerKey triggerKey = TriggerKey.triggerKey(jobInfo.getJobName(), jobInfo.getJobGroup());
Class<? extends Job> jobClass = null;
if (!Job.class.isAssignableFrom(Class.forName(jobInfo.getJobClassPath()))) {
throw new ClassNotFoundException("Class is not a Job.");
} else {
jobClass = (Class<? extends Job>) Class.forName(jobInfo.getJobClassPath());
}
//任务详情
JobDetail jobDetail = JobBuilder.newJob(jobClass)
.withDescription(jobInfo.getDescription()) //任务描述
.withIdentity(jobKey) //指定任务
.build();
//根据cron,TimeZone时区,指定执行计划
CronScheduleBuilder builder = CronScheduleBuilder.cronSchedule(jobInfo.getCron())
.inTimeZone(TimeZone.getTimeZone(timeZoneId));
//触发器
CronTrigger trigger = TriggerBuilder.newTrigger()
.withIdentity(jobInfo.getJobName(), jobInfo.getJobGroup())
.startNow()
.withSchedule(builder)
.build();
scheduler.rescheduleJob(triggerKey, trigger);
}
@Override
public void removeJob(String jobName, String jobGroup) throws SchedulerException {
//获取任务触发器
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
//停止触发器
scheduler.pauseTrigger(triggerKey);
//移除触发器
scheduler.unscheduleJob(triggerKey);
//删除任务
scheduler.deleteJob(JobKey.jobKey(jobName, jobGroup));
}
@Override
public void resumeJob(String jobName, String jobGroup) throws SchedulerException {
//根据jobName,jobGroup获取jobKey 恢复任务
scheduler.resumeJob(JobKey.jobKey(jobName, jobGroup));
}
@Override
public void pauseAllJobs() throws SchedulerException {
scheduler.pauseAll();
}
@Override
public void resumeAllJobs() throws SchedulerException {
scheduler.resumeAll();
}
@Override
public void pauseJob(String jobName, String jobGroup) throws SchedulerException {
//根据jobName,jobGroup获取jobKey 暂停任务
scheduler.pauseJob(JobKey.jobKey(jobName, jobGroup));
}
@Override
public void execJob(String jobName, String jobGroup) throws SchedulerException {
//根据jobName,jobGroup获取jobKey 立即执行任务
scheduler.triggerJob(JobKey.jobKey(jobName, jobGroup));
}
}
6. DemoJob执行具体业务的JobBean类
点击查看代码
package com.xx.xx.jobs;
import lombok.extern.slf4j.Slf4j;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.QuartzJobBean;
import java.util.List;
import java.util.StringJoiner;
@Slf4j
@DisallowConcurrentExecution
public class DemoJob extends QuartzJobBean {
//注入其他service要从Spring上下文获取,否则为null,因为当前Job类的实例化在JobServiceImpl中,未托管到Spring
//@Autowired
//xxService xxService = SpringContextUtil.getBean(xx.class);
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
StringJoiner joiner = new StringJoiner(" | ")
.add("---Executed Job---")
.add(jobExecutionContext.getTrigger().getKey().getName())
.add("Hello, the first demo job.")
.add(new Date());
log.info(String.valueOf(joiner));
}
}
7. JobController动态管理任务
点击查看代码
package com.xx.xx.api;
import com.xx.xx.model.JobInfo;
import com.xx.power.service.JobService;
import org.quartz.SchedulerException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@RestController
@RequestMapping("/job")
public class JobController {
@Autowired
JobService jobService;
@GetMapping("/startScheduler")
public String startScheduler() throws SchedulerException {
jobService.startScheduler();
return "Operation Succeeded.";
}
@GetMapping("/standbyScheduler")
public String standbyScheduler() throws SchedulerException {
jobService.standbyScheduler();
return "Operation Succeeded.";
}
@GetMapping("/shutdownScheduler")
public String shutdownScheduler() throws SchedulerException {
jobService.shutdownScheduler();
return "Operation Succeeded.";
}
@GetMapping("/getAllJobs")
public List<JobInfo> getAllJobs() throws SchedulerException {
return jobService.getAllJobs();
}
@PostMapping("/addJob")
public String addJob(@RequestBody JobInfo jobInfo) throws SchedulerException, ClassNotFoundException, ParseException {
jobService.addJob(jobInfo);
return "Operation Succeeded.";
}
@PostMapping("/updateJob")
public String updateJob(@RequestBody JobInfo jobInfo) throws SchedulerException, ParseException, ClassNotFoundException {
jobService.updateJob(jobInfo);
return "Operation Succeeded.";
}
@PostMapping("/removeJob")
public String removeJob(String jobName, String jobGroup) throws SchedulerException {
jobService.removeJob(jobName, jobGroup);
return "Operation Succeeded.";
}
@PostMapping("/pauseJob")
public String pauseJob(String jobName, String jobGroup) throws SchedulerException {
jobService.pauseJob(jobName, jobGroup);
return "Operation Succeeded.";
}
@PostMapping("/resumeJob")
public String resumeJob(String jobName, String jobGroup) throws SchedulerException {
jobService.resumeJob(jobName, jobGroup);
return "Operation Succeeded.";
}
@GetMapping("/pauseAllJobs")
public String pauseAllJobs() throws SchedulerException {
jobService.pauseAllJobs();
return "Operation Succeeded.";
}
@GetMapping("/resumeAllJobs")
public String resumeAllJobs() throws SchedulerException {
jobService.resumeAllJobs();
return "Operation Succeeded.";
}
@PostMapping("/execJob")
public String execJob(String jobName, String jobGroup) throws SchedulerException {
jobService.execJob(jobName, jobGroup);
return "Operation Succeeded.";
}
}
8. 启动后访问API: /job/addJob路径,RequestBody输入JobInfo参数
{
"jobName":"testJob",
"jobClassPath":"com.xx.xx.jobs.DemoJob",
"jobGroup":"testJobGroup",
"description":"This is a test job.",
"cron":"0/5 * * * * ? "
}
9. 查询任务状态,调用API:/job/getAllJob,或自行从日志文件和日志表中查询
作者:YancyMauno
出处:http://www.cnblogs.com/Mauno/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文链接,否则保留追究法律责任的权利。