我们知道,对于一个队列而言,最主要的两个操作是添加元素(Add)和获取/删除元素(Get),中实现了一个linux下通用的阻塞队列BlockQueue,通过代码可以看出,为了保证在多线程环境下安全正确的运行,BlockQueue定义中的几个关键函数都进行了加锁保护,而锁住的对象是整个队列,加锁粒度较大。当交易量增大,线程数增长时,会对性能产生一定的影响:同一时刻只能有一个线程添加或获取元素,而其他线程都需要等待。
队列的特性在于它只允许在表的头部(head)进行删除操作,而在表的尾部(tail)进行插入操作,根据这个特性,我们可以认为这两个操作其实是不冲突的,毕竟一个在头一个在尾,那么如果我们使用两个锁,head lock和tail lock,分别在处理插入和删除时对队列进行加锁,使插入和删除操作互不影响,而不是两个操作使用同一个锁,锁住整个队列,这样,同一时刻可以有两个线程对队列分别进行插入和删除操作,可以使性能得到一定提高。分析代码中的Add和Get函数,发现在向空队列中插入第一个元素和获取(删除)最后一个元素时,需要同时操作head和tail指针,按照之前的说法,就要同时获取队列的head lock和tail lock,这里使得程序更加复杂,因为如果加锁顺序错误,很容易出现,而即便能够写出正确的代码,每次操作进行两次加锁解锁,同样会使性能下降。
那么如何才能像上文那样使用head lock和tail lock来提高程序的性能呢,其实这个算法已经被实现,它很巧妙的在队列构造时,将head和tail指针指向了同一个data域为空的头结点,根据此算法,对上文中的BlockQueue进行修改:
1 template2 class BlockQueue 3 { 4 public: 5 BlockQueue(); 6 ~BlockQueue(); 7 8 void Add(T *pData); 9 T *Get(int timeout=0); 10 void SetBlockFlag(int nBlockFlag); 11 private: 12 typedef struct _Node 13 { 14 T * m_pData; 15 struct _Node * m_pNext; 16 } Node; 17 Node * m_pHead; 18 Node * m_pTail; 19 20 bool m_bBlockFlag; //是否阻塞 21 CMutex m_cHeadLock; //头部锁 22 CMutex m_cTailLock; //尾部锁 23 CCond m_condQueue; 24 }; 25 26 template 27 BlockQueue ::BlockQueue() 28 { 29 Node *pTrivial = new Node; 30 pTrivial->m_pData = NULL; 31 pTrivial->m_pNext = NULL; 32 m_pHead = m_pTail = pTrivial; 33 34 m_bBlockFlag = false; 35 } 36 37 template 38 void BlockQueue ::SetBlockFlag() 39 { 40 m_bBlockFlag = true; 41 } 42 43 template 44 BlockQueue ::~BlockQueue() 45 { 46 Node *pTmp = NULL; 47 while (m_pHead != NULL) 48 { 49 pTmp = m_pHead; 50 m_pHead = m_pHead->m_pNext; 51 delete pTmp; 52 pTmp = NULL; 53 } 54 m_pTail = NULL; 55 } 56 57 template 58 void BlockQueue ::Add(T *pData) 59 { 60 Node *pTmp = new Node; 61 pTmp->m_pData = pData; 62 pTmp->m_pNext = NULL; 63 64 m_cTailLock.EnterMutex(); 65 m_pTail->m_pNext = pTmp; 66 m_pTail = pTmp; 67 68 if(m_bBlockFlag) 69 m_condQueue.Signal(); 70 71 m_cTailLock.LeaveMutex(); 72 } 73 74 template 75 T *BlockQueue ::Get(int timeout) 76 { 77 m_cHeadLock.EnterMutex(); 78 while (m_pHead->m_pNext == NULL) 79 { 80 if (!m_bBlockFlag) 81 { 82 m_cHeadLock.LeaveMutex(); 83 return NULL; 84 } 85 else 86 { 87 if (m_condQueue.WaitLock(m_cHeadLock.GetMutex(), timeout) == 1) 88 { 89 m_cHeadLock.LeaveMutex(); 90 return NULL; 91 } 92 } 93 } 94 95 Node * pTmp = m_pHead; 96 Node * pNewHead = pTmp->next; 97 T *pTmpData = pNewHead->m_pData; 98 m_pHead = pNewHead; 99 100 m_cHeadLock.LeaveMutex();101 102 delete pTmp;103 pTmp = NULL;104 105 return pTmpData;106 }
与之前版本的代码进行对比:
- 在BlockQueue的声明中,使用了两个锁,就是上面提到的head lock和tail lock,分别对Add和Get两个操作进行同步。
- 删除了m_nCount成员。在更新BlockQueue中元素个数时,Add和Get函数都会对m_nCount进行操作,而Add和Get函数分别用了两个不同的锁来进行同步,多线程环境下,如果同时有两个线程对队列分别进行Add和Get操作,无法保证m_nCount更新的原子性,会导致m_nCount的值是不正确的。如果再为m_nCount单独添加一个锁,更新时保证原子性,那么每次操作都会进行两次加锁解锁,性能又会下降。
- BlockQueue构造函数中,创建了一个data元素为空的头结点,并让头指针和尾指针都指向这个它,这个头结点使得队列的Add和Get操作相互独立起来:
- 向队列中Add元素时,将新的结点添加到队列尾部,并且更新tail指针即可,不再对head指针进行操作;
- 从队列中Get元素时,每次返回的并不是头结点指向的data元素,而是头结点的m_pNext指针的data元素,更新时只需要将head指针指向下一个结点,并将头结点释放;
- 某一线程从队列中Get元素时,如果队列为空,那么需要条件变量将此线程阻塞,队列不为空将线程唤醒,这里注意队列判空的条件是头结点的m_pNext域不为空,配合条件变量使用的锁是head lock,当另一线程向队列中Add元素时,会将阻塞的线程唤醒。
用图来描述描述队列的创建,添加元素和删除元素:
像前一篇文章中的代码之初一样,如果设置了队列是阻塞的,那么只要Add操作,那么就发送信号,将阻塞在Get操作的线程唤醒。前面说过m_nCount在读写线程中不能保留原子性,那么如何在队列为空时才发送信号唤醒阻塞线程,首先想到根据两个指针的关系,将Add和Get函数定义如下:
1 template2 void BlockQueue ::Add(T *pData) 3 { 4 Node *pTmp = new Node; 5 pTmp->m_pData = pData; 6 pTmp->m_pNext = NULL; 7 8 m_cTailLock.EnterMutex(); 9 bool bEmpty = m_pHead == m_pTail; //队列是否为空10 m_pTail->m_pNext = pTmp;11 m_pTail = pTmp;12 13 if(m_bBlockFlag && bEmpty) //队列为空发送信号14 m_condQueue.Signal();15 16 m_cTailLock.LeaveMutex();17 }18 19 template 20 T *BlockQueue ::Get(int timeout)21 {22 m_cHeadLock.EnterMutex();23 while (m_pHead == m_pTail) //队列为空,条件也可改为 while(m_pHead->m_pNext == NULL)24 {25 if (!m_bBlockFlag)26 {27 m_cHeadLock.LeaveMutex();28 return NULL;29 }30 else31 {32 if (m_condQueue.WaitLock(m_cHeadLock.GetMutex(), timeout) == 1)33 {34 m_cHeadLock.LeaveMutex();35 return NULL;36 }37 }38 }39 40 Node * pTmp = m_pHead;41 Node * pNewHead = pTmp->next;42 T *pTmpData = pNewHead->m_pData;43 m_pHead = pNewHead;44 45 m_cHeadLock.LeaveMutex();46 47 delete pTmp;48 pTmp = NULL;49 50 return pTmpData;51 }
试想这种情况:在一读一写两个线程环境中,某一时刻,队列已经为空(假设为阻塞模式),此时读线程调用Get操作,发现队列为空,m_pHead == m_pTail,那么准备在条件变量上等待,同一时刻写线程调用Add操作,此时队列为空,那么线程会发送信号,进行唤醒。注意此时如果写线程运行的快(两个线程并不互斥),或者说单核CPU中,写线程获得的时间片稍长,那么有可能在读线程进行等待之前就发送了信号,即line14在line32之前运行,此时这一唤醒动作会被忽略掉,写线程更新m_pTail指针后,调用结束,之后再次调用Add,由于此时队列非空,所以不在发送信号,这样的话会导致读线程一直阻塞。如果像前一篇文章一样,使用一个计数器来表示当前阻塞在Get操作上的线程个数,在它的值大于零时进行激活,这样处理是否行之有效呢,根据前一篇文章的方法,修改Add和Get函数:
1 template2 void BlockQueue ::Add(T *pData) 3 { 4 Node *pTmp = new Node; 5 pTmp->m_pData = pData; 6 pTmp->m_pNext = NULL; 7 8 m_cTailLock.EnterMutex(); 9 m_pTail->m_pNext = pTmp;10 m_pTail = pTmp;11 12 if(m_bBlockFlag && m_nBlockCnt > 0) //当有线程阻塞时,发送信号进行唤醒13 m_condQueue.Signal();14 15 m_cTailLock.LeaveMutex();16 }17 18 template 19 T *BlockQueue ::Get(int timeout)20 {21 m_cHeadLock.EnterMutex();22 while (m_pHead == m_pTail) //队列为空,条件也可改为 while(m_pHead->m_pNext == NULL)23 {24 if (!m_bBlockFlag)25 {26 m_cHeadLock.LeaveMutex();27 return NULL;28 }29 else30 {31 m_nBlockCnt++;32 if (m_condQueue.WaitLock(m_cHeadLock.GetMutex(), timeout) == 1)33 {34 m_cHeadLock.LeaveMutex();35 m_nBlockCnt--;36 return NULL;37 }38 m_nBlockCnt--;39 }40 }41 42 Node * pTmp = m_pHead;43 Node * pNewHead = pTmp->next;44 T *pTmpData = pNewHead->m_pData;45 m_pHead = pNewHead;46 47 m_cHeadLock.LeaveMutex();48 49 delete pTmp;50 pTmp = NULL;51 52 return pTmpData;53 }
进行类似的分析,队列为空,读线程调用Get函数时阻塞,此时切换到写线程调用Add函数,那么就可能在读线程Get函数中阻塞之前就发送了信号,即line13在line32之前运行了,那么这一唤醒动作会被忽略掉,而此时读线程阻塞在line32,所以m_nBlockCnt仍然大于零,所以之后再次写线程调用Add函数时,仍然会将阻塞的读线程唤醒,可以通过这种方式,提高Signal函数的有效率。
未完待续……