多线程队列的算法优化

多线程队列(Concurrent Queue)的使用场合非常多,高性能服务器中的消息队列,并行算法中的Work Stealing等都离不开它。对于一个队列来说有两个最主要的动作:添加(enqueue)和删除(dequeue)节点。在一个(或多个)线程在对一个队列进行enqueue操作的同时可能会有一个(或多个)线程对这个队列进行dequeue操作。因为enqueue和dequeue都是对同一个队列里的节点进行操作,为了保证线程安全,一般在实现中都会在队列的结构体中加入一个队列锁(典型的如pthread_mutex_t q_lock),在进行enqueue和dequeue时都会先锁住这个锁以锁住整个队列然后再进行相关的操作。这样的设计如果实现的好的话一般性能就会很不错了。以链表实现的队列的结构体一般是这样的:

struct queue_t {
    node_t *head;
    node_t *tail;
    pthread_mutex_t q_lock;
};

但是,这其中其实有一个潜在的性能瓶颈:enqueue和dequeue操作都要锁住整个队列,这在线程少的时候可能没什么问题,但是只要线程数一多,这个锁竞争所产生的性能瓶颈就会越来越严重。那么我们可不可以想办法优化一下这个算法呢?当然可以!如果我们仔细想一想enqueue和dequeue的具体操作就会发现他们的操作其实不一定是冲突的。例如:如果所有的enqueue操作都是往队列的尾部插入新节点,而所有的dequeue操作都是从队列的头部删除节点,那么enqueue和dequeue大部分时候都是相互独立的,我们大部分时候根本不需要锁住整个队列,白白损失性能!那么一个很自然就能想到的算法优化方案就呼之欲出了:我们可以把那个队列锁拆成两个:一个队列头部锁(head lock)和一个队列尾部锁(tail lock)。这样这样的设计思路是对了,但是如果再仔细思考一下它的实现的话我们会发现其实不太容易,因为有两个特殊情况非常的tricky(难搞):第一种就是往空队列里插入第一个节点的时候,第二种就是从只剩最后一个节点的队列中删除那个“最后的果实”的时候。

为什么难搞呢?当我们向空队列中插入第一个节点的时候,我们需要同时修改队列的head和tail指针,使他们同时指向这个新插入的节点,换句话说,我们此时即需要拿到head lock又需要拿到tail lock。而另一种情况是对只剩一个节点的队列进行dequeue的时候,我们也是需要同时修改head和tail指针使他们指向NULL,亦即我们需要同时获得head和tail lock。有经验的同学会立刻发现我们进入危险区了!是什么危险呢?死锁!多线程编程中最臭名昭著的一种bug就是死锁了。例如,如果线程A在锁住了资源1后还想要获取资源2,而线程B在锁住了资源2后还想要获取资源1,这时两个线程谁都不能获得自己想要的那个资源,两个线程就死锁了。所以我们要小心奕奕的设计这个算法以避免死锁,例如保证enqueue和dequeue对head lock和tail lock的请求顺序(lock ordering)是一致的等等。但是这样设计出来的算法很容易就会包含多次的加锁/解锁操作,这些都会造成不必要的开销,尤其是在线程数很多的情况下反而可能导致性能的下降。我的亲身经历就是在32线程时这个思路设计出来的算法性能反而下降了10%左右,原因就是加锁/解锁的开销增加了。

好在有聪明人早在96年就想到了一个更妙的算法。这个算法也是用了head和tail两个锁,但是它有一个关键的地方是它在队列初始化的时候head和tail指针不为空,而是指向一个空节点。在enqueue的时候只要向队列尾部添加新节点就好了。而dequeue的情况稍微复杂点,它要返回的不是头节点,而是head->next,即头节点的下一个节点。先来看伪代码:

typedef struct node_t {
    TYPE value; 
    node_t *next
} NODE;

typedef struct queue_t {
    NODE *head; 
    NODE *tail;
    LOCK q_h_lock;
    LOCK q_t_lock;
} Q;

initialize(Q *q) {
   node = new_node()   // Allocate a free node
   node->next = NULL   // Make it the only node in the linked list
   q->head = q->tail = node	// Both head and tail point to it
   q->q_h_lock = q->q_t_lock = FREE   // Locks are initially free
}

