Last updated on 8 months ago
线程池是什么
举个例子,在以前文章又提到过 好几种reactor模式
可以大概分为这几种
单 Reactor 单进程 / 线程: **接收数据 –> 解析数据 –> 发送数据 **
单 Reactor 多线程 : 接收数据 –> fd 交给线程 –> 线程全程处理
接收数据 –> 从fd中抽出数据 –> 数据交给线程
如果接收一次数据,创建一次线程处理完后又销毁线程,这样太消耗cpu资源了,为了改善这个性能,可以先创建好一定数量的线程,线程处理饥渴的模式,有业务需要处理了,就派一个线程去处理,处理完后,又处于饥渴的转态,其实典型的空间换时间的问题,有很设计都是这样,如内存池(减少系统调用的次数),连接池等。
如何设计呢一个池呢? 好像Java有现成的类可以调,这次用C++ 写一个线程池,第一次写,有很多东西没考虑到,以后继续迭代吧
看了一个视频,用c写线程池,大致理解了思路,于是自己 用 c++写一遍( c with class …)
思路:
- 一个线程池类, 可以初始初始化线程 ,任务队列,锁
- 工作类, 一个线程对应个工作类,工作类从任务队列中抽取任务
- 任务类 ,产生一个任务 插入任务队列中
- 线程锁类, 用的还是 POSIX标准下的互斥锁,不过封装一遍(我看别人写的代码好多都是基本api在封装一遍的,所以我也尝试下), 还有一个条件变量,没任务全部线程等待,有任务 发送信号,通知一个等待的线程获取任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| class pool{ private: list<Job*> Jobs; mylocker *mutelock = new mylocker(); public: pool(int maxNumber){ for(int i = 0; i < maxNumber; i++){ Worker *work = new Worker(); work->lock = mutelock; work->Jobs = &Jobs; int ret = pthread_create(&work->tid, NULL, fun, (void *)work); pthread_detach(work->tid);
} } void addjob(Job *job){ mutelock->mutelock(); if(job->state == run){ cout << "添加队列中" << endl; Jobs.push_back(job); mutelock->condsignal(); } mutelock->muteunlock(); } int jobSize(){ return Jobs.size(); } ~pool(){ delete(mutelock); } };
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| class Job{ public: int state = run; int data; void (*job_function)(int fd);
public: Job(){ } void runFun(){ job_function(data); } };
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| void *fun(void *arg){ Worker *work = (Worker *)arg; while (1){ work->lock->mutelock(); while (work->Jobs->empty()){ cout << "等待任务队列 " << work->Jobs->empty() << endl; work->lock->condwait(); } if(work->getTer() == 1){ pthread_exit(NULL); } Job *job = work->Jobs->front(); work->Jobs->pop_front(); work->lock->muteunlock(); cout << pthread_self() << " data : "; job->runFun(); delete(job); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| class Worker{ private: int terminate; public: Worker(){ terminate = 0; }; int getTer(){ return terminate; } void run(){
} public: pthread_t tid; mylocker *lock; list<Job*> *Jobs;
};
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| class mylocker{ private: pthread_mutex_t mutexlock; pthread_cond_t cond; public: mylocker(){ mutexlock = PTHREAD_MUTEX_INITIALIZER; cond = PTHREAD_COND_INITIALIZER; } ~mylocker(){ pthread_mutex_destroy(&mutexlock); pthread_cond_destroy(&cond); } bool mutelock(){ return pthread_mutex_lock(&mutexlock) == 0; } bool muteunlock(){ return pthread_mutex_unlock(&mutexlock) == 0; } bool condwait(){ return pthread_cond_wait(&cond,&mutexlock); } bool condsignal(){ return pthread_cond_signal(&cond); } pthread_mutex_t *get(){ return &mutexlock; } };
|