Semaphore源码解析

2022/1/18 12:04:06

本文主要是介绍Semaphore源码解析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

目录

  • 1. 简介
  • 2. 小例子
  • 3. 源码解析
  • 3.1 内部类
      • 3.1.1 Sync
      • 3.1.2 NonfairSync
      • 3.1.3 FairSync
  • 3.2 成员变量
  • 3.3. 构造方法
  • 3.4 成员方法
      • 3.4.1 获取资源相关的成员方法
        • 3.4.1.1 acquire()方法
        • 3.4.1.2 acquireSharedInterruptibly(int arg)方法
        • 3.4.1.3 tryAcquireShared(int acquires)方法
        • 3.4.1.4 doAcquireSharedInterruptibly(int arg)成员方法
      • 3.4.2 释放资源相关的成员方法
        • 3.4.2.1 release()方法
        • 3.4.2.2 releaseShared(int arg)方法
        • 3.4.2.3 tryReleaseShared(int releases)方法
        • 3.4.2.4 doReleaseShared()方法
      • 3.4.3 其他成员方法
        • 3.4.3.1 acquireUninterruptibly()方法
        • 3.4.3.2 tryAcquire()方法
        • 3.4.3.3 availablePermits()方法
        • 3.4.3.4 drainPermits()方法
        • 3.4.3.5 reducePermits(int reduction)方法
        • 3.4.3.6 isFair()方法
  • 4. 总结

1. 简介

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,调用acquire方法尝试获取一个许可(如果没有许可则等待,直到有许可出现),调用release释放一个许可。

Semaphore通常用来控制同时访问特定资源线程数量,通过协调各个线程以保证资源的合理应用。

2. 小例子

比如说停车场的例子,停车场的车位有限的,同一时间内可以停的车的最大数量是一定的,当停车场停的车的数量满了的时候,这个时候如果又有车想要停的时候,必须等待停车场中的车离开车位才可以,接下来我们来模拟一下停车场的例子。

// 用Semaphore模拟停车场的例子
package Semaphore;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class Demo01 {
    public static void main(String[] args) {
        // 代表现在停车场有三个车位
        Semaphore semaphore = new Semaphore(3);
        // 现在有5辆车
        int n = 5;
        for (int i = 1; i <= n; i++) {
            new Thread(() -> {
                try {
                    // 车辆尝试获取许可,也就是尝试停车
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "抢到车位");
                    TimeUnit.SECONDS.sleep(2);
                    // 之后离开车位
                    System.out.println(Thread.currentThread().getName() + "离开车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // 离开车位要释放许可
                    semaphore.release();
                }

            }, "线程" + i).start();
        }
    }
}

结果:

在这里插入图片描述

从上面的例子我们可以更加理解Semaphore的作用。Semaphore构造时会初始化固定数量n的许可,调用acquire()会尝试获取一个许可,调用release()会释放一个许可,同一时刻最多只能有n个线程会拿到许可,当没有许可时,只有等到其他线程释放许可才可以拿到许可。

3. 源码解析

3.1 内部类

Semaphore中有三个内部类,分别是SyncNonfairSyncFairSync,其中Sync继承了AQS,用来获取一系列多线程访问共享资源模板NonfairSyncFairSyncSync的子类,其中NonfairSyncFairSync最主要的区别是前者是非公平的获取资源、后者是公平的获取资源,在实现Semaphore的功能时,Sync起着非常重要的作用,继承了AQS,又可以利用多态的特性使用非公平模式或公平模式,非常方便。

