本文最后更新于:2024年5月7日 下午

之前介绍过 生产者、消费者模式,是一种常用的多线程并发设计模式,本文记录 C++ 实现的过程。

生产者消费者模式

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。

生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。

该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

根据生产者和消费者数量的多少,程序复杂程度也不同,可以分为 :单生产者-单消费者模型单生产者-多消费者模型多生产者-单消费者模型多生产者-多消费者模型

单生产者-单消费者模型

单生产者-单消费者模型中只有一个生产者和一个消费者,生产者不停地往产品库中放入产品,消费者则从产品库中取走产品,产品库容积有限制,只能容纳一定数目的产品,如果生产者生产产品的速度过快,则需要等待消费者取走产品之后,产品库不为空才能继续往产品库中放置新的产品,相反,如果消费者取走产品的速度过快,则可能面临产品库中没有产品可使用的情况,此时需要等待生产者放入一个产品后,消费者才能继续工作。

  • C++11 实现单生产者单消费者模型的代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>

static const int repository_size = 10;//循环队列的大小
static const int item_total = 20;//要生产的产品数目

std::mutex mtx;//互斥量,保护产品缓冲区

std::condition_variable repo_not_full;//条件变量指示产品缓冲区不满
std::condition_variable repo_not_empty;//条件变量指示产品缓冲区不为空,就是缓冲区有产品

int item_buffer[repository_size];

static std::size_t read_position = 0;//消费者读取产品的位置
static std::size_t write_position = 0;//生产者写入产品的位置

std::chrono::seconds t(1);//a new feature of c++ 11 standard

void produce_item(int i)
{
std::unique_lock<std::mutex> lck(mtx);
while (((write_position + 1) % repository_size) == read_position)
{
std::cout << "Producer is waiting for an empty slot..." << std::endl;
repo_not_full.wait(lck);// 生产者等待"产品库缓冲区不为满"这一条件发生.
} //当缓冲区满了之后我们就不能添加产品了

item_buffer[write_position] = i;//写入产品
write_position++;

if (write_position == repository_size)//写入的位置如果在队列最后则重新设置
{
write_position = 0;
}

repo_not_empty.notify_all();//通知消费者产品库不为空

//lck.unlock();//解锁
}

int consume_item()
{
int data;
std::unique_lock<std::mutex> lck(mtx);
while (write_position == read_position)
{
std::cout << "Consumer is waiting for items..." << std::endl;
repo_not_empty.wait(lck);// 消费者等待"产品库缓冲区不为空"这一条件发生.
}

data = item_buffer[read_position];//读取产品
read_position++;

if (read_position >= repository_size)
{
read_position = 0;
}

repo_not_full.notify_all();//通知产品库不满
//lck.unlock();

return data;
}

void Producer_thread()
{
for (int i = 1; i <= item_total; ++i)
{
//std::this_thread::sleep_for(t);
std::cout << "生产者生产第" << i << "个产品" << std::endl;
produce_item(i);
}
}

void Consumer_thread()
{
static int cnt = 0;
while (1)
{
//std::this_thread::sleep_for(t);
int item = consume_item();
std::cout << "消费者消费第" << item << "个产品" << std::endl;

if (++cnt == item_total)
break;
}
}

int main()
{
std::thread producer(Producer_thread); // 创建生产者线程.
std::thread consumer(Consumer_thread); // 创建消费之线程.
producer.join();
consumer.join();
}

单生产者-多消费者模型

与单生产者和单消费者模型不同的是,单生产者-多消费者模型中可以允许多个消费者同时从产品库中取走产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护消费者取走产品的计数器。

  • 代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>
static const int repository_size = 10;//循环队列的大小
static const int item_total = 20;//要生产的产品数目

std::mutex mtx;//互斥量,保护产品缓冲区
std::mutex mtx_counter;//互斥量,保护产品计数器

std::condition_variable repo_not_full;//条件变量指示产品缓冲区不满
std::condition_variable repo_not_empty;//条件变量指示产品缓冲区不为空,就是缓冲区有产品

int item_buffer[repository_size];//产品缓冲区,这里使用了一个循环队列

static std::size_t read_position = 0;//消费者读取产品的位置
static std::size_t write_position = 0;//生产者写入产品的位置

static std::size_t item_counter = 0;//消费者消费产品计数器

std::chrono::seconds t(1);//a new feature of c++ 11 standard

void produce_item(int i)
{
std::unique_lock<std::mutex> lck(mtx);
//item buffer is full, just wait here.
while (((write_position + 1) % repository_size) == read_position)
{
std::cout << "Producer is waiting for an empty slot..." << std::endl;
repo_not_full.wait(lck);// 生产者等待"产品库缓冲区不为满"这一条件发生.
} //当缓冲区满了之后我们就不能添加产品了

item_buffer[write_position] = i;//写入产品
write_position++;

if (write_position == repository_size)//写入的位置如果在队列最后则重新设置
{
write_position = 0;
}

repo_not_empty.notify_all();//通知消费者产品库不为空

lck.unlock();//解锁
}

