Linux线程编程之生产者消费者问题
作者:Linux 发布时间:[ 2014/10/21 10:53:36 ] 推荐标签:Linux 操作系统 线程
基于上述结构,定义初始化、入队出队和显式函数:
1 //初始化循环队列
2 void InitQue(PT_QUEUE ptQue)
3 {
4 memset(ptQue, 0, sizeof(*ptQue));
5 }
6
7 //向循环队列中插入元素
8 void EnterQue(PT_QUEUE ptQue, int dwElem)
9 {
10 if(IsQueFull(ptQue))
11 {
12 printf("Elem %d cannot enter Queue %p(Full)! ", dwElem, ptQue);
13 return;
14 }
15 ptQue->aData[ptQue->dwTail]= dwElem;
16 ptQue->dwTail = (ptQue->dwTail + 1) % QUEUE_SIZE;
17 }
18
19 //从循环队列中取出元素
20 int LeaveQue(PT_QUEUE ptQue)
21 {
22 if(IsQueEmpty(ptQue))
23 {
24 printf("Queue %p is Empty! ", ptQue);
25 return -1;
26 }
27 int dwElem = ptQue->aData[ptQue->dwHead];
28 ptQue->dwHead = (ptQue->dwHead + 1) % QUEUE_SIZE;
29 return dwElem;
30 }
31
32 //从队首至队尾依次显示队中元素值
33 void DisplayQue(PT_QUEUE ptQue)
34 {
35 if(IsQueEmpty(ptQue))
36 {
37 printf("Queue %p is Empty! ", ptQue);
38 return;
39 }
40
41 printf("Queue Element: ");
42 int dwIdx = ptQue->dwHead;
43 while((dwIdx % QUEUE_SIZE) != ptQue->dwTail)
44 printf("%d ", ptQue->aData[(dwIdx++) % QUEUE_SIZE]);
45
46 printf(" ");
47 }
然后提供判空、判满、查询等辅助函数:
1 //判断循环队列是否为空
2 int IsQueEmpty(PT_QUEUE ptQue)
3 {
4 return ptQue->dwHead == ptQue->dwTail;
5 }
6
7 //判断循环队列是否为满
8 int IsQueFull(PT_QUEUE ptQue)
9 {
10 return (ptQue->dwTail + 1) % QUEUE_SIZE == ptQue->dwHead;
11 }
12
13 //获取循环队列元素数目
14 int QueDataNum(PT_QUEUE ptQue)
15 {
16 return (ptQue->dwTail - ptQue->dwHead + QUEUE_SIZE) % QUEUE_SIZE;
17 }
18
19 //获取循环队列队首位置
20 int GetQueHead(PT_QUEUE ptQue)
21 {
22 return ptQue->dwHead;
23 }
24 //获取循环队列队首元素
25 int GetQueHeadData(PT_QUEUE ptQue)
26 {
27 return ptQue->aData[ptQue->dwHead];
28 }
29 //获取循环队列队尾位置
30 int GetQueTail(PT_QUEUE ptQue)
31 {
32 return ptQue->dwTail;
33 }
后,通过QueueTest()函数来测试队列函数集:
1 static T_QUEUE gtQueue;
2 void QueueTest(void)
3 {
4 InitQue(>Queue);
5 printf("Enter Queue 1,2,3,4,5... ");
6 EnterQue(>Queue, 1);
7 EnterQue(>Queue, 2);
8 EnterQue(>Queue, 3);
9 EnterQue(>Queue, 4);
10 EnterQue(>Queue, 5);
11 printf("Queue Status: Empty(%d), Full(%d) ", IsQueEmpty(>Queue), IsQueFull(>Queue));
12 printf("Queue Elem Num: %u ", QueDataNum(>Queue));
13 printf("Head: %u(%d) ", GetQueHead(>Queue), GetQueHeadData(>Queue));
14 printf("Tail: %u ", GetQueTail(>Queue));
15 DisplayQue(>Queue);
16
17 printf(" Leave Queue... ");
18 printf("Leave %d ", LeaveQue(>Queue));
19 printf("Leave %d ", LeaveQue(>Queue));
20 printf("Leave %d ", LeaveQue(>Queue));
21 DisplayQue(>Queue);
22
23 printf(" Enter Queue 6,7... ");
24 EnterQue(>Queue, 6);
25 EnterQue(>Queue, 7);
26 DisplayQue(>Queue);
27
28 printf(" Leave Queue... ");
29 printf("Leave %d ", LeaveQue(>Queue));
30 printf("Leave %d ", LeaveQue(>Queue));
31 printf("Leave %d ", LeaveQue(>Queue));
32 DisplayQue(>Queue);
33 }
编译链接后,运行结果如下:
1 Enter Queue 1,2,3,4,5...
2 Elem 5 cannot enter Queue 0x8053f9c(Full)!
3 Queue Status: Empty(0), Full(1)
4 Queue Elem Num: 4
5 Head: 0(1)
6 Tail: 4
7 Queue Element: 1 2 3 4
8
9 Leave Queue...
10 Leave 1
11 Leave 2
12 Leave 3
13 Queue Element: 4
14
15 Enter Queue 6,7...
16 Queue Element: 4 6 7
17
18 Leave Queue...
19 Leave 4
20 Leave 6
21 Leave 7
22 Queue 0x8053f9c is Empty!
可见,队列行为完全符合期望。
二 生产者消费者问题
2.1 问题概述
本节将讨论Linux并发编程中经典的生产者/消费者(producer-consumer)问题。该问题涉及一个大小限定的有界缓冲区(bounded buffer)和两类线程或进程(生产者和消费者)。
在缓冲区中有可用空间时,一个或一组生产者(线程或进程)将创建的产品(数据条目)放入缓冲区,然后由一个或一组消费者(线程或进程)提取这些产品。产品在生产者和消费者之间通过某种类型的IPC传递。
在多个进程间共享一个公共数据缓冲区需要某种形式的共享内存区(如存储映射或共享内存),而多线程天然地共享存储空间。为简便起见,本节的讨论于多线程。
多个生产者和消费者线程的场景如下图所示:

