Java线程池源码分析

2021/10/22 9:11:34

本文主要是介绍Java线程池源码分析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

Java线程池,基于jdk1.8

一些属性

//线程数量和线程池状态  高三位是状态  低29位是数量
	private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
	//位移位数  29
    private static final int COUNT_BITS = Integer.SIZE - 3;
	//容量  2的29次方-1   00011111 11111111 11111111 11111111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 线程池状态标记
	// 11100000 00000000 00000000 00000000
    private static final int RUNNING    = -1 << COUNT_BITS;
	// 00000000 00000000 00000000 00000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
	// 00100000 00000000 00000000 00000000
    private static final int STOP       =  1 << COUNT_BITS;
	// 01000000 00000000 00000000 00000000
    private static final int TIDYING    =  2 << COUNT_BITS;
	// 01100000 00000000 00000000 00000000
    private static final int TERMINATED =  3 << COUNT_BITS;
	
	// 位运算 获取当前线程池的状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 位运算 获取当前线程池的线程数
	private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
	
	private final BlockingQueue<Runnable> workQueue;
	private final ReentrantLock mainLock = new ReentrantLock();
	private final HashSet<Worker> workers = new HashSet<Worker>();
	//是否 允许核心线程超时
	private volatile boolean allowCoreThreadTimeOut;
	
	private int largestPoolSize;

构造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
	//参数依次是  核心线程数  最大线程数  线程存活时间  线程存活时间的单位  阻塞队列  线程工厂  拒绝策略
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

提交任务

public void execute(Runnable command) {
		//任务为空,抛异常
        if (command == null)
            throw new NullPointerException();
		//线程计数
		int c = ctl.get();
		//当前线程池的线程数 小于 核心线程数
		if (workerCountOf(c) < corePoolSize) {
			// 添加线程
			if (addWorker(command, true))
				return;
			//获取最新的线程数
			c = ctl.get();
		}
		//在运行中并且可以将任务添加到阻塞队列(阻塞队列未满)
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 线程池不是运行中 并且 将阻塞队列 里面的线程 移除
			if (! isRunning(recheck) && remove(command))
                //执行拒绝
				reject(command);
			//如果线程池中没有线程了,则创建一个线程执行任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
		// 阻塞队列满了,添加线程失败(达到最大线程数) 执行拒绝
        else if (!addWorker(command, false))
            reject(command);
    }
	
	private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
	//添加线程方法	当前任务  是否核心线程
	private boolean addWorker(Runnable firstTask, boolean core) {
        retry://跳出标志
        for (;;) {
            int c = ctl.get();
			//获取线程池状态
            int rs = runStateOf(c);

            // 检查阻塞队列是否为空  阻塞队列不为空,说明里面有任务,说明线程数已经达到核心线程数的限制
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
				// 线程数 大于等于 最大线程容量  或者 线程数大于等于 核心线程数/最大线程数 不再添加线程
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
				//增加线程数 跳出循环,执行flag处的语句
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                //线程池状态改变了,再次进行循环
				if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
		// flag		添加的线程 启动 和 添加成功标记
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
			//将当前任务封装成一个Worker线程
            w = new Worker(firstTask);
			//工作线程
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();//加锁
                try {
					//
                    int rs = runStateOf(ctl.get());

					// 当前线程池是运行状态  || 是SHUTDOWN并且当前任务为空
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 新创建的线程 已经存活  抛异常 刚创建还没启动怎么就启动了
						if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                        //添加到工作线程集合
						workers.add(w);
						// 判断工作线程数是否大于醉倒线程数 出现过的最大线程数
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
						//标记添加线程成功
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
				// 添加线程成功 启动 设置启动标志
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
			//添加工作线程失败 将Worker从Worker集合中删除,并且减少工作线程数  因为上面对工作线程数+1了
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
	
	private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
	
	private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
	
	private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }
	//执行任务
	final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 允许中断
        boolean completedAbruptly = true;
        try {
			//当前任务不为空或者可以从阻塞队列里面获取到任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                //(线程池状态为stop || (线程中断了 并且 线程池准备stop)) && 当前线程没有中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();//中断线程
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//执行任务
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
					//置空任务 任务完成数加1 
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
			//获取不到任务了
            completedAbruptly = false;
        } finally {
			//销毁工作线程
            processWorkerExit(w, completedAbruptly);
        }
    }
	//获取任务方法
	private Runnable getTask() {
        boolean timedOut = false; 

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 阻塞队列为空
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 允许核心线程超时 或者 当前线程数大于核心线程数(有非核心线程)
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
				//获取任务 
                Runnable r = timed ?
					//允许核心线程超时 或者 有非核心线程 达到存活时间没有获取到任务也会返回 
					//再次循环 上面的判断队列为空的条件 会让方法返回空
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();//这里会阻塞 保持核心线程
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
	
	private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }
	
	private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }
	
	private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
			//增加完成任务数量
            completedTaskCount += w.completedTasks;
			//移除Worker
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

		//终止线程
        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
			//检查是不是因为获取不到任务 进入该方法的
            if (!completedAbruptly) {
				//线程获取不到任务,就是该消亡线程了
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
			//给线程池保存一个工作线程
            addWorker(null, false);
        }
    }
	
	final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
	
//内部类
	private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {

        // 工作线程
        final Thread thread;
        // 工作任务
        Runnable firstTask;
        // 完成的任务数量
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // aqs的状态 防止中断
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        // 执行任务  活跃线程执行的任务在这里
        public void run() {
            runWorker(this);
        }

        // 0 未上锁, 1 上锁了
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }


这篇关于Java线程池源码分析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程