亚洲全黄无码一级在线看_国产剧情久久久性色_无码av一区二区三区无码_亚洲成a×人片在线观看

當(dāng)前位置: 首頁(yè) > 科技新聞 >

帶你走進(jìn)C#并發(fā)隊(duì)列ConcurrentQueue的內(nèi)部世界

時(shí)間:2020-04-11 15:57來源:網(wǎng)絡(luò)整理 瀏覽:
帶著問題出發(fā)如果是自己實(shí)現(xiàn)一個(gè)簡(jiǎn)單的隊(duì)列功能,我們?cè)撊绾卧O(shè)計(jì)它的存儲(chǔ)結(jié)構(gòu)呢?一般來說有這兩種方式:數(shù)組或者鏈表,先來簡(jiǎn)單分析下。我們都知道,
帶著問題出發(fā)

如果是自己實(shí)現(xiàn)一個(gè)簡(jiǎn)單的隊(duì)列功能,我們?cè)撊绾卧O(shè)計(jì)它的存儲(chǔ)結(jié)構(gòu)呢?一般來說有這兩種方式:數(shù)組或者鏈表,先來簡(jiǎn)單分析下。

帶你走進(jìn)C#并發(fā)隊(duì)列ConcurrentQueue的內(nèi)部世界

我們都知道,數(shù)組是固定空間的集合,意味著初始化的時(shí)候要指定數(shù)組大小,但是隊(duì)列的長(zhǎng)度是隨時(shí)變化的,超出數(shù)組大小了怎么辦?這時(shí)候就必須要對(duì)數(shù)組進(jìn)行擴(kuò)容。問題又來了,擴(kuò)容要擴(kuò)多少呢,少了不夠用多了浪費(fèi)內(nèi)存空間。與之相反的,鏈表是動(dòng)態(tài)空間類型的數(shù)據(jù)結(jié)構(gòu),元素之間通過指針相連,不需要提前分配空間,需要多少分配多少。但隨之而來的問題是,大量的出隊(duì)入隊(duì)操作伴隨著大量對(duì)象的創(chuàng)建銷毀,GC的壓力又變得非常大。

帶你走進(jìn)C#并發(fā)隊(duì)列ConcurrentQueue的內(nèi)部世界

回到主題,要實(shí)現(xiàn)一個(gè)高性能的線程安全隊(duì)列,我們?cè)囍卮鹨韵聠栴}:

存儲(chǔ)結(jié)構(gòu)是怎樣的如何初始化(初始容量給多少比較好?)常用操作(入隊(duì)出隊(duì))如何實(shí)現(xiàn)線程安全是如何保證的帶你走進(jìn)C#并發(fā)隊(duì)列ConcurrentQueue的內(nèi)部世界

存儲(chǔ)結(jié)構(gòu)

通過源碼可以看到ConcurrentQueue采用了數(shù)組+鏈表的組合模式,充分吸收了2種結(jié)構(gòu)的優(yōu)點(diǎn)。

具體來說,它的總體結(jié)構(gòu)是一個(gè)鏈表,鏈表的每個(gè)節(jié)點(diǎn)是一個(gè)包含數(shù)組的特殊對(duì)象,我們稱之為Segment(段或節(jié),原話是a queue is a linked list of small arrays, each node is called a segment.),它里面的數(shù)組是存儲(chǔ)真實(shí)數(shù)據(jù)的地方,容量固定大小是32,每一個(gè)Segment有指向下一個(gè)Segment的的指針,以此形成鏈表結(jié)構(gòu)。而隊(duì)列中維護(hù)了2個(gè)特殊的指針,他們分別指向隊(duì)列的首段(head segment)和尾段(tail segment),他們對(duì)入隊(duì)和出隊(duì)有著重要的作用。用一張圖來解釋隊(duì)列的內(nèi)部結(jié)構(gòu):

帶你走進(jìn)C#并發(fā)隊(duì)列ConcurrentQueue的內(nèi)部世界

