日本好好热aⅴ|国产99视频精品免费观看|日本成人aV在线|久热香蕉国产在线

  • <cite id="ikgdy"><table id="ikgdy"></table></cite>
    1. 西西軟件園多重安全檢測下載網(wǎng)站、值得信賴的軟件下載站!
      軟件
      軟件
      文章
      搜索

      首頁編程開發(fā)C#.NET → Socket Server連接客服端的簡單實(shí)現(xiàn)

      Socket Server連接客服端的簡單實(shí)現(xiàn)

      相關(guān)軟件相關(guān)文章發(fā)表評論 來源:本站整理時間:2010/9/12 21:54:41字體大。A-A+

      作者:佚名點(diǎn)擊:922次評論:0次標(biāo)簽: Socket 客服端

      • 類型:服務(wù)器區(qū)大小:21KB語言:中文 評分:6.6
      • 標(biāo)簽:
      立即下載

      一、基本原理
      有時候我們需要實(shí)現(xiàn)一個公共的模塊,需要對多個其他的模塊提供服務(wù),最常用的方式就是實(shí)現(xiàn)一個Socket Server,接受客戶的請求,并返回給客戶結(jié)果。

      這經(jīng)常涉及到如果管理多個連接及如何多線程的提供服務(wù)的問題,常用的方式就是連接池和線程池,基本流程如下:

       

      首先服務(wù)器端有一個監(jiān)聽線程,不斷監(jiān)聽來自客戶端的連接。

      當(dāng)一個客戶端連接到監(jiān)聽線程后,便建立了一個新的連接。

      監(jiān)聽線程將新建立的連接放入連接池進(jìn)行管理,然后繼續(xù)監(jiān)聽新來的連接。

      線程池中有多個服務(wù)線程,每個線程都監(jiān)聽一個任務(wù)隊列,一個建立的連接對應(yīng)一個服務(wù)任務(wù),當(dāng)服務(wù)線程發(fā)現(xiàn)有新的任務(wù)的時候,便用此連接向客戶端提供服務(wù)。

      一個Socket Server所能夠提供的連接數(shù)可配置,如果超過配置的個數(shù)則拒絕新的連接。

      當(dāng)服務(wù)線程完成服務(wù)的時候,客戶端關(guān)閉連接,服務(wù)線程關(guān)閉連接,空閑并等待處理新的任務(wù)。

      連接池的監(jiān)控線程清除其中關(guān)閉的連接對象,從而可以建立新的連接。

      二、對Socket的封裝
      Socket的調(diào)用主要包含以下的步驟:

      調(diào)用比較復(fù)雜,我們首先區(qū)分兩類Socket,一類是Listening Socket,一類是Connected Socket.

      Listening Socket由MySocketServer負(fù)責(zé),一旦accept,則生成一個Connected Socket,又MySocket負(fù)責(zé)。

      MySocket主要實(shí)現(xiàn)的方法如下:

      int MySocket::write(const char * buf, int length)
      {
      int ret = 0;
      int left = length;
      int index = 0;
      while(left > 0)
      {
      ret = send(m_socket, buf + index, left, 0);
      if(ret == 0)
      break;
      else if(ret == -1)
      {
      break;
      }
      left -= ret;
      index += ret;
      }
      if(left > 0)
      return -1;
      return 0;
      }

      int MySocket::read(char * buf, int length)
      {
      int ret = 0;
      int left = length;
      int index = 0;
      while(left > 0)
      {
      ret = recv(m_socket, buf + index, left, 0);
      if(ret == 0)
      break;
      else if(ret == -1)
      return -1;
      left -= ret;
      index += ret;
      }

      return index;
      }

      int MySocket::status()
      {
      int status;
      int ret;
      fd_set checkset;
      struct timeval timeout;

      FD_ZERO(&checkset);
      FD_SET(m_socket, &checkset);

      timeout.tv_sec = 10;
      timeout.tv_usec = 0;

      status = select((int)m_socket + 1, &checkset, 0, 0, &timeout);
      if(status < 0)
      ret = -1;
      else if(status == 0)
      ret = 0;
      else
      ret = 0;
      return ret;
      }

      int MySocket::close()
      {
      struct linger lin;
      lin.l_onoff = 1;
      lin.l_linger = 0;
      setsockopt(m_socket, SOL_SOCKET, SO_LINGER, (const char *)&lin, sizeof(lin));
      ::close(m_socket);
      return 0;
      }
       

      MySocketServer的主要方法實(shí)現(xiàn)如下:

      int MySocketServer::init(int port)
      {
      if((m_socket = socket(AF_INET, SOCK_STREAM, 0)) == -1)
      {
      return -1;
      }

      struct sockaddr_in serverAddr;
      memset(&serverAddr, 0, sizeof(struct sockaddr_in));
      serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
      serverAddr.sin_family = AF_INET;
      serverAddr.sin_port = htons(port);

      if(bind(m_socket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) == -1)
      {
      ::close(m_socket);
      return -1;
      }

      if(listen(m_socket, SOMAXCONN) == -1)
      {
      ::close(m_socket);
      return -1;
      }

      struct linger lin;
      lin.l_onoff = 1;
      lin.l_linger = 0;

      setsockopt(m_socket, SOL_SOCKET, SO_LINGER, (const char *)&lin, sizeof(lin));
      m_port = port;
      m_inited = true;
      return 0;
      }

      MySocket * MySocketServer::accept()
      {
      int sock;
      struct sockaddr_in clientAddr;
      socklen_t clientAddrSize = sizeof(clientAddr);
      if((sock = ::accept(m_socket, (struct sockaddr *)&clientAddr, &clientAddrSize)) == -1)
      {
      return NULL;
      }
      MySocket* socket = new MySocket(sock);
      return socket;
      }

      MySocket * MySocketServer::accept(int timeout)
      {
      struct timeval timeout;
      timeout.tv_sec = timeout;
      timeout.tv_usec = 0;

      fd_set checkset;
      FD_ZERO(&checkset);
      FD_SET(m_socket, &checkset);

      int status = (int)select((int)(m_socket + 1), &checkset, NULL, NULL, &timeout);
      if(status < 0)
      return NULL;
      else if(status == 0)
      return NULL;

      if(FD_ISSET(m_socket, &checkset))
      {
      return accept();
      }
      }
       

      三、線程池的實(shí)現(xiàn)
      一個線程池一般有一個任務(wù)隊列,啟動的各個線程從任務(wù)隊列中競爭任務(wù),得到的線程則進(jìn)行處理:list<MyTask *> m_taskQueue;

      任務(wù)隊列由鎖保護(hù),使得線程安全:pthread_mutex_t m_queueMutex

      任務(wù)隊列需要條件變量來支持生產(chǎn)者消費(fèi)者模式:pthread_cond_t m_cond

      如果任務(wù)列表為空,則線程等待,等待中的線程個數(shù)為:m_numWaitThreads

      需要一個列表來維護(hù)線程池中的線程:vector<MyThread *> m_threads

      每個線程需要一個線程運(yùn)行函數(shù):

      void * __thread_new_proc(void *p)
      {
      ((MyThread *)p)->run();
      return 0;
      }
       

      每個線程由MyThread類負(fù)責(zé),主要函數(shù)如下:

      int MyThread::start()
      {

      pthread_attr_t attr;
      pthread_attr_init(&attr);
      pthread_attr_setschedpolicy(&attr, SCHED_FIFO);

      int ret = pthread_create(&m_thread, &attr, thread_func, args);
      pthread_attr_destroy(&attr);

      if(ret != 0)
      return –1;

      }

      int MyThread::stop()
      {

      int ret = pthread_kill(m_thread, SIGINT);

      if(ret != 0)
      return –1;
      }

      int MyThread::join()

      {

      int ret = pthread_join(m_thread, NULL);

      if(ret != 0)

      return –1;

      }

      void MyThread::run()

      {

      while (false == m_bStop)

      {

      MyTask *pTask = m_threadPool->getNextTask();

      if (NULL != pTask)

      {

      pTask->process();

      }

      }

      }
       

      線程池由MyThreadPool負(fù)責(zé),主要函數(shù)如下:

      int MyThreadPool::init()
      {

      pthread_condattr_t cond_attr;
      pthread_condattr_init(&cond_attr);
      pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED);
      int ret = pthread_cond_init(&m_cond, &cond_attr);
      pthread_condattr_destroy(&cond_attr);

      if (ret_val != 0)
      return –1;

      pthread_mutexattr_t attr;
      pthread_mutexattr_init(&attr);
      pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
      ret = pthread_mutex_init(&m_queueMutex, &attr);
      pthread_mutexattr_destroy(&attr);

      if (ret_val != 0)
      return –1;

      for (int i = 0; i< m_poolSize; ++i)
      {
      MyThread *thread = new MyThread(i+1, this);
      m_threads.push_back(thread);
      }

      return 0;
      }

      int MyThreadPool::start()
      {
      int ret;
      for (int i = 0; i< m_poolSize; ++i)
      {
      ret = m_threads[i]->start();
      if (ret != 0)
      break;
      }

      ret = pthread_cond_broadcast(&m_cond);

      if(ret != 0)
      return –1;
      return 0;
      }

      void MyThreadPool::addTask(MyTask *ptask)
      {
      if (NULL == ptask)
      return;

      pthread_mutex_lock(&m_queueMutex);

      m_taskQueue.push_back(ptask);

      if (m_waitingThreadCount > 0)
      pthread_cond_signal(&m_cond);

      pthread_mutex_unlock(&m_queueMutex);
      }

      MyTask * MyThreadPool::getNextTask()
      {
      MyTask *pTask = NULL;

      pthread_mutex_lock(&m_queueMutex);

      while (m_taskQueue.begin() == m_taskQueue.end())
      {
      ++m_waitingThreadCount;

      pthread_cond_wait(&n_cond, &m_queueMutex);

      --m_waitingThreadCount;
      }

      pTask = m_taskQueue.front();

      m_taskQueue.pop_front();

      pthread_mutex_unlock(&m_queueMutex);


      return pTask;
      }
       

      其中每一個任務(wù)的執(zhí)行由MyTask負(fù)責(zé),其主要方法如下:

      void MyTask::process()

      {

      //用read從客戶端讀取指令

      //對指令進(jìn)行處理

      //用write向客戶端寫入結(jié)果

      }
       

      四、連接池的實(shí)現(xiàn)
      每個連接池保存一個鏈表保存已經(jīng)建立的連接:list<MyConnection *> * m_connections

      當(dāng)然這個鏈表也需要鎖來進(jìn)行多線程保護(hù):pthread_mutex_t m_connectionMutex;

      此處一個MyConnection也是一個MyTask,由一個線程來負(fù)責(zé)。

      線程池也作為連接池的成員變量:MyThreadPool * m_threadPool

      連接池由類MyConnectionPool負(fù)責(zé),其主要函數(shù)如下:

      void MyConnectionPool::addConnection(MyConnection * pConn)
      {

      pthread_mutex_lock(&m_connectionMutex);

      m_connections->push_back(pConn);

      pthread_mutex_unlock(&m_connectionMutex);

      m_threadPool->addTask(pConn);
      }
       

      MyConnectionPool也要啟動一個背后的線程,來管理這些連接,移除結(jié)束的連接和錯誤的連接。

      void MyConnectionPool::managePool()
      {

      pthread_mutex_lock(&m_connectionMutex);

      for (list<MyConnection *>::iterator itr = m_connections->begin(); itr!=m_connections->end(); )
      {
      MyConnection *conn = *itr;
      if (conn->isFinish())
      {
      delete conn;
      conn = NULL;
      list<MyConnection *>::iterator pos = itr++;
      m_connections->erase(pos);
      }
      else if (conn->isError())
      {

      //處理錯誤的連接
      ++itr;
      }
      else
      {
      ++itr;
      }
      }

      pthread_mutex_unlock(&m_connectionMutex);

      }
       

      五、監(jiān)聽線程的實(shí)現(xiàn)
      監(jiān)聽線程需要有一個MySocketServer來監(jiān)聽客戶端的連接,每當(dāng)形成一個新的連接,查看是否超過設(shè)置的最大連接數(shù),如果超過則關(guān)閉連接,如果未超過設(shè)置的最大連接數(shù),則形成一個新的MyConnection,將其加入連接池和線程池。

      MySocketServer *pServer = new MySocketServer(port);

      MyConnectionPool *pPool = new MyConnectionPool();

      while (!stopFlag)

      {

      MySocket * sock = pServer->acceptConnection(5);

      if(sock != null)

      {

      if(m_connections.size > maxConnectionSize)

      {

      sock.close();

      }

      MyTask *pTask = new MyConnection();

      pPool->addConnection(pTask);

      }

      }

        相關(guān)評論

        閱讀本文后您有什么感想? 已有人給出評價!

        • 8 喜歡喜歡
        • 3 頂
        • 1 難過難過
        • 5 囧
        • 3 圍觀圍觀
        • 2 無聊無聊

        熱門評論

        最新評論

        發(fā)表評論 查看所有評論(0)

        昵稱:
        表情: 高興 可 汗 我不要 害羞 好 下下下 送花 屎 親親
        字?jǐn)?shù): 0/500 (您的評論需要經(jīng)過審核才能顯示)