Multi-threaded communication

xiaoxiao2021-03-06  41

// threadObj.h: interface for the cthreadobj class.////

#if! defined (threadObj_h) #define threadObj_h

#if _MSC_VER> 1000 # prgma overce # endif //_MSC_VER> 1000

#include "stdtype.h" #include "lock.h" #include "sockobj.h"

Typedef Enum {TPIDE, TPLOWEST, TPLOWER, TPNORMAL, TPHIGHER, TPHIGHEST, TPTIMECRITICRITICAL} CthreadPriority

Class CCLTTHREAD; Class Csrvthread;

Typedef vector ccltthreadlst; typef vector csrvthreadlst;

// Thread object base class class cthreadobj {public: bool m_bfinished; // terminating thread flag BOOL m_bfreeonterminate; // Whether to release thread bool m_bspended; // suspension thread during the termination thread

DWORD M_DWRETURNVALUE; // Thread Return Value DWORD M_DWTHREADID; / / Thread ID

Handle m_hhandle; // thread handle

Cthreadpriority m_priority; // thread priority public: void resume (); // Wake up thread void Terminate () {m_bterminated = true;}; // Setting Termination Thread Sign Void WaitFor (); // Waiting Thread End Void Create (Bool Bcreatesuspended = false); // Create thread

Static DWORD WINAPI THREADPROC (LPVOID PPARAM); // Thread Function Virtual Void Execute () = 0; // Thread Primary Function, Thread Circulation Virtuid Doterminate ()}; // Termination Thread

PUBLIC: CTHREADOBJ (); Virtual ~ CthreadObj ();

// worker thread class CCltThread: public CThreadObj {public: int m_iTimeOut; bool m_bKeepInCache; // whether to save the thread SOCKET m_ClientSocket; // socket handle HANDLE m_hEvent; // synchronization events CSrvThread * m_pSrvThread; // thread pointer Service

Public: Virtual Bool Doread (); Virtual Void Execute (); // Thread Primary Function, Thread Circulation Virtual Void ClientExecute (); // Client Thread Division Function

Void disconnect (); void doterminate (); // End Thread Void Reactivate (Socket Socket); // Re-enable thread bool startConnect (); // Start connecting BOOL endConnect (); // Terminate Connect Bool Connected (); public : CCltthread (Socket Sock, CSRVTHREAD * PSRVTHREAD, BOOL BOOL BREATESUSPENDED = false); cCltthread () {}; virtual ~ cCltthread ();

class CSrvThread: public CThreadObj {protected: UINT m_iThreadCacheSize; bool m_bActive; SOCKET m_SrvSock; CSockLst m_Connections; CLockObjCS m_LockObjCS; CCltThreadLst m_ActiveThreads; public: int m_iPort;

bool Connected (); void AddClient (SOCKET CltSock); void RemoveClient (SOCKET CltSock); void AddThread (CCltThread * pThread); void RemoveThread (CCltThread * pThread); void GetSrvCltThread (SOCKET CltSock); void SetPort (int port) {m_iPort = Port;}; void setActive; void disconnect (); void listen (); socket accept ();

public: int GetActiveConnections (); int GetActiveThreads (); int GetIdleThreads (); virtual void Init (); public: CSrvThread (); virtual void Execute (); virtual void DoCreateThread (SOCKET CltSock);};

#ndif // threadobj.cpp: Implementation of the cthreadobj class.///#include "threadObj.h"

//// construction / destruction //

CThreadObj :: CThreadObj () {m_hHandle = (HANDLE) -1; m_dwReturnValue = 0; m_dwThreadID = 0; m_bTerminated = false; m_bFreeOnTerminate = false; m_bSuspended = false; m_bFinished = false;}

