好长时间木有写java,怕忘光了😄,今天抽空翻翻源码做些总结。总的来说实现逻辑还是比较简单清晰的。
创新互联公司专注于德兴企业网站建设,响应式网站,成都商城网站开发。德兴网站建设公司,为德兴等地区提供建站服务。全流程按需求定制开发,专业设计,全程项目跟踪,创新互联公司专业和态度为您提供的服务实现 1. 架构图ThreadPoolExecutor 中有维护了队列,和Worker(对应一个线程)的池子,提交的任务会交给Worker执行。
2. 线程池属性corePoolSize
: 核心线程(即开启了就会常驻的线程)的数量
workQueue
: 提交任务的队列(当核心池用完就会先放在队列里面)
maximumPoolSize
: 线程池大的size,如果队列里放不下,就会接着起线程运行任务
RejectedExecutionHandler
: 超过线程池大的size,队列也放不下就开始执行Rejected逻辑
keepAliveTime
: 队列消费完数据后,线程大的keepAlive时间 ,默认allowCoreThreadTimeOut 是false,只会清理maxpool的线程。
控制
ctl
: 记录线程池生命周期的状态位和运行的线程个数 ,高三位记录状态,低位记录Worker个数。通过乐观锁的方式控制线程池(查询线程个数的方法workerCountOf和workQueue中有些混淆,前者是运行的线程数,后者是提交上来存放任务)
private static final int RUNNING = -1<< COUNT_BITS; //运行中 private static final int SHUTDOWN = 0<< COUNT_BITS; //对应shutdown()方法,队列和Worker现有的方法会继续运行,清理掉空闲的Worker(是否空闲看能否持有到Woker的锁),甚至还能添加Worker private static final int STOP = 1<< COUNT_BITS; //对应shutdownNow()方法,所有的Worker都会发interrupt信号(不会保证线程被释放掉),队列也会清空 private static final int TIDYING = 2<< COUNT_BITS; //当所有的任务到停掉,ctl记录的任务数为0 private static final int TERMINATED = 3<< COUNT_BITS; //线程池正式停掉。TiDYING状态后执行terminated()(空方法)变成TERMINATED状态4.提交流程:
int c = ctl.get(); if (workerCountOf(c)< corePoolSize) { //当前Worker数小于corePoolSize时候,会创建个新的Worker if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) {//corePool的线程已经起完了,就会将其提交到队列中,由Worker消费 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); //重新检查状态,如果池子如果不是运行中,且task未来的及消费,发起拒绝策略 else if (workerCountOf(recheck) == 0) // 如果从队列中删除未成功,而且Worker数等于0 addWorker(null, false); //加个Worker把这个任务消化掉 } else if (!addWorker(command, false)) //如果队列也满了,会向maximumPool增加新的Worker,去运行线程 reject(command); //maximumPool也装不下触发拒绝策略5.创建Worker的逻辑
5.1 Check是否可以添加Worker
根据上面提交流程
我们知道:
判断是否需要添加Worker由ctl中保存的Worker数决定是否addWorkder,但是多个线程之间会有竞争,进入addWorker方法时,是会存在池子已经满了的情况。
这个时候会有个乐观锁的方式,重试检查线程池状态和Worker数量,保证其他提交线程的性能不会受影响:
如果池子真的满了只能通知调用者,addWorker失败,进行其他策略
如果池子未满,就可以创建一个新的Worker
private boolean addWorker(Runnable firstTask, boolean core) {//bool core 表示核心池还是maxium池 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
5.2 开始添加Worker
这里没啥好说的,创建个Worker并放到Woker的池子中,
这里用到互斥锁,不过锁的粒度很小,只会是在加入池子的时候上锁
入池子前还是会检查池子的状态
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs< SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s >largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); }6. Woker 介绍
Worker 对应线程池里的一个工作线程
持续从队列里获取任务执行
boolean timed = allowCoreThreadTimeOut || wc >corePoolSize; if ((wc >maximumPoolSize || (timed && timedOut)) && (wc >1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }7.Worker生命周期
创建: 在上文有介绍
会根据线程池的的状态自行结束线程
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt();
如果是异常退出,会退出后会重新起一个新的woker(注意这个一般代码的逻辑异常不会导致Worker异常,会被封装到submit方法返回的future中,jdk留了空方法可以监听到这种异常)
int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } // completedAbruptly = true 会被任务是异常退出 addWorker(null, false); }
maxium池子的空闲Worker会被释放
// Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc >corePoolSize; if ((wc >maximumPoolSize || (timed && timedOut)) && (wc >1 || workQueue.isEmpty())) { //判断超时 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }
被主动释放(看shutdownNow()方法)
void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }
Q1: 在创建线程池时,在ThreadFactory接口中指定setUncaughtExceptionHandler
之后,如果线程池中的任务抛异常,是否会被UncaughtExceptionHandler捕获到
不会,在线程池的实现中,Worker的会一直持有申请到的线程,不会处理提交任务的异常,异常可以在submit 方法返回的future上获取到。
Q2:showdownNow()执行之后线程就一定都执行完嘛
只是发interrupt 信号,看上文介绍只有TYDING ,TERMINATED状态,所有任务才会执行完。一定要保持代码的健壮性,控制业务代码的生命周期。
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