嗯,畫圖畫到這里突然聯(lián)想到,搞成雙向鏈表的話是不是就神似B+樹的葉子節(jié)點(diǎn)?技術(shù)就是這么奇妙~

段的核心定義為:

/// <summary>
/// private class for ConcurrentQueue.
/// 鏈表節(jié)點(diǎn)(段)
/// </summary>
private class Segment
{
//實(shí)際存儲(chǔ)數(shù)據(jù)的容器
internal volatile T[] m_array;

//存儲(chǔ)對(duì)應(yīng)位置數(shù)據(jù)的狀態(tài),當(dāng)數(shù)據(jù)的對(duì)應(yīng)狀態(tài)位標(biāo)記為true時(shí)該數(shù)據(jù)才是有效的
internal volatile VolatileBool[] m_state;

//下一段的指針
private volatile Segment m_next;

//當(dāng)前段在隊(duì)列中的索引
internal readonly long m_index;

//兩個(gè)位置指針
private volatile int m_low;
private volatile int m_high;

//所屬的隊(duì)列實(shí)例
private volatile ConcurrentQueue<T> m_source;
}

隊(duì)列的核心定義為:

/// <summary>
/// 線程安全的先進(jìn)先出集合,
/// </summary>
public class ConcurrentQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>
{
//首段
[NonSerialized]
private volatile Segment m_head;

//尾段
[NonSerialized]
private volatile Segment m_tail;

//每一段的大小
private const int SEGMENT_SIZE = 32;

//截取快照的操作數(shù)量
[NonSerialized]
internal volatile int m_numSnapshotTakers = 0;
}


常規(guī)操作

先從初始化一個(gè)隊(duì)列開始看起。

創(chuàng)建隊(duì)列實(shí)例

與普通Queue不同的是,ConcurrentQueue不再支持初始化時(shí)指定隊(duì)列大小(capacity),僅僅提供一個(gè)無參構(gòu)造函數(shù)和一個(gè)IEnumerable<T>參數(shù)的構(gòu)造函數(shù)。

/// <summary>
/// Initializes a new instance of the <see cref="ConcurrentQueue{T}"/> class.
/// </summary>
public ConcurrentQueue()
{
m_head = m_tail = new Segment(0, this);
}

無參構(gòu)造函數(shù)很簡(jiǎn)單,創(chuàng)建了一個(gè)Segment實(shí)例并把首尾指針都指向它,此時(shí)隊(duì)列只包含一個(gè)Segment,它的索引是0,隊(duì)列容量是32。繼續(xù)看一下Segment是如何被初始化的:

/// <summary>
/// Create and initialize a segment with the specified index.
/// </summary>
internal Segment(long index, ConcurrentQueue<T> source)
{
m_array = new T[SEGMENT_SIZE];
m_state = new VolatileBool[SEGMENT_SIZE]; //all initialized to false
m_high = -1;
Contract.Assert(index >= 0);
m_index = index;
m_source = source;
}

Segment只提供了一個(gè)構(gòu)造函數(shù),接受的參數(shù)分別是隊(duì)列索引和隊(duì)列實(shí)例,它創(chuàng)建了一個(gè)長(zhǎng)度為32的數(shù)組,并創(chuàng)建了與之對(duì)應(yīng)的狀態(tài)數(shù)組,然后初始化了位置指針(m_low=0,m_high=-1,此時(shí)表示一個(gè)空的Segment)。到這里,一個(gè)并發(fā)隊(duì)列就創(chuàng)建好了。

使用集合創(chuàng)建隊(duì)列的過程和上面類似,只是多了兩個(gè)步驟:入隊(duì)和擴(kuò)容,下面會(huì)重點(diǎn)描述這兩部分所以這里不再過多介紹。


元素入隊(duì)

先亮出源碼:

/// <summary>
/// Adds an object to the end of the <see cref="ConcurrentQueue{T}"/>.
/// </summary>
/// <param name="item">The object to add to the end of the <see
/// cref="ConcurrentQueue{T}"/>. The value can be a null reference
/// (Nothing in Visual Basic) for reference types.
/// </param>
public void Enqueue(T item)
{
SpinWait spin = new SpinWait();
while (true)
{
Segment tail = m_tail;
if (tail.TryAppend(item))
return;
spin.SpinOnce();
}
}

