一文带你深入了解Java中延时任务的实现

   2023-02-09 学习力0
核心提示:目录概述JAVA DelayQueueDelayQueue的实现原理DelayQueue实现延时队列的优缺点时间轮算法时间轮的具体实现进阶优化版时间轮算法时间轮算法的应用小结redis延时队列mq延时队列rocketmq延时消息rocketmq的精准延时消息总结概述延时任务相信大家都不陌生,在现实

概述

延时任务相信大家都不陌生,在现实的业务中应用场景可以说是比比皆是。例如订单下单15分钟未支付直接取消,外卖超时自动赔付等等。这些情况下,我们该怎么设计我们的服务的实现呢?

笨一点的方法自然是定时任务去数据库进行轮询。但是当业务量较大,事件处理比较费时的时候,我们的系统和数据库往往会面临巨大的压力,如果采用这种方式或许会导致数据库和系统的崩溃。那么有什么好办法吗?今天我来为大家介绍几种实现延时任务的办法。

JAVA DelayQueue

你没看错,java内部有内置延时队列,位于java concurrent包内。

DelayQueue是一个jdk中自带的延时队列实现,他的实现依赖于可重入锁ReentrantLock以及条件锁Condition和优先队列PriorityQueue。而且本质上他也是一个阻塞队列。那么他是如何实现延时效果的呢。

DelayQueue的实现原理

首先DelayQueue队列中的元素必须继承一个接口叫做Delayed,我们找到这个类

public interface Delayed extends Comparable<Delayed> {
        long getDelay(TimeUnit unit);
    }

发现这个类内部定义了一个返回值为long的方法getDelay,这个方法用来定义队列中的元素的过期时间,所有需要放在队列中的元素,必须实现这个方法。

然后我们来看看延迟队列的队列是如何操作的,我们就拿最典型的offertake来看:

public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

