A implementation that supports multi-threaded synchronous loop queue

The queue element is an uncontrolled character array (ie the byte array). The address of the array is stored in the loop queue. This address points to a storage area, the structure of the storage area: ________________________________________________________________________________________________________________________________________________________________________________________________________________ ---------------------------------------------- This loop queue support Multithreaded synchronization operation, when the queue is changed, there is a mutex MUTEX prevents no synchronization. Working environment: Linux 9.0 compile: g bytesqueue.cpp main.cpp -o main -lpthread The following is the code section: two main.cpp functions, the previous one is a general single-threaded application. The latter is a multi-threaded application. / * Bytesqueue.h zhangggdlt 2004/11/15 to realize a queue storing bytes array. * / # IFNDef _BYTES_QUEUE_H # define _bytes_queue_h

#include #include

#define Operation_ok 0 # define queue_full -1 # define queue_empty -2 # Define increase_failed -3 # define no_Area -4 # define point_null -5 # define failed_lock -6

Typedef int err_number; typedef unsigned char uint_8;

/ * The class BytesQueue is used to realize store an unsigned char array into the queue which sustain mutiple thread and sycronization. This queue is a cycle queue. The size of the queue can be set when it is constructed and you can also increas the size . of the queue during the application * / class BytesQueue {private: int _size; int _head; int _rear; uint_8 ** _ buffer; pthread_mutex_t QueMutex; public: BytesQueue (int size = 512); ERR_NUMBER increaseSize (int size = 512); ERR_NUMBER inQueue (const uint_8 * data, int len); ERR_NUMBER outQueue (uint_8 * data, int & len); void destroy (); void errMessage (ERR_NUMBER err); void showBytesQueue (BytesQueue & bq);};


-------------------------------------- / * bytesqueue.cpp zhangggdlt 2004/12/9 to realize a stack storing bytes array which sustain the mutitread and sycronization. * / # include #include #include "BytesQueue.h" / * Constructor. This BytesQueue can sustain sycronization among the mutiThread. It Means you can users :: bytesqueue :: Bytesqueue (int size) // size = 512 {this -> _ size = size; this -> _ buffer = new (uint_8 *) [this -> _ size]; this -> _ head = 0; this -> _RAR = 0; pthread_mutex_init (& quemutex, null);}

/ * You can use this number fuction to the size of the queue The data will not be lost during the increasement * / ERR_NUMBER BytesQueue :: increaseSize (int size) // size = 512 {uint_8 ** temp increase;.. Int eleCount = (this -> _REAR - this -> _ head 1 this -> _ size)% this -> _ size; int tempsize = this -> _ size; int = size; f (! Temp = new (uint_8 *) [this -> _ size])) Return Increase_failed; if (this -> _RAR == this -> _ head) // Empty queue {electrount = 0;} f ((this -> _ rear 1 )% this -> _ size == this -> _ head // full queue {electrount = this -> _ size - size;} for (i = this -> _ head, j = 0; j _ size) {TEMP [I] = this -> _ buffer [i% tempsize]; this -> _RAR = i;} delete [] this -> _ buffer; this -> _ buffer = temp; return Operation_ok;}

