// Description, I used this code for a long time, I deleted the code that automatically adjusted the scale (because he is still immature) / ********************** **************************************************************** THREAD POOL for Win32 * VC 6, BC 5.5 (Free), GCC (Free) * Update: 2004.6.9 llbird wushaojian@21cn.com
Use: 1): void threadfunc (void * p) {// ...} threadpool tp; for (i = 0; i <100; i ) Tp.call (threadfunc);
ThreadPool TP (20); // 20 is the initial thread pool scale Tp.call (threadfunc, lppara); tp.adjustsize (50); // increase 50 tp.adjustSize (-30); // reduce 30
2): Class mythreadjob: public threadjob // Thread object from ThreadJob extension {public: virtual void dojob (virt * p) // Custom virtual function {// ....}}; mythreadjob mt [10]; threadpool Tp; for (i = 0; i <100 i ) Tp.call (MT I); // Tp.call (MT I, PARA);
*********************************************************** ***************** / # ifndef _threadpool_h_ # define _threadpool_h_
#pragma Warning (Disable: 4530) #pragma Warning (Disable: 4786)
#include
Class threadjob // Working base class {public: // Functional Virtual void dojob (void * ppara) = 0;};
Class threadpool {
Public: // dwnum thread pool size Threadpool (DWORD DWNUM = 4): _lthreadnum (0), _lrunningnum (0) {InitializeCriticalsection (& _ CSTHREADVector); InitializationCriticalSection; & _ csworkQueue);
_EventComplete = CreateEvent (0, false, false, NULL); _EventEnd = CreateEvent (0, true, false, NULL); _SemaphoreCall = CreateSemaphore (0, 0, 0x7FFFFFFF, NULL); _SemaphoreDel = CreateSemaphore (0, 0, 0x7FFFFFFF, NULL );
Assert (_SemaphoreCall! = INVALID_HANDLE_VALUE); assert (_eventComplete! = INVALID_HANDLE_VALUE); assert (_eventend! = invalid_handle_value); assert (_semaphoredel! = invalid_handle_value);
Adjustsize (dwnum <= 0? 4: dwnum);} ~ threadpool () {deletecriticalsection (& _ csworkQueue);
CloseHandle (_EventEnd); CloseHandle (_EventComplete); CloseHandle (_SemaphoreCall); CloseHandle (_SemaphoreDel); vector
DeleteCriticalSection (& _ csThreadVector);} // adjust the thread pool size int AdjustSize (int iNum) {if (iNum> 0) {int id; ThreadItem * pNew; EnterCriticalSection (& _ csThreadVector); for (int _i = 0; _i
EntercriticalSection (& _ csworkQueue); _Jobqueue.push (New JobItem (PFUNC, PPAR)); LeavecriticalSection (& _ CSWorkQueue);
ReleaseseSemaphore (_SemaphoreCall, 1, null);} // call thread pool inline void call (threadjob * p, void * ppara = null) {call (CallProc, new callprocpara (p, ppara);} // End thread pool, and synchronization wait bool EndAndWait (DWORD dwWaitTime = INFINITE) {SetEvent (_EventEnd); return WaitForSingleObject (_EventComplete, dwWaitTime) == WAIT_OBJECT_0;} // end of the thread pool inline void end () {SetEvent (_EventEnd);} inline DWORD Size ( ) {RETURN (DWORD) _Lthreadnum;} inline dword getRunningSize () {return (dword) _lrunningnun,} Bool isrunning () {return_lrunningnum> 0;} protected:
// Working Thread Static DWord WinAPI DefaultJobProc (LPVOID LPPARETER = NULL) {ThreadItem * pthread = static_cast
Threadpool * pthreadpoolobj = pthread -> _ pthis; assert (pthreadpoolobj);
InterlockedIncrement (& PthreadPoolobj -> _ LTHREADNUM);
Handle hwaithandle [3]; hwaithandle [0] = pthreadpoolobj -> _ semaphorecall; hwaithandle [1] = pthreadpoolobj -> _ semaphoredel; hwaithandle [2] = pthreadpoolobj - >_ eventend;
JobItem * PJOB; Bool FhasJob; for (;;) {DWORD WR = WaitFormultipleObjects (3, HwaitHandle, False, Infinite);
// delete the threaded response signal if (wr == WAIT_OBJECT_0 1) break; // get user EnterCriticalSection job from the queue (& pThreadPoolObj -> _ csWorkQueue); if (! FHasJob = pThreadPoolObj -> _ JobQueue.empty ()) {pJob = PthreadPoolobj -> _ jobqueue.front (); pthreadpoolobj -> _ jobqueue.pop (); assert (pjob);} Leavecriticalsection (& pthreadpoolobj -> _ csworkQueue);
/ / Is subject to ending the thread signal to determine if the thread (end thread signal && is still working) IF (WR == Wait_Object_0 2 &&! Fhaasjob) Break;
if (fHasJob && pJob) {InterlockedIncrement (& pThreadPoolObj -> _ lRunningNum); pThread -> _ dwLastBeginTime = GetTickCount (); pThread -> _ dwCount ; pThread -> _ fIsRunning = true; pJob -> _ pFunc (pJob -> _ pPara); // Run user job delete pJob; pThread -> _ fIsRunning = false; InterlockedDecrement (& pThreadPoolObj -> _ lRunningNum);}} // remove its structure EnterCriticalSection (& pThreadPoolObj -> _ csThreadVector); pThreadPoolObj -> _ ThreadVector.erase (find (pThreadPoolObj -> _ ThreadVector.begin (), pthreadpoolobj -> _ threadVector.end (), pthread); LeavecriticalSection (& PthreadPoolobj -> _ csthreadvector);
Delete pthread;
InterlockedDecrement (& pthreadpoolobj -> _ lthreadnum);
IF (! pthreadpoolobj -> _ lthreadnum) // All thread ends set set settevent (pthreadpoolobj -> _ evenetComplete);
Return 0;} // Call user object virtual function static void callproc (void * ppara) {callprocpara * cp = static_cast
Handle _EVENTEND, _EVENTCOMPLETE, _SEMAPHORECALL, _SEMAPHOREDEL; / / End Notification, Complete event, work signal, delete thread signal long _lthreadnum, _lrunningnum; // thread number, running thread number
}
#ENDIF / / _ THREADPOOL_H_