CthreadObj :: ~ cthreadobj () {closehandle (m_hhandle);

Void CthreadObj :: Resume () {if (resumethread (m_hhandle) == 1) m_bspended = false;}

// Create Thread Void CthreadObj :: Create (Bool BcreateSuspend) {m_bspended = bcreateSuspend; DWORD DWFLAGS = 0; if (bcreateSuspended) dwflags = create_suspend;

m_hhandle = beginthreadex (0, 0, & ThreadProc, this, dwflags, & m_dwthreadid);

// Thread function DWORD CTHREADOBJ :: ThreadProc (lpvoid pparam) {cthreadobj * pBaseThread = (cthreadobj *) PParam; try {pbasethread-> execute ();} catch (...) {}

// whether to release when the end of the thread object bool bFreeThread = pBaseThread-> m_bFreeOnTerminate; // Set the thread return value DWORD Result = pBaseThread-> m_dwReturnValue; // Set the thread completion flag pBaseThread-> m_bFinished = true; // terminate the calling thread function pBaseThread -> dotermic (); if (bFreethread) delete pBaseThread; exitthread; Return Result;} void cthreadObj :: waitfor () {WaitForsingleObject (m_hhandle, infinite);}

// Create a client thread, and set the connection handle and handle service thread that thread CCltThread :: CCltThread (SOCKET sock, CSrvThread * pSrvThread, bool bCreateSuspended) {m_hEvent = CreateEvent (NULL, false, true, NULL); m_ClientSocket = Sock; m_psrvthread = psrvthread; m_bkeepincacache = false; m_bfreeonterminate = true; // priority: = tphigher; reactivate (sock); // if (! bcreateSuspended) // resume;}

CCLTTHREAD :: ~ cCltthread () {closeHandle (m_hevent);

/ / Terminate the thread, remove the thread object itself from the list of service threads, call Void cCltthread :: doterminate () {if (NULL! = M_psrvthread) m_psrvthread-> removethread (this);}

// Thread main function, thread cycle body in this cycle VOID CCLTTHREAD :: execute () {try {// thread processing cyclic body while {if (startConnect ()) // block until the connection event arrives clientExecute (); // Process IF on the new connection (endConnect ()) // terminate the current connection, return thread end condition Break;}} catch (...) {m_bkeepincacache = false;}}

// Client Processing Function, Reserved Void CCltthread :: ClientExecute () {// Current Connection Processing Cyclic body While () {

FD_set fd = {1, m_clientsocket}; timeval TV = {m_itimeout, 0}; unsigned long flag = 1; int NbytesRecv = 0;

IF (SELECT (0, & FD, NULL, NULL, & TV)> 0 &&! m_bterminated) {ioctlsocket (m_clientsocket, fion, (u_long *) & nbytesRecv);

IF (0 == nbytesRecv) {this-> disconnect (); break;}} else {this-> disconnect (); break;}}}

// Accept data BOOL CCLTTHREAD :: DoreAd () {return true;} // Release Void CCltthread :: Disconnect () {// If there is no connection, it is not processed (! ") Return;

// Remove the current connection m_psrvthread-> removeclient (m_clientsocket) from the list of service threads; csockobjclt; // Close the socket clt.closeSocket (m_clientsocket); // Reset the socket = invalid_socket;}

/ / Return to whether the connection is established, the connection is established to return True, otherwise returns falsebool ccltthread :: connect () {return! (M_clientsocket == invalid_socket);}

// After the new connection arrives, reactivate the thread void ccltthread :: repeate (socket socket) {m_clientsocket = socket; // Add the current customer thread to the list of the service thread m_psrvthread-> addthread (this); // terminate Waiting for thread, restart the thread process setEvent (m_hevent);

// Wait for the connection event, restart the thread process (WaitForsingleObject (m_hevent, infinite); return! M_bterminated;}

// Thread processing function, end the connection bool ccltthread :: endConnect () {// User termination, or connection processing, no need to save the current thread Return true return m_bterminated ||! M_bkeepincacache;}

// Add the current connection to the list void csrvthread :: addclient (Socket CLTSOCK) {Clock Lock (& ​​M_LOCKOBJCS); if (Find (m_connections.begin (), m_connections.egin (), cltsock == m_connections.end () ) {M_connections.push_back (capsock); string szlog = " connection:" FMTINT (CLTSOCK, 5, '0') "establishment connection time is:" getsysStime () ", current connection is:" FMTINT (m_connections.size (), 4, '0') "/ r / n"; printf (szlog.c_str ());}}

void CSrvThread :: RemoveClient (SOCKET CltSock) {CLock lock (& ​​m_LockObjCS); CSockLst :: iterator pos; pos = find (m_Connections.begin (), m_Connections.end (), CltSock);! if (pos = m_Connections.end ( )) {M_connections.ed (POS); string szlog = "- connection:" FMTINT (CLTSOCK, 5, '0') "Lens connection time is:" getSysStime () ", the current number of connections is: " Fmtint (m_connections.size (), 4, '0') " / r / n "; printf (szlog.c_str ());}} void csrvthread :: addthread (ccltthread * pthread) {Clock Lock (& ​​M_LOCKOBJCS ); if (m_ActiveThreads.end () == find (m_ActiveThreads.begin (), m_ActiveThreads.end (), pThread)) {m_ActiveThreads.push_back (pThread); if (m_ActiveThreads.size () <= m_iThreadCacheSize) pThread-> m_bkeepincacache = true; else pthread-> m_bkeepincache = false;}}

