2024/11/24 23:03:38
- 用户4个小时以内没有进行任何操作,就自动清除用户会话。
- 每天晚上凌晨自动拉取另一个业务系统的某部分数据。
- 每隔15分钟,自动执行一段逻辑,更新某部分数据。
- 循环判定时间
- Sleep
- Timer
- ScheduledExecutorService
- Spring Scheduling
本文这五种方案的讲解顺序是有考究的,从简单到复杂,从底层到上层。循环判定时间、Sleep旨在摆脱所有组件或者框架带来的复杂度干扰,从最本质上理解定时任务的实现思路。Timer是JDK早先对定时任务的实现,相对来说是比较简单的。ScheduledExecutorService是对Timer的优化、而Spring Scheduling则是基于ScheduledExecutorService实现的。
public class LoopScheduler { public static void main(String[] avg) { long nowTime = System.currentTimeMillis(); long nextTime = nowTime + 15000; while (true) { if (System.currentTimeMillis() >= nextTime) { nowTime = System.currentTimeMillis(); nextTime = nowTime + 15000; System.out.println(nowTime + ":触发一次"); service(); } } } public static void service() { System.out.println("自定义逻辑执行"); } }
public class SleepScheduler { public static void main(String[] avg) { Thread thread = new Thread(new Runnable() { @Override public void run() { while (true) { try { Thread.sleep(15000); System.out.println(System.currentTimeMillis() + “:触发一次”); service(); } catch (InterruptedException e) { e.printStackTrace(); } } } public void service() { System.out.println("自定义逻辑执行"); } }); thread.start(); } }
以上代码,先定义了一个线程,然后启动。线程的执行逻辑是,不断循环,每次循环里面先sleep 15s。然后再执行指定的逻辑。基本上可以实现跟上面一样的效果,每15s执行一次service逻辑。
/** * @param task task to be scheduled. * @param delay delay in milliseconds before task is to be executed. * @param period time in milliseconds between successive task executions. */ public void schedule(TimerTask task, long delay, long period) { if (delay < 0) throw new IllegalArgumentException(“Negative delay.”); if (period <= 0) throw new IllegalArgumentException(“Non-positive period.”); sched(task, System.currentTimeMillis()+delay, -period); }
public class TimerScheduler {public static void main(String[] args) { // 创建SimpleDateFormat对象,定义格式化样式 SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS"); // 创建Timer对象 Timer timer = new Timer(); // 单次执行任务示例 TimerTask singleTask = new TimerTask() { @Override public void run() { // 将当前时间的毫秒数转换为格式化后的时间字符串 String formattedTime = sdf.format(System.currentTimeMillis()); System.out.println(formattedTime + "单次执行任务:定时任务执行了"); } }; // 延迟3000毫秒(3秒)后执行单次任务 timer.scheduleAtFixedRate(singleTask, 3000, 15000); String formattedTime = sdf.format(System.currentTimeMillis()); System.out.println(formattedTime + "单次执行任务:定时任务已启动"); } }
- 下图1和2:需要传入period的schedule(),提交的是周期任务,period是周期。不过这里的周期是上一次任务成功结束和下一次任务开始执行的间隔。可以认为是按照固定的延迟去重复。
- 下图3和4:不需要传入period的schedule(),提交的是单次任务。
- 下图5和6:scheduleAtFixedRate(),提交的是周期任务,period是执行频率,每次任务的计划执行时间在提交的那一刻就可以确定,跟上一次任务什么时候结束没有关系。
public abstract class TimerTask implements Runnable { /** * This object is used to control access to the TimerTask internals. */ final Object lock = new Object();/** * The state of this task, chosen from the constants below. */ int state = VIRGIN; /**This task has not yet been scheduled. */ static final int VIRGIN = 0;/**This task is scheduled for execution. If it is a non-repeating task,it has not yet been executed. */ static final int SCHEDULED = 1;/**This non-repeating task has already executed (or is currentlyexecuting) and has not been cancelled. */ static final int EXECUTED = 2;/**This task has been cancelled (with a call to TimerTask.cancel). */ static final int CANCELLED = 3;/**Next execution time for this task in the format returned bySystem.currentTimeMillis, assuming this task is scheduled for execution.For repeating tasks, this field is updated prior to each task execution. */ long nextExecutionTime;/**Period in milliseconds for repeating tasks. A positive value indicatesfixed-rate execution. A negative value indicates fixed-delay execution.A value of 0 indicates a non-repeating task. */ long period = 0; }
public class Timer { /** * The timer task queue. This data structure is shared with the timer * thread. The timer produces tasks, via its various schedule calls, * and the timer thread consumes, executing timer tasks as appropriate, * and removing them from the queue when they’re obsolete. */ private final TaskQueue queue = new TaskQueue();/** * The timer thread. */ private final TimerThread thread = new TimerThread(queue); public Timer(String name) { thread.setName(name); thread.start(); } }
class TaskQueue { /** * Priority queue represented as a balanced binary heap: the two children * of queue[n] are queue[2n] and queue[2n+1]. The priority queue is * ordered on the nextExecutionTime field: The TimerTask with the lowest * nextExecutionTime is in queue[1] (assuming the queue is nonempty). For * each node n in the heap, and each descendant of n, d, * n.nextExecutionTime <= d.nextExecutionTime. */ private TimerTask[] queue = new TimerTask[128];/** * The number of tasks in the priority queue. (The tasks are stored in * queue[1] up to queue[size]). */ private int size = 0; }
class TimerThread extends Thread {/** * Our Timer's queue. We store this reference in preference to * a reference to the Timer so the reference graph remains acyclic. * Otherwise, the Timer would never be garbage-collected and this * thread would never go away. */ private TaskQueue queue; TimerThread(TaskQueue queue) { this.queue = queue; }public void run() { try { mainLoop(); } finally { // Someone killed this Thread, behave as if Timer cancelled synchronized(queue) { newTasksMayBeScheduled = false; queue.clear(); // Eliminate obsolete references } } } private void mainLoop() { while (true) { try { TimerTask task; boolean taskFired; synchronized(queue) { // Wait for queue to become non-empty while (queue.isEmpty() && newTasksMayBeScheduled) queue.wait(); if (queue.isEmpty()) break; // Queue is empty and will forever remain; die // Queue nonempty; look at first event and do the right thing long currentTime, executionTime; task = queue.getMin(); synchronized(task.lock) { if (task.state == TimerTask.CANCELLED) { queue.removeMin(); continue; // No action required, poll queue again } currentTime = System.currentTimeMillis(); executionTime = task.nextExecutionTime; if (taskFired = (executionTime&lt;=currentTime)) { if (task.period == 0) { // Non-repeating, remove queue.removeMin(); task.state = TimerTask.EXECUTED; } else { // Repeating task, reschedule queue.rescheduleMin( task.period&lt;0 ? currentTime - task.period : executionTime + task.period); } } } if (!taskFired) // Task hasn't yet fired; wait queue.wait(executionTime - currentTime); } if (taskFired) // Task fired; run it, holding no locks; } catch(InterruptedException e) { } } } }
- 主体是一个while循环,进入循环后,会先看任务队列是否有任务,如果没有任务则执行quere.wait()。等待其他线程(一般是主线程在添加任务后)执行queue.notify()方法后,TimerThread会再次醒来。
- 如果队列不为空,成功获取到堆顶元素后,会判断任务是否已经被取消,如果取消的话直接移除,然后进入下一次循环。
- 如果任务没有被取消,则看任务的执行时间是否已经到了,也就是该任务是否应该被触发了。如果已经触发了,那么应该执行该任务,同时如果该任务是周期任务,还应该计算下次任务的执行时间,然后触发堆的下滤操作(重新找一个下次执行时间最近的任务到堆顶)。
- 如果任务还没有到执行时间,那么就计算还剩多少时间应该执行,然后等待这个时间。重新进入下一次循环。
负数的话,任务的下次执行时间是:currentTime - task.period(当前任务结束的时间 - 周期)
正数的话,任务的下次执行时间是:executionTime + task.period(当前任务的执行时间 + 周期)
public void schedule(TimerTask task, long delay, long period) { if (delay < 0) throw new IllegalArgumentException(“Negative delay.”); if (period <= 0) throw new IllegalArgumentException(“Non-positive period.”); sched(task, System.currentTimeMillis()+delay, -period); }public void scheduleAtFixedRate(TimerTask task, long delay, long period) { if (delay < 0) throw new IllegalArgumentException(“Negative delay.”); if (period <= 0) throw new IllegalArgumentException(“Non-positive period.”); sched(task, System.currentTimeMillis()+delay, period); }
- Timer是基于Object.wait()方法来实现时间等待的。在本质上跟我们用Thread.sleep()实现没有太大的区别,底层都是通过操作系统内核让线程进入睡眠状态。
- Timer是单线程模式,通过上面的分析可以看到,Timer类持有一个TimerThread实例,它就是一个线程实例。这种模式的问题在于,当我们在同一个Timer中提交了很多调度任务之后,并且有的任务时间过长的话,就可能会导致其他任务的调度延后。
- 通过上面的源码分析可以看到,TimerThread执行的时候,没有进行异常处理,所以当使用Timer的时候,业务逻辑一定要进行异常处理,否则如果一旦抛出异常将导致TimerThread线程挂掉。所有调度任务就失效了。
public class ScheduldExecutorServiceScheduler { public static void main(String[] args) { // 创建SimpleDateFormat对象,定义格式化样式 SimpleDateFormat sdf = new SimpleDateFormat(“HH:mm:ss.SSS”); // 创建ScheduledExecutorService对象 ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); Runnable runnable = new Runnable() { @Override public void run() { // 将当前时间的毫秒数转换为格式化后的时间字符串 String formattedTime = sdf.format(System.currentTimeMillis()); System.out.println(formattedTime + “单次执行任务:定时任务执行了”); } }; // 延迟3000毫秒(3秒)后执行单次任务,每15s开始执行一次任务 executor.scheduleAtFixedRate(runnable, 3000, 15000, java.util.concurrent.TimeUnit.MILLISECONDS); // 延迟3000毫秒(3秒)后执行单次任务,每次任务结束后15s执行下一次任务 //executor.scheduleWithFixedDelay(runnable, 3000, 15000, java.util.concurrent.TimeUnit.MILLISECONDS); // 延迟3000毫秒(3秒)后执行单次任务 //executor.schedule(runnable, 3000, java.util.concurrent.TimeUnit.MILLISECONDS); String formattedTime = sdf.format(System.currentTimeMillis()); System.out.println(formattedTime + “单次执行任务:定时任务已启动”); }}
public static void main(String[] args) { // 创建SimpleDateFormat对象,定义格式化样式 SimpleDateFormat sdf = new SimpleDateFormat(“HH:mm:ss.SSS”); // 创建ScheduledExecutorService对象 //ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); ScheduledExecutorService executor = Executors.newScheduledThreadPool(10); Runnable runnable = new Runnable() { @Override public void run() { // 将当前时间的毫秒数转换为格式化后的时间字符串 String formattedTime = sdf.format(System.currentTimeMillis()); System.out.println(formattedTime + “单次执行任务:定时任务执行了”); } }; // 延迟3000毫秒(3秒)后执行单次任务,每15s开始执行一次任务 executor.scheduleAtFixedRate(runnable, 3000, 15000, java.util.concurrent.TimeUnit.MILLISECONDS); // 延迟3000毫秒(3秒)后执行单次任务,每次任务结束后15s执行下一次任务 //executor.scheduleWithFixedDelay(runnable, 3000, 15000, java.util.concurrent.TimeUnit.MILLISECONDS); // 延迟3000毫秒(3秒)后执行单次任务 //executor.schedule(runnable, 3000, java.util.concurrent.TimeUnit.MILLISECONDS); String formattedTime = sdf.format(System.currentTimeMillis()); System.out.println(formattedTime + “单次执行任务:定时任务已启动”); }
可以看到的是,ScheduledFutureTask跟前文的Timer一样,有任务的周期、下次执行时间等属性。不同的是ScheduledFutureTask有自己的run方法。 在Timer中,TimerTask的run方法是我们自己重写的,就是业务逻辑代码。在ScheduledExecutorService中,我们把Runnable提交给ScheduledExecutorService之后,则是先调用ScheduledFutureTask的构造方法,在ScheduledFutureTask的构造方法中,又调用父类(FutureTask)的构造方法,并传入Runnable实例。
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { /** The time the task is enabled to execute in nanoTime units */ private long time; /** * Period in nanoseconds for repeating tasks. A positive * value indicates fixed-rate execution. A negative value * indicates fixed-delay execution. A value of 0 indicates a * non-repeating task. */ private final long period; /** * Creates a one-shot action with given nanoTime-based trigger time. */ ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } /** * Overrides FutureTask version so as to reset/requeue if periodic. */ public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } } }</code></pre><p style="">这里run方法的逻辑是:</p><p style=""><img src="的run方法流程.png" width="100%" height="100%" style="display: inline-block"></p><ol><li><p style="">判断任务是否能执行,因为存在已经将调度器shutdown了的情况。</p></li><li><p style="">如果是非周期任务,那么执行即可</p></li><li><p style="">如果是周期任务,除了要执行之外,还需要重新安排下一次调度。</p></li></ol><h4 style="" id="scheduledexecutorservice-1">ScheduledExecutorService</h4><p style="">前面我们已经提过,我们通过ScheduledExecutorService暴露的api提交任务,然后由ScheduledExecutorService持有的阻塞队列实例去存储提交上来的任务。另外就是通过线程池去执行任务,而不是单线程。本文就暂时不展开阻塞队列了,因为跟本文的主题已经有点远了。我们直接介绍线程池这部分。</p><p style=""><img src="" width="100%" height="100%" style="display: inline-block"></p><p style="">ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,所以其本身也是线程池实例。线程池中的线程是随我们任务的提交,不断创建的。以scheduleAtFixedRate为例:提交任务后,先是调用delayedExecute方法。</p><pre><code class="language-java">// public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }private void delayedExecute(RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } }
// void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null, true); else if (wc == 0) addWorker(null, false); }
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); }</code></pre><p style="">可以看到,run方法只有一行代码,就是将自己的引用作为入参调用runWorker方法。runWorker是ThreadPoolExecutor类的方法。</p><pre><code class="language-java">final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() &amp;&amp; runStateAtLeast(ctl.get(), STOP))) &amp;&amp; !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try {; } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }</code></pre><ol><li><p style="">主体不出意料的是个while循环,如果获取不到任务,就不进入循环体,而是直接退出,然后走processWorkerExit,移除worker。</p></li><li><p style="">如果获取到任务的话,进入循环体,先判断线程池是否已经要被关闭。如果状态正常,就执行任务,运行方法,也就是我们前面说的ScheduledFutureTask的run方法。</p></li></ol><h3 style="" id="%E7%BB%BC%E5%90%88%E5%88%86%E6%9E%90-1">综合分析</h3><ol><li><p style="">ScheduledThreadPoolExecutor在实现的复杂度上比Timer大了不少,用到了阻塞队列、线程池。从功能和性能角度都有很大的优化空间。</p></li><li><p style="">在异常处理层面,我们可以看到也很完善,不用再担心因为忘记处理的异常导致整个线程挂掉而影响后续的任务执行。</p></li></ol><h2 style="" id="spring-scheduling">Spring Scheduling</h2><p style="">Spring Scheduling是Spring提供的定时任务工具,与ScheduledThreadPoolExecutor相比,易用性上有了质的飞跃。</p><ol><li><p style="">只需要写好业务方法,然后在方法上添加上@Scheduled注解,配置好定时策略,所有工作就完成了。</p></li><li><p style="">提供更复杂的调度策略,比如cron表达式。</p></li></ol><h3 style="" id="%E5%A6%82%E4%BD%95%E4%BD%BF%E7%94%A8-2">如何使用</h3><ol><li><p style="">在SpringBoot的主程序上添加@EnableScheduling注解。</p></li></ol><pre><code>@SpringBootApplication @EnableScheduling public class SpringbootsrcApplication {public static void main(String[] args) {, args); } }
- 在业务方法上添加@Scheduled注解。
@Component public class SpringScheduler {// 创建SimpleDateFormat对象,定义格式化样式 private SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss.SSS"); @Scheduled(cron = "0/15 * * * * ? ") public void service() throws InterruptedException { // 将当前时间的毫秒数转换为格式化后的时间字符串 String formattedTime = sdf.format(System.currentTimeMillis()); System.out.println(formattedTime + "单次执行任务:定时任务执行了"); } }
public @interface Scheduled {String cron() default ""; String zone() default "";long fixedDelay() default -1;String fixedDelayString() default "";long fixedRate() default -1;String fixedRateString() default "";long initialDelay() default -1;String initialDelayString() default "";TimeUnit timeUnit() default TimeUnit.MILLISECONDS; }
Spring根据我们@Scheduled上的配置,自动为我们管理好了调度任务。可以想到的是,它必然要收集到这些信息才行。这就引出了Spring Scheduling的第一个核心类。
@Override public Object postProcessAfterInitialization(Object bean, String beanName) { if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler || bean instanceof ScheduledExecutorService) { // Ignore AOP infrastructure such as scoped proxies. return bean; }Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); if (!this.nonAnnotatedClasses.contains(targetClass) && AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) { Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> { Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations( method, Scheduled.class, Schedules.class); return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null); }); if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace("No @Scheduled annotations found on bean class: " + targetClass); } } else { // Non-empty set of methods annotatedMethods.forEach((method, scheduledAnnotations) -> scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean))); if (logger.isTraceEnabled()) { logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods); } } } return bean; }
- 首先基于获取到的bean、method,利用反射技术,封装为一个Runnable实例。
- 根据@Schedule的不同参数配置,识别出不同类型的任务:cron表达式任务、fixedDelay、fixedRate任务。然后通过registry的不同api,提交这些任务。
protected void processScheduled(Scheduled scheduled, Method method, Object bean) { try { Runnable runnable = createRunnable(bean, method); boolean processedSchedule = false; String errorMessage = “Exactly one of the ‘cron’, ‘fixedDelay(String)’, or ‘fixedRate(String)’ attributes is required”; Set<ScheduledTask> tasks = new LinkedHashSet<>(4); // Determine initial delay long initialDelay = convertToMillis(scheduled.initialDelay(), scheduled.timeUnit()); String initialDelayString = scheduled.initialDelayString(); if (StringUtils.hasText(initialDelayString)) { Assert.isTrue(initialDelay &lt; 0, "Specify 'initialDelay' or 'initialDelayString', not both"); if (this.embeddedValueResolver != null) { initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString); } if (StringUtils.hasLength(initialDelayString)) { try { initialDelay = convertToMillis(initialDelayString, scheduled.timeUnit()); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long"); } } } // Check cron expression String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { String zone =; if (this.embeddedValueResolver != null) { cron = this.embeddedValueResolver.resolveStringValue(cron); zone = this.embeddedValueResolver.resolveStringValue(zone); } if (StringUtils.hasLength(cron)) { Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers"); processedSchedule = true; if (!Scheduled.CRON_DISABLED.equals(cron)) { TimeZone timeZone; if (StringUtils.hasText(zone)) { timeZone = StringUtils.parseTimeZoneString(zone); } else { timeZone = TimeZone.getDefault(); } tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); } } } // At this point we don't need to differentiate between initial delay set or not anymore if (initialDelay &lt; 0) { initialDelay = 0; } // Check fixed delay long fixedDelay = convertToMillis(scheduled.fixedDelay(), scheduled.timeUnit()); if (fixedDelay &gt;= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } String fixedDelayString = scheduled.fixedDelayString(); if (StringUtils.hasText(fixedDelayString)) { if (this.embeddedValueResolver != null) { fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString); } if (StringUtils.hasLength(fixedDelayString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedDelay = convertToMillis(fixedDelayString, scheduled.timeUnit()); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long"); } tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } } // Check fixed rate long fixedRate = convertToMillis(scheduled.fixedRate(), scheduled.timeUnit()); if (fixedRate &gt;= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } String fixedRateString = scheduled.fixedRateString(); if (StringUtils.hasText(fixedRateString)) { if (this.embeddedValueResolver != null) { fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString); } if (StringUtils.hasLength(fixedRateString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedRate = convertToMillis(fixedRateString, scheduled.timeUnit()); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long"); } tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } } // Check whether we had any attribute set Assert.isTrue(processedSchedule, errorMessage); // Finally register the scheduled tasks synchronized (this.scheduledTasks) { Set&lt;ScheduledTask&gt; regTasks = this.scheduledTasks.computeIfAbsent(bean, key -&gt; new LinkedHashSet&lt;&gt;(4)); regTasks.addAll(tasks); } } catch (IllegalArgumentException ex) { throw new IllegalStateException( "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage()); } }</code></pre><p style="">通过bean的后置处理器,ScheduledAnnotationBeanPostProcessor完成了所有注解方法的扫描工作。而通过监听ContextRefreshedEvent事件,会在所有bean初始化完成之后,进行调度任务的最后阶段工作。完成整个调度任务注册流程。</p><blockquote><p style="">ContextRefreshedEvent会在ApplicationContext初始化完成之后或者刷新的时候发起。</p></blockquote><pre><code class="language-java">@Override public void onApplicationEvent(ContextRefreshedEvent event) { if (event.getApplicationContext() == this.applicationContext) { // Running in an ApplicationContext -> register tasks this late… // giving other ContextRefreshedEvent listeners a chance to perform // their work at the same time (e.g. Spring Batch’s job registration). finishRegistration(); } }</code></pre><p style="">可以看到主要是调用了finishRegistration方法,而finishRegistration方法的主体逻辑是调用registrar的afterPropertiesSet方法。</p><pre><code>private void finishRegistration() { if (this.scheduler != null) { this.registrar.setScheduler(this.scheduler); } this.registrar.afterPropertiesSet(); }
@Override public void afterPropertiesSet() { scheduleTasks(); }/** * Schedule all registered tasks against the underlying * {@linkplain #setTaskScheduler(TaskScheduler) task scheduler}. */ @SuppressWarnings("deprecation") protected void scheduleTasks() { if (this.taskScheduler == null) { this.localExecutor = Executors.newSingleThreadScheduledExecutor(); this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor); } if (this.triggerTasks != null) { for (TriggerTask task : this.triggerTasks) { addScheduledTask(scheduleTriggerTask(task)); } } if (this.cronTasks != null) { for (CronTask task : this.cronTasks) { addScheduledTask(scheduleCronTask(task)); } } if (this.fixedRateTasks != null) { for (IntervalTask task : this.fixedRateTasks) { addScheduledTask(scheduleFixedRateTask(task)); } } if (this.fixedDelayTasks != null) { for (IntervalTask task : this.fixedDelayTasks) { addScheduledTask(scheduleFixedDelayTask(task)); } } }</code></pre><p style="">可以看到,afterPropertiesSet的主体就是调用scheduleTasks。而scheduleTasks方法的核心逻辑是:</p><ol><li><p style="">如果taskScheduler为空,则初始化localExecutor和taskScheduler。</p></li><li><p style="">对四种不同类型的任务,循环加入调度。多的一种TriggerTask,是自定义任务。</p></li></ol><p style="">我们先来看下比较熟悉的scheduleFixedRateTask和scheduleFixedDelayTask。这两个逻辑基本一致,所以我们以scheduleFixedRateTask为例。</p><pre><code class="language-java">@Deprecated @Nullable public ScheduledTask scheduleFixedRateTask(IntervalTask task) { FixedRateTask taskToUse = (task instanceof FixedRateTask ? (FixedRateTask) task : new FixedRateTask(task.getRunnable(), task.getInterval(), task.getInitialDelay())); return scheduleFixedRateTask(taskToUse); } /**Schedule the specified fixed-rate task, either right away if possibleor on initialization of the scheduler.@return a handle to the scheduled task, allowing to cancel it(or {@code null} if processing a previously registered task)@since 5.0.2 */ @Nullable public ScheduledTask scheduleFixedRateTask(FixedRateTask task) { ScheduledTask scheduledTask = this.unresolvedTasks.remove(task); if (this.taskScheduler != null) { if (task.getInitialDelay() > 0) { Date startTime = new Date(this.taskScheduler.getClock().millis() + task.getInitialDelay()); scheduledTask.future = this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), startTime, task.getInterval()); } else { scheduledTask.future = this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), task.getInterval()); } } else { addFixedRateTask(task); this.unresolvedTasks.put(task, scheduledTask); } return (newTask ? scheduledTask : null); }</code></pre><ol><li><p style="">对任务进行封装,然后调用同名方法scheduleFixedRateTask</p></li><li><p style="">然后根据任务的不同配置,是否有InitialDelay,调用taskScheduler的不同方法去提交任务。</p></li></ol><h4 style="" id="threadpooltaskscheduler">ThreadPoolTaskScheduler</h4><p style="">ThreadPoolTaskScheduler是TaskScheduler的一个实现类。是Spring Scheduling对JDK ScheduledExecutorService的又一层封装。</p><p style="">上面我们看到,在ScheduledTaskRegistrar中提交固定频率的任务后,最终会调用this.taskScheduler.scheduleAtFixedRate方法。而在taskScheduler.scheduleAtFixedRate中,又最终会调用ScheduledExecutorService的scheduleAtFixedRate方法。</p><pre><code class="language-java">@Override public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period) { ScheduledExecutorService executor = getScheduledExecutor(); long initialDelay = startTime.getTime() - this.clock.millis(); try { return executor.scheduleAtFixedRate(errorHandlingTask(task, true), initialDelay, period, TimeUnit.MILLISECONDS); } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); } }</code></pre><h4 style="" id="cron%E8%A1%A8%E8%BE%BE%E5%BC%8F%E4%BB%BB%E5%8A%A1%E6%98%AF%E5%A6%82%E4%BD%95%E5%AE%9E%E7%8E%B0%E7%9A%84">Cron表达式任务是如何实现的</h4><p style="">我们知道Spring Scheduling是对ScheduledExecutorService的进一步封装。ScheduledExecutorService只支持固定延迟、固定频率、单次任务这三种任务,而Spring Scheduling还支持cron表达式任务,这个是怎么实现的呢?</p><p style="">我们要先回到ScheduledTaskRegistrar的scheduleCronTask方法。</p><pre><code class="language-java">@Nullable public ScheduledTask scheduleCronTask(CronTask task) { ScheduledTask scheduledTask = this.unresolvedTasks.remove(task); boolean newTask = false; if (scheduledTask == null) { scheduledTask = new ScheduledTask(task); newTask = true; } if (this.taskScheduler != null) { scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger()); } else { addCronTask(task); this.unresolvedTasks.put(task, scheduledTask); } return (newTask ? scheduledTask : null); }</code></pre><p style="">调用的是taskScheduler的schedule方法。入参是Runnable实例和1个Trigger。</p><blockquote><p style="">Trigger是触发器,一般是与任务关联的,用于计算任务的下次执行时间。而CronTask的触发器就是CronTrigger。</p></blockquote><pre><code class="language-java">@Override @Nullable public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) { ScheduledExecutorService executor = getScheduledExecutor(); try { ErrorHandler errorHandler = this.errorHandler; if (errorHandler == null) { errorHandler = TaskUtils.getDefaultErrorHandler(true); } return new ReschedulingRunnable(task, trigger, this.clock, executor, errorHandler).schedule(); } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); } }</code></pre><p style="">仍然是通过ScheduledExecutorService来实现的,但是在提交之前,有些额外的处理:</p><ol><li><p style="">Runnable任务又被封装了一层,类型是ReschedulingRunnable。Rescheduling是“重新调度“的意思。</p></li><li><p style="">调用新生成的ReschedulingRunnable实例的schedule方法。</p></li></ol><pre><code class="language-java">public ReschedulingRunnable(Runnable delegate, Trigger trigger, Clock clock, ScheduledExecutorService executor, ErrorHandler errorHandler) {super(delegate, errorHandler); this.trigger = trigger; this.triggerContext = new SimpleTriggerContext(clock); this.executor = executor; }@Nullable public ScheduledFuture<?> schedule() { synchronized (this.triggerContextMonitor) { this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext); if (this.scheduledExecutionTime == null) { return null; } long initialDelay = this.scheduledExecutionTime.getTime() - this.triggerContext.getClock().millis(); this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS); return this; } } @Override public void run() { Date actualExecutionTime = new Date(this.triggerContext.getClock().millis());; Date completionTime = new Date(this.triggerContext.getClock().millis()); synchronized (this.triggerContextMonitor) { Assert.state(this.scheduledExecutionTime != null, “No scheduled execution”); this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime); if (!obtainCurrentFuture().isCancelled()) { schedule(); } } }
- Timer、ScheduledExecutorService、Spring Scheduling中涉及的线程安全问题的讨论,锁的应用等
- 阻塞队列
- 异步编程相关的知识,如Future。
不用任何框架,Java 就能实现定时任务的 3 种方法!
所有博客内容,在我的个人博客网站可见,欢迎访问: TwoFish
本文由博客一文多发平台 OpenWrite 发布!
- 2024-11-24Java中定时任务实现方式及源码剖析
- 2024-11-24鸿蒙原生开发手记:03-元服务开发全流程(开发元服务,只需要看这一篇文章)
- 2024-11-24细说敏捷:敏捷四会之每日站会
- 2024-11-23Springboot应用的多环境打包入门
- 2024-11-23Springboot应用的生产发布入门教程
- 2024-11-23Python编程入门指南
- 2024-11-23Java创业入门:从零开始的编程之旅
- 2024-11-23Java创业入门:新手必读的Java编程与创业指南
- 2024-11-23Java对接阿里云智能语音服务入门详解
- 2024-11-23Java对接阿里云智能语音服务入门教程