决议从这篇文章最先,开一个读源码系列,不限制平台语言或工具,任何自己感兴趣的都会写。前几天碰着一个小问题又读了一遍ConcurrentQueue的源码,那就拿C#中对照常用的并发行列ConcurrentQueue作为开篇来聊一聊它的实现原理。
话不多说,直奔主题。
要提前说明下的是,本文剖析的源码是基于.NET Framework 4.8版本,地址是:https://referencesource.microsoft.com/#mscorlib/system/Collections/Concurrent/ConcurrentQueue.cs 本来是计划用.NET Core版本的,然则找了一下竟然没找到:https://github.com/dotnet/runtime/tree/master/src/libraries/System.Collections.Concurrent/src/System/Collections/Concurrent 不知道是我找错位置了照样咋回事,有知道的大佬见告一下。不外我以为实现原理应该类似吧,后面找到了我对比一下,差别的话再写一篇来剖析。
带着问题出发
若是是自己实现一个简朴的行列功效,我们该若何设计它的存储结构呢?一般来说有这两种方式:数组或者链表,先来简朴剖析下。
我们都知道,数组是牢固空间的聚集,意味着初始化的时刻要指定数组巨细,然则行列的长度是随时转变的,超出数组巨细了怎么办?这时刻就必须要对数组举行扩容。问题又来了,扩容要扩若干呢,少了不够用多了虚耗内存空间。与之相反的,链表是动态空间类型的数据结构,元素之间通过指针相连,不需要提前分配空间,需要若干分配若干。但随之而来的问题是,大量的出队入队操作伴随着大量工具的建立销毁,GC的压力又变得异常大。 事实上,在C#的通俗行列Queue
类型中选择使用数组举行实现,它实现了一套扩容机制,这里不再详细形貌,有兴趣的直接看源码,对照简朴。
回到主题,要实现一个高性能的线程平安行列,我们试着回覆以下问题:
- 存储结构是怎样的
- 若何初始化(初始容量给若干对照好?)
- 常用操作(入队出队)若何实现
- 线程平安是若何保证的
存储结构
通过源码可以看到ConcurrentQueue
接纳了数组+链表的组合模式,充实吸收了2种结构的优点。
详细来说,它的总体结构是一个链表,链表的每个节点是一个包罗数组的特殊工具,我们称之为Segment(段或节,原话是a queue is a linked list of small arrays, each node is called a segment.
),它内里的数组是存储真实数据的地方,容量牢固巨细是32,每一个Segment有指向下一个Segment的的指针,以此形成链表结构。而行列中维护了2个特殊的指针,他们划分指向行列的首段(head segment)和尾段(tail segment),他们对入队和出队有着主要的作用。用一张图来注释行列的内部结构:
嗯,绘图画到这里突然联想到,搞成双向链表的话是不是就神似B+树的叶子节点?手艺就是这么巧妙~
段的焦点界说为:
/// <summary>
/// private class for ConcurrentQueue.
/// 链表节点(段)
/// </summary>
private class Segment
{
//现实存储数据的容器
internal volatile T[] m_array;
//存储对应位置数据的状态,当数据的对应状态位标记为true时该数据才是有用的
internal volatile VolatileBool[] m_state;
//下一段的指针
private volatile Segment m_next;
//当前段在行列中的索引
internal readonly long m_index;
//两个位置指针
private volatile int m_low;
private volatile int m_high;
//所属的行列实例
private volatile ConcurrentQueue<T> m_source;
}
行列的焦点界说为:
/// <summary>
/// 线程平安的先进先出聚集,
/// </summary>
public class ConcurrentQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>
{
//首段
[NonSerialized]
private volatile Segment m_head;
//尾段
[NonSerialized]
private volatile Segment m_tail;
//每一段的巨细
private const int SEGMENT_SIZE = 32;
//截取快照的操作数目
[NonSerialized]
internal volatile int m_numSnapshotTakers = 0;
}
通例操作
先从初始化一个行列最先看起。
建立行列实例
与通俗Queue
差别的是,ConcurrentQueue
不再支持初始化时指定行列巨细(capacity),仅仅提供一个无参组织函数和一个IEnumerable<T>
参数的组织函数。
/// <summary>
/// Initializes a new instance of the <see cref="ConcurrentQueue{T}"/> class.
/// </summary>
public ConcurrentQueue()
{
m_head = m_tail = new Segment(0, this);
}
无参组织函数很简朴,建立了一个Segment实例并把首尾指针都指向它,此时行列只包罗一个Segment,它的索引是0,行列容量是32。 继续看一下Segment是若何被初始化的:
/// <summary>
/// Create and initialize a segment with the specified index.
/// </summary>
internal Segment(long index, ConcurrentQueue<T> source)
{
m_array = new T[SEGMENT_SIZE];
m_state = new VolatileBool[SEGMENT_SIZE]; //all initialized to false
m_high = -1;
Contract.Assert(index >= 0);
m_index = index;
m_source = source;
}
Segment只提供了一个组织函数,接受的参数划分是行列索引和行列实例,它建立了一个长度为32的数组,并建立了与之对应的状态数组,然后初始化了位置指针(m_low=0,m_high=-1,此时示意一个空的Segment)。 到这里,一个并发行列就建立好了。
使用聚集建立行列的历程和上面类似,只是多了两个步骤:入队和扩容,下面会重点形貌这两部分以是这里不再过多先容。
元素入队
先亮出源码:
/// <summary>
/// Adds an object to the end of the <see cref="ConcurrentQueue{T}"/>.
/// </summary>
/// <param name="item">The object to add to the end of the <see
/// cref="ConcurrentQueue{T}"/>. The value can be a null reference
/// (Nothing in Visual Basic) for reference types.
/// </param>
public void Enqueue(T item)
{
SpinWait spin = new SpinWait();
while (true)
{
Segment tail = m_tail;
if (tail.TryAppend(item))
return;
spin.SpinOnce();
}
}
通过源码可以看到,入队操作是在队尾(m_tail)举行的,它实验在最后一个Segment中追加指定的元素,若是乐成了就直接返回,失败的话就自旋守候,直到乐成为止。那什么情形下会失败呢?这就要继续看看是若何追加元素的:
docker-compose搭建redis哨兵集群
internal bool TryAppend(T value)
{
//先判断一下高位指针有没有到达数组界限(也就是数组是否装满了)
if (m_high >= SEGMENT_SIZE - 1)
{
return false;
}
int newhigh = SEGMENT_SIZE;
try
{ }
finally
{
//使用原子操作让高位指针加1
newhigh = Interlocked.Increment(ref m_high);
//若是数组另有空位
if (newhigh <= SEGMENT_SIZE - 1)
{
//把数据放到数组中,同时更新状态
m_array[newhigh] = value;
m_state[newhigh].m_value = true;
}
//数组满了要触发扩容
if (newhigh == SEGMENT_SIZE - 1)
{
Grow();
}
}
return newhigh <= SEGMENT_SIZE - 1;
}
以是,只有当尾段m_tail装满的情形下追加元素才会失败,这时刻必须要守候下一个段发生,也就是扩容(细细品一下Grow这个词真的很妙),自旋就是在等扩容完成才气有地方放数据。而在保留数据的时刻,通过原子自增操作保证了同一个位置只会有一个数据被写入,从而实现了线程平安。
注重:这里的装满并不是指数组每个位置都有数据,而是指最后一个位置已被使用。
继续看一下扩容是怎么一个历程:
/// <summary>
/// Create a new segment and append to the current one
/// Update the m_tail pointer
/// This method is called when there is no contention
/// </summary>
internal void Grow()
{
//no CAS is needed, since there is no contention (other threads are blocked, busy waiting)
Segment newSegment = new Segment(m_index + 1, m_source); //m_index is Int64, we don't need to worry about overflow
m_next = newSegment;
Contract.Assert(m_source.m_tail == this);
m_source.m_tail = m_next;
}
**在通俗行列中,扩容是通过建立一个更大的数组然后把数据拷贝已往实现扩容的,这个操作对照耗时。而在并发行列中就异常简朴了,首先建立一个新Segment,然后把当前Segment的next指向它,最后挂到行列的末尾去就可以了,所有是指针操作异常高效。**而且从代码注释中可以看到,这里不会泛起线程竞争的情形,由于其他线程都由于位置不够被壅闭都在自旋守候中。
元素出队
照样先亮出源码:
public bool TryDequeue(out T result)
{
while (!IsEmpty)
{
Segment head = m_head;
if (head.TryRemove(out result))
return true;
//since method IsEmpty spins, we don't need to spin in the while loop
}
result = default(T);
return false;
}
可以看到只有在行列不为空(IsEmpty==false)的情形下才会实验出队操作,而出队是在首段上举行操作的。关于若何判断行列是否为空总结就一句话:当首段m_head不包罗任何数据且没有下一段的时刻行列才为空,详细的判断历程源码注释中写的很清晰,限于篇幅不详细先容。
出队的本质是从首段中移除低位指针所指向的元素,看一下详细实现步骤:
internal bool TryRemove(out T result)
{
SpinWait spin = new SpinWait();
int lowLocal = Low, highLocal = High;
//判断当前段是否为空
while (lowLocal <= highLocal)
{
//判断低位指针位置是否可以移除
if (Interlocked.CompareExchange(ref m_low, lowLocal + 1, lowLocal) == lowLocal)
{
SpinWait spinLocal = new SpinWait();
//判断元素是否有用
while (!m_state[lowLocal].m_value)
{
spinLocal.SpinOnce();
}
//取出元素
result = m_array[lowLocal];
//释放引用关系
if (m_source.m_numSnapshotTakers <= 0)
{
m_array[lowLocal] = default(T);
}
//判断当前段的元素是否所有被移除了,要抛弃它
if (lowLocal + 1 >= SEGMENT_SIZE)
{
spinLocal = new SpinWait();
while (m_next == null)
{
spinLocal.SpinOnce();
}
Contract.Assert(m_source.m_head == this);
m_source.m_head = m_next;
}
return true;
}
else
{
//线程竞争失败,自旋守候并重置
spin.SpinOnce();
lowLocal = Low; highLocal = High;
}
}//end of while
result = default(T);
return false;
}
首先,只有当前Segment不为空的情形下才实验移除元素,否则就直接返回false。然后通过一个原子操作Interlocked.CompareExchange
判断当前低位指针上是否有其他线程同时也在移除,若是有那就进入自旋守候,没有的话就从这个位置取出元素并把低位指针往前推进一位。若是当前行列没有正在举行截取快照的操作,那取出元素后还要把这个位置给释放掉。当这个Segment的所有元素都被移除掉了,这时刻要把它抛弃,简朴来说就是让行列的首段指针指向它的下一段即可,抛弃的这一段等着GC来摒挡它。
这里稍微提一下Interlocked.CompareExchange,它的意思是对照和交流,也就是更为人人所熟悉的CAS(Compare-and-Swap),它主要做了以下2件事情:
- 对照m_low和lowLocal的值是否相等
- 若是相等则m_low=lowLocal+1,若是不相等就什么都不做,不管是否相等,始终返回m_low的原始值
整个操作是原子性的,对CPU而言就是一条指令,这样就可以保证当前位置只有一个线程执行出队操作。
另有一个
TryPeek()
方式和出队类似,它是从队首获取一个元素然则无需移除该元素,可以看做Dequeue的简化版,不再详细先容。
获取行列中元素的数目
与通俗Queue
差别的是,ConcurrentQueue
并没有维护一个示意行列中元素个数的计数器,那就意味着要获得这个数目必须实时去盘算。我们看一下盘算历程:
public int Count
{
get
{
Segment head, tail;
int headLow, tailHigh;
GetHeadTailPositions(out head, out tail, out headLow, out tailHigh);
if (head == tail)
{
return tailHigh - headLow + 1;
}
int count = SEGMENT_SIZE - headLow;
count += SEGMENT_SIZE * ((int)(tail.m_index - head.m_index - 1));
count += tailHigh + 1;
return count;
}
}
大致思绪是,先盘算(GetHeadTailPositions)出首段的低位指针和尾段的高位指针,这中心的总长度就是我们要的数目,然后分成3节依次累加每一个Segment包罗的元素个数获得最终的行列长度,可以看到这是一个开销对照大的操作。 正由于如此,微软官方推荐使用IsEmpty
属性来判断行列是否为空,而不是使用行列长度Count==0
来判断,使用ConcurrentStack
也是一样。
截取快照(take snapshot)
所谓的take snapshot就是指一些花样转换的操作,例如ToArray()
、ToList()
、GetEnumerator()
这种类型的方式。在前面行列的焦点界说中我们提到有一个m_numSnapshotTakers
字段,这时刻就派上用场了。下面以对照典型的ToList()
源码举例说明:
private List<T> ToList()
{
// Increments the number of active snapshot takers. This increment must happen before the snapshot is
// taken. At the same time, Decrement must happen after list copying is over. Only in this way, can it
// eliminate race condition when Segment.TryRemove() checks whether m_numSnapshotTakers == 0.
Interlocked.Increment(ref m_numSnapshotTakers);
List<T> list = new List<T>();
try
{
Segment head, tail;
int headLow, tailHigh;
GetHeadTailPositions(out head, out tail, out headLow, out tailHigh);
if (head == tail)
{
head.AddToList(list, headLow, tailHigh);
}
else
{
head.AddToList(list, headLow, SEGMENT_SIZE - 1);
Segment curr = head.Next;
while (curr != tail)
{
curr.AddToList(list, 0, SEGMENT_SIZE - 1);
curr = curr.Next;
}
tail.AddToList(list, 0, tailHigh);
}
}
finally
{
// This Decrement must happen after copying is over.
Interlocked.Decrement(ref m_numSnapshotTakers);
}
return list;
}
可以看到,ToList的逻辑和Count异常相似,都是先盘算出两个首尾位置指针,然后把行列分为3节依次遍历处置,最大的差别之处在于方式的开头和末端划分对m_numSnapshotTakers
做了一个原子操作。 在方式的第一行,使用Interlocked.Increment
做了一次递增,这时刻示意行列正在举行一次截取快照操作,在处置完后又在finally中用Interlocked.Decrement
做了一次递减示意当前操作已完成,这样确保了在举行快照时不被出队影响。感受这块很难形貌的稀奇好,以是保留了原始的英文注释,人人逐步体会。
到这里,基本把ConcurrentQueue的焦点说清晰了。
总结一下
回到文章开头提出的几个问题,现在应该有了很清晰的谜底:
- 存储结构 — 接纳数组和链表的组合形式
- 若何初始化 — 建立牢固巨细的段,无需指定初始容量
- 常用操作若何实现 — 尾段入队,首段出队
- 线程平安问题 — 使用SpinWait自旋守候和原子操作实现
以上所述均是小我私家明白,若是有错误的地方还请不吝指正,以免误导他人。
推荐相关阅读,篇篇都是干货:https://www.cnblogs.com/lucifer1982/category/126755.html
原创文章,作者:dddof新闻网,如若转载,请注明出处:https://www.dddof.com/archives/4687.html