21xrx.com
2025-04-05 00:23:20 Saturday
文章检索 我的文章 写文章
C++实现线程池:从入门到实践
2023-06-30 18:55:18 深夜i     14     0
C++ 线程池 实现 入门 实践

线程池是一种常用于多线程编程中的技术,它可以提高程序的并行度和运行效率。C++作为一种高性能编程语言,实现线程池非常有优势。本文将介绍如何从入门到实践使用C++语言实现线程池。

一、线程池概述

线程池是一组线程的集合,它用于执行多个任务,而且这些任务是异步的。我们可以将多个任务添加到线程池中,线程池会自动管理这些任务,并分配合适的线程执行它们。这样就可以充分利用计算机的多核CPU,提高程序的并行度和运行效率。

二、C++实现线程池

要实现线程池,我们需要使用C++中的多线程库。C++11之后,标准库中新增了线程库,我们可以直接使用它来实现线程池。

具体思路如下:

1. 定义任务类。任务是线程池中的最小单位,我们需要定义任务类,并在任务类中实现任务执行的具体逻辑。

2. 定义线程池类。线程池是一个工作线程的集合,我们需要定义线程池类,并在线程池类中实现线程池的具体逻辑。

3. 线程池初始化。线程池需要初始化,我们需要在初始化时设置线程池的属性,比如线程池中的最小线程数、最大线程数、任务队列长度等。

4. 添加任务。任务可以通过任务队列方式添加到线程池中,添加任务时需要判断任务队列是否已满,如果已满需要等待。

5. 执行任务。当线程池中的线程空闲时,会从任务队列中取出任务执行,执行完成后再去取下一个任务执行,直至任务队列为空。

三、实践

接下来,我们来实现一个简单的线程池,让您更好地理解C++中线程池的实现。

任务类的具体实现如下:

class Task
{
public:
  Task(void* arg = NULL) : m_arg(arg) {}
  virtual ~Task() {}
  virtual void run() = 0;
private:
  void* m_arg;
};

线程池类的具体实现如下:

class ThreadPool
{
public:
  ThreadPool(int minThreadNum, int maxThreadNum, int taskQueueSize)
    : m_minThreadNum(minThreadNum), m_maxThreadNum(maxThreadNum), m_taskQueueSize(taskQueueSize)
    , m_busyThreadNum(0), m_aliveThreadNum(0), m_waitExitThreadNum(0), m_isShutdown(false)
  {
    if (m_minThreadNum <= 0) m_minThreadNum = 1;
    if (m_maxThreadNum <= m_minThreadNum) m_maxThreadNum = m_minThreadNum + 1;
    if (m_taskQueueSize <= 0) m_taskQueueSize = 1;
    m_threads.reserve(m_maxThreadNum);
    m_taskQueue.reserve(m_taskQueueSize);
    // create min thread
    createThread(m_minThreadNum);
  }
  ~ThreadPool()
  {
    shutdown();
  }
  void addTask(Task* task)
  {
    lock_guard<queue<Task*>> lock(m_taskQueueMutex);
    while (m_taskQueue.size() >= m_taskQueueSize)
    {
      m_notFull.wait(m_taskQueueMutex);
    }
    m_taskQueue.push(task);
    m_notEmpty.notify_one();
  }
  void shutdown()
  {
    unique_lock<mutex> lock(m_variableMutex); // unique lock must be used
    m_isShutdown = true;
    while (m_busyThreadNum > 0)
    {
      m_empty.wait(lock);
    }
    while (m_aliveThreadNum > 0)
    {
      m_notEmpty.notify_one();
      m_exit.wait(lock);
    }
    for (unsigned int i = 0; i < m_threads.size(); ++i)
    {
      delete m_threads[i];
      m_threads[i] = NULL;
    }
  }
private:
  void createThread(int threadNum)
  {
    for (int i = 0; i < threadNum; ++i)
    {
      try
      {
        m_threads.push_back(new thread(bind(&ThreadPool::threadFunc, this)));
      }
      catch (...)
      
        --i;
        continue;
      
      m_aliveThreadNum++;
    }
  }
  void threadFunc()
  {
    while (!m_isShutdown)
    {
      Task* task = NULL;
      {
        lock_guard<queue<Task*>> lock(m_taskQueueMutex);
        while (m_taskQueue.empty() && !m_isShutdown)
        {
          m_notEmpty.wait(m_taskQueueMutex);
        }
        if (m_taskQueue.empty()) break;
        task = m_taskQueue.front();
        m_taskQueue.pop();
      }
      if (task != NULL)
      {
        ++m_busyThreadNum;
        task->run();
        --m_busyThreadNum;
        m_notFull.notify_one();
        delete task;
      }
    }
    {
      unique_lock<mutex> lock(m_variableMutex);
      m_aliveThreadNum--;
      if (m_aliveThreadNum == m_waitExitThreadNum) m_empty.notify_one();
    }
    m_exit.notify_one();
  }
private:
  int m_minThreadNum; // minimum thread number
  int m_maxThreadNum; // maximum thread number
  int m_taskQueueSize; // task queue size
  vector<thread*> m_threads; // threads
  queue<Task*> m_taskQueue;  // task queue
  mutex m_variableMutex;   // mutex for variables
  mutex m_taskQueueMutex;   // mutex for task queue
  condition_variable m_notEmpty; // condition variable: not empty
  condition_variable m_notFull;  // condition variable: not full
  condition_variable m_empty;   // condition variable: empty
  condition_variable m_exit;   // condition variable: exit
  int m_busyThreadNum;  // number of busy thread
  int m_aliveThreadNum;  // number of alive thread
  int m_waitExitThreadNum;  // number of waiting for exit thread
  bool m_isShutdown;   // whether shutdown
};

线程池的具体使用如下:

class MyTask : public Task
{
public:
  MyTask(int id) : m_id(id) {}
  virtual void run()
  {
    cout << "Thread " << this_thread::get_id() << " running task " << m_id << endl;
    this_thread::sleep_for(chrono::seconds(1));
  }
private:
  int m_id;
};
void testThreadPool()
{
  int minThreadNum = 2;
  int maxThreadNum = 10;
  int taskQueueSize = 50;
  ThreadPool threadPool(minThreadNum, maxThreadNum, taskQueueSize);
  for (int i = 0; i < 20; ++i)
  {
    threadPool.addTask(new MyTask(i));
  }
  this_thread::sleep_for(chrono::seconds(30));
  threadPool.shutdown();
}

四、总结

本文简单介绍了C++中线程池的实现方法,采用C++11中标准库中的线程库实现,可以充分利用计算机的多核CPU,提高程序的并行度和运行效率。通过实现一个简单的线程池,可以更好地理解C++中线程池的实现。

  
  

评论区