信号量,用来限制能同时访问共享资源的线程上限。
作用: 多个共享资源互斥的使用!并发限流,控制最大的线程数
Semaphore维护了一个许可集,其实就是一定数量的“许可证”。
当有线程想要访问共享资源时,需要先获取(acquire)的许可;如果许可不够了,线程需要一直等待,直到许可可用。当线程使用完共享资源后,可以归还(release)许可,以供其它需要的线程使用。
和CountDownLatch区别
CountDownLatch:
同步状态State > 0
表示资源不可用,所有线程需要等待;State == 0
表示资源可用,所有线程可以同时访问
Semaphore:
剩余许可数 < 0
表示没有许可数了共享资源不可用,所有线程需要等待; 许可剩余数 ≥ 0
表示存在许可数了共享资源可用,所有线程可以同时访问
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <=6; i++) {
new Thread(()->{
//得到
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"抢到车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
},String.valueOf(i)).start();
}
}
限流
-
使用 Semaphore 限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机
线程数量,并且仅是限制线程数,而不是限制资源数(例如连接数,请对比 Tomcat LimitLatch 的实现)当资源数和线程数使用semohore 比较合适比如数据库连接池:一个线程数对应一个线程资源
-
用 Semaphore 实现简单连接池,对比『享元模式』下的实现(用wait notify),性能和可读性显然更好,
注意下面的实现中线程数和数据库连接数是相等的
public class PoolSemaphoreStream {
public static void main(String[] args) {
Pool pool = new Pool(2);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
Connection conn = pool.borrow();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.free(conn);
}).start();
}
}
}
class Pool {
// 1. 连接池大小
private final int poolSize;
// 2. 连接对象数组
private final Connection[] connections;
// 3. 连接状态数组 0 表示空闲, 1 表示繁忙
private final AtomicIntegerArray states;
private final Semaphore semaphore;
// 4. 构造方法初始化
public Pool(int poolSize) {
this.poolSize = poolSize;
// 让许可数与资源数一致
this.semaphore = new Semaphore(poolSize);
this.connections = new Connection[poolSize];
this.states = new AtomicIntegerArray(new int[poolSize]);
try {
Driver driver =new com.mysql.cj.jdbc.Driver();
DriverManager.registerDriver(driver);
for (int i = 0; i < poolSize; i++) {
connections[i] = DriverManager.getConnection("jdbc:mysql://localhost:3306/myemployees","root","123123");;
}
} catch (SQLException e) {
e.printStackTrace();
}
}
// 5. 借连接
public Connection borrow() {// t1, t2, t3
// 获取许可
try {
semaphore.acquire(); // 没有许可的线程,在此等待
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < poolSize; i++) {
// 获取空闲连接
if(states.get(i) == 0) {
if (states.compareAndSet(i, 0, 1)) {
System.out.println(("borrow {}" + connections[i]));
return connections[i];
}
}
}
// 不会执行到这里
return null;
}
// 6. 归还连接
public void free(Connection conn) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == conn) {
states.set(i, 0);
System.out.println(("free {}" + conn));
semaphore.release();
break;
}
}
}
}
单位时间内限流
public class TestController {
private RateLimiter limiter = RateLimiter.create(50);
@GetMapping("/test")
public String test() {
// limiter.acquire();
return "ok";
}
}
原理
semaphore.acquire();
获得,假设如果已经满了,等待,等待被释放为止!semaphore.release();
释放,会将当前的信号量释放 + 1,然后唤醒等待的线程!
Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一 刚开始,permits(state)为 3,这时 5 个线程来获取资源。
假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞
这时 Thread-4 释放了 permits,状态如下
接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态
内部结构
Semaphore 通过内部类实现了AQS框架提供的接口, 内部类分别实现了公平/非公平策略。
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
//可以看出Semaphore分公平与非公平
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
//抽象的Sync继承与AQS
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);//设置Semaphore的许可数
}
final int getPermits() {
return getState();
}
//非公平的尝试获取共享锁
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();//获取可以使用的许可数
int remaining = available - acquires;//剩余许可数
//剩下许可数为<0或者CAS 修改总许可数为剩余许可数成功返回remaining
//如果许可已经用完, 返回负数, 表示获取失败;如果 cas 重试成功, 返回正数, 表示获取成功
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
//尝试释放共享锁
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();//当前许可数
//其他的锁比如countDownLatch 和ReentrantLock这里都是减,因为释放许可的时候需要将使用掉的许可数+releases
//这样其他线程才能获取新的许可
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))//修改当前许可数为新的许可数next
return true;
}
}
//减少许可数
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
// 立即获取所有可用许可证,当许可证数为0或者修改许可证为0的时候返回
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
//非公平许可
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
//调用 Sync的nonfairTryAcquireShared
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
//公平许可
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
//获取共享锁
protected int tryAcquireShared(int acquires) {
for (;;) {
//有前驱节点在同步队列里,说明等待队列里有其他线程正在获取许可,则直接返回-1,体现公平性
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;//剩余许可数
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);//AQS的可中断获取共享锁操作
}
public void release() {
sync.releaseShared(1);//AQS的释放共享锁操作
}
Semaphore 资源就是许可证的数量:
- 剩余许可证数(State值) - 尝试获取的许可数(acquire方法入参) ≥ 0:资源可用
- 剩余许可证数(State值) - 尝试获取的许可数(acquire方法入参) < 0:资源不可用
- 只能限制同时访问资源的线程数,至于对数据一致性的控制,Semaphore是不关心的。如果是只有一个许可的Semaphore,可以当作锁使用。
- Semaphore 的限流是限制线程数,而不是限制资源数当资源数和线程数使用semohore 比较合适比如数据库连接池:一个线程数对应一个线程资源