3.1.1 Sync

 abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;
	// 设置state
    Sync(int permits) {
        setState(permits);
    }
	// 获取state的值
    final int getPermits() {
        return getState();
    }
	// 非公平的获取资源(直接尝试和AQS中首部接点的后继节点争抢资源)
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            // 
            // 获取当前state的值
            int available = getState();
            // 获取资源时state的值要减小,这里得到减小后state的值
            int remaining = available - acquires;
            // 如果减小后的state的值小于0直接返回减小后state的值,否则设置state的值
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
	// 尝试释放资源
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            // 获取当前state的值
            int current = getState();
            // 释放资源state的值要增加,获得释放资源后state的值
            int next = current + releases;
            // 这里说明释放资源时传的参数为负数,非法抛出异常
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            // CAS操作设置state的值
            if (compareAndSetState(current, next))
                return true;
        }
    }
	// 减少许可
    final void reducePermits(int reductions) {
        for (;;) {
            // 获取当前state的值
            int current = getState();
            // 获取减小许可之后state的值
            int next = current - reductions;
            // 如果next > current说明传入的参数为负,抛出异常
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            // 原子设置state的值
            if (compareAndSetState(current, next))
                return;
        }
    }
	// 销毁许可(清空许可)
    final int drainPermits() {
        for (;;) {
            // 获取当前state的值
            int current = getState();
            // 当前state的值为0直接返回state的值,否则原子更新state的值为0再返回state的值(也就是0)
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}

3.1.2 NonfairSync

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;
	// 调用父类Sync的构造方法
    NonfairSync(int permits) {
        super(permits);
    }
	// 非公平的获取资源,这个方法的实现其实父类Sync已经实现了,所以直接调用即可
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

3.1.3 FairSync

static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;
	// 调用父类的构造方法
    FairSync(int permits) {
        super(permits);
    }
	// 公平的获取资源
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 判断AQS队列中1. 头部结点的后继节点是否为空或者2. 头部结点的后继节点是否不是当前线程
            // 如果阻塞队列不为空直接返回-1
            if (hasQueuedPredecessors())
                return -1;
            // 执行到这里说明阻塞队列为空或者当前线程是AQS中的head的后继节点,那当前线程就可以尝试获取资源了
            // 得到当前state的值
            int available = getState();
            // 得到获取资源后state的值
            int remaining = available - acquires;
            // 得到资源后state的值小于0直接返回state的值,否则CAS更新state的值再返回state的值
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}
// 这个方法位于AQS中
// 1. 阻塞队列为空或者2. 当前正在执行线程的后继节点不是当前线程返回true
// 否则返回false
public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

3.2 成员变量

// SemaphoreSemaphore中的成员变量

// 记录版本号
private static final long serialVersionUID = -3222578661600680210L;
// SemaphoreSemaphore内部中最重要的成员变量便是Sync变量也是除了版本变量外的唯一一个变量
private final Sync sync;

3.3. 构造方法

// 只传入一个int类型参数的构造方法采用的是非公平竞争资源的方式
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
// 传入一个int类型参数和一个boolean类型参数的构造方法可以指定竞争资源的方式是公平方式还是非公平方式
// 传入true代表是公平方式传入false代表是公平方式
public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

3.4 成员方法

3.4.1 获取资源相关的成员方法

3.4.1.1 acquire()方法

获取资源方法

// 获取一个资源(许可),里面调用acquireSharedInterruptibly(int permits)方法
public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
// 获取指定数量的资源(许可),里面调用acquireSharedInterruptibly(int permits)方法
public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

3.4.1.2 acquireSharedInterruptibly(int arg)方法

获取资源模板

// acquireSharedInterruptibly(int arg)方法位于AQS中
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    // 有中断就抛出中断异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 尝试获取共享资源,失败返回小于0的数,之后调用doAcquireSharedInterruptibly(arg)方法
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

3.4.1.3 tryAcquireShared(int acquires)方法

尝试获取资源

