多线程队列的算法优化

多线程队列(Concurrent Queue)的使用场合非常多,高性能服务器中的消息队列,并行算法中的Work Stealing等都离不开它。对于一个队列来说有两个最主要的动作:添加(enqueue)和删除(dequeue)节点。在一个(或多个)线程在对一个队列进行enqueue操作的同时可能会有一个(或多个)线程对这个队列进行dequeue操作。因为enqueue和dequeue都是对同一个队列里的节点进行操作,为了保证线程安全,一般在实现中都会在队列的结构体中加入一个队列锁(典型的如pthread_mutex_t q_lock),在进行enqueue和dequeue时都会先锁住这个锁以锁住整个队列然后再进行相关的操作。这样的设计如果实现的好的话一般性能就会很不错了。以链表实现的队列的结构体一般是这样的:

struct queue_t {
    node_t *head;
    node_t *tail;
    pthread_mutex_t q_lock;
};

但是,这其中其实有一个潜在的性能瓶颈:enqueue和dequeue操作都要锁住整个队列,这在线程少的时候可能没什么问题,但是只要线程数一多,这个锁竞争所产生的性能瓶颈就会越来越严重。那么我们可不可以想办法优化一下这个算法呢?当然可以!如果我们仔细想一想enqueue和dequeue的具体操作就会发现他们的操作其实不一定是冲突的。例如:如果所有的enqueue操作都是往队列的尾部插入新节点,而所有的dequeue操作都是从队列的头部删除节点,那么enqueue和dequeue大部分时候都是相互独立的,我们大部分时候根本不需要锁住整个队列,白白损失性能!那么一个很自然就能想到的算法优化方案就呼之欲出了:我们可以把那个队列锁拆成两个:一个队列头部锁(head lock)和一个队列尾部锁(tail lock)。这样这样的设计思路是对了,但是如果再仔细思考一下它的实现的话我们会发现其实不太容易,因为有两个特殊情况非常的tricky(难搞):第一种就是往空队列里插入第一个节点的时候,第二种就是从只剩最后一个节点的队列中删除那个“最后的果实”的时候。

为什么难搞呢?当我们向空队列中插入第一个节点的时候,我们需要同时修改队列的head和tail指针,使他们同时指向这个新插入的节点,换句话说,我们此时即需要拿到head lock又需要拿到tail lock。而另一种情况是对只剩一个节点的队列进行dequeue的时候,我们也是需要同时修改head和tail指针使他们指向NULL,亦即我们需要同时获得head和tail lock。有经验的同学会立刻发现我们进入危险区了!是什么危险呢?死锁!多线程编程中最臭名昭著的一种bug就是死锁了。例如,如果线程A在锁住了资源1后还想要获取资源2,而线程B在锁住了资源2后还想要获取资源1,这时两个线程谁都不能获得自己想要的那个资源,两个线程就死锁了。所以我们要小心奕奕的设计这个算法以避免死锁,例如保证enqueue和dequeue对head lock和tail lock的请求顺序(lock ordering)是一致的等等。但是这样设计出来的算法很容易就会包含多次的加锁/解锁操作,这些都会造成不必要的开销,尤其是在线程数很多的情况下反而可能导致性能的下降。我的亲身经历就是在32线程时这个思路设计出来的算法性能反而下降了10%左右,原因就是加锁/解锁的开销增加了。

好在有聪明人早在96年就想到了一个更妙的算法。这个算法也是用了head和tail两个锁,但是它有一个关键的地方是它在队列初始化的时候head和tail指针不为空,而是指向一个空节点。在enqueue的时候只要向队列尾部添加新节点就好了。而dequeue的情况稍微复杂点,它要返回的不是头节点,而是head->next,即头节点的下一个节点。先来看伪代码:

typedef struct node_t {
    TYPE value; 
    node_t *next
} NODE;

typedef struct queue_t {
    NODE *head; 
    NODE *tail;
    LOCK q_h_lock;
    LOCK q_t_lock;
} Q;

initialize(Q *q) {
   node = new_node()   // Allocate a free node
   node->next = NULL   // Make it the only node in the linked list
   q->head = q->tail = node	// Both head and tail point to it
   q->q_h_lock = q->q_t_lock = FREE   // Locks are initially free
}