enqueue(Q *q, TYPE value) {
   node = new_node()       // Allocate a new node from the free list
   node->value = value	  // Copy enqueued value into node
   node->next = NULL       // Set next pointer of node to NULL
   lock(&q->q_t_lock)	  // Acquire t_lock in order to access Tail
      q->tail->next = node // Link node at the end of the queue
      q->tail = node       // Swing Tail to node
   unlock(&q->q_t_lock)    // Release t_lock
}

dequeue(Q *q, TYPE *pvalue) {
   lock(&q->q_h_lock)   // Acquire h_lock in order to access Head
      node = q->head    // Read Head
      new_head = node->next	     // Read next pointer
      if new_head == NULL         // Is queue empty?
         unlock(&q->q_h_lock)     // Release h_lock before return
         return FALSE             // Queue was empty
      endif
      *pvalue = new_head->value   // Queue not empty, read value
      q->head = new_head  // Swing Head to next node
   unlock(&q->q_h_lock)   // Release h_lock
   free(node)			  // Free node
   return TRUE			  // Queue was not empty, dequeue succeeded
}

发现玄机了么?是的,这个算法中队列总会包含至少一个节点。dequeue每次返回的不是头节点,而是头节点的下一个节点中的数据:如果head->next不为空的话就把这个节点的数据取出来作为返回值,同时再把head指针指向这个节点,此时旧的头节点就可以被free掉了。这个在队列初始化时插入空节点的技巧使得enqueue和dequeue彻底相互独立了。但是,还有一个小地方在实现的时候需要注意:对第一个空节点的next指针的读写。想象一下,当一个线程对一个空队列进行第一次enqueue操作时刚刚运行完第25行的代码(对该空节点的next指针进行写操作);而此时另一个线程对这个队列进行第一次dequeue操作时恰好运行到第33行(对该空节点的next指针进行读操作),它们其实还是有冲突!不过,好在一般来讲next指针是32位数据,而现代的CPU已经能保证多线程程序中内存对齐了的32位数据读写操作的原子性,而一般来讲编译器会自动帮你对齐32位数据,所以这个不是问题。唯一需要注意的是我们要确保enqueue线程是先让要添加的新节点包含好数据再把新节点插入链表(也就是不能先插入空节点,再往节点中填入数据),那么dequeue线程就不会拿到空的节点。其实我们也可以把q_t_lock理解成生产者的锁,q_h_lock理解成消费者的锁,这样生产者(们)和消费者(们)的操作就相互独立了,只有在多个生产者对同一队列进行添加操作时,以及多个消费者对同一队列进行删除操作时才需要加锁以使访问互斥。

通过使用这个算法,我成功的把一个32线程程序的性能提升了11%!可见多线程中的锁竞争对性能影响之大!此算法出自一篇著名的论文:M. Michael and M. Scott. Simple, Fast, and Practical Non-Blocking and Blocking Concurren Queue Algorithms. 如果还想做更多优化的话可以参考这篇论文实现相应的Non Blocking版本的算法,性能还能有更多提升。当然了,这个算法早已被集成到java.util.concurrent里了(即LinkedBlockingQueue),其他的并行库例如Intel的TBB多半也有类似的算法,如果大家能用上现成的库的话就不要再重复造轮子了。为什么别造并行算法的轮子呢?因为高性能的并行算法实在太难正确地实现了,尤其是Non Blocking,Lock Free之类的“火箭工程”。有多难呢?Doug Lea提到java.util.concurrent中一个Non Blocking的算法的实现大概需要1年的时间,总共约500行代码。所以,对最广大的程序员来说,别去写Non Blocking, Lock Free的代码,只管用就行了,我看见网上很多的Non Blocking阿,无锁编程的算法实现啊什么的都非常地害怕,谁敢去用他们贴出来的这些代码啊?我之所以推荐这个two lock的算法是因为它的实现相对Non Blocking之类的来说容易多了,非常具备实用价值。虽然这篇论文出现的很早,但是我在看了几个开源软件中多线程队列的实现之后发现他们很多还是用的本文最开始提到的那种一个锁的算法。如果你想要实现更高性能的多线程队列的话,试试这个算法吧!

Update: 多线程队列算法有很多种,大家应根据不同的应用场合选取最优算法(例如是CPU密集型还是IO密集型)。本文所列的算法应用在这样一个多线程程序中:每个线程都拥有一个队列,每个队列可能被本线程进行dequeue操作,也可以被其他线程进行dequeue(即work stealing),线程数不超过CPU核心数,是一个典型的CPU/MEM密集型客户端单写者多读者场景。

二进制的二三事

