21xrx.com
2025-03-30 22:18:54 Sunday
文章检索 我的文章 写文章
C++实现生产者消费者模式
2023-07-02 12:54:57 深夜i     14     0
C++ 生产者 消费者 模式 实现

生产者消费者模式是一种常见的并发编程模式,它允许多个线程同时操作共享的数据结构。在这种模式中,有两种角色:生产者和消费者。生产者负责生产数据,并将其写入共享的数据结构中;消费者则负责消费这些数据,并从共享的数据结构中读取它们。

C++是一种流行的编程语言,支持多线程编程,并提供了各种工具和库来实现生产者消费者模式。下面介绍一些基本的C++代码实现。

使用互斥锁和条件变量

在C++中,可以使用互斥锁和条件变量来实现生产者消费者模式。互斥锁用于保护共享数据结构,只有获得锁的线程才能访问它。条件变量用于通知其他线程需要等待某个条件满足。

下面是一个简单的例子,使用互斥锁和条件变量实现一个生产者消费者队列:

#include <iostream>
#include <vector>
#include <mutex>
#include <condition_variable>
#include <thread>
std::vector<int> queue;
std::mutex queue_mutex;
std::condition_variable queue_cond;
int queue_size = 0;
bool quit = false;
void producer() {
  for (int i = 0; i < 10; ++i) {
    std::unique_lock<std::mutex> lock(queue_mutex);
    while (queue_size >= 10) {
      queue_cond.wait(lock);
    }
    queue.push_back(i);
    ++queue_size;
    lock.unlock();
    queue_cond.notify_all();
  }
  quit = true;
  queue_cond.notify_all();
}
void consumer() {
  while (true) {
    std::unique_lock<std::mutex> lock(queue_mutex);
    while (queue_size == 0 && !quit) {
      queue_cond.wait(lock);
    }
    if (queue.empty())
      break;
    
    int data = queue.front();
    queue.erase(queue.begin());
    --queue_size;
    lock.unlock();
    queue_cond.notify_all();
    std::cout << "consume " << data << std::endl;
  }
}
int main() {
  std::thread t1(producer);
  std::thread t2(consumer);
  std::thread t3(consumer);
  t1.join();
  t2.join();
  t3.join();
  return 0;
}

在这个例子中,生产者线程循环产生0到9的整数,将它们写入队列中。消费者线程循环从队列中读取数据,并输出到控制台。我们使用标准库中的unique\_lock来获得互斥锁,以避免死锁。在队列的长度超过10之前,生产者线程会等待,并且在队列非空之前,消费者线程也会等待。当队列长度小于10时,生产者会释放锁,并通知条件变量,使得消费者线程可以继续运行。在队列为空时,消费者线程会退出循环,并结束线程。另外,在生产者线程完成工作后,设置quit标志为true,并通知所有线程退出。

使用信号量

在C++中,还可以使用信号量实现生产者消费者模式。信号量是一种线程同步的原语,它包含一个计数器,可以原子地增加或减少。当计数器大于零时,线程可以继续运行,并且计数器减1。当计数器等于零时,线程会等待,并在计数器增加时继续运行。

下面是一个使用信号量实现生产者消费者队列的例子:

#include <iostream>
#include <vector>
#include <semaphore.h>
#include <thread>
std::vector<int> queue;
int queue_size = 0;
sem_t queue_empty;
sem_t queue_full;
bool quit = false;
void producer() {
  for (int i = 0; i < 10; ++i) {
    sem_wait(&queue_empty);
    queue.push_back(i);
    ++queue_size;
    sem_post(&queue_full);
  }
  quit = true;
  sem_post(&queue_full);
}
void consumer() {
  while (true) {
    sem_wait(&queue_full);
    if (queue.empty())
      break;
    
    int data = queue.front();
    queue.erase(queue.begin());
    --queue_size;
    sem_post(&queue_empty);
    std::cout << "consume " << data << std::endl;
  }
}
int main() {
  sem_init(&queue_empty, 0, 10);
  sem_init(&queue_full, 0, 0);
  std::thread t1(producer);
  std::thread t2(consumer);
  std::thread t3(consumer);
  t1.join();
  t2.join();
  t3.join();
  sem_destroy(&queue_empty);
  sem_destroy(&queue_full);
  return 0;
}

在这个例子中,我们使用两个信号量来实现队列的同步。queue\_empty信号量用于限制队列的长度,当队列长度为0时,消费者线程会等待。queue\_full信号量用于标记队列是否已满,当队列长度达到上限时,生产者线程会等待。当生产者将数据写入队列时,它会增加queue\_full计数器,并减少queue\_empty计数器,以提示消费者可以读取数据。当消费者从队列中读取数据时,它会增加queue\_empty计数器,并减少queue\_full计数器,以提示生产者可以写入新数据。在队列的长度达到上限或为空时,线程会等待。当生产者线程完成工作时,它会设置quit标志,并发出信号通知所有线程退出。

总结

生产者消费者模式是一种重要的并发编程模式,在C++中可以使用互斥锁、条件变量和信号量来实现。使用互斥锁和条件变量时,生产者和消费者线程会持有互斥锁,以避免并发访问。使用信号量时,线程会等待计数器达到某个值,以避免队列长度超过上限。无论使用哪种方法,都需要非常小心地编写并发代码,以避免死锁和竞态条件等问题。

  
  

评论区