本文最后更新于:2024年5月7日 下午
之前介绍过 生产者、消费者模式,是一种常用的多线程并发设计模式,本文记录 C++ 实现的过程。
生产者消费者模式
生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。
生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。
该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
根据生产者和消费者数量的多少,程序复杂程度也不同,可以分为 :单生产者-单消费者模型,单生产者-多消费者模型,多生产者-单消费者模型,多生产者-多消费者模型。
单生产者-单消费者模型
单生产者-单消费者模型中只有一个生产者和一个消费者,生产者不停地往产品库中放入产品,消费者则从产品库中取走产品,产品库容积有限制,只能容纳一定数目的产品,如果生产者生产产品的速度过快,则需要等待消费者取走产品之后,产品库不为空才能继续往产品库中放置新的产品,相反,如果消费者取走产品的速度过快,则可能面临产品库中没有产品可使用的情况,此时需要等待生产者放入一个产品后,消费者才能继续工作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
| #include <iostream> #include <condition_variable> #include <mutex> #include <thread> static const int repository_size = 10; static const int item_total = 20; std::mutex mtx; std::condition_variable repo_not_full; std::condition_variable repo_not_empty; int item_buffer[repository_size]; static std::size_t read_position = 0; static std::size_t write_position = 0; std::chrono::seconds t(1); void produce_item(int i) { std::unique_lock<std::mutex> lck(mtx); while (((write_position + 1) % repository_size) == read_position) { std::cout << "Producer is waiting for an empty slot..." << std::endl; repo_not_full.wait(lck); } item_buffer[write_position] = i; write_position++; if (write_position == repository_size) { write_position = 0; } repo_not_empty.notify_all(); } int consume_item() { int data; std::unique_lock<std::mutex> lck(mtx); while (write_position == read_position) { std::cout << "Consumer is waiting for items..." << std::endl; repo_not_empty.wait(lck); } data = item_buffer[read_position]; read_position++; if (read_position >= repository_size) { read_position = 0; } repo_not_full.notify_all(); return data; } void Producer_thread() { for (int i = 1; i <= item_total; ++i) { std::cout << "生产者生产第" << i << "个产品" << std::endl; produce_item(i); } } void Consumer_thread() { static int cnt = 0; while (1) { int item = consume_item(); std::cout << "消费者消费第" << item << "个产品" << std::endl; if (++cnt == item_total) break; } } int main() { std::thread producer(Producer_thread); std::thread consumer(Consumer_thread); producer.join(); consumer.join(); }
|
单生产者-多消费者模型
与单生产者和单消费者模型不同的是,单生产者-多消费者模型中可以允许多个消费者同时从产品库中取走产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护消费者取走产品的计数器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
| #include <iostream> #include <condition_variable> #include <mutex> #include <thread> #include <vector> static const int repository_size = 10; static const int item_total = 20; std::mutex mtx; std::mutex mtx_counter; std::condition_variable repo_not_full; std::condition_variable repo_not_empty; int item_buffer[repository_size]; static std::size_t read_position = 0; static std::size_t write_position = 0; static std::size_t item_counter = 0; std::chrono::seconds t(1); void produce_item(int i) { std::unique_lock<std::mutex> lck(mtx); while (((write_position + 1) % repository_size) == read_position) { std::cout << "Producer is waiting for an empty slot..." << std::endl; repo_not_full.wait(lck); } item_buffer[write_position] = i; write_position++; if (write_position == repository_size) { write_position = 0; } repo_not_empty.notify_all(); lck.unlock(); } int consume_item() { int data; std::unique_lock<std::mutex> lck(mtx); while (write_position == read_position) { std::cout << "Consumer is waiting for items..." << std::endl; repo_not_empty.wait(lck); } data = item_buffer[read_position]; read_position++; if (read_position >= repository_size) { read_position = 0; } repo_not_full.notify_all(); lck.unlock(); return data; } void Producer_thread() { for (int i = 1; i <= item_total; ++i) { std::cout << "生产者生产第" << i << "个产品" << std::endl; produce_item(i); } } void Consumer_thread() { bool read_to_exit = false; while (1) { std::this_thread::sleep_for(t); std::unique_lock<std::mutex> lck(mtx_counter); if (item_counter < item_total) { int item = consume_item(); ++item_counter; std::cout << "消费者线程" << std::this_thread::get_id() << "消费第" << item << "个产品" << std::endl; } else { read_to_exit = true; } if (read_to_exit == true) break; } std::cout << "Consumer thread " << std::this_thread::get_id() << " is exiting..." << std::endl; } int main() { std::thread producer(Producer_thread); std::vector<std::thread> thread_vector; for (int i = 0; i != 5; ++i) { thread_vector.push_back(std::thread(Consumer_thread)); } producer.join(); for (auto &thr : thread_vector) { thr.join(); } }
|
多生产者-单消费者模型
与单生产者和单消费者模型不同的是,多生产者-单消费者模型中可以允许多个生产者同时向产品库中放入产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护生产者放入产品的计数器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
| #include <iostream> #include <condition_variable> #include <mutex> #include <thread> #include <vector> static const int repository_size = 10; static const int item_total = 20; std::mutex mtx; std::mutex mtx_counter; std::condition_variable repo_not_full; std::condition_variable repo_not_empty; int item_buffer[repository_size]; static std::size_t read_position = 0; static std::size_t write_position = 0; static std::size_t item_counter = 0; std::chrono::seconds t(1); void produce_item(int i) { std::unique_lock<std::mutex> lck(mtx); while (((write_position + 1) % repository_size) == read_position) { std::cout << "Producer is waiting for an empty slot..." << std::endl; repo_not_full.wait(lck); } item_buffer[write_position] = i; write_position++; if (write_position == repository_size) { write_position = 0; } repo_not_empty.notify_all(); lck.unlock(); } int consume_item() { int data; std::unique_lock<std::mutex> lck(mtx); while (write_position == read_position) { std::cout << "Consumer is waiting for items..." << std::endl; repo_not_empty.wait(lck); } data = item_buffer[read_position]; read_position++; if (read_position >= repository_size) { read_position = 0; } repo_not_full.notify_all(); lck.unlock(); return data; } void Producer_thread() { bool read_to_exit = false; while (1) { std::unique_lock<std::mutex> lck(mtx_counter); if (item_counter < item_total) { ++item_counter; produce_item(item_counter); std::cout << "生产者线程 " << std::this_thread::get_id() << "生产第 " << item_counter << "个产品" << std::endl; } else { read_to_exit = true; } if (read_to_exit == true) break; } std::cout << "Producer thread " << std::this_thread::get_id() << " is exiting..." << std::endl; } void Consumer_thread() { static int cnt = 0; while (1) { std::this_thread::sleep_for(t); int item = consume_item(); std::cout << "消费者消费第" << item << "个产品" << std::endl; if (++cnt == item_total) break; } } int main() { std::vector<std::thread> thread_vector; for (int i = 0; i != 5; ++i) { thread_vector.push_back(std::thread(Producer_thread)); } std::thread consumer(Consumer_thread); for (auto &thr : thread_vector) { thr.join(); } consumer.join(); }
|
多生产者-多消费者模型
该模型可以说是前面两种模型的综合,程序需要维护两个计数器,分别是生产者已生产产品的数目和消费者已取走产品的数目。另外也需要保护产品库在多个生产者和多个消费者互斥地访问。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
| #include <iostream> #include <condition_variable> #include <mutex> #include <thread> #include <vector> static const int repository_size = 10; static const int item_total = 20; std::mutex mtx; std::mutex producer_count_mtx; std::mutex consumer_count_mtx; std::condition_variable repo_not_full; std::condition_variable repo_not_empty; int item_buffer[repository_size]; static std::size_t read_position = 0; static std::size_t write_position = 0; static size_t produced_item_counter = 0; static size_t consumed_item_counter = 0; std::chrono::seconds t(1); std::chrono::microseconds t1(1000); void produce_item(int i) { std::unique_lock<std::mutex> lck(mtx); while (((write_position + 1) % repository_size) == read_position) { std::cout << "Producer is waiting for an empty slot..." << std::endl; repo_not_full.wait(lck); } item_buffer[write_position] = i; write_position++; if (write_position == repository_size) { write_position = 0; } repo_not_empty.notify_all(); lck.unlock(); } int consume_item() { int data; std::unique_lock<std::mutex> lck(mtx); while (write_position == read_position) { std::cout << "Consumer is waiting for items..." << std::endl; repo_not_empty.wait(lck); } data = item_buffer[read_position]; read_position++; if (read_position >= repository_size) { read_position = 0; } repo_not_full.notify_all(); lck.unlock(); return data; } void Producer_thread() { bool ready_to_exit = false; while (1) { std::unique_lock<std::mutex> lock(producer_count_mtx); if (produced_item_counter < item_total) { ++produced_item_counter; produce_item(produced_item_counter); std::cout << "生产者线程 " << std::this_thread::get_id() << "生产第 " << produced_item_counter << "个产品" << std::endl; } else { ready_to_exit = true; } lock.unlock(); if (ready_to_exit == true) { break; } } std::cout << "Producer thread " << std::this_thread::get_id() << " is exiting..." << std::endl; } void Consumer_thread() { bool read_to_exit = false; while (1) { std::this_thread::sleep_for(t1); std::unique_lock<std::mutex> lck(consumer_count_mtx); if (consumed_item_counter < item_total) { int item = consume_item(); ++consumed_item_counter; std::cout << "消费者线程" << std::this_thread::get_id() << "消费第" << item << "个产品" << std::endl; } else { read_to_exit = true; } if (read_to_exit == true) { break; } } std::cout << "Consumer thread " << std::this_thread::get_id() << " is exiting..." << std::endl; } int main() { std::vector<std::thread> thread_vector1; std::vector<std::thread> thread_vector2; for (int i = 0; i != 5; ++i) { thread_vector1.push_back(std::thread(Producer_thread)); thread_vector2.push_back(std::thread(Consumer_thread)); } for (auto &thr1 : thread_vector1) { thr1.join(); } for (auto &thr2 : thread_vector2) { thr2.join(); } }
|
参考资料
文章链接:
https://www.zywvvd.com/notes/coding/cpp/cpp-producer-consumer/cpp-producer-consumer/