int consume_item()
{
int data;
std::unique_lock<std::mutex> lck(mtx);
// item buffer is empty, just wait here.
while (write_position == read_position)
{
std::cout << "Consumer is waiting for items..." << std::endl;
repo_not_empty.wait(lck);// 消费者等待"产品库缓冲区不为空"这一条件发生.
}

data = item_buffer[read_position];//读取产品
read_position++;

if (read_position >= repository_size)
{
read_position = 0;
}

repo_not_full.notify_all();//通知产品库不满
lck.unlock();

return data;
}

void Producer_thread()
{
for (int i = 1; i <= item_total; ++i)
{
//std::this_thread::sleep_for(t);
std::cout << "生产者生产第" << i << "个产品" << std::endl;
produce_item(i);
}
}

void Consumer_thread()
{
bool read_to_exit = false;
while (1)
{
std::this_thread::sleep_for(t);
std::unique_lock<std::mutex> lck(mtx_counter);
if (item_counter < item_total)
{
int item = consume_item();
++item_counter;
std::cout << "消费者线程" << std::this_thread::get_id()
<< "消费第" << item << "个产品" << std::endl;
}
else
{
read_to_exit = true;
}
if (read_to_exit == true)
break;
}

std::cout << "Consumer thread " << std::this_thread::get_id()
<< " is exiting..." << std::endl;

}

int main()
{
std::thread producer(Producer_thread); // 创建生产者线程.
std::vector<std::thread> thread_vector;
for (int i = 0; i != 5; ++i)
{
thread_vector.push_back(std::thread(Consumer_thread));// 创建消费者线程.
}

producer.join();
for (auto &thr : thread_vector)
{
thr.join();
}

}

多生产者-单消费者模型

与单生产者和单消费者模型不同的是,多生产者-单消费者模型中可以允许多个生产者同时向产品库中放入产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护生产者放入产品的计数器。

  • 代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>

static const int repository_size = 10;//循环队列的大小
static const int item_total = 20;//要生产的产品数目

std::mutex mtx;//互斥量,保护产品缓冲区
std::mutex mtx_counter;

std::condition_variable repo_not_full;//条件变量指示产品缓冲区不满
std::condition_variable repo_not_empty;//条件变量指示产品缓冲区不为空,就是缓冲区有产品

int item_buffer[repository_size];//产品缓冲区,这里使用了一个循环队列

static std::size_t read_position = 0;//消费者读取产品的位置
static std::size_t write_position = 0;//生产者写入产品的位置

static std::size_t item_counter = 0;//计数器

std::chrono::seconds t(1);//a new feature of c++ 11 standard

void produce_item(int i)
{
std::unique_lock<std::mutex> lck(mtx);
// item buffer is full, just wait here.
while (((write_position + 1) % repository_size) == read_position)
{
std::cout << "Producer is waiting for an empty slot..." << std::endl;
repo_not_full.wait(lck);// 生产者等待"产品库缓冲区不为满"这一条件发生.
} //当缓冲区满了之后我们就不能添加产品了

item_buffer[write_position] = i;//写入产品
write_position++;

if (write_position == repository_size)//写入的位置如果在队列最后则重新设置
{
write_position = 0;
}

repo_not_empty.notify_all();//通知消费者产品库不为空

lck.unlock();//解锁
}

int consume_item()
{
int data;
std::unique_lock<std::mutex> lck(mtx);
// item buffer is empty, just wait here.
while (write_position == read_position)
{
std::cout << "Consumer is waiting for items..." << std::endl;
repo_not_empty.wait(lck);// 消费者等待"产品库缓冲区不为空"这一条件发生.
}

data = item_buffer[read_position];//读取产品
read_position++;

if (read_position >= repository_size)
{
read_position = 0;
}

repo_not_full.notify_all();//通知产品库不满
lck.unlock();

return data;
}

void Producer_thread()
{
bool read_to_exit = false;
while (1)
{
std::unique_lock<std::mutex> lck(mtx_counter);
if (item_counter < item_total)
{
++item_counter;
produce_item(item_counter);
std::cout << "生产者线程 " << std::this_thread::get_id()
<< "生产第 " << item_counter << "个产品" << std::endl;
}
else
{
read_to_exit = true;
}

if (read_to_exit == true)
break;
}

std::cout << "Producer thread " << std::this_thread::get_id()
<< " is exiting..." << std::endl;

}

void Consumer_thread()
{
static int cnt = 0;
while (1)
{
std::this_thread::sleep_for(t);
int item = consume_item();
std::cout << "消费者消费第" << item << "个产品" << std::endl;

if (++cnt == item_total)
break;
}
}

