实现一款去中心化分布式调度

数据智能相依偎 2024-09-14 02:40:27

内容目录

一、背景概述二、原理分析三、具体实现四、使用方式五、总结六、参考

一、背景概述

前边我们介绍了spring自带的scheduler结合Shedlock实现去中心化分布式调度,但是使用起来相对麻烦一些,要分别开启scheduler和Shedlock能力,然后在任务方法上添加任务和Shedlock注解,还要根据项目自身业务特点考虑添加Shedlock的锁定协调器支持,引入各种依赖,添加各种配置,相对比较麻烦,那有没有什么方式,既保留又有的能力,又能方便使用?

我们本篇文章本着保留scheduler和Shedlock核心能力的前提下,基于scheduler和Shedlock原理重新写一个融合了二者能力的组件,来简化去中心化分布式调度的使用。

二、原理分析

我们简单汇总分析一下scheduler和shedlock的核心能力,以及我们融合的想法。

1.spring scheduler核心能力

有介绍 的核心能力是由ScheduledAnnotationBeanPostProcessor实现,借用它一张图:

在spring容器启动实例化bean之后会调用其postProcessAfterInitialization方法将带有Scheduled注解的方法封装成Task然后交给调度线程池来启动执行调度,核心实现方法是processScheduled:

protected void processScheduled(Scheduled scheduled, Method method, Object bean) { try { Runnable runnable = createRunnable(bean, method); boolean processedSchedule = false; Set<ScheduledTask> tasks = new LinkedHashSet<>(4); //检查添加cron任务 String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); //省略... } //省略... } } catch (IllegalArgumentException ex) {}}

从原理分析我们可以得知,将带注解的任务方法封装成任务,是静态一次性的,我们可以在这里做文章,将shedlock的能力嵌入到任务中,而不是在任务触发的节点每次动动态添加shedlock的锁定能力。

2.shedlock核心能力

这篇文章有介绍,shedlock的核心能力由MethodProxyScheduledLockAdvisor实现,是在bean实例化之前调用AOP能力,将shedlock锁能力织入到目标方法代理实现中去。

scheduler调度封装的任务方法是已经被shedlock增强过的,我们也可以考虑同样的做法,但是换个角度思考,是不是可以在将目标方法封装成任务的时候,手动将锁定能力嵌入进去,这样在一个地方就能处理两种逻辑能力了。

3.定制化合二为一

通过前边的scheduler和shedlock的分析,我们可以考虑改造原生scheduler的调度能力,放弃shedlock的AOP能力,在将带有Scheduled注解的方案封装成任务的时候,手动嵌入shedlock相关能力。

三、具体实现

1.注解

模仿spring scheduler添加开启调度能力注解、调度任务注解。

EnableDcsScheduling注解,指定Provider类型和默认最大锁定时间,以及导入一个Selector。

@Target(ElementType.TYPE)@Retention(RetentionPolicy.RUNTIME)@Import(DcsSchedulerConfigurationSelector.class)@Documentedpublic @interface EnableDcsScheduling { ProviderModel providerModel(); String defaultLockAtMostFor(); int order() default Ordered.LOWEST_PRECEDENCE;}

DcsScheduled注解持有Scheduled所有能力以及SchedulerLock部分能力。

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})@Retention(RetentionPolicy.RUNTIME)@Documented@Repeatable(DcsSchedules.class)public @interface DcsScheduled { String cron() default ""; String zone() default ""; long fixedDelay() default -1; String fixedDelayString() default ""; long fixedRate() default -1; String fixedRateString() default ""; long initialDelay() default -1; String initialDelayString() default ""; TimeUnit timeUnit() default TimeUnit.MILLISECONDS; String name() default ""; String lockAtMostFor() default ""; String lockAtLeastFor() default "";}

2.支撑类

DcsSchedulerConfigurationSelector根据Provider类型注册不能类型的辅助bean。目前改造现状暂时支持mysql、redis和zookeeper三种类型的Provider。

public DcsSchedulerConfigurationSelector implements ImportSelector { @Override public String[] selectImports(AnnotationMetadata metadata) { AnnotationAttributes attributes = AnnotationAttributes.fromMap( metadata.getAnnotationAttributes(EnableDcsScheduling.class.getName(), false)); EnableDcsScheduling.ProviderModel mode = attributes.getEnum("providerModel"); if(mode == EnableDcsScheduling.ProviderModel.DB) { return new String[] { AutoProxyRegistrar.class.getName(), LockConfigurationExtractorConfiguration.class.getName(), SchedulerConfig.class.getName(), DBProviderConfig.class.getName(), DcsSchedulingConfiguration.class.getName() }; } else if(mode == EnableDcsScheduling.ProviderModel.REDIS) { return new String[] { AutoProxyRegistrar.class.getName(), LockConfigurationExtractorConfiguration.class.getName(), SchedulerConfig.class.getName(), RedisProviderConfig.class.getName(), DcsSchedulingConfiguration.class.getName() }; } else if(mode == EnableDcsScheduling.ProviderModel.ZOOKEEPER) { return new String[] { AutoProxyRegistrar.class.getName(), LockConfigurationExtractorConfiguration.class.getName(), SchedulerConfig.class.getName(), ZkProviderConfig.class.getName(), DcsSchedulingConfiguration.class.getName() }; } else { throw new UnsupportedOperationException("Unknown provider mode " + mode); } }}