enqueue(Q *q, TYPE value) {
   node = new_node()       // Allocate a new node from the free list
   node->value = value	  // Copy enqueued value into node
   node->next = NULL       // Set next pointer of node to NULL
   lock(&q->q_t_lock)	  // Acquire t_lock in order to access Tail
      q->tail->next = node // Link node at the end of the queue
      q->tail = node       // Swing Tail to node
   unlock(&q->q_t_lock)    // Release t_lock
}

dequeue(Q *q, TYPE *pvalue) {
   lock(&q->q_h_lock)   // Acquire h_lock in order to access Head
      node = q->head    // Read Head
      new_head = node->next	     // Read next pointer
      if new_head == NULL         // Is queue empty?
         unlock(&q->q_h_lock)     // Release h_lock before return
         return FALSE             // Queue was empty
      endif
      *pvalue = new_head->value   // Queue not empty, read value
      q->head = new_head  // Swing Head to next node
   unlock(&q->q_h_lock)   // Release h_lock
   free(node)			  // Free node
   return TRUE			  // Queue was not empty, dequeue succeeded
}

发现玄机了么?是的,这个算法中队列总会包含至少一个节点。dequeue每次返回的不是头节点,而是头节点的下一个节点中的数据:如果head->next不为空的话就把这个节点的数据取出来作为返回值,同时再把head指针指向这个节点,此时旧的头节点就可以被free掉了。这个在队列初始化时插入空节点的技巧使得enqueue和dequeue彻底相互独立了。但是,还有一个小地方在实现的时候需要注意:对第一个空节点的next指针的读写。想象一下,当一个线程对一个空队列进行第一次enqueue操作时刚刚运行完第25行的代码(对该空节点的next指针进行写操作);而此时另一个线程对这个队列进行第一次dequeue操作时恰好运行到第33行(对该空节点的next指针进行读操作),它们其实还是有冲突!不过,好在一般来讲next指针是32位数据,而现代的CPU已经能保证多线程程序中内存对齐了的32位数据读写操作的原子性,而一般来讲编译器会自动帮你对齐32位数据,所以这个不是问题。唯一需要注意的是我们要确保enqueue线程是先让要添加的新节点包含好数据再把新节点插入链表(也就是不能先插入空节点,再往节点中填入数据),那么dequeue线程就不会拿到空的节点。其实我们也可以把q_t_lock理解成生产者的锁,q_h_lock理解成消费者的锁,这样生产者(们)和消费者(们)的操作就相互独立了,只有在多个生产者对同一队列进行添加操作时,以及多个消费者对同一队列进行删除操作时才需要加锁以使访问互斥。

通过使用这个算法,我成功的把一个32线程程序的性能提升了11%!可见多线程中的锁竞争对性能影响之大!此算法出自一篇著名的论文:M. Michael and M. Scott. Simple, Fast, and Practical Non-Blocking and Blocking Concurren Queue Algorithms. 如果还想做更多优化的话可以参考这篇论文实现相应的Non Blocking版本的算法,性能还能有更多提升。当然了,这个算法早已被集成到java.util.concurrent里了(即LinkedBlockingQueue),其他的并行库例如Intel的TBB多半也有类似的算法,如果大家能用上现成的库的话就不要再重复造轮子了。为什么别造并行算法的轮子呢?因为高性能的并行算法实在太难正确地实现了,尤其是Non Blocking,Lock Free之类的“火箭工程”。有多难呢?Doug Lea提到java.util.concurrent中一个Non Blocking的算法的实现大概需要1年的时间,总共约500行代码。所以,对最广大的程序员来说,别去写Non Blocking, Lock Free的代码,只管用就行了,我看见网上很多的Non Blocking阿,无锁编程的算法实现啊什么的都非常地害怕,谁敢去用他们贴出来的这些代码啊?我之所以推荐这个two lock的算法是因为它的实现相对Non Blocking之类的来说容易多了,非常具备实用价值。虽然这篇论文出现的很早,但是我在看了几个开源软件中多线程队列的实现之后发现他们很多还是用的本文最开始提到的那种一个锁的算法。如果你想要实现更高性能的多线程队列的话,试试这个算法吧!