/ * This function is use to accept one element into the queue. You must remember the element is a unsigned char array. Len is the length of the data. * / ERR_NUMBER BytesQueue :: inQueue (const uint_8 * data, int len) { UINT_8 * TEMP; IF (PTHREAD_MUTEX_TRYLOCK (& Quemutex)) {Printf ("Try Lock Failed! / N"); Return Failed_lock;} f ((this -> _RAR 1)% this -> _ size == this -> _ head) {printf ( "The queue is full / n!"); pthread_mutex_unlock (& ​​QueMutex); return QUEUE_FULL;} if (! uint_8 (temp = new [len 4])) {pthread_mutex_unlock (& ​​QueMutex); return NO_AREA;} this- > _Buffer [this -> _REAR] = Temp; memcpy (this -> _ buffer [this -> _ r] & len, 4); memcpy (this -> _ buffer [this -> _RHIS 4, DATA, LEN); this -> _ rear = (this -> _ rear 1)% this -> _ size; pthread_mutex_unlock (& ​​QueMutex); return OPERATION_OK;} / * This function is use to set free one element from the queue You must get a buffer big enough to. Store The Data Before You Call The Function. At The Same Time You NEED A More Int & Len To Get The Data Length. * / E RR_NUMBER BytesQueue :: outQueue (uint_8 * data, int & len) {if (pthread_mutex_trylock (& ​​QueMutex)) {printf ( "Try lock failed / n!"); Return FAILED_LOCK;} if (data!) {Pthread_mutex_unlock (& ​​QueMutex); return Point_null;} if (this -> _ head == this -> _ {printf ("The queue is empty! / N"); pthread_mutex_unlock (& ​​queTex); return queue_empty;} memcpy (void *) & len, this-> _Buffer [this -> _ head], 4); memcpy ((void *) Data, this -> _ buffer [this -> _ head] 4, len);

DELETE [] (this -> _ buffer [this -> _ head]);

this -> _ head = (this -> _ head 1)% this -> _ size; pthread_mutex_unlock (& ​​QueMutex); return OPERATION_OK;} / * This function is used to set free the data structure * / void BytesQueue :: destroy (). {While (this -> _ head! = This -> _REAR) {delete [] (this -> _ buffer [this -> _ head]); this -> _ head = (this -> _ head 1)% this -> _ size; } Delete [] (this -> _ buffer; this -> _ size = 0; this -> _ buffer = null; this -> _ head = 0; this -> _RAR = 0;} / * this fuction is use to test. Show The result of the call fuction. * / void bytesqueue :: errMessage (err_number err) {switch (err) {copy operation_ok: printf ("push is ok! / n"); Break; Case Queue_Full: Printf ("Push Failed! The Queue Is Full !! / N "); Break; Case Queue_empty: Printf (" Pop Failed! The Queue IS Empty !! / N "); Break; Case Increase_failed: Printf (" Increase Queue Size Failed! / N ") Break; Default: Printf ("Other Things Are Wrong! / N"); Break;}}

/ * This fuction is buy {printf ("% s / n", "the info of the bytesqueue"; printf ("" print Size:% D / N ", BQ._SIZE); Printf (" HEAD:% D / N ", BQ._HEAD); Printf (" REAR:% D / N ", BQ._REAR); Printf (" BUF ADDR : 0x% x / n ", bq._buffer);

/ * Using Namespace NetworkProtocols;

// this is a good example to show how to use the the data structure bytesqueue.

INT main () {Int Len, I; char ch; err_number err; uint_8 bufi [] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0}; uint_8 bufo [10]; bytesqueue BS; BS.Showbytesqueue (BS); ch = getchar (); while (ch! = 'q') {switch (ch) {copy 'i': err = bs.inqueue (bufi, 10); bs.ERRMESSAGE Err); BS.Showbytesqueue (BS); CH = getcha (); Break; Case 'o': err = bs.outqueue (BUFO, LEN); BS. HerrMessage (ERR); BS.SHOWBYTESQUEUE (BS); ch = GetChar (); Break; Case 'E': err = bs.increasesize (); bs.errMessage (ERR); BS.Showbytesqueue (BS); ch = getchar (); break; case 'h': Printf (". ................... Help ................ / n "); Printf (" i: Go Into an Array INTO Queue./N "); Printf (" o: Go out of an array out of the queue./N "); Printf (" e: enlarge the size of the queue./N "); Printf (" H: Help / n "); Printf (" Q: Quit the system./n "); ch = getchar (); break; default: if (ch! = '/ n') Printf (" ....... .... Your Input Is Wrong! AG AIN! ............ / n "); ch = getchar (); break;}} bs.destroy (); bs.showbytesqueue (bs); return 0;} * / ---------------------------------------- // main.cpp # incdude #include "bytesqueue.h" typef struct {INT ID; bytesqueue * bq; uint_8 * buf; int Len; int delay;} myparameter;

PThread_t threads [5]; pthread_mutex_t quemX; pthread_attr_ttt;

Void * inQueue (void * pvar) {INT i = 1; MyParameter * Para = (MyParameter *) PVAR; while (i) {printf ("Thread Inque:% D Is Working! / N", Para-> ID); Para-> BQ-> Inqueue (Para-> BUF, Para-> LEN); Para-> BQ-> Showbytesqueue (* (Para-> BQ)); // Para-> BS-> Push (Para-> BUF , PARA-> LEN); // Para-> BS-> Showbytesstack (* (Para-> BS)); Usleep (Para-> DELAY); i ;} pthread_exit (null);} void * Outqueue (void * pvar) {INT i = 1; MyParameter * Para = (MyParameter *) PVAR; while (i) {printf ("----------- Thread Outque:% D Is Working! / N ", para-> id); para-> bq-> Outqueue (Para-> BUF, Para-> LEN); Para-> BQ-> showbytesqueue (* (PARA-> BQ)); // Para-> BS -> POP (Para-> BUF, Para-> LEN); // Para-> BS-> Showbytesstack (* (Para-> BS)); Usleep (Para-> Delay); i ;} pthread_exit (null }

INT main () {// ipstack :: ipstack (int size) // size = 10 uint_8 mybuf1 [] = {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x50, 0x58, 0x0d, 0x0d, 0x0d, 0x08, 0x00, 0x45, 0x00, 0x00, 0x34, 0x00, 0x40, 0x11, 0x00, 0x40, 0x11, 0xa8, 0x21, 0x0f, 0xc0, 0xa8, 0x21, 0x02, 0x04, 0x01, 0x00, 0x05, 0x00, 0x20, 0x60, 0x4c, 0x73, 0x66, 0x61, 0x73, 0x64, 0x66, 0x73, 0x61, 0x64, 0x66, 0x61, 0x73, 0x64, 0x66, 0x73, 0x64, 0x61, 0x66, 0x61, 0x73, 0x66, 0x73, 0x64, 0x66}; uint_8 mybuf2 [100]; int Len; bytesqueue BQ (100); MyParameter Paras [4] = {{0, & BQ, MyBuf1, 66, 1000000}, {1, & BQ, Mybuf1, 66, 2000000}, {2, & BQ, Mybuf1, 66, 300000}, {3, & BQ, MyBuf2, Len, 1000000}}; //bq.showbytesqueue (BQ); pthread_attr_init (& Attr); pthread_create (& Threads [& threads [ , & attr, inqueue, (void *) & paras [0]); pthread_create (& Threads [1], & Attr, Inqueue, (Void *) & Paras [1]); pthread_create (& Threads [2], & Attr, Inqueue, (void * ) & paras [2]); PTHREA D_create (& Threads [3], & Attr, Outqueue, (Void *) & Paras [3]); // pthread_create (& Threads [4], & Attr, Outqueue, (Void *) & Paras [3]); for (int i = 0 ; i <4; i ) {pthread_join (threads [i], null);} pthread_attr_destroy (& attr);