二进制是计算机的自然语言,逻辑门中神奇的0/1组合犹如那起起伏伏的“滴答”之声构成了曼妙的电子世界。不仅如此,二进制中的0和1往往也是我们解决实际问题的利器。

Task1:求一个固定长度集合所有子集

最直观的方法就是穷举:对集合中的每个元素来说它要么在当前子集中,要么不在当前子集中,以此依次类推穷举出所有可能的值。如果我们用0表示该元素在当前子集中,用1表示该元素不在当前子集中,我们就可以用一串0/1序列来表示当前的子集。

例如集合{a, b, c, d, e}中的子集{a, b, c}可以用“11100”来表示,子集{c, d, e}可以用“00111”来表示,空子集{}可以用“00000”来表示。

那么对任意固定长度的集合a[n],我们可以从a[0]开始穷举a[0]在子集中和a[0]不在子集中两种情况,再依次类推从a[1]开始一直穷举到a[n-1]为止。于是我们可以得到一个很直观的递归程序:

void sub_sets_recu(int index, int length, char *array, char *bits)
{
	int j = 0;

	if (index >= length) {
		for (j = 0; j < length; j++) {
			if (bits[j] == '1') {
				printf("%c", array[j]);
			}
		}
		printf("\n");
	}
	else {
		bits[index] = '1';
		sub_sets_recu(index+1, length, array, bits);
		bits[index] = '0';
		sub_sets_recu(index+1, length, array, bits);
	}
}

但是递归版本的缺点就是需要一个额外的数组bits来存储0/1序列,那么有没有办法改进呢?我们可以先试试把递归改成迭代的方式会不会有效果。

迭代的关键就在于明白你究竟要做哪些计算步骤,为此我们可以先画出递归函数的Execution Tree来看看:

—                       ○
|                     0/  \1
|                     ○    ○
|                  0/  \1   .
|                  ○    ○     .
n层             .       .      .
|                .         .
|               .           .
|             /  \        /  \
|           ○   ○     ○    ○

上图中,第i层的值为1的边来表示a[i]在子集中,值为0的边表示a[i]不在子集中,那么从根节点到所有叶子节点所经过的边的组合就是所有可能的子集组合,例如从根节点到最左叶子节点的组合为“00000”,即空子集。根据完全二叉树的性质,第n层有2^n个节点,所以共有2^n个不同的子集。既然所有的解都已经存储在这颗二叉树中,那么我们自然就可以依此构建一个遍历所有路径的迭代算法。

思路是有了,但是实际编码中我们需要用到一个跟二进制编码相关的trick了。我们知道任意一个十进制的数是可以用二进制来表示的,反过来我们也可以用十进制来表示二进制啊,“用二进制来思考”其实就是这个trick的关键。既然我们的解是“00000”~“11111”,那么它们自然可以用十进制的0~31来表示。很自然的我们就可以想到如果我们依次遍历0~31这些数,根据它们二进制值中每一位的值是0还是1即可确定当前元素是否在子集中。关于二进制的位操作等相关知识Matrix67的blog[1]有一个系列写的很不错,在此就不再赘述。

void sub_sets_iter(int length, char *array)
{
	int i = 0;
	int j = 0;
	for (i = 0; i < (1<<length); i++) {
		for (j = 0; j < length; j++) {
			if ((i & (1<<j)) != 0) {
				printf("%c", array[j]);
			}
		}
		printf("\n");
	}
}

再加上测试用的main函数:

int main()
{
	char a[5] = {'a','b','c','d','e'};
	char b[5];
	int i;

#ifdef USE_RECU
	for (i = 0; i < 10000000; i++) {
		sub_sets_recu(0, 5, a, b);
	}
#else
	for (i = 0; i < 10000000; i++) {
		sub_sets_iter(5, a);
	}
#endif
	return 0;
}

好,代码全写完了,当然应该比试比试两个算法孰优孰劣。测试平台是:cygwin + gcc 3.4.4 + Core2 Duo T7300@2.0GHz

在测试时已经把两个算法中的printf给注释掉了,只比拼速度。先把gcc的优化设成-O0看看:

$ time ./recu.exe

real    0m8.847s
user    0m8.811s
sys     0m0.046s

$ time ./iter.exe

real    0m5.093s
user    0m4.999s
sys     0m0.046s

再看看gcc -O3的结果:

$ time ./recu_o3.exe

real    0m8.285s
user    0m8.234s
sys     0m0.031s

$ time ./iter_o3.exe

