21xrx.com
2024-11-05 16:33:03 Tuesday
登录
文章检索 我的文章 写文章
C++实现线程池:从入门到实践
2023-06-30 18:55:18 深夜i     --     --
C++ 线程池 实现 入门 实践

线程池是一种常用于多线程编程中的技术,它可以提高程序的并行度和运行效率。C++作为一种高性能编程语言,实现线程池非常有优势。本文将介绍如何从入门到实践使用C++语言实现线程池。

一、线程池概述

线程池是一组线程的集合,它用于执行多个任务,而且这些任务是异步的。我们可以将多个任务添加到线程池中,线程池会自动管理这些任务,并分配合适的线程执行它们。这样就可以充分利用计算机的多核CPU,提高程序的并行度和运行效率。

二、C++实现线程池

要实现线程池,我们需要使用C++中的多线程库。C++11之后,标准库中新增了线程库,我们可以直接使用它来实现线程池。

具体思路如下:

1. 定义任务类。任务是线程池中的最小单位,我们需要定义任务类,并在任务类中实现任务执行的具体逻辑。

2. 定义线程池类。线程池是一个工作线程的集合,我们需要定义线程池类,并在线程池类中实现线程池的具体逻辑。

3. 线程池初始化。线程池需要初始化,我们需要在初始化时设置线程池的属性,比如线程池中的最小线程数、最大线程数、任务队列长度等。

4. 添加任务。任务可以通过任务队列方式添加到线程池中,添加任务时需要判断任务队列是否已满,如果已满需要等待。

5. 执行任务。当线程池中的线程空闲时,会从任务队列中取出任务执行,执行完成后再去取下一个任务执行,直至任务队列为空。

三、实践

接下来,我们来实现一个简单的线程池,让您更好地理解C++中线程池的实现。

任务类的具体实现如下:


class Task

{

public:

  Task(void* arg = NULL) : m_arg(arg) {}

  virtual ~Task() {}

  virtual void run() = 0;

private:

  void* m_arg;

};

线程池类的具体实现如下:


class ThreadPool

{

public:

  ThreadPool(int minThreadNum, int maxThreadNum, int taskQueueSize)

    : m_minThreadNum(minThreadNum), m_maxThreadNum(maxThreadNum), m_taskQueueSize(taskQueueSize)

    , m_busyThreadNum(0), m_aliveThreadNum(0), m_waitExitThreadNum(0), m_isShutdown(false)

  {

    if (m_minThreadNum <= 0) m_minThreadNum = 1;

    if (m_maxThreadNum <= m_minThreadNum) m_maxThreadNum = m_minThreadNum + 1;

    if (m_taskQueueSize <= 0) m_taskQueueSize = 1;

    m_threads.reserve(m_maxThreadNum);

    m_taskQueue.reserve(m_taskQueueSize);

    // create min thread

    createThread(m_minThreadNum);

  }

  ~ThreadPool()

  {

    shutdown();

  }

  void addTask(Task* task)

  {

    lock_guard<queue<Task*>> lock(m_taskQueueMutex);

    while (m_taskQueue.size() >= m_taskQueueSize)

    {

      m_notFull.wait(m_taskQueueMutex);

    }

    m_taskQueue.push(task);

    m_notEmpty.notify_one();

  }

  void shutdown()

  {

    unique_lock<mutex> lock(m_variableMutex); // unique lock must be used

    m_isShutdown = true;

    while (m_busyThreadNum > 0)

    {

      m_empty.wait(lock);

    }

    while (m_aliveThreadNum > 0)

    {

      m_notEmpty.notify_one();

      m_exit.wait(lock);

    }

    for (unsigned int i = 0; i < m_threads.size(); ++i)

    {

      delete m_threads[i];

      m_threads[i] = NULL;

    }

  }

private:

  void createThread(int threadNum)

  {

    for (int i = 0; i < threadNum; ++i)

    {

      try

      {

        m_threads.push_back(new thread(bind(&ThreadPool::threadFunc, this)));

      }

      catch (...)

      

        --i;

        continue;

      

      m_aliveThreadNum++;

    }

  }

  void threadFunc()

  {

    while (!m_isShutdown)

    {

      Task* task = NULL;

      {

        lock_guard<queue<Task*>> lock(m_taskQueueMutex);

        while (m_taskQueue.empty() && !m_isShutdown)

        {

          m_notEmpty.wait(m_taskQueueMutex);

        }

        if (m_taskQueue.empty()) break;

        task = m_taskQueue.front();

        m_taskQueue.pop();

      }

      if (task != NULL)

      {

        ++m_busyThreadNum;

        task->run();

        --m_busyThreadNum;

        m_notFull.notify_one();

        delete task;

      }

    }

    {

      unique_lock<mutex> lock(m_variableMutex);

      m_aliveThreadNum--;

      if (m_aliveThreadNum == m_waitExitThreadNum) m_empty.notify_one();

    }

    m_exit.notify_one();

  }

private:

  int m_minThreadNum; // minimum thread number

  int m_maxThreadNum; // maximum thread number

  int m_taskQueueSize; // task queue size

  vector<thread*> m_threads; // threads

  queue<Task*> m_taskQueue;  // task queue

  mutex m_variableMutex;   // mutex for variables

  mutex m_taskQueueMutex;   // mutex for task queue

  condition_variable m_notEmpty; // condition variable: not empty

  condition_variable m_notFull;  // condition variable: not full

  condition_variable m_empty;   // condition variable: empty

  condition_variable m_exit;   // condition variable: exit

  int m_busyThreadNum;  // number of busy thread

  int m_aliveThreadNum;  // number of alive thread

  int m_waitExitThreadNum;  // number of waiting for exit thread

  bool m_isShutdown;   // whether shutdown

};

线程池的具体使用如下:


class MyTask : public Task

{

public:

  MyTask(int id) : m_id(id) {}

  virtual void run()

  {

    cout << "Thread " << this_thread::get_id() << " running task " << m_id << endl;

    this_thread::sleep_for(chrono::seconds(1));

  }

private:

  int m_id;

};

void testThreadPool()

{

  int minThreadNum = 2;

  int maxThreadNum = 10;

  int taskQueueSize = 50;

  ThreadPool threadPool(minThreadNum, maxThreadNum, taskQueueSize);

  for (int i = 0; i < 20; ++i)

  {

    threadPool.addTask(new MyTask(i));

  }

  this_thread::sleep_for(chrono::seconds(30));

  threadPool.shutdown();

}

四、总结

本文简单介绍了C++中线程池的实现方法,采用C++11中标准库中的线程库实现,可以充分利用计算机的多核CPU,提高程序的并行度和运行效率。通过实现一个简单的线程池,可以更好地理解C++中线程池的实现。

  
  

评论区

{{item['qq_nickname']}}
()
回复
回复