21xrx.com
2024-11-22 07:29:12 Friday
登录
文章检索 我的文章 写文章
C++进程池头文件
2023-07-08 15:14:24 深夜i     --     --
C++ 进程池 头文件 并发 线程池

C++进程池是一个常见的多线程编程技术,它通过复用进程来提高程序的性能和稳定性。进程池管理着一组可复用的进程,这些进程在程序启动时就创建好并一直运行着,等待任务的到来。当任务到来时,进程池从中挑选一个进程来处理任务,任务处理完毕后进程归还给进程池,等待下一个任务。

为了方便使用进程池,通常需要封装一些头文件,来帮助用户使用。下面是一个简单的C++进程池头文件示例:


#ifndef PROCESS_POOL_H

#define PROCESS_POOL_H

#include <sys/types.h>

#include <sys/socket.h>

#include <netinet/in.h>

#include <arpa/inet.h>

#include <fcntl.h>

#include <unistd.h>

#include <errno.h>

#include <stdio.h>

#include <stdlib.h>

#include <string.h>

#include <signal.h>

#include <sys/wait.h>

#include <iostream>

#include <vector>

#include <exception>

using namespace std;

class process {

public:

  process() : m_pid(-1) {}

public:

  pid_t m_pid;

  int m_pipefd[2];

};

template<typename T>

class process_pool {

private:

  process_pool(int listenfd, int process_number = 8);

public:

  static process_pool<T>* create(int listenfd, int process_number = 8) {

    if (!m_instance) {

      m_instance = new process_pool<T>(listenfd, process_number);

    }

    return m_instance;

  }

  ~process_pool() {

    delete[] m_sub_process;

  }

  void run();

private:

  void setup_sig_pipe();

  void run_parent();

  void run_child();

private:

  static const int MAX_PROCESS_NUMBER = 16;

  static const int USER_PER_PROCESS = 65536;

  static const int MAX_EVENT_NUMBER = 10000;

  int m_process_number;

  int m_idx;

  int m_epollfd;

  int m_listenfd;

  int m_stop;

  vector<T*> m_users;

  vector<process> m_sub_process;

  static process_pool<T>* m_instance;

};

template<typename T>

process_pool<T>* process_pool<T>::m_instance = NULL;

static int setnonblocking(int fd) {

  int old_option = fcntl(fd, F_GETFL);

  int new_option = old_option | O_NONBLOCK;

  fcntl(fd, F_SETFL, new_option);

  return old_option;

}

static void addfd(int epollfd, int fd) {

  epoll_event event;

  event.data.fd = fd;

  event.events = EPOLLIN | EPOLLET;

  epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);

  setnonblocking(fd);

}

static void removefd(int epollfd, int fd) {

  epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, 0);

  close(fd);

}

static void sig_handler(int sig) {

  int save_errno = errno;

  int msg = sig;

  send(m_sig_pipefd[1], (char*)&msg, 1, 0);

  errno = save_errno;

}

static void addsig(int sig) {

  struct sigaction sa;

  memset(&sa, '\0', sizeof(sa));

  sa.sa_handler = sig_handler;

  sa.sa_flags |= SA_RESTART;

  sigfillset(&sa.sa_mask);

  assert(sigaction(sig, &sa, NULL) != -1);

}

template<typename T>

process_pool<T>::process_pool(int listenfd, int process_number) :

    m_listenfd(listenfd), m_process_number(process_number), m_idx(-1), m_stop(false){

  assert((process_number > 0) && (process_number <= MAX_PROCESS_NUMBER));

  m_sub_process.resize(process_number);

  for (int i = 0; i < process_number; ++i) {

    int ret = socketpair(AF_UNIX, SOCK_STREAM, 0, m_sub_process[i].m_pipefd);

    assert(ret != -1);

    m_sub_process[i].m_pid = fork();

    assert(m_sub_process[i].m_pid >= 0);

    if (m_sub_process[i].m_pid > 0) {

      close(m_sub_process[i].m_pipefd[1]);

      continue;

    }

    else {

      close(m_sub_process[i].m_pipefd[0]);

      m_idx = i;

      break;

    }

  }

}

template<typename T>

void process_pool<T>::setup_sig_pipe() {

  m_epollfd = epoll_create(5);

  assert(m_epollfd != -1);

  int ret = socketpair(AF_UNIX, SOCK_STREAM, 0, m_sig_pipefd);

  assert(ret != -1);

  setnonblocking(m_sig_pipefd[1]);

  addfd(m_epollfd, m_sig_pipefd[0]);

  addsig(SIGCHLD);

  addsig(SIGTERM);

  addsig(SIGINT);

  addsig(SIGPIPE);

}

template<typename T>

void process_pool<T>::run() {

  if (m_idx != -1) {

    run_child();

    return;

  }

  run_parent();

}

template<typename T>

