21xrx.com
2024-12-22 20:56:04 Sunday
登录
文章检索 我的文章 写文章
C++ 无锁队列线程池实现
2023-07-05 12:35:04 深夜i     --     --
C++ 无锁队列 线程池 实现 多线程编程
return !tasks_.empty() || stop_flag_;

C++是一种高效的编程语言,也是许多系统级应用和底层设备中的首选语言。而线程池则是一种线程管理技术,可以通过预先分配一定数量的线程,在多次执行任务时重用这些线程以减少线程创建和销毁的开销。而无锁队列是一种数据结构,不需要使用锁来实现同步,因而可以提升并发读写效率。

将C++、线程池和无锁队列结合起来,可以实现更高效的多线程应用。下面我们将介绍如何使用C++语言实现无锁队列线程池。

1. 线程池实现

我们可以使用thread库来实现线程池,其中线程池的任务队列可以使用无锁队列实现。线程池的核心代码如下:


class ThreadPool {

private:

  std::vector<std::thread> threads_;

  std::queue<std::function<void()>> tasks_;

  std::atomic<bool> stop_flag_;

  std::mutex mtx_;

  std::condition_variable cv_;

public:

  ThreadPool() : stop_flag_(false) {

    for (unsigned i = 0; i < std::thread::hardware_concurrency(); ++i) {

      threads_.emplace_back(std::thread([this]() {

        while (!stop_flag_) {

          std::function<void()> task;

          {

            std::unique_lock<std::mutex> lock(mtx_);

            cv_.wait(lock, [this]() { return !tasks_.empty() || stop_flag_; });

            if (stop_flag_) break;

            task = std::move(tasks_.front());

            tasks_.pop();

          }

          task();

        }

      }));

    }

  }

  ~ThreadPool() {

    std::unique_lock<std::mutex> lock(mtx_);

    stop_flag_ = true;

    cv_.notify_all();

    lock.unlock();

    for (auto& t : threads_) {

      t.join();

    }

  }

  template<typename Task>

  void enqueue(Task&& task) {

    {

      std::unique_lock<std::mutex> lock(mtx_);

      tasks_.emplace(std::forward<Task>(task));

    }

    cv_.notify_one();

  }

};

线程池通过std::vector 存储线程,通过std::queue >存储任务队列,通过std::atomic stop_flag_标记线程池是否已经停止。线程执行函数从任务队列中取出任务,并执行该任务。当任务队列为空时,线程通过std::condition_variable cv_等待新任务的到来。

2. 无锁队列实现

无锁队列一般是基于CAS操作实现的,常见的无锁队列有SPSC(单生产单消费)队列、MPMC(多生产多消费)队列等。这里我们展示一个SPSC队列的实现:


template <typename T>

class LockFreeQueue {

public:

  LockFreeQueue() : head_(new Node), tail_(head_.load()) {}

  ~LockFreeQueue() {

    while (head_) {

      Node* tmp = head_;

      head_ = tmp->next;

      delete tmp;

    }

  }

  void push(const T& val) {

    Node* node = new Node(val);

    Node* tail = tail_.load();

    while (!tail_->next.compare_exchange_weak(nullptr, node)) {

      tail_ = tail_->next;

      tail = tail_.load();

    }

    tail_.compare_exchange_weak(tail, node);

  }

  bool pop(T& val) {

    Node* head = head_.load();

    if (head == tail_.load())

      return false;

    

    Node* new_head = head->next.load();

    while (head_ != head) {

      head = head_.load();

      new_head = head->next.load();

      if (!new_head)

        return false;

      

    }

    val = new_head->data;

    head_.store(new_head);

    delete head;

    return true;

  }

private:

  struct Node {

    T data;

    std::atomic<Node*> next;

    Node() : next(nullptr) {}

    Node(const T& val) : data(val), next(nullptr) {}

  };

  std::atomic<Node*> head_;

  std::atomic<Node*> tail_;

};

LockFreeQueue通过head和tail指针标记队列头、尾,并使用CAS操作来实现入队(dequeue)和出队(enqueue)操作。当队列为空时,入队时需要更新tail指针,出队时需要更新head指针。

3. 结合实现

我们可以将线程池和无锁队列结合起来,使用线程池执行任务,使用无锁队列作为任务队列。这样可以大幅提升多线程应用的性能,减小锁竞争的开销,使得多个线程可以更好地协同工作。

结合后的代码如下:


template<typename T>

class TaskQueue {

private:

  LockFreeQueue<T> queue_;

  ThreadPool pool_;

public:

  TaskQueue() : pool_([this](int) { this->run(); }) {}

  void run() {

    while (true) {

      T task;

      if (queue_.pop(task)) {

        task();

      }

    }

  }

  template<typename F>

  auto enqueue(F&& f) -> std::future<decltype(f())> {

    auto task = std::make_shared<std::packaged_task<decltype(f())()>>(std::forward<F>(f));

    auto future = task->get_future();

    queue_.push([task]() { (*task)(); });

    return future;

  }

};

以上代码中,TaskQueue定义了一个无锁队列和一个线程池。线程池中有多个线程等待任务到来,其中每个线程通过执行run()函数从无锁队列中取出任务并执行。而enqueue()函数将待执行的任务封装成std::packaged_task并添加到无锁队列中,同时返回该任务的std::future以便于获取任务执行结果。这样,用户只需要向TaskQueue中加入任务,便可得到高效且线程安全的多线程应用。

总结

本文介绍了如何使用C++语言实现无锁队列线程池,这种多线程应用模式可以大幅提升多线程应用的性能,减小锁竞争的开销,使得多个线程可以更好地协同工作。对于需要处理大量任务的多线程应用,无锁队列线程池是一种不错的选择。

  
  

评论区

{{item['qq_nickname']}}
()
回复
回复