void CSrvThread :: RemoveThread (CCltThread * pThread) {CLock lock (& ​​m_LockObjCS); CCltThreadLst :: iterator pos; pos = find (m_ActiveThreads.begin (), m_ActiveThreads.end (), pThread);! if (pos = m_ActiveThreads.end ()) M_ActiveThreads.rase (POS);

INT CSRVTHREAD :: getActiveConnections () {// clock lock (& ​​m_cslockobj); return m_connections.size ();

INT CSRVTHREADS () {Clock Lock (& ​​M_LOCKOBJCS); int IactiveCount = 0; for (size_t i = 0; i connection ()) IactiveCount;

return iActiveCount;} int CSrvThread :: GetIdleThreads () {CLock lock (& ​​m_LockObjCS); int iIdleCount = 0; for (size_t i = 0; i Connection ()) iidlecount;

Return iidlecount;}

Void csrvthread :: init () {}

// Thread execution body, if no one is terminating thread, loop reception connection void csrvthread :: execute () {string szlog = getsysStime () "service [" fmtint (m_iport, 5, '0') "] startup Success / r / n "; Printf (Szlog.c_str ()); while (! M_bterminated) accept ();} // Settings the active status of the service thread Void Csrvthread :: SetAtAnt (BOOL BACTIVE) {if (BACTIVE! = M_BACTIVE ) {M_bactive = BACTIVE; if (connection ()) // If it is connected, the connection disconnect (); ELSE // is not connected, launch listen ();}}

// Service thread termination connection, disconnecting Void csrvthread :: disconnect () {int isaVachesize = m_ithreadcachesize; terminnesize (); // Call Termination Multi-Phase M_ithreadCachesize = 0; cCltthread * pCltthread;

While (0! = m_activethreads.size ()) {PCLTTHREAD = M_ActiveTHReads [0];

PCltthread-> m_bfreeonterminate = false; pCltthread-> Terminate (); // If the client thread is blocked, reactivate SetEvent (PCLTTHREAD-> m_hevent);

PCLTTHREAD-> WAITFOR (); pCltthread-> disconnect (); delete pCltthread; /// // If the thread is still in an active connection, the connection status IF is ended by closing the socket (M_ActiveThreads [i] -> m_clientsocket! = INVALID_SOCKET) {CSOCKOBJ CLT; // Close socket CLT.CloseSocket (m_activethreads [i] -> m_clientsocket);} * /}

m_ithreadcachesize; if (Connected ()) {// If the connection service is active, turn off the connection CSOCKOBJ SRV; Srv.closeSocket; m_srvsock = invalid_socket;}}

// Start service thread void csrvthread :: listen () {init ();

CSOCKOBJ SRV; M_SRVSOCK = srv.createsocket (); if (m_srvsock == invalid_socket) return; if (0 == m_iport) m_iport = 75333;

Srv.bindsocketex (M_SRVSOCK, M_IPORT); SRV.Listensocket (M_SRVSOCK, 100);

CREATE ();

// Accept the client connection socket csrvthread :: acce () {csockobj srv; socket cltsock = srv.accept_block (m_srvsock); // Accept client connection

If (CLTSOCK! = Invalid_Socket) {addclient (CLTSOCK); // Add the current client connection to getSrvCltthread (CLTSOCK); // Get client threads used to serve}; Return CLTSOCK;

// Acquisition of client threads for services Void Csrvthread :: getsrvcltthread (socket cltsock) {clock lock (& ​​m_lockobjcs); size_t i; // Looking for idle client threads for (i = 0; i m_clientsocket == invalid_socket) {m_activethreads [i] -> reactivate (CLTSOCK); // If you are found, activate Break;} // If you are not found, create a new thread if (m_activethreads.size () == i) DOCREATTHREAD (CLTSOCK);

Void CSRVTHREAD :: DocreThread (Socket CLTSOCK) {(New CCLTTHREAD (CLTSOCK, THIS, FALSE) -> CREATE ();

/ / Return the connection status of the service thread BOOL CSRVTHREAD :: Connected () {return! (M_srvsock == invalid_socket);}

CSRVTHREAD :: CSRVTHREAD () {m_ithreadcachesize = 100; m_bactive = false; m_srvsock = INVALID_SOCKET; m_IPORT = 0;}

转载请注明原文地址:https://www.9cbs.com/read-74779.html

New Post(0)