永州建设网站公司,百度网盟推广多少钱,怎么选择做网站的公司,中国最著名的40个建筑http://blog.csdn.net/qq_25425023/article/details/53914609 线程池中的线程#xff0c;在任务队列为空的时候#xff0c;等待任务的到来#xff0c;任务队列中有任务时#xff0c;则依次获取任务来执行#xff0c;任务队列需要同步。 Linux线程同步有多种方法#xff… http://blog.csdn.net/qq_25425023/article/details/53914609 线程池中的线程在任务队列为空的时候等待任务的到来任务队列中有任务时则依次获取任务来执行任务队列需要同步。 Linux线程同步有多种方法互斥量、信号量、条件变量等。 下面是根据互斥量、信号量、条件变量封装的三个类。 线程池中用到了互斥量和信号量。 [cpp] view plain copy #ifndef _LOCKER_H_ #define _LOCKER_H_ #include pthread.h #include stdio.h #include semaphore.h /*信号量的类*/ class sem_locker { private: sem_t m_sem; public: //初始化信号量 sem_locker() { if(sem_init(m_sem, 0, 0) ! 0) printf(sem init error\n); } //销毁信号量 ~sem_locker() { sem_destroy(m_sem); } //等待信号量 bool wait() { return sem_wait(m_sem) 0; } //添加信号量 bool add() { return sem_post(m_sem) 0; } }; /*互斥 locker*/ class mutex_locker { private: pthread_mutex_t m_mutex; public: mutex_locker() { if(pthread_mutex_init(m_mutex, NULL) ! 0) printf(mutex init error!); } ~mutex_locker() { pthread_mutex_destroy(m_mutex); } bool mutex_lock() //lock mutex { return pthread_mutex_lock(m_mutex) 0; } bool mutex_unlock() //unlock { return pthread_mutex_unlock(m_mutex) 0; } }; /*条件变量 locker*/ class cond_locker { private: pthread_mutex_t m_mutex; pthread_cond_t m_cond; public: // 初始化 m_mutex and m_cond cond_locker() { if(pthread_mutex_init(m_mutex, NULL) ! 0) printf(mutex init error); if(pthread_cond_init(m_cond, NULL) ! 0) { //条件变量初始化是被释放初始化成功的mutex pthread_mutex_destroy(m_mutex); printf(cond init error); } } // destroy mutex and cond ~cond_locker() { pthread_mutex_destroy(m_mutex); pthread_cond_destroy(m_cond); } //等待条件变量 bool wait() { int ans 0; pthread_mutex_lock(m_mutex); ans pthread_cond_wait(m_cond, m_mutex); pthread_mutex_unlock(m_mutex); return ans 0; } //唤醒等待条件变量的线程 bool signal() { return pthread_cond_signal(m_cond) 0; } }; #endif 下面的是线程池类是一个模版类 [cpp] view plain copy #ifndef _PTHREAD_POOL_ #define _PTHREAD_POOL_ #include locker.h #include list #include stdio.h #include exception #include errno.h #include pthread.h #include iostream templateclass T class threadpool { private: int thread_number; //线程池的线程数 int max_task_number; //任务队列中的最大任务数 pthread_t *all_threads; //线程数组 std::listT * task_queue; //任务队列 mutex_locker queue_mutex_locker; //互斥锁 sem_locker queue_sem_locker; //信号量 bool is_stop; //是否结束线程 public: threadpool(int thread_num 20, int max_task_num 30); ~threadpool(); bool append_task(T *task); void start(); void stop(); private: //线程运行的函数。执行run()函数 static void *worker(void *arg); void run(); }; template class T threadpoolT::threadpool(int thread_num, int max_task_num): thread_number(thread_num), max_task_number(max_task_num), is_stop(false), all_threads(NULL) { if((thread_num 0) || max_task_num 0) printf(threadpool cant init because thread_number 0 or max_task_number 0); all_threads new pthread_t[thread_number]; if(!all_threads) printf(cant init threadpool because thread array cant new); } template class T threadpoolT::~threadpool() { delete []all_threads; is_stop true; } template class T void threadpoolT::stop() { is_stop true; //queue_sem_locker.add(); } template class T void threadpoolT::start() { for(int i 0; i thread_number; i) { printf(create the %dth pthread\n, i); if(pthread_create(all_threads i, NULL, worker, this) ! 0) {//创建线程失败清除成功申请的资源并抛出异常 delete []all_threads; throw std::exception(); } if(pthread_detach(all_threads[i])) {//将线程设置为脱离线程失败则清除成功申请的资源并抛出异常 delete []all_threads; throw std::exception(); } } } //添加任务进入任务队列 template class T bool threadpoolT::append_task(T *task) { //获取互斥锁 queue_mutex_locker.mutex_lock(); //判断队列中任务数是否大于最大任务数 if(task_queue.size() max_task_number) {//是则释放互斥锁 queue_mutex_locker.mutex_unlock(); return false; } //添加进入队列 task_queue.push_back(task); queue_mutex_locker.mutex_unlock(); //唤醒等待任务的线程 queue_sem_locker.add(); return true; } template class T void *threadpoolT::worker(void *arg) { threadpool *pool (threadpool *)arg; pool-run(); return pool; } template class T void threadpoolT::run() { while(!is_stop) { //等待任务 queue_sem_locker.wait(); if(errno EINTR) { printf(errno); continue; } //获取互斥锁 queue_mutex_locker.mutex_lock(); //判断任务队列是否为空 if(task_queue.empty()) { queue_mutex_locker.mutex_unlock(); continue; } //获取队头任务并执行 T *task task_queue.front(); task_queue.pop_front(); queue_mutex_locker.mutex_unlock(); if(!task) continue; // printf(pthreadId %ld\n, (unsigned long)pthread_self()); task-doit(); //doit是T对象中的方法 } //测试用 printf(close %ld\n, (unsigned long)pthread_self()); } #endif 以上参考《Linux高性能服务器编程》 写个程序对线程池进行测试 [cpp] view plain copy #include stdio.h #include iostream #include unistd.h #include thread_pool.h class task { private: int number; public: task(int num) : number(num) { } ~task() { } void doit() { printf(this is the %dth task\n, number); } }; int main() { task *ta; threadpooltask pool(10, 15); // pool.start(); for(int i 0; i 20; i) { ta new task(i); // sleep(2); pool.append_task(ta); } pool.start(); sleep(10); printf(close the thread pool\n); pool.stop(); pause(); return 0; } 经测试线程池可以正常使用。 下一篇博客使用线程池来实现回射服务器测试可以达到多大的并发量。