通過源碼可以看到,入隊(duì)操作是在隊(duì)尾(m_tail)進(jìn)行的,它嘗試在最后一個(gè)Segment中追加指定的元素,如果成功了就直接返回,失敗的話就自旋等待,直到成功為止。那什么情況下會(huì)失敗呢?這就要繼續(xù)看看是如何追加元素的:

internal bool TryAppend(T value)
{
//先判斷一下高位指針有沒有達(dá)到數(shù)組邊界(也就是數(shù)組是否裝滿了)
if (m_high >= SEGMENT_SIZE - 1)
{
return false;
}
int newhigh = SEGMENT_SIZE;
try
{ }
finally
{
//使用原子操作讓高位指針加1
newhigh = Interlocked.Increment(ref m_high);
//如果數(shù)組還有空位
if (newhigh <= SEGMENT_SIZE - 1)
{
//把數(shù)據(jù)放到數(shù)組中,同時(shí)更新狀態(tài)
m_array[newhigh] = value;
m_state[newhigh].m_value = true;
}
//數(shù)組滿了要觸發(fā)擴(kuò)容
if (newhigh == SEGMENT_SIZE - 1)
{
Grow();
}
}
return newhigh <= SEGMENT_SIZE - 1;
}

所以,只有當(dāng)尾段m_tail裝滿的情況下追加元素才會(huì)失敗,這時(shí)候必須要等待下一個(gè)段產(chǎn)生,也就是擴(kuò)容(細(xì)細(xì)品一下Grow這個(gè)詞真的很妙),自旋就是在等擴(kuò)容完成才能有地方放數(shù)據(jù)。而在保存數(shù)據(jù)的時(shí)候,通過原子自增操作保證了同一個(gè)位置只會(huì)有一個(gè)數(shù)據(jù)被寫入,從而實(shí)現(xiàn)了線程安全。

注意:這里的裝滿并不是指數(shù)組每個(gè)位置都有數(shù)據(jù),而是指最后一個(gè)位置已被使用。

繼續(xù)看一下擴(kuò)容是怎么一個(gè)過程:

/// <summary>
/// Create a new segment and append to the current one
/// Update the m_tail pointer
/// This method is called when there is no contention
/// </summary>
internal void Grow()
{
//no CAS is needed, since there is no contention (other threads are blocked, busy waiting)
Segment newSegment = new Segment(m_index + 1, m_source); //m_index is Int64, we don't need to worry about overflow
m_next = newSegment;
Contract.Assert(m_source.m_tail == this);
m_source.m_tail = m_next;
}

在普通隊(duì)列中,擴(kuò)容是通過創(chuàng)建一個(gè)更大的數(shù)組然后把數(shù)據(jù)拷貝過去實(shí)現(xiàn)擴(kuò)容的,這個(gè)操作比較耗時(shí)。而在并發(fā)隊(duì)列中就非常簡(jiǎn)單了,首先創(chuàng)建一個(gè)新Segment,然后把當(dāng)前Segment的next指向它,最后掛到隊(duì)列的末尾去就可以了,全部是指針操作非常高效。而且從代碼注釋中可以看到,這里不會(huì)出現(xiàn)線程競(jìng)爭(zhēng)的情況,因?yàn)槠渌€程都因?yàn)槲恢貌粔虮蛔枞荚谧孕却小?/p>元素出隊(duì)

還是先亮出源碼:

public bool TryDequeue(out T result)
{
while (!IsEmpty)
{
Segment head = m_head;
if (head.TryRemove(out result))
return true;
//since method IsEmpty spins, we don't need to spin in the while loop
}
result = default(T);
return false;
}