void process_pool<T>::run_child() {

  setup_sig_pipe();

  int pipefd = m_sub_process[m_idx].m_pipefd[1];

  addfd(m_epollfd, pipefd);

  epoll_event events[MAX_EVENT_NUMBER];

  T* users = new T[USER_PER_PROCESS];

  assert(users);

  int number = 0;

  int ret = -1;

  while (!m_stop) {

    number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);

    if ((number < 0) && (errno != EINTR)) {

      cout << "epoll failure" << endl;

      break;

    }

    for (int i = 0; i < number; ++i) {

      int sockfd = events[i].data.fd;

      if ((sockfd == pipefd) && (events[i].events & EPOLLIN)) {

        int client = 0;

        ret = recv(sockfd, (char*)&client, sizeof(client), 0);

        if (((ret < 0) && (errno != EAGAIN)) || ret == 0) {

          continue;

        }

        else {

          struct sockaddr_in client_address;

          socklen_t client_addrlength = sizeof(client_address);

          int connfd = accept(m_listenfd, (struct sockaddr*)&client_address, &client_addrlength);

          if (connfd < 0) {

            cout << "accept error: " << errno << endl;

            continue;

          }

          addfd(m_epollfd, connfd);

          users[connfd].init(m_epollfd, connfd, client_address);

        }

      }

      else if ((sockfd == m_sig_pipefd[0]) && (events[i].events & EPOLLIN)) {

        int sig;

        char signals[1024];

        ret = recv(m_sig_pipefd[0], signals, sizeof(signals), 0);

        if (ret < 0) {

          continue;

        }

        else if (ret == 0) {

          continue;

        }

        else {

          for (int i = 0; i < ret; ++i) {

            switch (signals[i]) {

              case SIGCHLD:{

                pid_t pid;

                int stat;

                while ((pid = waitpid(-1, &stat, WNOHANG)) > 0) {

                  continue;

                }

                break;

              }

              case SIGTERM:

              case SIGINT: {

                m_stop = true;

                break;

              }

              default: {

                break;

              }

            }

          }

        }

      }

      else if (events[i].events & EPOLLIN) {

        users[sockfd].process();

      }

      else {

        continue;

      }

    }

  }

  delete[] users;

  users = NULL;

  close(pipefd);

  close(m_epollfd);

  exit(0);

}

template<typename T>

void process_pool<T>::run_parent() {

  setup_sig_pipe();

  addfd(m_epollfd, m_listenfd);

  epoll_event events[MAX_EVENT_NUMBER];

  int sub_process_counter = 0;

  int new_conn = 1;

  int number = 0;

  int ret = -1;

  while (!m_stop) {

    number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1);

    if ((number < 0) && (errno != EINTR)) {

      cout << "epoll failure" << endl;

      break;

    }

    for (int i = 0; i < number; ++i) {

      int sockfd = events[i].data.fd;

      if (sockfd == m_listenfd) {

        int i = sub_process_counter;

        do {

          if (m_sub_process[i].m_pid != -1) {

            break;

          }

          i = (i + 1) % m_process_number;

        } while (i != sub_process_counter);

        if (m_sub_process[i].m_pid == -1) {

          m_stop = true;

          break;

        }

        sub_process_counter = (i + 1) % m_process_number;

        send(m_sub_process[i].m_pipefd[0], (char*)&new_conn, sizeof(new_conn), 0);

        cout << "send request to child " << i << endl;

      }

      else if ((sockfd == m_sig_pipefd[0]) && (events[i].events & EPOLLIN)) {

        int sig;

        char signals[1024];

        ret = recv(m_sig_pipefd[0], signals, sizeof(signals), 0);

        if (ret <= 0) {

          continue;

        }

        else {

          for (int i = 0; i < ret; ++i) {

            switch (signals[i]) {

              case SIGCHLD:{

                pid_t pid;

                int stat;

                while ((pid = waitpid(-1, &stat, WNOHANG)) > 0) {

                  for (int i = 0; i < m_process_number; ++i) {

                    if (m_sub_process[i].m_pid == pid) {

                      cout << "child " << i << " join" << endl;

                      close(m_sub_process[i].m_pipefd[0]);

                      m_sub_process[i].m_pid = -1;

                    }

                  }

                }

                m_stop = true;

                for (int i = 0; i < m_process_number; ++i) {

                  if (m_sub_process[i].m_pid != -1) {

                    m_stop = false;

                  }

                }

                break;

              }

              case SIGTERM:

              case SIGINT: {

                cout << "kill all the child now" << endl;

                for (int i = 0; i < m_process_number; ++i) {

                  int pid = m_sub_process[i].m_pid;

                  if (pid != -1) {

                    kill(pid, SIGTERM);

                  }

                }

                break;

              }

              default: {

                break;

              }

            }

          }

        }

      }

      else {

        continue;

      }

    }

  }

  close(m_epollfd);

}

#endif

这个头文件包含了C++进程池实现所需要的各种头文件和常量定义。同时还提供了`process`和`process_pool`两个类,用于管理进程池的各个进程。

在使用C++进程池时,只需要包含这个头文件,并调用相关的函数即可。例如:


#include "process_pool.h"

int main() {

  int listenfd = socket(AF_INET, SOCK_STREAM, 0);

  // ... bind, listen ...

  process_pool<your_class> *pool = process_pool<your_class>::create(listenfd);

  if (pool) {

    pool->run();

    delete pool;

  }

  close(listenfd);

  return 0;

}

这里的`your_class`是用户自定义的类,它必须继承自`process`类,并实现`process`类的各个方法。这样,进程池才能正确的处理用户的请求。

  
  

评论区

{{item['qq_nickname']}}
()
回复
回复