三、線程池的實現(xiàn)
一個線程池一般有一個任務(wù)隊列,啟動的各個線程從任務(wù)隊列中競爭任務(wù),得到的線程則進(jìn)行處理:list<MyTask *> m_taskQueue;
任務(wù)隊列由鎖保護(hù),使得線程安全:pthread_mutex_t m_queueMutex
任務(wù)隊列需要條件變量來支持生產(chǎn)者消費者模式:pthread_cond_t m_cond
如果任務(wù)列表為空,則線程等待,等待中的線程個數(shù)為:m_numWaitThreads
需要一個列表來維護(hù)線程池中的線程:vector<MyThread *> m_threads
每個線程需要一個線程運(yùn)行函數(shù):
void * __thread_new_proc(void *p)
{
((MyThread *)p)->run();
return 0;
}
每個線程由MyThread類負(fù)責(zé),主要函數(shù)如下:
int MyThread::start()
{
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setschedpolicy(&attr, SCHED_FIFO);
int ret = pthread_create(&m_thread, &attr, thread_func, args);
pthread_attr_destroy(&attr);
if(ret != 0)
return –1;
}
int MyThread::stop()
{
int ret = pthread_kill(m_thread, SIGINT);
if(ret != 0)
return –1;
}
int MyThread::join()
{
int ret = pthread_join(m_thread, NULL);
if(ret != 0)
return –1;
}
void MyThread::run()
{
while (false == m_bStop)
{
MyTask *pTask = m_threadPool->getNextTask();
if (NULL != pTask)
{
pTask->process();
}
}
}
線程池由MyThreadPool負(fù)責(zé),主要函數(shù)如下:
int MyThreadPool::init()
{
pthread_condattr_t cond_attr;
pthread_condattr_init(&cond_attr);
pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED);
int ret = pthread_cond_init(&m_cond, &cond_attr);
pthread_condattr_destroy(&cond_attr);
if (ret_val != 0)
return –1;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
ret = pthread_mutex_init(&m_queueMutex, &attr);
pthread_mutexattr_destroy(&attr);
if (ret_val != 0)
return –1;
for (int i = 0; i< m_poolSize; ++i)
{
MyThread *thread = new MyThread(i+1, this);
m_threads.push_back(thread);
}
return 0;
}
int MyThreadPool::start()
{
int ret;
for (int i = 0; i< m_poolSize; ++i)
{
ret = m_threads[i]->start();
if (ret != 0)
break;
}
ret = pthread_cond_broadcast(&m_cond);
if(ret != 0)
return –1;
return 0;
}
void MyThreadPool::addTask(MyTask *ptask)
{
if (NULL == ptask)
return;
pthread_mutex_lock(&m_queueMutex);
m_taskQueue.push_back(ptask);
if (m_waitingThreadCount > 0)
pthread_cond_signal(&m_cond);
pthread_mutex_unlock(&m_queueMutex);
}
MyTask * MyThreadPool::getNextTask()
{
MyTask *pTask = NULL;
pthread_mutex_lock(&m_queueMutex);
while (m_taskQueue.begin() == m_taskQueue.end())
{
++m_waitingThreadCount;
pthread_cond_wait(&n_cond, &m_queueMutex);
--m_waitingThreadCount;
}
pTask = m_taskQueue.front();
m_taskQueue.pop_front();
pthread_mutex_unlock(&m_queueMutex);
return pTask;
}
其中每一個任務(wù)的執(zhí)行由MyTask負(fù)責(zé),其主要方法如下:
void MyTask::process()
{
//用read從客戶端讀取指令
//對指令進(jìn)行處理
//用write向客戶端寫入結(jié)果
}
本文導(dǎo)航
- 第1頁: 首頁
- 第2頁: 對Socket的封裝
- 第3頁: 線程池的實現(xiàn)
- 第4頁: 連接池的實現(xiàn)
- 第5頁: 監(jiān)聽線程的實現(xiàn)