可以看到只有在隊(duì)列不為空(IsEmpty==false)的情況下才會(huì)嘗試出隊(duì)操作,而出隊(duì)是在首段上進(jìn)行操作的。關(guān)于如何判斷隊(duì)列是否為空總結(jié)就一句話:當(dāng)首段m_head不包含任何數(shù)據(jù)且沒有下一段的時(shí)候隊(duì)列才為空,詳細(xì)的判斷過程源碼注釋中寫的很清楚,限于篇幅不詳細(xì)介紹。

出隊(duì)的本質(zhì)是從首段中移除低位指針?biāo)赶虻脑?,看一下具體實(shí)現(xiàn)步驟:

internal bool TryRemove(out T result)
{
SpinWait spin = new SpinWait();
int lowLocal = Low, highLocal = High;
//判斷當(dāng)前段是否為空
while (lowLocal <= highLocal)
{
//判斷低位指針位置是否可以移除
if (Interlocked.CompareExchange(ref m_low, lowLocal + 1, lowLocal) == lowLocal)
{
SpinWait spinLocal = new SpinWait();
//判斷元素是否有效
while (!m_state[lowLocal].m_value)
{
spinLocal.SpinOnce();
}
//取出元素
result = m_array[lowLocal];
//釋放引用關(guān)系
if (m_source.m_numSnapshotTakers <= 0)
{
m_array[lowLocal] = default(T);
}
//判斷當(dāng)前段的元素是否全部被移除了,要丟棄它
if (lowLocal + 1 >= SEGMENT_SIZE)
{
spinLocal = new SpinWait();
while (m_next == null)
{
spinLocal.SpinOnce();
}
Contract.Assert(m_source.m_head == this);
m_source.m_head = m_next;
}
return true;
}
else
{
//線程競(jìng)爭(zhēng)失敗,自旋等待并重置
spin.SpinOnce();
lowLocal = Low; highLocal = High;
}
}//end of while
result = default(T);
return false;
}

首先,只有當(dāng)前Segment不為空的情況下才嘗試移除元素,否則就直接返回false。然后通過一個(gè)原子操作Interlocked.CompareExchange判斷當(dāng)前低位指針上是否有其他線程同時(shí)也在移除,如果有那就進(jìn)入自旋等待,沒有的話就從這個(gè)位置取出元素并把低位指針往前推進(jìn)一位。如果當(dāng)前隊(duì)列沒有正在進(jìn)行截取快照的操作,那取出元素后還要把這個(gè)位置給釋放掉。當(dāng)這個(gè)Segment的所有元素都被移除掉了,這時(shí)候要把它丟棄,簡(jiǎn)單來說就是讓隊(duì)列的首段指針指向它的下一段即可,丟棄的這一段等著GC來收拾它。

這里稍微提一下Interlocked.CompareExchange,它的意思是比較和交換,也就是更為大家所熟悉的CAS(Compare-and-Swap),它主要做了以下2件事情:

比較m_low和lowLocal的值是否相等如果相等則m_low=lowLocal+1,如果不相等就什么都不做,不管是否相等,始終返回m_low的原始值

整個(gè)操作是原子性的,對(duì)CPU而言就是一條指令,這樣就可以保證當(dāng)前位置只有一個(gè)線程執(zhí)行出隊(duì)操作。

還有一個(gè)TryPeek()方法和出隊(duì)類似,它是從隊(duì)首獲取一個(gè)元素但是無需移除該元素,可以看做Dequeue的簡(jiǎn)化版,不再詳細(xì)介紹。


獲取隊(duì)列中元素的數(shù)量

與普通Queue不同的是,ConcurrentQueue并沒有維護(hù)一個(gè)表示隊(duì)列中元素個(gè)數(shù)的計(jì)數(shù)器,那就意味著要得到這個(gè)數(shù)量必須實(shí)時(shí)去計(jì)算。我們看一下計(jì)算過程:

public int Count
{
get
{
Segment head, tail;
int headLow, tailHigh;
GetHeadTailPositions(out head, out tail, out headLow, out tailHigh);

if (head == tail)
{
return tailHigh - headLow + 1;
}

int count = SEGMENT_SIZE - headLow;
count += SEGMENT_SIZE * ((int)(tail.m_index - head.m_index - 1));
count += tailHigh + 1;

return count;
}
}