Update: 多线程队列算法有很多种,大家应根据不同的应用场合选取最优算法(例如是CPU密集型还是IO密集型)。本文所列的算法应用在这样一个多线程程序中:每个线程都拥有一个队列,每个队列可能被本线程进行dequeue操作,也可以被其他线程进行dequeue(即work stealing),线程数不超过CPU核心数,是一个典型的CPU/MEM密集型客户端单写者多读者场景。

《多线程队列的算法优化》上有38条评论

  1. 不过,好在一般来讲next指针是32位数据,而现代的CPU已经能保证多线程程序中32位数据读写操作的原子性,所以这个不是问题。
    这个不能一定保证的。

    这种方法不一定好,程序会变得很复杂。

    ”在进行enqueue和dequeue时都会先锁住这个锁以锁住整个队列然后再进行相关的操作“
    这个也是可以改善的,在一个线程写队列,一个线程读队列,这两个线程是通过pthread_cond_signal通信的话。在这种情况下的话,不需要锁住整个队列,只要锁住一个队列的一个单元

    1. @jet
      “这个不一定能保证”
      请说说具体什么时候不能保证?我原文漏说了条件 应该是内存对齐了的32位数据的操作是原子操作,具体我另一篇博文详细讲过了

      用pthread_cond_signal的话那就是阻塞型等待了,这种同步方式让线程停止工作,我在服务器端的程序中见过这样的用法,但是在多核上就不太可行了。因为在多核上(例如4个核)让一个线程停下来的代价太高了,所以一般如果dequeue一个空队列没取到值的话当前线程就会尝试对这个队列或另一个队列再进行一次dequeue操作

      1. 这个原子操作会存在memory barrier的语义么? 不存在的话 多核时候应该是无法保证正确性的。

  2. “用pthread_cond_signal的话那就是阻塞型等待了,这种同步方式让线程停止工作,我在服务器端的程序中见过这样的用法,但是在多核上就不太可行了。因为在多核上(例如4个核)让一个线程停下来的代价太高了”

    不知道博主说的是什么意思?难道是pthread_cond_signal()引起上下文切换太多?

    1. @spriteray
      我的本意是指pthread_cond_singal()会完成一次unlock,然后让该线程sleep,然后再等待被唤醒,然后再完成一次lock这个函数才执行完毕,调用它过程会让线程整个停下来。如果是每个线程拥有一个独立队列,用work stealing算法来分配任务的多线程程序,意味着对每个队列来说是单写者多读者的情况,更高效的办法就不是调用pthread_cond_singal()来阻塞的等待新任务,而是在dequeue直接返回false然后紧接着再去调用一次dequeue从另一个队列里去取任务,不让CPU闲着。当然,这得看应用场景,我文中所举的算法不是对任何场景都应用的,要不然java.util.concurrent就不会有那么多不同的concurrent queue算法了

  3. ”在进行enqueue和dequeue时都会先锁住这个锁以锁住整个队列然后再进行相关的操作“
    关于上面的解析。
    举个场景:一个主线程和一个工作线程池,他们之间有个环形数据队列。主线程写队列,工作线程读这个队列并且改变读到的那个队列单元的其中一个标识字段,使该单元重新变为可写的。主线程通过pthread_cond_signal驱动工作线程池工作。
    那么这个场景的特性是,主线程每次只会驱动一个工作线程去读队列(这点是最关键的)。
    根据这个特性,所以每次主线程写这个队列时是不需要对整个队列加锁的,只是在找到可写的单元时才需要加锁。工作线程读队列也一样,只要找到可读的单元时才需要加锁。

    1. @jet
      我文中的算法比较适合每个线程拥有一个独立的队列,不同线程间用work stealing的算法来分配任务的情况。你所列是一个全局共享的队列再加上线程池的情况,各有各的优势,关键还是得看应用场景,举例来说Java Concurrent包里的队列算法就有很多个。其实关键的出发点都是尽可能分离读和写,然后尽可能多并发。多谢指教。

  4. 不客气,共同学习!
    请问楼主,有没有研究c/c++下的 volatile,多线程,对齐,原子操作,锁等的关系?
    我发现他们之间是一个很有趣的话题,而且有点复杂。当然这些也跟cpu,系统,编译器有关了
    希望能和你共同探讨一下! 🙂

  5. 谢谢分享。学习、收藏了。
    文章中的方法可以很地处理队列插入/删除两个操作的互斥的问题。
    对于多线程同时插入的动作,不知道有没有办法优化呢?

    1. 如果想要对同时插入的情况做最更多的优化就必须使用硬件直接提供的CompareAndSwitch指令了,这已经是non blocking算法的范畴了,我的建议是:除非你是这方面的专家,一般情况下别去碰non blocking,因为太容易出错了

  6. 我们在为一个多级并行编程模型写底层runtime的时候,也用到了这些non-blocking,甚至更进一步的Non-fence队列;有个叫fastflow的工作,其实做得挺好的了;剑桥大学写出xen的那个小组的工作也很pratical。有机会交流一下,哈哈!

  7. 不知道博主是否认真想过你的例子没?虽然是伪码,但是也不能误导大家啊~
    enqueue 是没问题,但是 dequeue就是错的。这里必须要对tail指针进行赋值。
    比如剩下最后一个节点了,tail指针是指向这个node的,但是你dequeue的时候将它取出,free掉了,这里不改tail不是一个bug?那么又回到和原始模型一样,你得给tail 加锁,指回null 节点,性能提升的仅仅是enqueue少加了一次锁。
    ps:无锁编程也不是太难的事吧,把cas 搞熟,还是比较好写queue和stack的

    1. 这个算法本身在initialize()里面初始化了一个伪头结点,所以在任何时候都能保证至少还剩一个节点(即这个伪节点),因此free(node)不可能free掉tail指向的node,也就不会出现你说的bug。
      无锁编程保证正确性太难,java.util.concurrent库里面的无锁算法实现的时候1个人年只能写500行。

      1. 是的,是我看错,以为第一个节点是固定不动的,这个算法是对的

  8. 有个问题请教博主,也是类似的一类问题,那就是c++风格的容器和 linux c风格的容器 的实现的差异。
    linux 的容器 都是类似list一样的嵌入式的实现,有个很好的好处就是一个 obj可以归属于过个容器(比如链表)也就是多维的,而c++的容器是一维的,就很难办到同样的事情。就比如说这个例子,dequeue 取值是 拷贝的语义来实现,内部是通过node节点来管理的。相比linux的风格来说,这里就有比较多的内存分配销毁的负担。
    请教一下博主对linux c 的并发容器的理解,或者再写篇博文出来 : )

  9. 冠诚你好。
    看了你这篇博文,受教很多,想请问一下你文中提到的消息队列还有Work stealing有没有什么比较好的参考资料?
    关于队列这块,我自己目前也就是在线程池中实现了用pthread_cond_wait/signal的等待队列。不知道还有没有在什么场景下需要有什么样其他的队列实现呢?请指教~~

    1. 关于Concurrent Queue这个数据结构,The Art of Multiprocessor Programming有专门的一章内容:)

  10. 楼主你好!
    我们最近遇到队列的问题,碰到楼主的文章,真是救星啊。
    不过使用楼主说的这种算法有个问题,我们的机器是64位的,指针大小也都是64位的,不知道会不会影响?
    我们在测试过程发现了问题:数据量比较大的时候就会core。现在就是怀疑指针的赋值操作是不是原子的
    还有个问题请教,能不能测试指针的操作是否原子性的,怎么测试?
    谢谢!

    1. 是C/C++还是Java?前者的话可以用GCC提供的原子操作,Java的话有专门的原子操作API。请参考这篇文章:http://www.parallellabs.com/2010/04/15/atomic-operation-in-multithreaded-application/

  11. hi,请问下,上面代码的dequeue出队的是head的next节点上的数据,为何free的却是head指向的节点?即上面代码的L38和L41。虽然这样理论上也没啥影响,依然可以获得所有入队的数据,但是总感觉好奇怪啊,为啥不能直接free掉head的next节点呢?还是我理解有误?请帮忙回复下,谢谢。

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注