C++多生产者多消费者模型

2022/6/13 1:20:11

本文主要是介绍C++多生产者多消费者模型,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

// 多生产者多消费者模型
// 需要了解以下概念
// thread 线程
// mutex 互斥锁
// atomic 原子操作
// condition_variable 条件变量

#include <iostream>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <queue>
#include <random>
#include <ctime>
#include <vector>

// 随机数生成,用于模拟生产消费的耗时
std::default_random_engine e(time(0));
std::uniform_int_distribution<int> distribution(3000, 7000);

class Factory
{
public:
    Factory(int production_target, int producer_count,
        int consumer_count, int line_capacity);

    void produce(int tag);
    void consume(int tag);
    void start_working();

private:
    bool do_produce(int tag);
    bool do_consume(int tag);

private:
    const int production_target; // 生产目标,计划生产的数目
    const int producer_count; // 生产者数量
    const int consumer_count; // 消费者数量
    const int line_capacity; // 产品暂存区最大存放量
    std::atomic<int> produced_count = 0; // 已生产产品数量
    std::atomic<int> consumed_count = 0; // 已消费产品数量
    std::atomic_flag produce_done = ATOMIC_FLAG_INIT; // 产品已经生产完毕
    std::atomic_flag consume_done = ATOMIC_FLAG_INIT; // 产品已经消费完毕
    std::queue<int> product_line; // 已生产未消费产品存放区
    std::mutex mtx; // 互斥锁
    std::condition_variable line_not_full; // 消费者消耗了一个产品
    std::condition_variable line_not_empty; // 生产者产出了一个产品
};

Factory::Factory(int production_target, int producer_count,
    int consumer_count, int line_capacity) : 
    production_target(production_target),
    producer_count(producer_count),
    consumer_count(consumer_count),
    line_capacity(line_capacity)
{
    // do nothing
}

bool Factory::do_produce(int tag)
{
    // 假设生产总是成功的
    std::this_thread::sleep_for(std::chrono::duration(
        std::chrono::milliseconds(distribution(e))));
    return true;
}

bool Factory::do_consume(int tag)
{
    // 假设消费总是成功的
    std::this_thread::sleep_for(std::chrono::duration(
        std::chrono::milliseconds(distribution(e))));
    return true;
}

void Factory::produce(int tag)
{
    while(produced_count < production_target)
    {
        std::unique_lock<std::mutex> lock(mtx);
        if(produced_count < production_target && product_line.size() < line_capacity)
        {
            int id = ++produced_count; // 提前将计数器加上,防止生产超额
            std::cout << "[P" << tag << "] prodecing " << id << "\n";
            lock.unlock(); // 具体的生产过程会比较耗时,将锁释放,使用其它线程生产/消费
            do_produce(tag); // 如果生产可能失败,则需要重新设计此处逻辑
            lock.lock();
            product_line.push(id);
            std::cout << "[P" << tag << "] prodeced " << id << "\n";
            line_not_empty.notify_one(); // 只生产了一个产品
            if(product_line.size() == line_capacity && produced_count < production_target)
            {
                std::cout << "[" << tag << "] product line full\n";
                line_not_full.wait(lock);
            }
        }
    }
    if(produce_done.test_and_set()) // 生产结束,需要唤醒所有消费者
        line_not_empty.notify_all(); // 消费者在生产线为空后一直等待
    std::cout << "producer " << tag << " done\n";
}

void Factory::consume(int tag)
{
    while(consumed_count < production_target)
    {
        std::unique_lock<std::mutex> lock(mtx);
        if(consumed_count < production_target && !product_line.empty())
        {
            int id = product_line.front();
            ++consumed_count;
            product_line.pop();
            std::cout << "[C" << tag << "] consuming " << id << "\n";
            lock.unlock();
            do_consume(tag);
            lock.lock();
            std::cout << "[C" << tag << "] consumed " << id << "\n";
            line_not_full.notify_one();
            if(product_line.empty() && consumed_count < production_target)
            {
                std::cout << "[" << tag << "] product line empty\n";
                line_not_empty.wait(lock);
            }
        }
    }
    if(consume_done.test_and_set())
        line_not_full.notify_all();
    std::cout << "consumer " << tag << " done\n";
}

void Factory::start_working()
{
    std::vector<std::thread> threads;
    for(int i = 0; i < producer_count; ++i)
        threads.emplace_back(&Factory::produce, this, i); // 使用类成员函数时需要加上this指针
    for(int i = producer_count; i < producer_count + consumer_count; ++i)
        threads.emplace_back(&Factory::consume, this, i);

    for(int i = 0; i < producer_count + consumer_count; ++i)
        threads[i].join();
}

int main()
{
    Factory factory(10, 3, 3, 3);
    factory.start_working();
    std::cout << "done\n";
}


// TODO
// 1. 为每条打印信息加上时间戳
//        chrono打印时间戳有点麻烦
// 2. 处理生产/消费失败的情形
//        使用两个变量,一个记录已生产的数量,一个记录正在生产的数量(包含已生产数量)


这篇关于C++多生产者多消费者模型的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程