real    0m1.887s
user    0m1.796s
sys     0m0.031s

结果显示迭代算法优于递归算法。感兴趣的朋友可以反汇编一下编译器优化后的代码,看看为什么迭代算法优化后能快那么多。关于递归与迭代已经有很多分析了,简单说来递归更加直观易懂,但是栈的开销比较大;迭代的算法不容易一开始就想出来,但是一般来说效率更高。从编译器的角度来讲,部分的编译器能把相对简单的递归算法转化成迭代算法,因为递归的算法对编译器来说是更友好的算法,更易于在此基础上做更多的优化。从体系结构的角度来讲,递归算法更易于发挥CPU流水线的特点让多步运算同时执行,而某些特定的架构,例如GPU就没有对递归提供有效的硬件支持,因为栈空间是一个大问题。

Task2:小白鼠试毒药问题

实验室里有1000个一模一样的瓶子,但是其中的一瓶有毒。可以用实验室的小白鼠来测试哪一瓶是毒药。如果小白鼠喝掉毒药的话,会在一个星期的时候死去,其他瓶子里的药水没有任何副作用。请问最少用多少只小白鼠可以在一个星期以内查出哪瓶是毒药?

这个问题关键是跳出思维,并不是说瓶子和小白鼠必须一一对应,然后我们用1000只小白鼠,每只各试一瓶药水,等一个礼拜然后出结果。其实我们仔细想想,小白鼠试毒只有两种结果,非“死”即“生”,又是一个0/1状态。很显然我们可以再次构建一个0/1组成的二叉树,由此即可表示不同的试毒结果。假设我们需要n个小白鼠,那么2^n-1应大于等于1000(因为有1000种可能的结果,减一是指要除去小白鼠全部不死的情况 — 多谢danqi指正),那么最小的n就是10了。

总结

解决此类问题的关键在于找到能用0/1表示的状态,然后想办法用0/1组成的二进制数来表示不同的结果,从而达到节省存储空间,提高解题效率的目的。

二进制真是神奇,文章的最后附一张0和1构成的Fibonacci数列(来自[2]):

相关文献

[1] 位运算讲解系列文章
[2] Fibonacci数列转二进制图形的惊异发现

转载请注明来自www.parallellabs.com

An interesting algorithm problem: the longest plateau

Recently I met an interesting algorithm problem:

Problem:

Given an array, try to develop an efficient algorithm which can compute the length of the longest plateau. A plateau is a consecutive segment of an array with equal contents. For example, if x[] = {1, 2, 3, 4, 4, 4, 5, 5, 6}, then we have six plateaus which are 1, 2, 3, 4-4-4, 5-5 and 6. And obviously the length of the longest plateaus is 3.

Analysis:

Well, a straightforward idea is try to firstly compute all the length of different plateaus from left to right and then select the longest length. The pseudo-code is like this:

for each element in the array a[]
     if a[i] is equal to a[i-1]
          add 1 to the length for the current plateau
          check whether the current length is the longest one so far
     else
          reset length to 1 // plateau length is at least 1

Whether we need line 5&6 depends on whether we need to store the length of every plateau. If we just want to calculate the longest length then we can keep the code and use the “length” as a temp variable which is only used inside the loop. On the other hand, if we need to keep track of the length of all plateaus, we need to use an array of “length[]” to store the needed information.

/*
 * input: an array a[], the length of the array n
 * output: the length of the longest plateau
 */
int longestPlateau (int a[], int n)
{
	if (n == 0)
 		return 0;

	int length = 1;
	int longestLength = 1;

	for (int i = 1; i<n; i++)
 	{
		if (a[i] == a[i-1])
 		{
 			length++;
			longestLength = max(longestLength, length);
 		}
 		else
 			length = 1;
	}
	return longestLength;
}

Some more:

What if the given array is sorted (in the increasing order) already?

Actually if the array is sorted, the algorithm can be much simpler:

assume the longest length now is L, then we just need to compare a[i] and a[i-L], if they are equal then all the elements between them are also equal (since this is a sorted array!), and we can add 1 to the current longest length. The code looks like this:

/*
 * input: an sorted array a[] (increasing order), the length of the array n
 * output: the length of the longest plateau
 */
int longestPlateau (int a[], int n)
{
	if (n == 0)
 		return 0;

	int length = 1;
	for (int i = 1; i<n; i++)
 	{
		if (a[i] == a[i-length])
 			length++;
	}
	return length;
}