// 这个是内部类FairSync的成员方法,公平竞争资源
protected int tryAcquireShared(int acquires) {
    for (;;) {
        // 判断阻塞队列是否有等待者线程,有的话直接返回-1结束方法
        if (hasQueuedPredecessors())
            return -1;
        // 没有等待者线程就尝试获取资源
        // 得到state的值
        int available = getState();
        // 得到获取资源后state的值
        int remaining = available - acquires;
        // state小于0说明获取资源失败直接返回state的值否则CAS更新state的值并返回state
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
// 这个是内部类NonfairSync的成员方法,它会调用父类Sync的成员方法nonfairTryAcquireShared(acquires)方法,非公平竞争资源
protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
// 这个是内部类Sync的成员方法
final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                // 如果是非公平竞争方式,就不用看有没有等待者线程了,直接尝试获取资源
                // 得到当前state的值
                int available = getState();
                // 得到获取资源后state的值
                int remaining = available - acquires;
                // 获取资源后state值小于0直接返回state的值否则CAS更新state的值并返回state
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

3.4.1.4 doAcquireSharedInterruptibly(int arg)成员方法

如果获取资源失败返回一个小于0的数就会执行这个方法

// 如果获取资源失败返回一个小于0的数就会执行这个方法,这个方法位于AQS中
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 将当前线程包装成Node结点加入AQS队列中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
            	// 如果当前节点是head结点的后继节点则当前线程可以继续尝试获取资源
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                	// 获取资源成功就设置新的头结点之后清空原来的头结点就直接返回
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 当前节点不是head结点的后继节点判断是否应该挂起,应该挂起就把当前线程挂起(等待状态)
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

3.4.2 释放资源相关的成员方法

3.4.2.1 release()方法

// 这两个方法都位于Semaphore中,会调用内部类Sync的成员方法releaseShared(arg)
public void release() {
    sync.releaseShared(1);
}
public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

3.4.2.2 releaseShared(int arg)方法

释放资源模板

// 该方法位于AQS中
public final boolean releaseShared(int arg) {
    // 尝试释放资源,释放资源成功,调用doReleaseShared()方法
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

3.4.2.3 tryReleaseShared(int releases)方法

尝试释放资源

// 该方法位于Semaphore的内部类Sync中
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // 获取当前state的值
        // 释放资源state值要增加
        int current = getState();
        // 得到释放资源后state的值
        int next = current + releases;
        // next < current说明传入参数是负数,非法,抛出异常
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        // 尝试CAS更新state的值,更新成功返回true
        if (compareAndSetState(current, next))
            return true;
    }
}

3.4.2.4 doReleaseShared()方法

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 当前节点的状态是唤醒后继节点
            if (ws == Node.SIGNAL) {
                // 更新头结点状态为0(默认状态),更新失败自旋,更新成功唤醒后继结点
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 唤醒head的后继节点
                unparkSuccessor(h);
            }
            // 当前头结点状态是0(默认状态)就更新当前头结点状态为共享锁需要无条件的传播(更新失败自旋)
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

3.4.3 其他成员方法

3.4.3.1 acquireUninterruptibly()方法

非中断获取资源,获取资源失败进入AQS队列排队

public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

3.4.3.2 tryAcquire()方法

直接获取资源,会调用非公平竞争获取资源方法,直接竞争,返回的参数大于等于0代表获取资源成功

// 直接尝试获取资源
public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}
// 在一定时间内获取资源
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }

3.4.3.3 availablePermits()方法

得到当前许可的值

public int availablePermits() {
    return sync.getPermits();
}

3.4.3.4 drainPermits()方法

销毁(清空许可,调用内部类Sync的drainPermits()方法

public int drainPermits() {
    return sync.drainPermits();
}

3.4.3.5 reducePermits(int reduction)方法

减少指定数量许可,调用内部类Sync的reducePermits(reduction)方法

protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}

3.4.3.6 isFair()方法

判断竞争资源方式是否是公平的

public boolean isFair() {
    return sync instanceof FairSync;
}

4. 总结

  • Semaphore(信号量)是用来控制同时访问特定资源的线程数量,一般用于限流场景
  • 内部基于AQS共享方式实现,基于state的值控制许可数量。
  • acquire()获取一个许可,state的值减1release()释放一个许可,state的值加1
  • 当没有许可时,如果还有线程尝试获取许可,会陷入阻塞,直到获取许可的线程释放许可。


这篇关于Semaphore源码解析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程