21xrx.com
2025-03-27 22:07:43 Thursday
文章检索 我的文章 写文章
C++ 无锁队列线程池实现
2023-07-05 12:35:04 深夜i     66     0
C++ 无锁队列 线程池 实现 多线程编程
return !tasks_.empty() || stop_flag_;

C++是一种高效的编程语言,也是许多系统级应用和底层设备中的首选语言。而线程池则是一种线程管理技术,可以通过预先分配一定数量的线程,在多次执行任务时重用这些线程以减少线程创建和销毁的开销。而无锁队列是一种数据结构,不需要使用锁来实现同步,因而可以提升并发读写效率。

将C++、线程池和无锁队列结合起来,可以实现更高效的多线程应用。下面我们将介绍如何使用C++语言实现无锁队列线程池。

1. 线程池实现

我们可以使用thread库来实现线程池,其中线程池的任务队列可以使用无锁队列实现。线程池的核心代码如下:

class ThreadPool {
private:
  std::vector<std::thread> threads_;
  std::queue<std::function<void()>> tasks_;
  std::atomic<bool> stop_flag_;
  std::mutex mtx_;
  std::condition_variable cv_;
public:
  ThreadPool() : stop_flag_(false) {
    for (unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {
      threads_.emplace_back(std::thread([this]() {
        while (!stop_flag_) {
          std::function<void()> task;
          {
            std::unique_lock<std::mutex> lock(mtx_);
            cv_.wait(lock, [this]() { return !tasks_.empty() || stop_flag_; });
            if (stop_flag_) break;
            task = std::move(tasks_.front());
            tasks_.pop();
          }
          task();
        }
      }));
    }
  }
  ~ThreadPool() {
    std::unique_lock<std::mutex> lock(mtx_);
    stop_flag_ = true;
    cv_.notify_all();
    lock.unlock();
    for (auto& t : threads_) {
      t.join();
    }
  }
  template<typename Task>
  void enqueue(Task&& task) {
    {
      std::unique_lock<std::mutex> lock(mtx_);
      tasks_.emplace(std::forward<Task>(task));
    }
    cv_.notify_one();
  }
};

线程池通过std::vector 存储线程,通过std::queue >存储任务队列,通过std::atomic stop_flag_标记线程池是否已经停止。线程执行函数从任务队列中取出任务,并执行该任务。当任务队列为空时,线程通过std::condition_variable cv_等待新任务的到来。

2. 无锁队列实现

无锁队列一般是基于CAS操作实现的,常见的无锁队列有SPSC(单生产单消费)队列、MPMC(多生产多消费)队列等。这里我们展示一个SPSC队列的实现:

template <typename T>
class LockFreeQueue {
public:
  LockFreeQueue() : head_(new Node), tail_(head_.load()) {}
  ~LockFreeQueue() {
    while (head_) {
      Node* tmp = head_;
      head_ = tmp->next;
      delete tmp;
    }
  }
  void push(const T& val) {
    Node* node = new Node(val);
    Node* tail = tail_.load();
    while (!tail_->next.compare_exchange_weak(nullptr, node)) {
      tail_ = tail_->next;
      tail = tail_.load();
    }
    tail_.compare_exchange_weak(tail, node);
  }
  bool pop(T& val) {
    Node* head = head_.load();
    if (head == tail_.load())
      return false;
    
    Node* new_head = head->next.load();
    while (head_ != head) {
      head = head_.load();
      new_head = head->next.load();
      if (!new_head)
        return false;
      
    }
    val = new_head->data;
    head_.store(new_head);
    delete head;
    return true;
  }
private:
  struct Node {
    T data;
    std::atomic<Node*> next;
    Node() : next(nullptr) {}
    Node(const T& val) : data(val), next(nullptr) {}
  };
  std::atomic<Node*> head_;
  std::atomic<Node*> tail_;
};

LockFreeQueue通过head和tail指针标记队列头、尾,并使用CAS操作来实现入队(dequeue)和出队(enqueue)操作。当队列为空时,入队时需要更新tail指针,出队时需要更新head指针。

3. 结合实现

我们可以将线程池和无锁队列结合起来,使用线程池执行任务,使用无锁队列作为任务队列。这样可以大幅提升多线程应用的性能,减小锁竞争的开销,使得多个线程可以更好地协同工作。

结合后的代码如下:

template<typename T>
class TaskQueue {
private:
  LockFreeQueue<T> queue_;
  ThreadPool pool_;
public:
  TaskQueue() : pool_([this](int) { this->run(); }) {}
  void run() {
    while (true) {
      T task;
      if (queue_.pop(task)) {
        task();
      }
    }
  }
  template<typename F>
  auto enqueue(F&& f) -> std::future<decltype(f())> {
    auto task = std::make_shared<std::packaged_task<decltype(f())()>>(std::forward<F>(f));
    auto future = task->get_future();
    queue_.push([task]() { (*task)(); });
    return future;
  }
};

以上代码中,TaskQueue定义了一个无锁队列和一个线程池。线程池中有多个线程等待任务到来,其中每个线程通过执行run()函数从无锁队列中取出任务并执行。而enqueue()函数将待执行的任务封装成std::packaged_task并添加到无锁队列中,同时返回该任务的std::future以便于获取任务执行结果。这样,用户只需要向TaskQueue中加入任务,便可得到高效且线程安全的多线程应用。

总结

本文介绍了如何使用C++语言实现无锁队列线程池,这种多线程应用模式可以大幅提升多线程应用的性能,减小锁竞争的开销,使得多个线程可以更好地协同工作。对于需要处理大量任务的多线程应用,无锁队列线程池是一种不错的选择。

  
  

评论区