// threadObj.h: interface for the cthreadobj class.////
#if! defined (threadObj_h) #define threadObj_h
#if _MSC_VER> 1000 # prgma overce # endif //_MSC_VER> 1000
#include "stdtype.h" #include "lock.h" #include "sockobj.h"
Typedef Enum {TPIDE, TPLOWEST, TPLOWER, TPNORMAL, TPHIGHER, TPHIGHEST, TPTIMECRITICRITICAL} CthreadPriority
Class CCLTTHREAD; Class Csrvthread;
Typedef vector
// Thread object base class class cthreadobj {public: bool m_bfinished; // terminating thread flag BOOL m_bfreeonterminate; // Whether to release thread bool m_bspended; // suspension thread during the termination thread
DWORD M_DWRETURNVALUE; // Thread Return Value DWORD M_DWTHREADID; / / Thread ID
Handle m_hhandle; // thread handle
Cthreadpriority m_priority; // thread priority public: void resume (); // Wake up thread void Terminate () {m_bterminated = true;}; // Setting Termination Thread Sign Void WaitFor (); // Waiting Thread End Void Create (Bool Bcreatesuspended = false); // Create thread
Static DWORD WINAPI THREADPROC (LPVOID PPARAM); // Thread Function Virtual Void Execute () = 0; // Thread Primary Function, Thread Circulation Virtuid Doterminate ()}; // Termination Thread
PUBLIC: CTHREADOBJ (); Virtual ~ CthreadObj ();
// worker thread class CCltThread: public CThreadObj {public: int m_iTimeOut; bool m_bKeepInCache; // whether to save the thread SOCKET m_ClientSocket; // socket handle HANDLE m_hEvent; // synchronization events CSrvThread * m_pSrvThread; // thread pointer Service
Public: Virtual Bool Doread (); Virtual Void Execute (); // Thread Primary Function, Thread Circulation Virtual Void ClientExecute (); // Client Thread Division Function
Void disconnect (); void doterminate (); // End Thread Void Reactivate (Socket Socket); // Re-enable thread bool startConnect (); // Start connecting BOOL endConnect (); // Terminate Connect Bool Connected (); public : CCltthread (Socket Sock, CSRVTHREAD * PSRVTHREAD, BOOL BOOL BREATESUSPENDED = false); cCltthread () {}; virtual ~ cCltthread ();
class CSrvThread: public CThreadObj {protected: UINT m_iThreadCacheSize; bool m_bActive; SOCKET m_SrvSock; CSockLst m_Connections; CLockObjCS m_LockObjCS; CCltThreadLst m_ActiveThreads; public: int m_iPort;
bool Connected (); void AddClient (SOCKET CltSock); void RemoveClient (SOCKET CltSock); void AddThread (CCltThread * pThread); void RemoveThread (CCltThread * pThread); void GetSrvCltThread (SOCKET CltSock); void SetPort (int port) {m_iPort = Port;}; void setActive; void disconnect (); void listen (); socket accept ();
public: int GetActiveConnections (); int GetActiveThreads (); int GetIdleThreads (); virtual void Init (); public: CSrvThread (); virtual void Execute (); virtual void DoCreateThread (SOCKET CltSock);};
#ndif // threadobj.cpp: Implementation of the cthreadobj class.///#include "threadObj.h"
//// construction / destruction //
CThreadObj :: CThreadObj () {m_hHandle = (HANDLE) -1; m_dwReturnValue = 0; m_dwThreadID = 0; m_bTerminated = false; m_bFreeOnTerminate = false; m_bSuspended = false; m_bFinished = false;}
CthreadObj :: ~ cthreadobj () {closehandle (m_hhandle);
Void CthreadObj :: Resume () {if (resumethread (m_hhandle) == 1) m_bspended = false;}
// Create Thread Void CthreadObj :: Create (Bool BcreateSuspend) {m_bspended = bcreateSuspend; DWORD DWFLAGS = 0; if (bcreateSuspended) dwflags = create_suspend;
m_hhandle = beginthreadex (0, 0, & ThreadProc, this, dwflags, & m_dwthreadid);
// Thread function DWORD CTHREADOBJ :: ThreadProc (lpvoid pparam) {cthreadobj * pBaseThread = (cthreadobj *) PParam; try {pbasethread-> execute ();} catch (...) {}
// whether to release when the end of the thread object bool bFreeThread = pBaseThread-> m_bFreeOnTerminate; // Set the thread return value DWORD Result = pBaseThread-> m_dwReturnValue; // Set the thread completion flag pBaseThread-> m_bFinished = true; // terminate the calling thread function pBaseThread -> dotermic (); if (bFreethread) delete pBaseThread; exitthread; Return Result;} void cthreadObj :: waitfor () {WaitForsingleObject (m_hhandle, infinite);}
// Create a client thread, and set the connection handle and handle service thread that thread CCltThread :: CCltThread (SOCKET sock, CSrvThread * pSrvThread, bool bCreateSuspended) {m_hEvent = CreateEvent (NULL, false, true, NULL); m_ClientSocket = Sock; m_psrvthread = psrvthread; m_bkeepincacache = false; m_bfreeonterminate = true; // priority: = tphigher; reactivate (sock); // if (! bcreateSuspended) // resume;}
CCLTTHREAD :: ~ cCltthread () {closeHandle (m_hevent);
/ / Terminate the thread, remove the thread object itself from the list of service threads, call Void cCltthread :: doterminate () {if (NULL! = M_psrvthread) m_psrvthread-> removethread (this);}
// Thread main function, thread cycle body in this cycle VOID CCLTTHREAD :: execute () {try {// thread processing cyclic body while {if (startConnect ()) // block until the connection event arrives clientExecute (); // Process IF on the new connection (endConnect ()) // terminate the current connection, return thread end condition Break;}} catch (...) {m_bkeepincacache = false;}}
// Client Processing Function, Reserved Void CCltthread :: ClientExecute () {// Current Connection Processing Cyclic body While () {
FD_set fd = {1, m_clientsocket}; timeval TV = {m_itimeout, 0}; unsigned long flag = 1; int NbytesRecv = 0;
IF (SELECT (0, & FD, NULL, NULL, & TV)> 0 &&! m_bterminated) {ioctlsocket (m_clientsocket, fion, (u_long *) & nbytesRecv);
IF (0 == nbytesRecv) {this-> disconnect (); break;}} else {this-> disconnect (); break;}}}
// Accept data BOOL CCLTTHREAD :: DoreAd () {return true;} // Release Void CCltthread :: Disconnect () {// If there is no connection, it is not processed (! ") Return;
// Remove the current connection m_psrvthread-> removeclient (m_clientsocket) from the list of service threads; csockobjclt; // Close the socket clt.closeSocket (m_clientsocket); // Reset the socket = invalid_socket;}
/ / Return to whether the connection is established, the connection is established to return True, otherwise returns falsebool ccltthread :: connect () {return! (M_clientsocket == invalid_socket);}
// After the new connection arrives, reactivate the thread void ccltthread :: repeate (socket socket) {m_clientsocket = socket; // Add the current customer thread to the list of the service thread m_psrvthread-> addthread (this); // terminate Waiting for thread, restart the thread process setEvent (m_hevent);
// Wait for the connection event, restart the thread process (WaitForsingleObject (m_hevent, infinite); return! M_bterminated;}
// Thread processing function, end the connection bool ccltthread :: endConnect () {// User termination, or connection processing, no need to save the current thread Return true return m_bterminated ||! M_bkeepincacache;}
// Add the current connection to the list void csrvthread :: addclient (Socket CLTSOCK) {Clock Lock (& M_LOCKOBJCS); if (Find (m_connections.begin (), m_connections.egin (), cltsock == m_connections.end () ) {M_connections.push_back (capsock); string szlog = " connection:" FMTINT (CLTSOCK, 5, '0') "establishment connection time is:" getsysStime () ", current connection is:" FMTINT (m_connections.size (), 4, '0') "/ r / n"; printf (szlog.c_str ());}}
void CSrvThread :: RemoveClient (SOCKET CltSock) {CLock lock (& m_LockObjCS); CSockLst :: iterator pos; pos = find (m_Connections.begin (), m_Connections.end (), CltSock);! if (pos = m_Connections.end ( )) {M_connections.ed (POS); string szlog = "- connection:" FMTINT (CLTSOCK, 5, '0') "Lens connection time is:" getSysStime () ", the current number of connections is: " Fmtint (m_connections.size (), 4, '0') " / r / n "; printf (szlog.c_str ());}} void csrvthread :: addthread (ccltthread * pthread) {Clock Lock (& M_LOCKOBJCS ); if (m_ActiveThreads.end () == find (m_ActiveThreads.begin (), m_ActiveThreads.end (), pThread)) {m_ActiveThreads.push_back (pThread); if (m_ActiveThreads.size () <= m_iThreadCacheSize) pThread-> m_bkeepincacache = true; else pthread-> m_bkeepincache = false;}}
void CSrvThread :: RemoveThread (CCltThread * pThread) {CLock lock (& m_LockObjCS); CCltThreadLst :: iterator pos; pos = find (m_ActiveThreads.begin (), m_ActiveThreads.end (), pThread);! if (pos = m_ActiveThreads.end ()) M_ActiveThreads.rase (POS);
INT CSRVTHREAD :: getActiveConnections () {// clock lock (& m_cslockobj); return m_connections.size ();
INT CSRVTHREADS () {Clock Lock (& M_LOCKOBJCS); int IactiveCount = 0; for (size_t i = 0; i
return iActiveCount;} int CSrvThread :: GetIdleThreads () {CLock lock (& m_LockObjCS); int iIdleCount = 0; for (size_t i = 0; i
Return iidlecount;}
Void csrvthread :: init () {}
// Thread execution body, if no one is terminating thread, loop reception connection void csrvthread :: execute () {string szlog = getsysStime () "service [" fmtint (m_iport, 5, '0') "] startup Success / r / n "; Printf (Szlog.c_str ()); while (! M_bterminated) accept ();} // Settings the active status of the service thread Void Csrvthread :: SetAtAnt (BOOL BACTIVE) {if (BACTIVE! = M_BACTIVE ) {M_bactive = BACTIVE; if (connection ()) // If it is connected, the connection disconnect (); ELSE // is not connected, launch listen ();}}
// Service thread termination connection, disconnecting Void csrvthread :: disconnect () {int isaVachesize = m_ithreadcachesize; terminnesize (); // Call Termination Multi-Phase M_ithreadCachesize = 0; cCltthread * pCltthread;
While (0! = m_activethreads.size ()) {PCLTTHREAD = M_ActiveTHReads [0];
PCltthread-> m_bfreeonterminate = false; pCltthread-> Terminate (); // If the client thread is blocked, reactivate SetEvent (PCLTTHREAD-> m_hevent);
PCLTTHREAD-> WAITFOR (); pCltthread-> disconnect (); delete pCltthread; /// // If the thread is still in an active connection, the connection status IF is ended by closing the socket (M_ActiveThreads [i] -> m_clientsocket! = INVALID_SOCKET) {CSOCKOBJ CLT; // Close socket CLT.CloseSocket (m_activethreads [i] -> m_clientsocket);} * /}
m_ithreadcachesize; if (Connected ()) {// If the connection service is active, turn off the connection CSOCKOBJ SRV; Srv.closeSocket; m_srvsock = invalid_socket;}}
// Start service thread void csrvthread :: listen () {init ();
CSOCKOBJ SRV; M_SRVSOCK = srv.createsocket (); if (m_srvsock == invalid_socket) return; if (0 == m_iport) m_iport = 75333;
Srv.bindsocketex (M_SRVSOCK, M_IPORT); SRV.Listensocket (M_SRVSOCK, 100);
CREATE ();
// Accept the client connection socket csrvthread :: acce () {csockobj srv; socket cltsock = srv.accept_block (m_srvsock); // Accept client connection
If (CLTSOCK! = Invalid_Socket) {addclient (CLTSOCK); // Add the current client connection to getSrvCltthread (CLTSOCK); // Get client threads used to serve}; Return CLTSOCK;
// Acquisition of client threads for services Void Csrvthread :: getsrvcltthread (socket cltsock) {clock lock (& m_lockobjcs); size_t i; // Looking for idle client threads for (i = 0; i
Void CSRVTHREAD :: DocreThread (Socket CLTSOCK) {(New CCLTTHREAD (CLTSOCK, THIS, FALSE) -> CREATE ();
/ / Return the connection status of the service thread BOOL CSRVTHREAD :: Connected () {return! (M_srvsock == invalid_socket);}
CSRVTHREAD :: CSRVTHREAD () {m_ithreadcachesize = 100; m_bactive = false; m_srvsock = INVALID_SOCKET; m_IPORT = 0;}