分析Java并發(fā)編程之信號(hào)量Semaphore
Semaphore 的使用場(chǎng)景主要用于流量控制,比如數(shù)據(jù)庫(kù)連接,同時(shí)使用的數(shù)據(jù)庫(kù)連接會(huì)有數(shù)量限制,數(shù)據(jù)庫(kù)連接不能超過(guò)一定的數(shù)量,當(dāng)連接到達(dá)了限制數(shù)量后,后面的線(xiàn)程只能排隊(duì)等前面的線(xiàn)程釋放數(shù)據(jù)庫(kù)連接后才能獲得數(shù)據(jù)庫(kù)連接。
再比如交通公路上的紅綠燈,綠燈亮起時(shí)只能讓 100 輛車(chē)通過(guò),紅燈亮起不允許車(chē)輛通過(guò)。
再比如停車(chē)場(chǎng)的場(chǎng)景中,一個(gè)停車(chē)場(chǎng)有有限數(shù)量的車(chē)位,同時(shí)能夠容納多少臺(tái)車(chē),車(chē)位滿(mǎn)了之后只有等里面的車(chē)離開(kāi)停車(chē)場(chǎng)外面的車(chē)才可以進(jìn)入。
1.2、Semaphore 使用下面我們就來(lái)模擬一下停車(chē)場(chǎng)的業(yè)務(wù)場(chǎng)景:在進(jìn)入停車(chē)場(chǎng)之前會(huì)有一個(gè)提示牌,上面顯示著停車(chē)位還有多少,當(dāng)車(chē)位為 0 時(shí),不能進(jìn)入停車(chē)場(chǎng),當(dāng)車(chē)位不為 0 時(shí),才會(huì)允許車(chē)輛進(jìn)入停車(chē)場(chǎng)。所以停車(chē)場(chǎng)有幾個(gè)關(guān)鍵因素:停車(chē)場(chǎng)車(chē)位的總?cè)萘浚?dāng)一輛車(chē)進(jìn)入時(shí),停車(chē)場(chǎng)車(chē)位的總?cè)萘?- 1,當(dāng)一輛車(chē)離開(kāi)時(shí),總?cè)萘?+ 1,停車(chē)場(chǎng)車(chē)位不足時(shí),車(chē)輛只能在停車(chē)場(chǎng)外等待。
public class CarParking { private static Semaphore semaphore = new Semaphore(10); public static void main(String[] args){for(int i = 0;i< 100;i++){ Thread thread = new Thread(new Runnable() {@Overridepublic void run() { System.out.println('歡迎 ' + Thread.currentThread().getName() + ' 來(lái)到停車(chē)場(chǎng)'); // 判斷是否允許停車(chē) if(semaphore.availablePermits() == 0) {System.out.println('車(chē)位不足,請(qǐng)耐心等待'); } try {// 嘗試獲取semaphore.acquire();System.out.println(Thread.currentThread().getName() + ' 進(jìn)入停車(chē)場(chǎng)');Thread.sleep(new Random().nextInt(10000));// 模擬車(chē)輛在停車(chē)場(chǎng)停留的時(shí)間System.out.println(Thread.currentThread().getName() + ' 駛出停車(chē)場(chǎng)');semaphore.release(); } catch (InterruptedException e) {e.printStackTrace(); }} }, i + '號(hào)車(chē)'); thread.start();} }}
在上面這段代碼中,我們給出了 Semaphore 的初始容量,也就是只有 10 個(gè)車(chē)位,我們用這 10 個(gè)車(chē)位來(lái)控制 100 輛車(chē)的流量,所以結(jié)果和我們預(yù)想的很相似,即大部分車(chē)都在等待狀態(tài)。但是同時(shí)仍允許一些車(chē)駛?cè)胪\?chē)場(chǎng),駛?cè)胪\?chē)場(chǎng)的車(chē)輛,就會(huì) semaphore.acquire 占用一個(gè)車(chē)位,駛出停車(chē)場(chǎng)時(shí),就會(huì) semaphore.release 讓出一個(gè)車(chē)位,讓后面的車(chē)再次駛?cè)搿?/p>1.3、Semaphore 信號(hào)量的模型
上面代碼雖然比較簡(jiǎn)單,但是卻能讓我們了解到一個(gè)信號(hào)量模型的五臟六腑。下面是一個(gè)信號(hào)量的模型:
來(lái)解釋一下 Semaphore ,Semaphore 有一個(gè)初始容量,這個(gè)初始容量就是 Semaphore 所能夠允許的信號(hào)量。在調(diào)用 Semaphore 中的 acquire 方法后,Semaphore 的容量 -1,相對(duì)的在調(diào)用 release 方法后,Semaphore 的容量 + 1,在這個(gè)過(guò)程中,計(jì)數(shù)器一直在監(jiān)控 Semaphore 數(shù)量的變化,等到流量超過(guò) Semaphore 的容量后,多余的流量就會(huì)放入等待隊(duì)列中進(jìn)行排隊(duì)等待。等到 Semaphore 的容量允許后,方可重新進(jìn)入。
Semaphore 所控制的流量其實(shí)就是一個(gè)個(gè)的線(xiàn)程,因?yàn)椴l(fā)工具最主要的研究對(duì)象就是線(xiàn)程。
它的工作流程如下
這幅圖應(yīng)該很好理解吧,這里就不再過(guò)多解釋啦。
二、Semaphore 深入理解在了解 Semaphore 的基本使用和 Semaphore 的模型后,下面我們還是得從源碼來(lái)和你聊一聊 Semaphore 的種種細(xì)節(jié)問(wèn)題,因?yàn)槲覍?xiě)文章最核心的東西就是想讓我的讀者 了解 xxx,看這一篇就夠了,這是我寫(xiě)文章的追求,好了話(huà)不多說(shuō),源碼走起來(lái)!
2.1、Semaphore 基本屬性Semaphore 中只有一個(gè)屬性
private final Sync sync;
Sync 是 Semaphore 的同步實(shí)現(xiàn),Semaphore 保證線(xiàn)程安全性的方式和 ReentrantLock 、CountDownLatch 類(lèi)似,都是繼承于 AQS 的實(shí)現(xiàn)。同樣的,這個(gè) Sync 也是繼承于 AbstractQueuedSynchronizer 的一個(gè)變量,也就是說(shuō),聊 Semaphore 也繞不開(kāi) AQS,所以說(shuō) AQS 真的太重要了。
2.2、Semaphore 的公平性和非公平性那么我們進(jìn)入 Sync 內(nèi)部看看它實(shí)現(xiàn)了哪些方法
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L; Sync(int permits) { setState(permits); } final int getPermits() { return getState(); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining))return remaining; } } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflowthrow new Error('Maximum permit count exceeded'); if (compareAndSetState(current, next))return true; } } final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflowthrow new Error('Permit count underflow'); if (compareAndSetState(current, next))return; } } final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0))return current; } }}
首先是 Sync 的初始化,內(nèi)部調(diào)用了 setState 并傳遞了 permits ,我們知道,AQS 中的 State 其實(shí)就是同步狀態(tài)的值,而 Semaphore 的這個(gè) permits 就是代表了許可的數(shù)量。
getPermits 其實(shí)就是調(diào)用了 getState 方法獲取了一下線(xiàn)程同步狀態(tài)值。后面的 nonfairTryAcquireShared 方法其實(shí)是在 Semaphore 中構(gòu)造了 NonfairSync 中的 tryAcquireShared 調(diào)用的
這里需要提及一下什么是 NonfairSync,除了 NonfairSync 是不是還有 FairSync 呢?查閱 JDK 源碼發(fā)現(xiàn)確實(shí)有。
那么這里的 FairSync 和 NonfairSync 都代表了什么?為什么會(huì)有這兩個(gè)類(lèi)呢?
事實(shí)上,Semaphore 就像 ReentrantLock 一樣,也存在“公平”和'不公平'兩種,默認(rèn)情況下 Semaphore 是一種不公平的信號(hào)量
Semaphore 的不公平意味著它不會(huì)保證線(xiàn)程獲得許可的順序,Semaphore 會(huì)在線(xiàn)程等待之前為調(diào)用 acquire 的線(xiàn)程分配一個(gè)許可,擁有這個(gè)許可的線(xiàn)程會(huì)自動(dòng)將自己置于線(xiàn)程等待隊(duì)列的頭部。
當(dāng)這個(gè)參數(shù)為 true 時(shí),Semaphore 確保任何調(diào)用 acquire 的方法,都會(huì)按照先入先出的順序來(lái)獲取許可。
final int nonfairTryAcquireShared(int acquires) { for (;;) { // 獲取同步狀態(tài)值 int available = getState(); // state 的值 - 當(dāng)前線(xiàn)程需要獲取的信號(hào)量(通常默認(rèn)是 -1),只有 // remaining > 0 才表示可以獲取。 int remaining = available - acquires; // 先判斷是否小于 0 ,如果小于 0 則表示無(wú)法獲取,如果是正數(shù) // 就需要使用 CAS 判斷內(nèi)存值和同步狀態(tài)值是否一致,然后更新為同步狀態(tài)值 - 1 if (remaining < 0 ||compareAndSetState(available, remaining)) return remaining; }}
從上面這幅源碼對(duì)比圖可以看到,NonfairSync 和 FairSync 最大的區(qū)別就在于 tryAcquireShared 方法的區(qū)別。
NonfairSync 版本中,是不會(huì)管當(dāng)前等待隊(duì)列中是否有排隊(duì)許可的,它會(huì)直接判斷信號(hào)許可量和 CAS 方法的可行性。
FairSync 版本中,它首先會(huì)判斷是否有許可進(jìn)行排隊(duì),如果有的話(huà)就直接獲取失敗。
這時(shí)候可能就會(huì)有讀者問(wèn)了,你上面說(shuō)公平性和非公平性的區(qū)別一直針對(duì)的是 acquire 方法來(lái)說(shuō)的,怎么現(xiàn)在他們兩個(gè)主要的區(qū)別在于 tryAcquireShared 方法呢?
別急,讓我們進(jìn)入到 acquire 方法一探究竟
可以看到,在 acquire 方法中,會(huì)調(diào)用 tryAcquireShared 方法,根據(jù)其返回值判斷是否調(diào)用 doAcquireSharedInterruptibly 方法。
這里需要注意下,acquire 方法具有阻塞性,而 tryAcquire 方法不具有阻塞性。
這也就是說(shuō),調(diào)用 acquire 方法如果獲取不到許可,那么 Semaphore 會(huì)阻塞,直到有可用的許可。而 tryAcquire 方法如果獲取不到許可會(huì)直接返回 false。
這里還需要注意下 acquireUninterruptibly 方法,其他 acquire 的相關(guān)方法要么是非阻塞,要么是阻塞可中斷,而 acquireUninterruptibly 方法不僅在沒(méi)有許可的情況下執(zhí)著的等待,而且也不會(huì)中斷,使用這個(gè)方法時(shí)需要注意,這個(gè)方法很容易在出現(xiàn)大規(guī)模線(xiàn)程阻塞而導(dǎo)致 Java 進(jìn)程出現(xiàn)假死的情況。
有獲取許可相對(duì)應(yīng)的就有釋放許可,但是釋放許可不會(huì)區(qū)分到底是公平釋放還是非公平釋放。不管方式如何都是釋放一個(gè)許可給 Semaphore ,同樣的 Semaphore 中的許可數(shù)量會(huì)增加。
在上圖中調(diào)用 tryReleaseShared 判斷是否能進(jìn)行釋放后,再會(huì)調(diào)用 AQS 中的 releasedShared 方法進(jìn)行釋放。
上面這個(gè)釋放流程只是釋放一個(gè)許可,除此之外,還可以釋放多個(gè)許可
public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits);}
后面這個(gè) releaseShared 的釋放流程和上面的釋放流程一致。
2.3、其他 Semaphore 方法除了上面基本的 acquire 和 release 相關(guān)方法外,我們也要了解一下 Semaphore 的其他方法。Semaphore 的其他方法比較少,只有下面這幾個(gè):
drainPermits : 獲取并退還所有立即可用的許可,其實(shí)相當(dāng)于使用 CAS 方法把內(nèi)存值置為 0 reducePermits:和 nonfairTryAcquireShared 方法類(lèi)似,只不過(guò) nonfairTryAcquireShared 是使用 CAS 使內(nèi)存值 + 1,而 reducePermits 是使內(nèi)存值 - 1 。 isFair:對(duì) Semaphore 許可的爭(zhēng)奪是采用公平還是非公平的方式,對(duì)應(yīng)到內(nèi)部的實(shí)現(xiàn)就是 FairSync 和 NonfairSync。 hasQueuedThreads:當(dāng)前是否有線(xiàn)程由于要獲取 Semaphore 許可而進(jìn)入阻塞。 getQueuedThreads:返回一個(gè)包含了等待獲取許可的線(xiàn)程集合。 getQueueLength:獲取正在排隊(duì)而進(jìn)入阻塞狀態(tài)的線(xiàn)程個(gè)數(shù)以上就是分析Java并發(fā)編程之信號(hào)量Semaphore的詳細(xì)內(nèi)容,更多關(guān)于Java并發(fā)編程 信號(hào)量Semaphore的資料請(qǐng)關(guān)注好吧啦網(wǎng)其它相關(guān)文章!
相關(guān)文章:
1. ASP.NET MVC實(shí)現(xiàn)城市或車(chē)型三級(jí)聯(lián)動(dòng)2. IntelliJ IDEA設(shè)置條件斷點(diǎn)的方法步驟3. PHP數(shù)組array類(lèi)常見(jiàn)操作示例4. 淺談Java HttpURLConnection請(qǐng)求方式5. 永久解決 Intellij idea 報(bào)錯(cuò):Error :java 不支持發(fā)行版本5的問(wèn)題6. Eclipse XSD 生成枚舉類(lèi)型的Schema的實(shí)例詳解7. PHP中常用的函數(shù)庫(kù)和一些小技巧8. JavaScript內(nèi)置對(duì)象之Array的使用小結(jié)9. Java正則表達(dá)式實(shí)現(xiàn)經(jīng)緯度的合法性操作10. python實(shí)現(xiàn)ftp文件傳輸系統(tǒng)(案例分析)
