资讯

精准传达 • 有效沟通

从品牌网站建设到网络营销策划,从策略到执行的一站式服务

JDK学习笔记-线程池的实现-创新互联

前言

好长时间木有写java,怕忘光了😄,今天抽空翻翻源码做些总结。总的来说实现逻辑还是比较简单清晰的。

创新互联公司专注于德兴企业网站建设,响应式网站,成都商城网站开发。德兴网站建设公司,为德兴等地区提供建站服务。全流程按需求定制开发,专业设计,全程项目跟踪,创新互联公司专业和态度为您提供的服务实现 1. 架构图

ThreadPoolExecutor 中有维护了队列,和Worker(对应一个线程)的池子,提交的任务会交给Worker执行。

2. 线程池属性

corePoolSize: 核心线程(即开启了就会常驻的线程)的数量

workQueue: 提交任务的队列(当核心池用完就会先放在队列里面)

maximumPoolSize: 线程池大的size,如果队列里放不下,就会接着起线程运行任务

RejectedExecutionHandler: 超过线程池大的size,队列也放不下就开始执行Rejected逻辑

keepAliveTime: 队列消费完数据后,线程大的keepAlive时间 ,默认allowCoreThreadTimeOut 是false,只会清理maxpool的线程。

控制

ctl: 记录线程池生命周期的状态位和运行的线程个数 ,高三位记录状态,低位记录Worker个数。通过乐观锁的方式控制线程池(查询线程个数的方法workerCountOf和workQueue中有些混淆,前者是运行的线程数,后者是提交上来存放任务)

3.线程池状态
private static final int RUNNING    = -1<< COUNT_BITS;  //运行中
​
private static final int SHUTDOWN   =  0<< COUNT_BITS;  //对应shutdown()方法,队列和Worker现有的方法会继续运行,清理掉空闲的Worker(是否空闲看能否持有到Woker的锁),甚至还能添加Worker
​
private static final int STOP       =  1<< COUNT_BITS;  //对应shutdownNow()方法,所有的Worker都会发interrupt信号(不会保证线程被释放掉),队列也会清空
​
private static final int TIDYING    =  2<< COUNT_BITS;  //当所有的任务到停掉,ctl记录的任务数为0
​
private static final int TERMINATED =  3<< COUNT_BITS;  //线程池正式停掉。TiDYING状态后执行terminated()(空方法)变成TERMINATED状态

4.提交流程:
  int c = ctl.get();
        if (workerCountOf(c)< corePoolSize) {
           //当前Worker数小于corePoolSize时候,会创建个新的Worker
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {//corePool的线程已经起完了,就会将其提交到队列中,由Worker消费
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command)) 
                reject(command); //重新检查状态,如果池子如果不是运行中,且task未来的及消费,发起拒绝策略
            else if (workerCountOf(recheck) == 0) // 如果从队列中删除未成功,而且Worker数等于0
                addWorker(null, false); //加个Worker把这个任务消化掉
        }
        else if (!addWorker(command, false)) //如果队列也满了,会向maximumPool增加新的Worker,去运行线程
            reject(command); //maximumPool也装不下触发拒绝策略

5.创建Worker的逻辑

5.1 Check是否可以添加Worker

根据上面提交流程我们知道:

判断是否需要添加Worker由ctl中保存的Worker数决定是否addWorkder,但是多个线程之间会有竞争,进入addWorker方法时,是会存在池子已经满了的情况。

这个时候会有个乐观锁的方式,重试检查线程池状态和Worker数量,保证其他提交线程的性能不会受影响:

  • 如果池子真的满了只能通知调用者,addWorker失败,进行其他策略

  • 如果池子未满,就可以创建一个新的Worker

private boolean addWorker(Runnable firstTask, boolean core) {//bool core 表示核心池还是maxium池
        retry:
        for (;;) {
          
            int c = ctl.get();
            int rs = runStateOf(c);
​
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
​
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

5.2 开始添加Worker

这里没啥好说的,创建个Worker并放到Woker的池子中,

  • 这里用到互斥锁,不过锁的粒度很小,只会是在加入池子的时候上锁

  • 入池子前还是会检查池子的状态

  boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
​
                    if (rs< SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s >largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }

6. Woker 介绍

Worker 对应线程池里的一个工作线程

持续从队列里获取任务执行

 boolean timed = allowCoreThreadTimeOut || wc >corePoolSize;
​
            if ((wc >maximumPoolSize || (timed && timedOut))
                && (wc >1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
​
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }

7.Worker生命周期
  • 创建: 在上文有介绍

  • 会根据线程池的的状态自行结束线程

if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                    
  • 如果是异常退出,会退出后会重新起一个新的woker(注意这个一般代码的逻辑异常不会导致Worker异常,会被封装到submit方法返回的future中,jdk留了空方法可以监听到这种异常)

  int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
  //        completedAbruptly = true 会被任务是异常退出
            addWorker(null, false);
        }
  • maxium池子的空闲Worker会被释放

// Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc >corePoolSize;
​
            if ((wc >maximumPoolSize || (timed && timedOut))
                && (wc >1 || workQueue.isEmpty())) { //判断超时
                if (compareAndDecrementWorkerCount(c))
                    return null; 
                continue;
            }
​
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true; 
            } catch (InterruptedException retry) {
                timedOut = false;
            }
  • 被主动释放(看shutdownNow()方法)

    void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }

使用问题:

Q1: 在创建线程池时,在ThreadFactory接口中指定setUncaughtExceptionHandler之后,如果线程池中的任务抛异常,是否会被UncaughtExceptionHandler捕获到

不会,在线程池的实现中,Worker的会一直持有申请到的线程,不会处理提交任务的异常,异常可以在submit 方法返回的future上获取到。

Q2:showdownNow()执行之后线程就一定都执行完嘛

只是发interrupt 信号,看上文介绍只有TYDING ,TERMINATED状态,所有任务才会执行完。一定要保持代码的健壮性,控制业务代码的生命周期。

你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧


新闻名称:JDK学习笔记-线程池的实现-创新互联
标题路径:http://cdkjz.cn/article/jeese.html
多年建站经验

多一份参考,总有益处

联系快上网,免费获得专属《策划方案》及报价

咨询相关问题或预约面谈,可以通过以下方式与我们联系

业务热线:400-028-6601 / 大客户专线   成都:13518219792   座机:028-86922220