大闹天庭篇玄武大帝
发布时间:06/21 11:01:51
大闹天庭篇玄武大帝:从零构建高可用分布式任务调度系统
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-data-jpa
mysql
mysql-connector-java
8.0.33
org.quartz-scheduler
quartz
2.3.2
com.alibaba.cloud
spring-cloud-starter-alibaba-nacos-discovery
2022.0.0.0
```
getJobStats() {
Map stats = new HashMap<>();
stats.put("activeJobs", getActiveJobCount());
stats.put("failedJobs", getFailedJobCountLastHour());
stats.put("avgExecutionTime", getAverageExecutionTime());
return stats;
}
}
```
1. 系统架构设计
本系统采用分层架构,核心组件包括调度中心、执行器、注册中心和监控告警模块。调度中心负责任务的分发与调度,执行器负责具体任务执行,注册中心实现服务发现,监控告警确保系统稳定性。
1.1 技术选型
- 调度中心:Spring Boot 2.7.12 + Quartz 2.3.2
- 执行器:Java 11 + 自定义任务执行框架
- 注册中心:Nacos 2.2.3
- 数据库:MySQL 8.0.33
- 监控:Prometheus 2.45.0 + Grafana 9.5.2
2. 环境准备
2.1 基础环境安装
安装Java开发环境:
```bash 下载JDK 11 wget https://download.oracle.com/java/11/latest/jdk-11_linux-x64_bin.tar.gz 解压并配置环境变量 tar -zxvf jdk-11_linux-x64_bin.tar.gz sudo mv jdk-11.0.20 /usr/local/ echo 'export JAVA_HOME=/usr/local/jdk-11.0.20' >> ~/.bashrc echo 'export PATH=$JAVA_HOME/bin:$PATH' >> ~/.bashrc source ~/.bashrc ```2.2 数据库初始化
创建数据库和表结构:
```sql CREATE DATABASE IF NOT EXISTS task_scheduler DEFAULT CHARSET utf8mb4; USE task_scheduler; CREATE TABLE `job_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `job_name` varchar(100) NOT NULL COMMENT '任务名称', `job_group` varchar(100) NOT NULL COMMENT '任务分组', `cron_expression` varchar(50) NOT NULL COMMENT 'cron表达式', `job_class` varchar(200) NOT NULL COMMENT '任务类全限定名', `status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '状态:0-停用 1-启用', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP, `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `uk_job_name_group` (`job_name`,`job_group`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; CREATE TABLE `job_execution_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `job_id` bigint(20) NOT NULL, `execution_time` datetime NOT NULL, `status` tinyint(4) NOT NULL COMMENT '0-执行中 1-成功 2-失败', `result_message` text, `cost_time` int(11) DEFAULT NULL COMMENT '执行耗时(ms)', PRIMARY KEY (`id`), KEY `idx_job_id_time` (`job_id`,`execution_time`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; ```3. 调度中心实现
3.1 项目初始化
创建Spring Boot项目并添加依赖:
```xml3.2 Quartz配置
配置application.yml:
```yaml spring: datasource: url: jdbc:mysql://localhost:3306/task_scheduler?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai username: root password: your_password driver-class-name: com.mysql.cj.jdbc.Driver jpa: hibernate: ddl-auto: update show-sql: true quartz: job-store-type: jdbc jdbc: initialize-schema: never properties: org.quartz.scheduler.instanceName: TaskScheduler org.quartz.scheduler.instanceId: AUTO org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate org.quartz.jobStore.tablePrefix: QRTZ_ org.quartz.jobStore.isClustered: true org.quartz.jobStore.clusterCheckinInterval: 20000 org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool org.quartz.threadPool.threadCount: 10 nacos: discovery: server-addr: localhost:8848 ```3.3 调度服务核心代码
创建JobSchedulerService:
```java @Service public class JobSchedulerService { @Autowired private Scheduler scheduler; @Autowired private JobInfoRepository jobInfoRepository; public void addJob(JobInfo jobInfo) throws SchedulerException { JobDetail jobDetail = JobBuilder.newJob(getJobClass(jobInfo.getJobClass())) .withIdentity(jobInfo.getJobName(), jobInfo.getJobGroup()) .storeDurably() .build(); CronTrigger trigger = TriggerBuilder.newTrigger() .withIdentity(jobInfo.getJobName() + "_trigger", jobInfo.getJobGroup()) .withSchedule(CronScheduleBuilder.cronSchedule(jobInfo.getCronExpression())) .build(); scheduler.scheduleJob(jobDetail, trigger); if (jobInfo.getStatus() == 0) { scheduler.pauseJob(new JobKey(jobInfo.getJobName(), jobInfo.getJobGroup())); } } private Class<? extends Job> getJobClass(String className) { try { return (Class<? extends Job>) Class.forName(className); } catch (ClassNotFoundException e) { throw new RuntimeException("Job class not found: " + className); } } public void triggerJob(String jobName, String jobGroup) throws SchedulerException { scheduler.triggerJob(new JobKey(jobName, jobGroup)); } public void pauseJob(String jobName, String jobGroup) throws SchedulerException { scheduler.pauseJob(new JobKey(jobName, jobGroup)); } public void resumeJob(String jobName, String jobGroup) throws SchedulerException { scheduler.resumeJob(new JobKey(jobName, jobGroup)); } } ```4. 执行器实现
4.1 执行器框架设计
创建任务执行器基类:
```java public abstract class BaseJobExecutor implements Job { private static final Logger logger = LoggerFactory.getLogger(BaseJobExecutor.class); @Override public void execute(JobExecutionContext context) throws JobExecutionException { long startTime = System.currentTimeMillis(); String jobId = context.getJobDetail().getKey().getName(); try { logger.info("Job {} start executing", jobId); executeInternal(context); logger.info("Job {} executed successfully, cost {}ms", jobId, System.currentTimeMillis() - startTime); saveExecutionLog(jobId, 1, "Success", startTime); } catch (Exception e) { logger.error("Job {} execution failed", jobId, e); saveExecutionLog(jobId, 2, e.getMessage(), startTime); throw new JobExecutionException(e); } } protected abstract void executeInternal(JobExecutionContext context) throws Exception; private void saveExecutionLog(String jobId, int status, String message, long startTime) { // 保存执行日志到数据库 JobExecutionLog log = new JobExecutionLog(); log.setJobId(Long.parseLong(jobId)); log.setExecutionTime(new Date(startTime)); log.setStatus(status); log.setResultMessage(message); log.setCostTime((int)(System.currentTimeMillis() - startTime)); // 调用日志服务保存 } } ```4.2 具体任务实现示例
创建数据同步任务:
```java @Component public class DataSyncJob extends BaseJobExecutor { @Autowired private DataSyncService dataSyncService; @Override protected void executeInternal(JobExecutionContext context) throws Exception { // 从上下文获取参数 JobDataMap dataMap = context.getJobDetail().getJobDataMap(); String source = dataMap.getString("source"); String target = dataMap.getString("target"); // 执行数据同步 dataSyncService.syncData(source, target); } } ```5. 服务注册与发现
5.1 Nacos安装与配置
安装Nacos服务端:
```bash 下载Nacos wget https://github.com/alibaba/nacos/releases/download/2.2.3/nacos-server-2.2.3.tar.gz 解压 tar -zxvf nacos-server-2.2.3.tar.gz cd nacos/bin 单机模式启动 sh startup.sh -m standalone ```5.2 执行器注册到Nacos
配置执行器的application.yml:
```yaml spring: application: name: task-executor cloud: nacos: discovery: server-addr: localhost:8848 namespace: public group: DEFAULT_GROUP server: port: 8081 task: executor: name: executor-1 max-threads: 20 queue-capacity: 1000 ```6. 监控告警配置
6.1 Prometheus指标暴露
在调度中心添加监控端点:
```java @RestController @RequestMapping("/metrics") public class MetricsController { private final MeterRegistry meterRegistry; public MetricsController(MeterRegistry meterRegistry) { this.meterRegistry = meterRegistry; } @PostConstruct public void initMetrics() { // 注册自定义指标 Counter.builder("job.execution.total") .description("Total job executions") .register(meterRegistry); Gauge.builder("job.queue.size", () -> getQueueSize()) .description("Current job queue size") .register(meterRegistry); } @GetMapping("/job/stats") public Map6.2 Grafana仪表板配置
创建Prometheus数据源:
- 登录Grafana(默认地址:http://localhost:3000)
- 进入Configuration → Data Sources
- 点击Add data source选择Prometheus
- URL填写:http://localhost:9090
- 点击Save & Test验证连接
导入任务调度监控仪表板:
- 进入Dashboard → Import
- 输入仪表板ID:7362(任务调度系统通用模板)
- 选择Prometheus数据源
- 点击Import完成导入
7. 高可用部署
7.1 调度中心集群配置
修改集群配置:
```yaml application-cluster.yml quartz: properties: org.quartz.jobStore.isClustered: true org.quartz.jobStore.clusterCheckinInterval: 20000 org.quartz.jobStore.acquireTriggersWithinLock: true org.quartz.scheduler.instanceId: AUTO org.quartz.scheduler.instanceName: TaskSchedulerCluster ```7.2 负载均衡配置
使用Nginx配置调度中心负载均衡:
```nginx upstream scheduler_servers { server 192.168.1.101:8080 weight=3; server 192.168.1.102:8080 weight=3; server 192.168.1.103:8080 weight=4; ip_hash; } server { listen 80; server_name scheduler.example.com; location / { proxy_pass http://scheduler_servers; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; } } ```8. 故障排查与维护
8.1 常见问题解决
- 任务不执行:检查Quartz表QRTZ_TRIGGERS的TRIGGER_STATE字段,正常应为WAITING
- 执行器失联:检查Nacos服务列表,执行器实例状态应为UP
- 数据库连接失败:验证数据库连接池配置,检查maxActive和maxWait参数
- 内存溢出:监控JVM堆内存使用,调整-Xmx参数,检查任务执行是否有内存泄漏
8.2 日常维护命令
```bash 查看调度中心日志 tail -f /opt/scheduler/logs/application.log 检查数据库连接 mysql -h localhost -u root -p task_scheduler -e "SELECT COUNT() FROM QRTZ_JOB_DETAILS;" 重启调度中心服务 systemctl restart task-scheduler 清理历史执行日志(保留30天) DELETE FROM job_execution_log WHERE execution_time < DATE_SUB(NOW(), INTERVAL 30 DAY); ```版权保护: 本文由 741卡盟 原创,转载请保留链接: http://741ka.com/gamenews/23001.html
- 上一篇:《魔法对抗物极必反》获得法术位置
- 下一篇:武林群侠传攻略和秘籍