大致思路是,先計(jì)算(GetHeadTailPositions)出首段的低位指針和尾段的高位指針,這中間的總長(zhǎng)度就是我們要的數(shù)量,然后分成3節(jié)依次累加每一個(gè)Segment包含的元素個(gè)數(shù)得到最終的隊(duì)列長(zhǎng)度,可以看到這是一個(gè)開銷比較大的操作。正因?yàn)槿绱耍④浌俜酵扑]使用IsEmpty屬性來判斷隊(duì)列是否為空,而不是使用隊(duì)列長(zhǎng)度Count==0來判斷,使用ConcurrentStack也是一樣。

截取快照(take snapshot)

所謂的take snapshot就是指一些格式轉(zhuǎn)換的操作,例如ToArray()、ToList()、GetEnumerator()這種類型的方法。在前面隊(duì)列的核心定義中我們提到有一個(gè)m_numSnapshotTakers字段,這時(shí)候就派上用場(chǎng)了。下面以比較典型的ToList()源碼舉例說明:

private List<T> ToList()
{
// Increments the number of active snapshot takers. This increment must happen before the snapshot is
// taken. At the same time, Decrement must happen after list copying is over. Only in this way, can it
// eliminate race condition when Segment.TryRemove() checks whether m_numSnapshotTakers == 0.
Interlocked.Increment(ref m_numSnapshotTakers);

List<T> list = new List<T>();
try
{
Segment head, tail;
int headLow, tailHigh;
GetHeadTailPositions(out head, out tail, out headLow, out tailHigh);

if (head == tail)
{
head.AddToList(list, headLow, tailHigh);
}
else
{
head.AddToList(list, headLow, SEGMENT_SIZE - 1);
Segment curr = head.Next;
while (curr != tail)
{
curr.AddToList(list, 0, SEGMENT_SIZE - 1);
curr = curr.Next;
}
tail.AddToList(list, 0, tailHigh);
}
}
finally
{
// This Decrement must happen after copying is over.
Interlocked.Decrement(ref m_numSnapshotTakers);
}
return list;
}

可以看到,ToList的邏輯和Count非常相似,都是先計(jì)算出兩個(gè)首尾位置指針,然后把隊(duì)列分為3節(jié)依次遍歷處理,最大的不同之處在于方法的開頭和結(jié)尾分別對(duì)m_numSnapshotTakers做了一個(gè)原子操作。在方法的第一行,使用Interlocked.Increment做了一次遞增,這時(shí)候表示隊(duì)列正在進(jìn)行一次截取快照操作,在處理完后又在finally中用Interlocked.Decrement做了一次遞減表示當(dāng)前操作已完成,這樣確保了在進(jìn)行快照時(shí)不被出隊(duì)影響。感覺這塊很難描述的特別好,所以保留了原始的英文注釋,大家慢慢體會(huì)。

到這里,基本把ConcurrentQueue的核心說清楚了。

總結(jié)一下

回到文章開頭提出的幾個(gè)問題,現(xiàn)在應(yīng)該有了很清晰的答案:

存儲(chǔ)結(jié)構(gòu) -- 采用數(shù)組和鏈表的組合形式如何初始化 -- 創(chuàng)建固定大小的段,無需指定初始容量常用操作如何實(shí)現(xiàn) -- 尾段入隊(duì),首段出隊(duì)線程安全問題 -- 使用SpinWait自旋等待和原子操作實(shí)現(xiàn)最后

多說一句,很多人學(xué)Python過程中會(huì)遇到各種煩惱問題,沒有人解答容易放棄。小編是一名python開發(fā)工程師,這里有我自己整理了一套最新的python系統(tǒng)學(xué)習(xí)教程,包括從基礎(chǔ)的python腳本到web開發(fā)、爬蟲、數(shù)據(jù)分析、數(shù)據(jù)可視化、機(jī)器學(xué)習(xí)等。想要這些資料的可以關(guān)注小編,并在后臺(tái)私信小編:“01”即可領(lǐng)取。

推薦內(nèi)容