LockConfigurationExtractorConfiguration是从shedlock拷贝过来的配置类,该类提供SchedulerLock属性解析能力,这里引用它也是为了解析相关属性。

@Configurationclass LockConfigurationExtractorConfiguration extends AbstractLockConfiguration implements EmbeddedValueResolverAware, BeanFactoryAware { private final StringToDurationConverter durationConverter = StringToDurationConverter.INSTANCE; @Nullable private StringValueResolver resolver; @Nullable private BeanFactory beanFactory; @Bean ExtendedLockConfigurationExtractor lockConfigurationExtractor() { return new SpringLockConfigurationExtractor( defaultLockAtMostForDuration(), defaultLockAtLeastForDuration(), resolver, durationConverter, beanFactory); } //省略... }

SchedulerConfig是新增的配置类,提供任务调度相关能力配置和相关bean的定义。使用ThreadPoolTaskScheduler替换默认调度器的目的是使用cpu的并发能力以及一些自定义配置。

public SchedulerConfig { @Bean public TaskScheduler taskScheduler() { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.setThreadNamePrefix("task-scheduler"); taskScheduler.setPoolSize(Runtime.getRuntime().availableProcessors()); taskScheduler.setErrorHandler(TaskUtils.getDefaultErrorHandler(true)); taskScheduler.initialize(); return taskScheduler; }}

DcsSchedulingConfiguration定义了DcsScheduledAnnotationBeanPostProcessor类型的bean和DefaultLockingTaskExecutor类型的bean。

@Configuration@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public DcsSchedulingConfiguration {@Bean(name = "internalDcsScheduledAnnotationProcessor")@Role(BeanDefinition.ROLE_INFRASTRUCTURE)public DcsScheduledAnnotationBeanPostProcessor dcsScheduledAnnotationProcessor(@Lazy ExtendedLockConfigurationExtractor extendedLockConfigurationExtractor,@Lazy LockingTaskExecutor lockingTaskExecutor) {return new DcsScheduledAnnotationBeanPostProcessor(extendedLockConfigurationExtractor,lockingTaskExecutor);}@Beanpublic LockingTaskExecutor lockingTaskExecutor(@Lazy LockProvider lockProvider) {return new DefaultLockingTaskExecutor(lockProvider);}}

同样DcsScheduledAnnotationBeanPostProcessor是参考scheduler实现的BeanPostProcessor,目的是在封装任务的时候嵌入所能力,DefaultLockingTaskExecutor则是基于LockProvider封装的锁执行器。

DefaultLockingTaskExecutor与shedlock的实现没什么区别,我们重点看一下DcsScheduledAnnotationBeanPostProcessor的processScheduled:

protected void processScheduled(DcsScheduled scheduled, Method method, Object bean) { try { Runnable runnable = createRunnable(bean, method,scheduled); //省略... // Check cron expression String cron = scheduled.cron(); tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); //省略... } catch (IllegalArgumentException ex) { }}

重点在createRunnable方法:

protected Runnable createRunnable(Object target, Method method,DcsScheduled scheduled) { String name = scheduled.name(); Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @DcsScheduled"); Assert.isTrue(null != name && name.length() > 0, "parameter name can't be null with @DcsScheduled"); Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass()); return new DcsScheduledMethodRunnable(target, invocableMethod,scheduled,extendedLockConfigurationExtractor,lockingTaskExecutor);}

最终封装成了DcsScheduledMethodRunnable,他是一个Runnable,直接看一下run方法实现:

@Overridepublic void run() { try { Class<?> returnType = this.method.getReturnType(); if (returnType.isPrimitive() && !void.class.equals(returnType)) { throw new LockingNotSupportedException("Can not lock method returning primitive value"); } ReflectionUtils.makeAccessible(this.method); Optional<LockConfiguration> lockConfigurationOptional = lockConfigurationExtractor.getLockConfiguration(scheduled,this.method,this.method.getParameters()); LockConfiguration lockConfiguration = lockConfigurationOptional.get(); lockingTaskExecutor.executeWithLock((LockingTaskExecutor.Task) () -> method.invoke(target), lockConfiguration); } catch (InvocationTargetException ex) { ReflectionUtils.rethrowRuntimeException(ex.getTargetException()); } catch (IllegalAccessException ex) { throw new UndeclaredThrowableException(ex); } catch (Throwable e) { ReflectionUtils.rethrowRuntimeException(e); }}

这里就是我们将shedlock锁定能力嵌入到任务的核心实现。

至于DBProviderConfig、RedisProviderConfig和ZkProviderConfig的实现这里不再展开,里边都是对应类型的Provider定义。

3.开源项目

项目我已经创建放到github上,完全开源https://github.com/ScorpioAeolus/dcs-scheduler,并且我已经把release版本jar发布到maven中央仓库了,如何发布参考,最新版本1.0.3.RELEASE。

四、使用方式

经过封装后的持有scheduler和shedlock能力的开源组件dcs-scheduler使用非常方便,这里介绍一下使用方式。

1.引入依赖

引入核心依赖

<dependency> <groupId>io.github.scorpioaeolus</groupId> <artifactId>dcs-scheduler</artifactId> <version>{version}</version></dependency>

如果Provider是DB,保证已经用用或者引入依赖:

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId></dependency>

如果Provider是Redis,那么保证拥有spring-redis依赖或者引入:

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId></dependency>

如果Provider类型是zookeeper,那么要引入curator-framework依赖:

<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId></dependency>

2.开启调度能力

在项目启动类型添加EnableDcsScheduling注解:

@SpringBootApplication@EnableDcsScheduling(providerModel=DB,defaultLockAtMostFor = "10m")public XxxApplication { public static void main(String[] args) { SpringApplication.run(XxxApplication.class, args); }}

3.添加任务注解

在任务方法上添加DcsScheduled注解,配置任务执行频率,以及锁的名字:

@DcsScheduled(fixedRate = 5000l,name = "executeHandler")public void executeHandler3() { log.info("executeHandler3 exec,thread-id={}",Thread.currentThread().getId());}

4.Provider准备

如果类型是DB,那么在mysql的数据库中添加shedlock表:

CREATE TABLE shedlock(name VARCHAR(64) NOT NULL, lock_until TIMESTAMP(3) NOT NULL, locked_at TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), locked_by VARCHAR(255) NOT NULL, PRIMARY KEY (name));

并且保证应用会暴露出来DataSource类型的bean。

如果类型是Redis,那么需要保证应用会暴露出来RedisConnectionFactory类型的bean。

如果类型是Zookeeper,那么需要保证暴露出来CuratorFramework类型的bean。

5.测试验证

启动应用后,观察一段时间,从日志中可以看到我们的去中心化分布式调度已经生效并且正常执行。

把Provider改成Redis,并且把lockAtLeastFor改长一些(过短不方便观察),可以观察到redis中已经存在锁定的key以及锁定有效时间:

然后把Provider类型改成DB,运行后可以在数据库中看到一条所记录:

至于Zookeeper类型的Provider,有条件并且方便的伙伴可以自己试一下。

五、总结

本篇文章我们结合scheduler和shedlock编写了一个简化的去中心化分布式任务调度,也比较粗糙的介绍了其实现原理,当然世界上没有完美的东西,借鸡生蛋,鸡未必是完美的鸡,蛋也可能压根就是个坏蛋,我们也简单分析一下dcs-scheduler的优缺点。

优点

1.去中心化调度能力,任务不再由调度平台统一出发,而是分散到每台业务服务自行处理,通过shedlock提供任务触发的唯一性,这样把调度能力化整为零,降低了项目的复杂度,并且能够解决调度平台宕机的问题。

2.简单易用,Spring Boot Scheduler是Spring框架的一部分,易于集成和使用。

3.轻量级,适合简单的调度任务,不需要引入额外的依赖。

4.代码、业务侵入性小,无缝集成到Spring应用中,利用Spring的依赖注入和AOP等特性。

5.灵活,可以根据需求扩展和定制

6.节省成本,省去了调度平台机器成本

缺点

1.缺乏监控和管理,相比专门的调度框架,Spring Boot Scheduler+ShedLock对任务的监控和管理支持较弱。

2.适用范围有限:对于复杂的调度需求和大规模分布式环境可能需要额外的开发和配置。

3.资源消耗转移: 相比于专门的调度框架,调度能力和分布式处理都在调度平台处理,通过回调的方式出发任务的执行,而使用Scheduler+ShedLock方式实现分布式调度,调度算力和分布式处理并没有凭空消失,而是从调度平台转移到业务服务中进行,那么势必会占用业务服务器一部分资源和性能。

对于生产环境如果想尝试使用dcs-scheduler能力的话,可以和我联系,我会协助一起排查问题解决问题,帮你做比较早吃螃蟹的人哈哈。

六、参考

https://github.com/ScorpioAeolus/dcs-scheduler

https://github.com/lukas-krecan/ShedLock

https://spring.io/guides/gs/scheduling-tasks

https://github.com/spring-guides/gs-scheduling-tasks

0 阅读:0

数据智能相依偎

简介:感谢大家的关注