本篇内容介绍了“KAFKA是如何处理延时任务的”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
创新互联公司是专业的珠晖网站建设公司,珠晖接单;提供成都网站设计、网站制作,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行珠晖网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!
首先,我们需要了解一下kafka中大概有哪些需要延时的任务,该怎么查看呢?
很简单,kafka的设计都是基于接口的,那么我们只需要找到延时任务的顶层接口,然后看一下该接口有哪些实现类就知道有哪些延时任务了。
顶层抽象类接口是:DelayedOperation
对应的子类:
DelayedHeartbeat:就是用于做消费者心跳超时检测的;
DelayedProduce:就是做生产者设置ack=-1时需要等待所有副本确认写入成功的;
DelayedFetch:就是在消费的时候该分区没有数据,需要去做延时等待;
DelayedJoin:就是去做消费者加组的时候,在JOIN阶段需要延时等待。
这个答案已经在标题中就已经回答了,就是时间轮。
那么时间轮在kafka中是如何实现的呢?
kafka中的时间轮本体是一个20长度数组,不过内部持有上层数组的一个引用,数组中每个元素都是一个List,存放处于这个时间段的所有任务。
最后将这些有任务的List引用,放入DelayQueue来实现时间的流动,每次从DelayQueue中取出到期的List进行对应的操作。
翻译一下:
就是原本把所有延时任务都一股脑全部放入DelayQueue中,实在是太多了,由于DelayQueue底层数据结构是小顶堆,插入和删除的时间复杂度都是 O(nlog(n)),
n代表的具体任务的数量,当n值非常大时,对应的性能就很差,不能满足一个高性能中间件的要求。于是就想了个办法减小n的个数,
就是把原来的一个个延时任务,通过时间区间来封装成一个List,把List作为一个基本单位存入到DelayQueue中,那么这一样一来,就能把插入和删除的时间复杂度
从O(nlog(n))降低到接近O(1)[这里为什么是近似O(1)呢?你可以理解为时间轮是一个类hash表的结构],除此之外,最重要的就是大大减小了DelayQueue中元素的个数n,
因为一层时间轮就20个List,10层也就才200个,所以对于这么小数量的元素个数,DelayQueue是完全能hold的住的。
总结一下:
其实时间轮的设计思想就是批处理的思想,把一批任务根据时间区间封装成一个List,最后把List放到DelayQueue中去实现轮转的效果。
优化点主要是两个,一个是插入/删除的时间复杂度由O(nlog(n))降低到了近似O(1),第二个是大大减小了DelayQueue元素的个数。
了解设计思想,我们再看看实现原理:
1、核心函数:加入Task到时间轮中
分为三步:
如果任务已经超期就返回false
如果任务在自己的时间跨度内,就计算应该放入哪个桶中(在哪个时间区间);如果桶没在DelayQueue中则加入到DelayQueue中去。
如果任务的超时时间超过了自己的时间跨度,就往上层时间传,直到找到一个满足时间跨度的时间轮。
def add(timerTaskEntry: TimerTaskEntry): Boolean = { val expiration = timerTaskEntry.expirationMs if (timerTaskEntry.cancelled) { // 被取消 // Cancelled false } else if (expiration < currentTime + tickMs) { // 已经过期 // Already expired false } else if (expiration < currentTime + interval) { // 在有效期内 // Put in its own bucket val virtualId = expiration / tickMsval bucket = buckets((virtualId % wheelSize.toLong).toInt) bucket.add(timerTaskEntry)// Set the bucket expiration time // 设置超时时间,如果该桶已经设置了超时时间则说明已经存在于DelayQueue中了 // 如果不存在超时时间,则需要将当前桶加入DelayQueue中 if (bucket.setExpiration(virtualId * tickMs)) { // The bucket needs to be enqueued because it was an expired bucket // We only need to enqueue the bucket when its expiration time has changed, i.e. the wheel has advanced // and the previous buckets gets reused; further calls to set the expiration within the same wheel cycle // will pass in the same value and hence return false, thus the bucket with the same expiration will not // be enqueued multiple times. queue.offer(bucket) }true } else { // 超过了当前层时间轮的时间跨度 需要向上层时间轮传递,如果上层不存在则新建 // Out of the interval. Put it into the parent timer if (overflowWheel == null) addOverflowWheel()overflowWheel.add(timerTaskEntry) }}
2、时间轮如何推进?
每一个DelayedOperationPurgatory,都有一个线程expirationReaper,去负责推进时间轮,如果当前没有task到期就挂起200ms等待。
如果有task到期,就取出对应的桶,然后将桶中的数据全都执行reinsert,也就是从最底层的时间轮重新执行一遍add操作。
/** * A background reaper to expire delayed operations that have timed out */private class ExpiredOperationReaper extends ShutdownableThread( "ExpirationReaper-%d-%s".format(brokerId, purgatoryName), false) { override def doWork() { advanceClock(200L) } }
def advanceClock(timeoutMs: Long): Boolean = { // 从延时队列中取出到期的桶 var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) if (bucket != null) {writeLock.lock()try { // 一次性把到期的全部取出来 while (bucket != null) { // 时间轮的时间推进 timingWheel.advanceClock(bucket.getExpiration())// 把桶中的所有数据都拿去执行reinsert函数 // 本质就是去执行addTimerTaskEntry(timerTaskEntry) bucket.flush(reinsert)bucket = delayQueue.poll() } } finally { writeLock.unlock() }true } else {false } }
3、到期的任务如何执行?
其实就是接着上面的源码,当任务到期之后,reinsert函数会返回false,代表已经超期/被取消了,每个DelayedOperationPurgatory又有一个单线程的taskExecutor,
超期的任务就提交到线程池中去执行即可。
private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry)
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = { // 如果时间轮添加返回false则说明超期/被取消了,直接提交到自己的单线程线程池中去执行该task if (!timingWheel.add(timerTaskEntry)) {// Already expired or cancelled if (!timerTaskEntry.cancelled) taskExecutor.submit(timerTaskEntry.timerTask) } }
4、整个流程的运行图
整个流程概括下来,就是业务代码想TimingWheel执行add,提交任务;
TimingWheel找到合适的时间轮后插入对应的桶中,并将桶放入DelayQueue中;
DelayedOperationPurgatory组件中存在收割线程,去不停从DelayQueue中poll对应到期的task;
最后task重新执行reinsert,如果超期了就提交到taskExecutor中去执行对应的业务handler逻辑。
这个答案在第二小节的时候其实已经给出的,在这里进行一个总结:
1、DelayQueue底层数据结构是小顶堆,插入和删除的时间复杂度都是 O(nlog(n)),因此面对大量的延时操作时,该结构无法满足kafka高性能的要求。
2、时间轮采用批处理的思想将任务按照区间进行封装,形成一类类似hash表的结构,让插入/删除的时间复杂度降低为O(1),并且大大减小了DelayQueue元素的个数。
另外补充一点,kafka中每一种延时场景都会创建单独的时间轮,一个时间轮里只存放一种类型的延时任务,因为不同Task在超期/完成的时候需要执行的逻辑是不一样的,
需要一一对应去执行。举个栗子,心跳延时场景有自己的heartbeatPurgatory,生产延时有自己的delayedProducePurgatory,以此类推。
1、HEARTBEAT请求的处理
从源码中我们可以知道,心跳的维护和会话的超时,kafka的实现非常巧妙。
通常情况下,心跳3s发一次,session超时时间是10s;
kafka就在收到HEARTBEAT请求之后,就先创建一个DelayedHeartbeat延时任务,超时时间就是对应的session.timeout值即10s;
如果在10s内又收到了对应consumer的HEARTBEAT请求,就将上次提交的延时任务完成;
如果在10s内没有收到对应consumer的HEARTBEAT请求,则任务consumer出问题了,就去执行对应的超期逻辑。
private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) { // complete current heartbeat expectation member.latestHeartbeat = time.milliseconds() val memberKey = MemberKey(member.groupId, member.memberId) // 完成上次的延时任务 heartbeatPurgatory.checkAndComplete(memberKey) // reschedule the next heartbeat expiration deadline // 服务端能拿到这个session.timeout,然后根据这个时间生成一个延时任务, // 例如30s,如果这么长时间么有收到心跳请求,则认为消费者出了问题,就踢掉以后执行rebalance。 val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs) heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey)) }
private[group] class DelayedHeartbeat(coordinator: GroupCoordinator, group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, sessionTimeout: Long) extends DelayedOperation(sessionTimeout, Some(group.lock)) { override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete _) override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline) override def onComplete() = coordinator.onCompleteHeartbeat() }
2、DelayedHeartbeat任务超期后的逻辑
这一块也很简单,就是去执行coordinator.onExpireHeartbeat函数,
具体逻辑就是打印一个标识日志:Member xxx has failed,这个日志为什么要单独讲呢?因为这个是我们排查消费者问题的时候的核心日志;
我们在看server.log的时候,如果查到某个消费组的消费者出现这个日志,那么我们就能肯定这个消费组的这个消费者是因为会话超时的原因被剔除了;
从而我们就可以继续往下分析这个消费者掉线的原因是因为消费者进程挂了?或者是客户端机器负载太高而心跳线程是守护线程优先级比较低拿不到CPU资源?
等等一系列定位线索。这个日志主要是用于定位消费者出问题,以及消费组rebalance原因的,是非常重要的一个标识日志!
讲完日志,我们就可以看到后续就是去开启rebalance,因为消费者个数变了,需要重新去进行分区分配,已经故障转移。
def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) { group.inLock {if (!shouldKeepMemberAlive(member, heartbeatDeadline)) { // 标识日志 info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group") removeMemberAndUpdateGroup(group, member)} } }
private def removeMemberAndUpdateGroup(group: GroupMetadata, member: MemberMetadata) { group.remove(member.memberId) group.currentState match {case Dead | Empty => case Stable | CompletingRebalance => maybePrepareRebalance(group) case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId)) } }
“KAFKA是如何处理延时任务的”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联网站,小编将为大家输出更多高质量的实用文章!