int main()
{
std::vector<std::thread> thread_vector;
for (int i = 0; i != 5; ++i)
{
thread_vector.push_back(std::thread(Producer_thread));// 创建消费者线程.
}

std::thread consumer(Consumer_thread); // 创建消费之线程.

for (auto &thr : thread_vector)
{
thr.join();
}

consumer.join();
}

多生产者-多消费者模型

该模型可以说是前面两种模型的综合,程序需要维护两个计数器,分别是生产者已生产产品的数目和消费者已取走产品的数目。另外也需要保护产品库在多个生产者和多个消费者互斥地访问。

  • 示例代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>

static const int repository_size = 10;//循环队列的大小
static const int item_total = 20;//要生产的产品数目

std::mutex mtx;//互斥量,保护产品缓冲区
std::mutex producer_count_mtx;
std::mutex consumer_count_mtx;

std::condition_variable repo_not_full;//条件变量指示产品缓冲区不满
std::condition_variable repo_not_empty;//条件变量指示产品缓冲区不为空,就是缓冲区有产品

int item_buffer[repository_size];//产品缓冲区,这里使用了一个循环队列

static std::size_t read_position = 0;//消费者读取产品的位置
static std::size_t write_position = 0;//生产者写入产品的位置

static size_t produced_item_counter = 0;
static size_t consumed_item_counter = 0;

std::chrono::seconds t(1);//a new feature of c++ 11 standard
std::chrono::microseconds t1(1000);

void produce_item(int i)
{
std::unique_lock<std::mutex> lck(mtx);
// item buffer is full, just wait here.
while (((write_position + 1) % repository_size) == read_position)
{
std::cout << "Producer is waiting for an empty slot..." << std::endl;
repo_not_full.wait(lck);// 生产者等待"产品库缓冲区不为满"这一条件发生.
} //当缓冲区满了之后我们就不能添加产品了

item_buffer[write_position] = i;//写入产品
write_position++;

if (write_position == repository_size)//写入的位置如果在队列最后则重新设置
{
write_position = 0;
}

repo_not_empty.notify_all();//通知消费者产品库不为空

lck.unlock();//解锁
}

int consume_item()
{
int data;
std::unique_lock<std::mutex> lck(mtx);
// item buffer is empty, just wait here.
while (write_position == read_position)
{
std::cout << "Consumer is waiting for items..." << std::endl;
repo_not_empty.wait(lck);// 消费者等待"产品库缓冲区不为空"这一条件发生.
}

data = item_buffer[read_position];//读取产品
read_position++;

if (read_position >= repository_size)
{
read_position = 0;
}

repo_not_full.notify_all();//通知产品库不满
lck.unlock();

return data;
}

void Producer_thread()
{
bool ready_to_exit = false;
while (1)
{
//std::this_thread::sleep_for(t);
std::unique_lock<std::mutex> lock(producer_count_mtx);
if (produced_item_counter < item_total)
{
++produced_item_counter;
produce_item(produced_item_counter);
std::cout << "生产者线程 " << std::this_thread::get_id()
<< "生产第 " << produced_item_counter << "个产品" << std::endl;
}
else
{
ready_to_exit = true;
}

lock.unlock();

if (ready_to_exit == true)
{
break;
}
}

std::cout << "Producer thread " << std::this_thread::get_id()
<< " is exiting..." << std::endl;
}

void Consumer_thread()
{
bool read_to_exit = false;
while (1)
{
std::this_thread::sleep_for(t1);
std::unique_lock<std::mutex> lck(consumer_count_mtx);
if (consumed_item_counter < item_total)
{
int item = consume_item();
++consumed_item_counter;
std::cout << "消费者线程" << std::this_thread::get_id()
<< "消费第" << item << "个产品" << std::endl;
}
else
{
read_to_exit = true;
}

if (read_to_exit == true)
{
break;
}
}

std::cout << "Consumer thread " << std::this_thread::get_id()
<< " is exiting..." << std::endl;
}

int main()
{
std::vector<std::thread> thread_vector1;
std::vector<std::thread> thread_vector2;
for (int i = 0; i != 5; ++i)
{
thread_vector1.push_back(std::thread(Producer_thread));// 创建生产者线程.
thread_vector2.push_back(std::thread(Consumer_thread));// 创建消费者线程.

}

for (auto &thr1 : thread_vector1)
{
thr1.join();
}

for (auto &thr2 : thread_vector2)
{
thr2.join();
}
}

参考资料



文章链接:
https://www.zywvvd.com/notes/coding/cpp/cpp-producer-consumer/cpp-producer-consumer/


“觉得不错的话,给点打赏吧 ୧(๑•̀⌄•́๑)૭”

微信二维码

微信支付

支付宝二维码

支付宝支付

C++ 实现多线程生产者消费者模式
https://www.zywvvd.com/notes/coding/cpp/cpp-producer-consumer/cpp-producer-consumer/
作者
Yiwei Zhang
发布于
2023年3月20日
许可协议