offer操作平平无奇,甚至直接调用到了优先队列的offer来将队列根据延时进行排序,只不过加了个锁,做了些数据的调整,没有什么深入的地方,但是take的实现看上去就很复杂了。(注意,Dalayed继承了Comparable方法,所以是可以直接用优先队列来排序的,只要你自己实现了compareTo方法)我尝试加了些注释让各位看得更明白些:

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 自选操作
            for (;;) {
                // 获取队列第一个元素,如果队列为空
                // 阻塞住直到有新元素加入队列,offer等方法调用signal唤醒线程
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    // 如果队列中有元素
                    long delay = first.getDelay(NANOSECONDS);
                    // 判断延时时间,如果到时间了,直接取出数据并return
                    if (delay <= 0)
                        return q.poll();
                    first = null;
                    // 如果leader为空则阻塞
                    if (leader != null)
                        available.await();
                    else {
                        // 获取当前线程
                        Thread thisThread = Thread.currentThread();
                        // 设置leader为当前线程
                        leader = thisThread;
                        try {
                            // 阻塞延时时间
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

我们可以看到take的实现依靠了无限自旋,直到第一个队列元素过了超时时间后才会返回,否则等待他的只有被阻塞。

DelayQueue实现延时队列的优缺点

看了源码后,我们应该对DelayQueue的实现有了一个大致的了解,也对他的优缺点有了一定的理解。他的优点很明显:

  • java原生支持,不需要引入第三方工具
  • 线程安全,即插即用使用方便

但是他的缺点也是很明显的:

  • 不支持分布式,并且数据放在内存中,没有持久化的支持,服务宕机会丢失数据
  • 插入时使用的是优先队列的排序,时间复杂度较高,并且对于队列中的任务不能很好的管理

所以有没有更好的延时队列的实现呢,我们继续看下去~

时间轮算法

时间轮算法是一个被设计出来处理延时任务的算法,现实中的应用可以在kafka以及netty等项目中找到类似的实现。

时间轮的具体实现

所谓时间轮,顾名思义,他是一个类似于时钟的结构,即他的主结构是一个环形数组,如图:

一文带你深入了解Java中延时任务的实现

环形数组中存放的是一个一个的链表,链表中存放着需要执行的任务,我们设定好数组中执行的间隔,假设我们的环形数组的长度是60,每个数组的执行间隔为1s,那么我们会在每过1s就会执行数组下一个元素中的链表中的元素。如果只是这样,那么我们将无法处理60秒之外的延时任务,这显然不合适,所以我们会在每个任务中加上一个参数圈数,来表明任务会在几圈后执行。假如我们有一个任务是在150s后执行,那么他应该在30s的位置,同时圈数应该为2。我们每次执行一个链表中的任务的时候会把当圈需要执行的任务取出执行,然后把他从链表中删除,如果任务不是当圈执行,则修改他的圈数,将圈数减1,于是一个简单的时间轮出炉了。

那么这样的时间轮有什么优缺点呢?

先来说优点吧:

  • 相比DelayQueue来说,时间轮的插入更加的高效,时间复杂度为O(1)
  • 实现简单清晰,任务调度更加方便合理

当然他的缺点也不少:

  • 他和DelayQueue一样不支持分布式,并且数据放在内存中,没有持久化的支持,服务宕机会丢失数据
  • 数组间的间隔设置会影响任务的精度
  • 由于不同圈数的任务会在同一个链表中,执行到每个数组元素时需要遍历所有的链表数据,效率会很低

进阶优化版时间轮算法

刚才提到了一些时间轮算法的缺点,那么是不是有一些方法来进行下优化?这里我来介绍一下时间轮的优化版本。

之前我们提到不同圈数的任务会在同一个链表中被重复遍历影响效率,这种情况下我们可以进行如下优化:将时间轮进行分层

一文带你深入了解Java中延时任务的实现

我们可以看到图中,我们采用了多层级的设计,上图中分了三层,每层都是60格,第一个轮盘中的间隔为1小时,我们的数据每一次都是插入到这个轮盘中,每当这个轮盘经过一个小时后来到下一个刻度,就会取出其中的所有元素,按照延迟时间放入到第二个象征着分钟的轮盘中,以此类推。

这样的实现好处可以说是显而易见的:

  • 首先避免了当时间跨度较大时空间的浪费
  • 每一次到达刻度的时候我们不用再像以前那样遍历链表取出需要的数据,而是可以一次性全部拿出来,大大节约了操作的时间

时间轮算法的应用

时间轮算法可能在之前大家没有听说过,但是他在各个地方都有着不小的作用。linux的定时器的实现中就有时间轮的身影,同样如果你是一个喜好看源码的读者,你也可能会在kafka以及netty中找到他的实现。

kafka

kafka中应用了时间轮算法,他的实现和之前提到的进阶版时间轮没有太大的区别,只有在一点上:kafka内部实现的时间轮应用到了DelayQueue

@nonthreadsafe
    private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {

    private[this] val interval = tickMs * wheelSize
    private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }

    private[this] var currentTime = startMs - (startMs % tickMs)
    
    @volatile private[this] var overflowWheel: TimingWheel = null

    private[this] def addOverflowWheel(): Unit = {
        synchronized {
        if (overflowWheel == null) {
            overflowWheel = new TimingWheel(
            tickMs = interval,
            wheelSize = wheelSize,
            startMs = currentTime,
            taskCounter = taskCounter,
            queue
            )
        }
        }
    }

    def add(timerTaskEntry: TimerTaskEntry): Boolean = {
        val expiration = timerTaskEntry.expirationMs

        if (timerTaskEntry.cancelled) {
        false
        } else if (expiration < currentTime + tickMs) {
        false
        } else if (expiration < currentTime + interval) {
        val virtualId = expiration / tickMs
        val bucket = buckets((virtualId % wheelSize.toLong).toInt)
        bucket.add(timerTaskEntry)

        if (bucket.setExpiration(virtualId * tickMs)) {
            queue.offer(bucket)
        }
        true
        } else {
        if (overflowWheel == null) addOverflowWheel()
        overflowWheel.add(timerTaskEntry)
        }
    }

    def advanceClock(timeMs: Long): Unit = {
        if (timeMs >= currentTime + tickMs) {
        currentTime = timeMs - (timeMs % tickMs)

        if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
        }
    }
    }

上面是kafka内部的实现(使用的语言是scala),我们可以看到实现非常的简洁,并且使用到了DelayQueue。我们刚才已经讨论过了DelayQueue的优缺点,查看源码后我们已经可以有一个大致的结论了:DelayQueue在kafka的时间轮中的作用是负责推进任务的,为的就是防止在时间轮中由于任务比较稀疏而造成的"空推进"。DelayQueue的触发机制可以很好的避免这一点,同时由于DelayQueue的插入效率较低,所以仅用于底层的推进,任务的插入由时间轮来操作,两者配置,可以实现效率和资源的平衡。

netty

netty的内部也有时间轮的实现HashedWheelTimer

HashedWheelTimer的实现要比kafka内部的实现复杂许多,和kafka不同的是,它的内部推进不是依靠的DelayQueue而是自己实现了一套,源码太长,有兴趣的读者可以自己去看一下。

小结

时间轮说了这么多,我们可以看到他的效率是很出众的,但是还是有这么一个问题:他不支持分布式。当我们的业务很复杂,需要分布式的时候,时间轮显得力不从心,那么这个时候有什么好一点的延时队列的选择呢?我们或许可以尝试使用第三方的工具

redis延时队列

其实啊说起延时,我们如果常用redis的话,就会想起redis是存在过期机制的,那么我们是否可以利用这个机制来实现一个延时队列呢?

redis自带key的过期机制,而且可以设置过期后的回调方法。基于此特性,我们可以非常容易就完成一个延时队列,任务进来时,设定定时时间,并且配置好过期回调方法即可。

除了使用redis的过期机制之外,我们也可以利用它自带的zset来实现延时队列。zset支持高性能的排序,因此我们任务进来时可以将时间戳作为排序的依据,以此将任务的执行先后进行有序的排列,这样也能实现延时队列。

zset实现延时队列的好处:

  • 支持高性能排序
  • redis本身的高可用和高性能以及持久性

mq延时队列

rocketmq延时消息

rocketmq天然支持延时消息,他的延时消息分为18个等级,每个等级对应不同的延时时间。

那么他的原理是怎样的呢?

rocketmqbroker收到消息后会将消息写入commitlog,并且判断这个消息是否是延时消息(即delay属性是否大于0),之后如果判断确实是延时消息,那么他不会马上写入,而是通过转发的方式将消息放入对应的延时topic(18个延时级别对应18个topic

rocketmq会有一个定时任务进行轮询,如果任务的延迟时间已经到了就发往指定的topic

这个设计比较的简单粗暴,但是缺点也十分明显:

  • 延时是固定的,如果想要的延迟超出18个级别就没办法实现
  • 无法实现精准延时,队列的堆积等等情况也会导致执行产生误差

rocketmq的精准延时消息

rocketmq本身是不支持的精确延迟的,他的商业版本ons倒是支持。不过rocketmq的社区中有相应的解决方案。方案是借助于时间轮算法来实现的,感兴趣的朋友可以自行去社区查看。(社区中的一些未被合并的pr是不错的实现参考)

总结

延时队列的实现千千万,但是如果要在生产中大规模使用,那么大部分情况下其实都避不开时间轮算法。改进过的时间轮算法可以做到精准延时,持久化,高性能,高可用性,可谓是完美。但是话又说回来,其他的延时方式就无用了吗?其实不是的,所有的方式都是需要匹配自己的使用场景。如果你是极少量数据的轮询,那么定时轮询数据库或许才是最佳的解决方案,而不是无脑的引入复杂的延时队列。如果是单机的任务,那么jdk的延时队列也是不错的选择。

本文介绍的这些延时队列只是为了向大家展示他们的原理和优缺点,具体的使用还需要结合自己业务的场景。

以上就是一文带你深入了解Java中延时任务的实现的详细内容,更多关于Java延时任务的资料请关注其它相关文章!

原文地址:https://segmentfault.com/a/1190000042870188
 
标签: Java 延时 任务
反对 0举报 0 评论 0
 

免责声明:本文仅代表作者个人观点,与乐学笔记(本网)无关。其原创性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容、文字的真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。
    本网站有部分内容均转载自其它媒体,转载目的在于传递更多信息,并不代表本网赞同其观点和对其真实性负责,若因作品内容、知识产权、版权和其他问题,请及时提供相关证明等材料并与我们留言联系,本网站将在规定时间内给予删除等相关处理.

  • #新闻拍一拍# Oracle 调研如何避免让 Java 开发者投奔 Rust 和 Kotlin | Linux 中国
    #新闻拍一拍# Oracle 调研如何避免让 Java 开发
     导读:• 英特尔对迟迟不被 Linux 主线接受的 SGX Enclave 进行了第 38 次修订 • ARM 支持开源的 Panfrost Gallium3D 驱动本文字数:977,阅读时长大约:1分钟作者:硬核老王Oracle 调研如何避免让 Java 开发者投奔 Rust 和 KotlinOracle 委托分析公司 Omd
    03-08
  • oogle的“ JavaScript杀手” Dart 与JavaScript的比较
    oogle的“ JavaScript杀手” Dart 与JavaScript
    JavaScript通常被称为浏览器脚本语言,但它也已扩展到许多服务器端和移动应用程序开发环境。JS已经存在了将近20年,可以肯定地说它确实是一种成熟且稳定的编程语言。在Facebook发布React和React Native框架之后,JS变得越来越流行。JavaScript具有自己的软件
    03-08
  • sf02_选择排序算法Java Python rust 实现
    Java 实现package common;public class SimpleArithmetic {/** * 选择排序 * 输入整形数组:a[n] 【4、5、3、7】 * 1. 取数组编号为i(i属于[0 , n-2])的数组值 a[i],即第一重循环 * 2. 假定a[i]为数组a[k](k属于[i,n-1])中的最小值a[min],即执行初始化 min =i
    02-09
  • Delphi XE6 通过JavaScript API调用百度地图
    Delphi XE6 通过JavaScript API调用百度地图
    参考昨天的内容,有朋友还是问如何调用百度地图,也是,谁让咱都在国内呢,没办法,你懂的。 首先去申请个Key,然后看一下百度JavaScript的第一个例子:http://developer.baidu.com/map/jsdemo.htm下一步,就是把例子中的代码,移动TWebBrower中。 unit Unit
    02-09
  • JavaScript面向对象轻松入门之抽象(demo by ES5
    抽象的概念  狭义的抽象,也就是代码里的抽象,就是把一些相关联的业务逻辑分离成属性和方法(行为),这些属性和方法就可以构成一个对象。  这种抽象是为了把难以理解的代码归纳成与现实世界关联的概念,比如小狗这样一个对象:属性可以归纳出“毛色”、
    02-09
  • Java与Objective-C的渊源 objective-c和c++的区
    java创始成员Patrick Naughton回忆,通常人们会认为Java是学Modula-3和C+,其实这些都是谣传,而对Java影响比较大的则是Objective-C:单 继承、动态绑定和加载、类对象、纯虚函数、反射、原始类型包装类等。Java的接口直接抄自OC的协议。  Objective-C是扩
    02-09
  • Java项目导出数据为 PDF 文件的操作代码
    Java项目导出数据为 PDF 文件的操作代码
    目录Java项目如何导出数据为 PDF 文件?一、代码结构如下二、代码说明1、添加依赖 pom.xml2、HTML模板文件 audit_order_record.html3、添加字体4、PDF 导出工具类5、导出接口6、打开浏览器测试三、效果图Java项目如何导出数据为 PDF 文件?一个小需求,需要将
  • 盘点Java中延时任务的多种实现方式 java 延时队列怎么实现
    盘点Java中延时任务的多种实现方式 java 延时队
    目录场景描述实现方式一、挂起线程二、ScheduledExecutorService 延迟任务线程池三、DelayQueue(延时队列)四、Redis-为key指定超时时长,并监听失效key五、时间轮六、消息队列-延迟队列场景描述①需要实现一个定时发布系统通告的功能,如何实现? ②支付超时
  • Java Semaphore信号量使用分析讲解
    Java Semaphore信号量使用分析讲解
    目录前言介绍和使用API介绍基本使用原理介绍获取许可acquire()释放许可release()总结前言大家应该都用过synchronized 关键字加锁,用来保证某个时刻只允许一个线程运行。那么如果控制某个时刻允许指定数量的线程执行,有什么好的办法呢? 答案就是JUC提供的信
  • 【Java并发入门】03 互斥锁(上):解决原子性问题
    【Java并发入门】03 互斥锁(上):解决原子性
    原子性问题的源头是线程切换Q:如果禁用 CPU 线程切换是不是就解决这个问题了?A:单核 CPU 可行,但到了多核 CPU 的时候,有可能是不同的核在处理同一个变量,即便不切换线程,也有问题。所以,解决原子性的关键是「同一时刻只有一个线程处理该变量,也被称
    02-09
点击排行