一、基本原理
有時候我們需要實(shí)現(xiàn)一個公共的模塊,需要對多個其他的模塊提供服務(wù),最常用的方式就是實(shí)現(xiàn)一個Socket Server,接受客戶的請求,并返回給客戶結(jié)果。
這經(jīng)常涉及到如果管理多個連接及如何多線程的提供服務(wù)的問題,常用的方式就是連接池和線程池,基本流程如下:
首先服務(wù)器端有一個監(jiān)聽線程,不斷監(jiān)聽來自客戶端的連接。
當(dāng)一個客戶端連接到監(jiān)聽線程后,便建立了一個新的連接。
監(jiān)聽線程將新建立的連接放入連接池進(jìn)行管理,然后繼續(xù)監(jiān)聽新來的連接。
線程池中有多個服務(wù)線程,每個線程都監(jiān)聽一個任務(wù)隊列,一個建立的連接對應(yīng)一個服務(wù)任務(wù),當(dāng)服務(wù)線程發(fā)現(xiàn)有新的任務(wù)的時候,便用此連接向客戶端提供服務(wù)。
一個Socket Server所能夠提供的連接數(shù)可配置,如果超過配置的個數(shù)則拒絕新的連接。
當(dāng)服務(wù)線程完成服務(wù)的時候,客戶端關(guān)閉連接,服務(wù)線程關(guān)閉連接,空閑并等待處理新的任務(wù)。
連接池的監(jiān)控線程清除其中關(guān)閉的連接對象,從而可以建立新的連接。
二、對Socket的封裝
Socket的調(diào)用主要包含以下的步驟:
調(diào)用比較復(fù)雜,我們首先區(qū)分兩類Socket,一類是Listening Socket,一類是Connected Socket.
Listening Socket由MySocketServer負(fù)責(zé),一旦accept,則生成一個Connected Socket,又MySocket負(fù)責(zé)。
MySocket主要實(shí)現(xiàn)的方法如下:
int MySocket::write(const char * buf, int length)
{
int ret = 0;
int left = length;
int index = 0;
while(left > 0)
{
ret = send(m_socket, buf + index, left, 0);
if(ret == 0)
break;
else if(ret == -1)
{
break;
}
left -= ret;
index += ret;
}
if(left > 0)
return -1;
return 0;
}
int MySocket::read(char * buf, int length)
{
int ret = 0;
int left = length;
int index = 0;
while(left > 0)
{
ret = recv(m_socket, buf + index, left, 0);
if(ret == 0)
break;
else if(ret == -1)
return -1;
left -= ret;
index += ret;
}
return index;
}
int MySocket::status()
{
int status;
int ret;
fd_set checkset;
struct timeval timeout;
FD_ZERO(&checkset);
FD_SET(m_socket, &checkset);
timeout.tv_sec = 10;
timeout.tv_usec = 0;
status = select((int)m_socket + 1, &checkset, 0, 0, &timeout);
if(status < 0)
ret = -1;
else if(status == 0)
ret = 0;
else
ret = 0;
return ret;
}
int MySocket::close()
{
struct linger lin;
lin.l_onoff = 1;
lin.l_linger = 0;
setsockopt(m_socket, SOL_SOCKET, SO_LINGER, (const char *)&lin, sizeof(lin));
::close(m_socket);
return 0;
}
MySocketServer的主要方法實(shí)現(xiàn)如下:
int MySocketServer::init(int port)
{
if((m_socket = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
return -1;
}
struct sockaddr_in serverAddr;
memset(&serverAddr, 0, sizeof(struct sockaddr_in));
serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(port);
if(bind(m_socket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) == -1)
{
::close(m_socket);
return -1;
}
if(listen(m_socket, SOMAXCONN) == -1)
{
::close(m_socket);
return -1;
}
struct linger lin;
lin.l_onoff = 1;
lin.l_linger = 0;
setsockopt(m_socket, SOL_SOCKET, SO_LINGER, (const char *)&lin, sizeof(lin));
m_port = port;
m_inited = true;
return 0;
}
MySocket * MySocketServer::accept()
{
int sock;
struct sockaddr_in clientAddr;
socklen_t clientAddrSize = sizeof(clientAddr);
if((sock = ::accept(m_socket, (struct sockaddr *)&clientAddr, &clientAddrSize)) == -1)
{
return NULL;
}
MySocket* socket = new MySocket(sock);
return socket;
}
MySocket * MySocketServer::accept(int timeout)
{
struct timeval timeout;
timeout.tv_sec = timeout;
timeout.tv_usec = 0;
fd_set checkset;
FD_ZERO(&checkset);
FD_SET(m_socket, &checkset);
int status = (int)select((int)(m_socket + 1), &checkset, NULL, NULL, &timeout);
if(status < 0)
return NULL;
else if(status == 0)
return NULL;
if(FD_ISSET(m_socket, &checkset))
{
return accept();
}
}
三、線程池的實(shí)現(xiàn)
一個線程池一般有一個任務(wù)隊列,啟動的各個線程從任務(wù)隊列中競爭任務(wù),得到的線程則進(jìn)行處理:list<MyTask *> m_taskQueue;
任務(wù)隊列由鎖保護(hù),使得線程安全:pthread_mutex_t m_queueMutex
任務(wù)隊列需要條件變量來支持生產(chǎn)者消費(fèi)者模式:pthread_cond_t m_cond
如果任務(wù)列表為空,則線程等待,等待中的線程個數(shù)為:m_numWaitThreads
需要一個列表來維護(hù)線程池中的線程:vector<MyThread *> m_threads
每個線程需要一個線程運(yùn)行函數(shù):
void * __thread_new_proc(void *p)
{
((MyThread *)p)->run();
return 0;
}
每個線程由MyThread類負(fù)責(zé),主要函數(shù)如下:
int MyThread::start()
{
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setschedpolicy(&attr, SCHED_FIFO);
int ret = pthread_create(&m_thread, &attr, thread_func, args);
pthread_attr_destroy(&attr);
if(ret != 0)
return –1;
}
int MyThread::stop()
{
int ret = pthread_kill(m_thread, SIGINT);
if(ret != 0)
return –1;
}
int MyThread::join()
{
int ret = pthread_join(m_thread, NULL);
if(ret != 0)
return –1;
}
void MyThread::run()
{
while (false == m_bStop)
{
MyTask *pTask = m_threadPool->getNextTask();
if (NULL != pTask)
{
pTask->process();
}
}
}
線程池由MyThreadPool負(fù)責(zé),主要函數(shù)如下:
int MyThreadPool::init()
{
pthread_condattr_t cond_attr;
pthread_condattr_init(&cond_attr);
pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED);
int ret = pthread_cond_init(&m_cond, &cond_attr);
pthread_condattr_destroy(&cond_attr);
if (ret_val != 0)
return –1;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
ret = pthread_mutex_init(&m_queueMutex, &attr);
pthread_mutexattr_destroy(&attr);
if (ret_val != 0)
return –1;
for (int i = 0; i< m_poolSize; ++i)
{
MyThread *thread = new MyThread(i+1, this);
m_threads.push_back(thread);
}
return 0;
}
int MyThreadPool::start()
{
int ret;
for (int i = 0; i< m_poolSize; ++i)
{
ret = m_threads[i]->start();
if (ret != 0)
break;
}
ret = pthread_cond_broadcast(&m_cond);
if(ret != 0)
return –1;
return 0;
}
void MyThreadPool::addTask(MyTask *ptask)
{
if (NULL == ptask)
return;
pthread_mutex_lock(&m_queueMutex);
m_taskQueue.push_back(ptask);
if (m_waitingThreadCount > 0)
pthread_cond_signal(&m_cond);
pthread_mutex_unlock(&m_queueMutex);
}
MyTask * MyThreadPool::getNextTask()
{
MyTask *pTask = NULL;
pthread_mutex_lock(&m_queueMutex);
while (m_taskQueue.begin() == m_taskQueue.end())
{
++m_waitingThreadCount;
pthread_cond_wait(&n_cond, &m_queueMutex);
--m_waitingThreadCount;
}
pTask = m_taskQueue.front();
m_taskQueue.pop_front();
pthread_mutex_unlock(&m_queueMutex);
return pTask;
}
其中每一個任務(wù)的執(zhí)行由MyTask負(fù)責(zé),其主要方法如下:
void MyTask::process()
{
//用read從客戶端讀取指令
//對指令進(jìn)行處理
//用write向客戶端寫入結(jié)果
}
四、連接池的實(shí)現(xiàn)
每個連接池保存一個鏈表保存已經(jīng)建立的連接:list<MyConnection *> * m_connections
當(dāng)然這個鏈表也需要鎖來進(jìn)行多線程保護(hù):pthread_mutex_t m_connectionMutex;
此處一個MyConnection也是一個MyTask,由一個線程來負(fù)責(zé)。
線程池也作為連接池的成員變量:MyThreadPool * m_threadPool
連接池由類MyConnectionPool負(fù)責(zé),其主要函數(shù)如下:
void MyConnectionPool::addConnection(MyConnection * pConn)
{
pthread_mutex_lock(&m_connectionMutex);
m_connections->push_back(pConn);
pthread_mutex_unlock(&m_connectionMutex);
m_threadPool->addTask(pConn);
}
MyConnectionPool也要啟動一個背后的線程,來管理這些連接,移除結(jié)束的連接和錯誤的連接。
void MyConnectionPool::managePool()
{
pthread_mutex_lock(&m_connectionMutex);
for (list<MyConnection *>::iterator itr = m_connections->begin(); itr!=m_connections->end(); )
{
MyConnection *conn = *itr;
if (conn->isFinish())
{
delete conn;
conn = NULL;
list<MyConnection *>::iterator pos = itr++;
m_connections->erase(pos);
}
else if (conn->isError())
{
//處理錯誤的連接
++itr;
}
else
{
++itr;
}
}
pthread_mutex_unlock(&m_connectionMutex);
}
五、監(jiān)聽線程的實(shí)現(xiàn)
監(jiān)聽線程需要有一個MySocketServer來監(jiān)聽客戶端的連接,每當(dāng)形成一個新的連接,查看是否超過設(shè)置的最大連接數(shù),如果超過則關(guān)閉連接,如果未超過設(shè)置的最大連接數(shù),則形成一個新的MyConnection,將其加入連接池和線程池。
MySocketServer *pServer = new MySocketServer(port);
MyConnectionPool *pPool = new MyConnectionPool();
while (!stopFlag)
{
MySocket * sock = pServer->acceptConnection(5);
if(sock != null)
{
if(m_connections.size > maxConnectionSize)
{
sock.close();
}
MyTask *pTask = new MyConnection();
pPool->addConnection(pTask);
}
}