在生产者/消费者问题中,生产者线程必须在缓冲区中有可用空间后才能向其中放置内容,否则将阻塞(进入休眠状态)直到出现下一个可用的空位置。生产者线程可使用互斥量原子性地检查缓冲区,而不受其他线程干扰。当发现缓冲区已满后,生产者阻塞自己并在缓冲区变为非满时被唤醒,这可由条件变量实现。
消费者线程必须在生产者向缓冲区中写入之后才能从中提取内容。同理,可用互斥量和条件变量以无竞争的方式等待缓冲区由空变为非空。
2.2 多线程实现
本节将采用队列模拟任务,给出生产者/消费者问题的多线程示例。
为简单起见,生产者将队列元素下标作为元素值入队,消费者使元素出队并验证元素值正确性。
首先定义一组全局数据:
1 #define PRODUCER_NUM 5 //生产者数
2 #define CONSUMER_NUM 3 //消费者数
3 #define PRD_NUM 20 //多生产的产品数
4 #define DELAY_TIME 3 //生产(或消费)任务之间的大时间间隔
5
6 #define QUEUE_SIZE (PRD_NUM+1) //队列大容纳QUEUE_SIZE-1个元素
7
8 T_QUEUE gtQueue;
9 pthread_mutex_t gtQueLock = PTHREAD_MUTEX_INITIALIZER;
10 pthread_cond_t gtPrdCond = PTHREAD_COND_INITIALIZER; //Full->Not Full
11 pthread_cond_t gtCsmCond = PTHREAD_COND_INITIALIZER; //Empty->Not Empty
此处,QUEUE_SIZE按照产品数重新定义。互斥量gtQueLock用于保护全局队列gtQueue和两个条件变量。条件变量gtPrdCond用于队列由满变为非满时通知(唤醒)生产者线程,而gtCsmCond用于队列由空变为非空时通知消费者线程。
若首先创建并启动生产者线程,再立即或稍候创建消费者线程,则不需要条件变量gtCsmCond(队列中始终有产品)。本节为全面展现线程间的同步,约定消费者线程创建和启动完毕之后,再创建生产者线程。这样,所有消费者线程将会阻塞,在条件变量gtCsmCond的线程列表里条件状态的改变。生产者线程并开始“产出”后,广播通知所有消费者线程。反之,因为消费者线程不会全部阻塞,可单播唤醒某个消费者。

sales@spasvo.com