博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
生产者/消费者模式(二)
阅读量:5260 次
发布时间:2019-06-14

本文共 6928 字,大约阅读时间需要 23 分钟。

  我们知道,对于一个队列而言,最主要的两个操作是添加元素(Add)和获取/删除元素(Get),中实现了一个linux下通用的阻塞队列BlockQueue,通过代码可以看出,为了保证在多线程环境下安全正确的运行,BlockQueue定义中的几个关键函数都进行了加锁保护,而锁住的对象是整个队列,加锁粒度较大。当交易量增大,线程数增长时,会对性能产生一定的影响:同一时刻只能有一个线程添加或获取元素,而其他线程都需要等待。

  队列的特性在于它只允许在表的头部(head)进行删除操作,而在表的尾部(tail)进行插入操作,根据这个特性,我们可以认为这两个操作其实是不冲突的,毕竟一个在头一个在尾,那么如果我们使用两个锁,head lock和tail lock,分别在处理插入和删除时对队列进行加锁,使插入和删除操作互不影响,而不是两个操作使用同一个锁,锁住整个队列,这样,同一时刻可以有两个线程对队列分别进行插入和删除操作,可以使性能得到一定提高。分析代码中的Add和Get函数,发现在向空队列中插入第一个元素和获取(删除)最后一个元素时,需要同时操作head和tail指针,按照之前的说法,就要同时获取队列的head lock和tail lock,这里使得程序更加复杂,因为如果加锁顺序错误,很容易出现,而即便能够写出正确的代码,每次操作进行两次加锁解锁,同样会使性能下降。

  那么如何才能像上文那样使用head lock和tail lock来提高程序的性能呢,其实这个算法已经被实现,它很巧妙的在队列构造时,将head和tail指针指向了同一个data域为空的头结点,根据此算法,对上文中的BlockQueue进行修改:

1 template
2 class BlockQueue 3 { 4 public: 5 BlockQueue(); 6 ~BlockQueue(); 7 8 void Add(T *pData); 9 T *Get(int timeout=0); 10 void SetBlockFlag(int nBlockFlag); 11 private: 12 typedef struct _Node 13 { 14 T * m_pData; 15 struct _Node * m_pNext; 16 } Node; 17 Node * m_pHead; 18 Node * m_pTail; 19 20 bool m_bBlockFlag; //是否阻塞 21 CMutex m_cHeadLock; //头部锁 22 CMutex m_cTailLock; //尾部锁 23 CCond m_condQueue; 24 }; 25 26 template
27 BlockQueue
::BlockQueue() 28 { 29 Node *pTrivial = new Node; 30 pTrivial->m_pData = NULL; 31 pTrivial->m_pNext = NULL; 32 m_pHead = m_pTail = pTrivial; 33 34 m_bBlockFlag = false; 35 } 36 37 template
38 void BlockQueue
::SetBlockFlag() 39 { 40 m_bBlockFlag = true; 41 } 42 43 template
44 BlockQueue
::~BlockQueue() 45 { 46 Node *pTmp = NULL; 47 while (m_pHead != NULL) 48 { 49 pTmp = m_pHead; 50 m_pHead = m_pHead->m_pNext; 51 delete pTmp; 52 pTmp = NULL; 53 } 54 m_pTail = NULL; 55 } 56 57 template
58 void BlockQueue
::Add(T *pData) 59 { 60 Node *pTmp = new Node; 61 pTmp->m_pData = pData; 62 pTmp->m_pNext = NULL; 63 64 m_cTailLock.EnterMutex(); 65 m_pTail->m_pNext = pTmp; 66 m_pTail = pTmp; 67 68 if(m_bBlockFlag) 69 m_condQueue.Signal(); 70 71 m_cTailLock.LeaveMutex(); 72 } 73 74 template
75 T *BlockQueue
::Get(int timeout) 76 { 77 m_cHeadLock.EnterMutex(); 78 while (m_pHead->m_pNext == NULL) 79 { 80 if (!m_bBlockFlag) 81 { 82 m_cHeadLock.LeaveMutex(); 83 return NULL; 84 } 85 else 86 { 87 if (m_condQueue.WaitLock(m_cHeadLock.GetMutex(), timeout) == 1) 88 { 89 m_cHeadLock.LeaveMutex(); 90 return NULL; 91 } 92 } 93 } 94 95 Node * pTmp = m_pHead; 96 Node * pNewHead = pTmp->next; 97 T *pTmpData = pNewHead->m_pData; 98 m_pHead = pNewHead; 99 100 m_cHeadLock.LeaveMutex();101 102 delete pTmp;103 pTmp = NULL;104 105 return pTmpData;106 }

  与之前版本的代码进行对比:

  • 在BlockQueue的声明中,使用了两个锁,就是上面提到的head lock和tail lock,分别对Add和Get两个操作进行同步。
  • 删除了m_nCount成员。在更新BlockQueue中元素个数时,Add和Get函数都会对m_nCount进行操作,而Add和Get函数分别用了两个不同的锁来进行同步,多线程环境下,如果同时有两个线程对队列分别进行Add和Get操作,无法保证m_nCount更新的原子性,会导致m_nCount的值是不正确的。如果再为m_nCount单独添加一个锁,更新时保证原子性,那么每次操作都会进行两次加锁解锁,性能又会下降。
  • BlockQueue构造函数中,创建了一个data元素为空的头结点,并让头指针和尾指针都指向这个它,这个头结点使得队列的Add和Get操作相互独立起来:
  1. 向队列中Add元素时,将新的结点添加到队列尾部,并且更新tail指针即可,不再对head指针进行操作;
  2. 从队列中Get元素时,每次返回的并不是头结点指向的data元素,而是头结点的m_pNext指针的data元素,更新时只需要将head指针指向下一个结点,并将头结点释放;
  • 某一线程从队列中Get元素时,如果队列为空,那么需要条件变量将此线程阻塞,队列不为空将线程唤醒,这里注意队列判空的条件是头结点的m_pNext域不为空,配合条件变量使用的锁是head lock,当另一线程向队列中Add元素时,会将阻塞的线程唤醒。

  用图来描述描述队列的创建,添加元素和删除元素:

    

 

  像前一篇文章中的代码之初一样,如果设置了队列是阻塞的,那么只要Add操作,那么就发送信号,将阻塞在Get操作的线程唤醒。前面说过m_nCount在读写线程中不能保留原子性,那么如何在队列为空时才发送信号唤醒阻塞线程,首先想到根据两个指针的关系,将Add和Get函数定义如下:

1 template
2 void BlockQueue
::Add(T *pData) 3 { 4 Node *pTmp = new Node; 5 pTmp->m_pData = pData; 6 pTmp->m_pNext = NULL; 7 8 m_cTailLock.EnterMutex(); 9 bool bEmpty = m_pHead == m_pTail; //队列是否为空10 m_pTail->m_pNext = pTmp;11 m_pTail = pTmp;12 13 if(m_bBlockFlag && bEmpty) //队列为空发送信号14 m_condQueue.Signal();15 16 m_cTailLock.LeaveMutex();17 }18 19 template
20 T *BlockQueue
::Get(int timeout)21 {22 m_cHeadLock.EnterMutex();23 while (m_pHead == m_pTail) //队列为空,条件也可改为 while(m_pHead->m_pNext == NULL)24 {25 if (!m_bBlockFlag)26 {27 m_cHeadLock.LeaveMutex();28 return NULL;29 }30 else31 {32 if (m_condQueue.WaitLock(m_cHeadLock.GetMutex(), timeout) == 1)33 {34 m_cHeadLock.LeaveMutex();35 return NULL;36 }37 }38 }39 40 Node * pTmp = m_pHead;41 Node * pNewHead = pTmp->next;42 T *pTmpData = pNewHead->m_pData;43 m_pHead = pNewHead;44 45 m_cHeadLock.LeaveMutex();46 47 delete pTmp;48 pTmp = NULL;49 50 return pTmpData;51 }

  试想这种情况:在一读一写两个线程环境中,某一时刻,队列已经为空(假设为阻塞模式),此时读线程调用Get操作,发现队列为空,m_pHead == m_pTail,那么准备在条件变量上等待,同一时刻写线程调用Add操作,此时队列为空,那么线程会发送信号,进行唤醒。注意此时如果写线程运行的快(两个线程并不互斥),或者说单核CPU中,写线程获得的时间片稍长,那么有可能在读线程进行等待之前就发送了信号,即line14在line32之前运行,此时这一唤醒动作会被忽略掉,写线程更新m_pTail指针后,调用结束,之后再次调用Add,由于此时队列非空,所以不在发送信号,这样的话会导致读线程一直阻塞。如果像前一篇文章一样,使用一个计数器来表示当前阻塞在Get操作上的线程个数,在它的值大于零时进行激活,这样处理是否行之有效呢,根据前一篇文章的方法,修改Add和Get函数:

1 template
2 void BlockQueue
::Add(T *pData) 3 { 4 Node *pTmp = new Node; 5 pTmp->m_pData = pData; 6 pTmp->m_pNext = NULL; 7 8 m_cTailLock.EnterMutex(); 9 m_pTail->m_pNext = pTmp;10 m_pTail = pTmp;11 12 if(m_bBlockFlag && m_nBlockCnt > 0) //当有线程阻塞时,发送信号进行唤醒13 m_condQueue.Signal();14 15 m_cTailLock.LeaveMutex();16 }17 18 template
19 T *BlockQueue
::Get(int timeout)20 {21 m_cHeadLock.EnterMutex();22 while (m_pHead == m_pTail) //队列为空,条件也可改为 while(m_pHead->m_pNext == NULL)23 {24 if (!m_bBlockFlag)25 {26 m_cHeadLock.LeaveMutex();27 return NULL;28 }29 else30 {31 m_nBlockCnt++;32 if (m_condQueue.WaitLock(m_cHeadLock.GetMutex(), timeout) == 1)33 {34 m_cHeadLock.LeaveMutex();35 m_nBlockCnt--;36 return NULL;37 }38 m_nBlockCnt--;39 }40 }41 42 Node * pTmp = m_pHead;43 Node * pNewHead = pTmp->next;44 T *pTmpData = pNewHead->m_pData;45 m_pHead = pNewHead;46 47 m_cHeadLock.LeaveMutex();48 49 delete pTmp;50 pTmp = NULL;51 52 return pTmpData;53 }

  进行类似的分析,队列为空,读线程调用Get函数时阻塞,此时切换到写线程调用Add函数,那么就可能在读线程Get函数中阻塞之前就发送了信号,即line13在line32之前运行了,那么这一唤醒动作会被忽略掉,而此时读线程阻塞在line32,所以m_nBlockCnt仍然大于零,所以之后再次写线程调用Add函数时,仍然会将阻塞的读线程唤醒,可以通过这种方式,提高Signal函数的有效率。

  未完待续……

posted on
2014-11-19 16:18 阅读(
...) 评论(
...)

转载于:https://www.cnblogs.com/Tour/p/4108521.html

你可能感兴趣的文章
mysql-5.7 innodb 的并行任务调度详解
查看>>
shell脚本
查看>>
Upload Image to .NET Core 2.1 API
查看>>
Js时间处理
查看>>
Java项目xml相关配置
查看>>
三维变换概述
查看>>
vue route 跳转
查看>>
【雷电】源代码分析(二)-- 进入游戏攻击
查看>>
Entityframework:“System.Data.Entity.Internal.AppConfig”的类型初始值设定项引发异常。...
查看>>
Linux中防火墙centos
查看>>
mysql新建用户,用户授权,删除用户,修改密码
查看>>
FancyCoverFlow
查看>>
JS博客
查看>>
如何设置映射网络驱动器的具体步骤和方法
查看>>
ASP.NET WebApi 基于OAuth2.0实现Token签名认证
查看>>
283. Move Zeroes把零放在最后面
查看>>
Visual Studio Code 打开.py代码报Linter pylint is not installed解决办法
查看>>
Python 数据类型
查看>>
S5PV210根文件系统的制作(一)
查看>>
centos下同时启动多个tomcat
查看>>