The queue element is an uncontrolled character array (ie the byte array). The address of the array is stored in the loop queue. This address points to a storage area, the structure of the storage area: ________________________________________________________________________________________________________________________________________________________________________________________________________________ ---------------------------------------------- This loop queue support Multithreaded synchronization operation, when the queue is changed, there is a mutex MUTEX prevents no synchronization. Working environment: Linux 9.0 compile: g bytesqueue.cpp main.cpp -o main -lpthread The following is the code section: two main.cpp functions, the previous one is a general single-threaded application. The latter is a multi-threaded application. / * Bytesqueue.h zhangggdlt 2004/11/15 to realize a queue storing bytes array. * / # IFNDef _BYTES_QUEUE_H # define _bytes_queue_h
#include
#define Operation_ok 0 # define queue_full -1 # define queue_empty -2 # Define increase_failed -3 # define no_Area -4 # define point_null -5 # define failed_lock -6
Typedef int err_number; typedef unsigned char uint_8;
/ * The class BytesQueue is used to realize store an unsigned char array into the queue which sustain mutiple thread and sycronization. This queue is a cycle queue. The size of the queue can be set when it is constructed and you can also increas the size . of the queue during the application * / class BytesQueue {private: int _size; int _head; int _rear; uint_8 ** _ buffer; pthread_mutex_t QueMutex; public: BytesQueue (int size = 512); ERR_NUMBER increaseSize (int size = 512); ERR_NUMBER inQueue (const uint_8 * data, int len); ERR_NUMBER outQueue (uint_8 * data, int & len); void destroy (); void errMessage (ERR_NUMBER err); void showBytesQueue (BytesQueue & bq);};
#ENDIF / / _ _ BYTES_QUE_H
-------------------------------------- / * bytesqueue.cpp zhangggdlt 2004/12/9 to realize a stack storing bytes array which sustain the mutitread and sycronization. * / # include
/ * You can use this number fuction to the size of the queue The data will not be lost during the increasement * / ERR_NUMBER BytesQueue :: increaseSize (int size) // size = 512 {uint_8 ** temp increase;.. Int eleCount = (this -> _REAR - this -> _ head 1 this -> _ size)% this -> _ size; int tempsize = this -> _ size; int = size; f (! Temp = new (uint_8 *) [this -> _ size])) Return Increase_failed; if (this -> _RAR == this -> _ head) // Empty queue {electrount = 0;} f ((this -> _ rear 1 )% this -> _ size == this -> _ head // full queue {electrount = this -> _ size - size;} for (i = this -> _ head, j = 0; j
/ * This function is use to accept one element into the queue. You must remember the element is a unsigned char array. Len is the length of the data. * / ERR_NUMBER BytesQueue :: inQueue (const uint_8 * data, int len) { UINT_8 * TEMP; IF (PTHREAD_MUTEX_TRYLOCK (& Quemutex)) {Printf ("Try Lock Failed! / N"); Return Failed_lock;} f ((this -> _RAR 1)% this -> _ size == this -> _ head) {printf ( "The queue is full / n!"); pthread_mutex_unlock (& QueMutex); return QUEUE_FULL;} if (! uint_8 (temp = new [len 4])) {pthread_mutex_unlock (& QueMutex); return NO_AREA;} this- > _Buffer [this -> _REAR] = Temp; memcpy (this -> _ buffer [this -> _ r] & len, 4); memcpy (this -> _ buffer [this -> _RHIS 4, DATA, LEN); this -> _ rear = (this -> _ rear 1)% this -> _ size; pthread_mutex_unlock (& QueMutex); return OPERATION_OK;} / * This function is use to set free one element from the queue You must get a buffer big enough to. Store The Data Before You Call The Function. At The Same Time You NEED A More Int & Len To Get The Data Length. * / E RR_NUMBER BytesQueue :: outQueue (uint_8 * data, int & len) {if (pthread_mutex_trylock (& QueMutex)) {printf ( "Try lock failed / n!"); Return FAILED_LOCK;} if (data!) {Pthread_mutex_unlock (& QueMutex); return Point_null;} if (this -> _ head == this -> _ {printf ("The queue is empty! / N"); pthread_mutex_unlock (& queTex); return queue_empty;} memcpy (void *) & len, this-> _Buffer [this -> _ head], 4); memcpy ((void *) Data, this -> _ buffer [this -> _ head] 4, len);
DELETE [] (this -> _ buffer [this -> _ head]);
this -> _ head = (this -> _ head 1)% this -> _ size; pthread_mutex_unlock (& QueMutex); return OPERATION_OK;} / * This function is used to set free the data structure * / void BytesQueue :: destroy (). {While (this -> _ head! = This -> _REAR) {delete [] (this -> _ buffer [this -> _ head]); this -> _ head = (this -> _ head 1)% this -> _ size; } Delete [] (this -> _ buffer; this -> _ size = 0; this -> _ buffer = null; this -> _ head = 0; this -> _RAR = 0;} / * this fuction is use to test. Show The result of the call fuction. * / void bytesqueue :: errMessage (err_number err) {switch (err) {copy operation_ok: printf ("push is ok! / n"); Break; Case Queue_Full: Printf ("Push Failed! The Queue Is Full !! / N "); Break; Case Queue_empty: Printf (" Pop Failed! The Queue IS Empty !! / N "); Break; Case Increase_failed: Printf (" Increase Queue Size Failed! / N ") Break; Default: Printf ("Other Things Are Wrong! / N"); Break;}}
/ * This fuction is buy {printf ("% s / n", "the info of the bytesqueue"; printf ("" print Size:% D / N ", BQ._SIZE); Printf (" HEAD:% D / N ", BQ._HEAD); Printf (" REAR:% D / N ", BQ._REAR); Printf (" BUF ADDR : 0x% x / n ", bq._buffer);
/ * Using Namespace NetworkProtocols;
// this is a good example to show how to use the the data structure bytesqueue.
INT main () {Int Len, I; char ch; err_number err; uint_8 bufi [] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0}; uint_8 bufo [10]; bytesqueue BS; BS.Showbytesqueue (BS); ch = getchar (); while (ch! = 'q') {switch (ch) {copy 'i': err = bs.inqueue (bufi, 10); bs.ERRMESSAGE Err); BS.Showbytesqueue (BS); CH = getcha (); Break; Case 'o': err = bs.outqueue (BUFO, LEN); BS. HerrMessage (ERR); BS.SHOWBYTESQUEUE (BS); ch = GetChar (); Break; Case 'E': err = bs.increasesize (); bs.errMessage (ERR); BS.Showbytesqueue (BS); ch = getchar (); break; case 'h': Printf (". ................... Help ................ / n "); Printf (" i: Go Into an Array INTO Queue./N "); Printf (" o: Go out of an array out of the queue./N "); Printf (" e: enlarge the size of the queue./N "); Printf (" H: Help / n "); Printf (" Q: Quit the system./n "); ch = getchar (); break; default: if (ch! = '/ n') Printf (" ....... .... Your Input Is Wrong! AG AIN! ............ / n "); ch = getchar (); break;}} bs.destroy (); bs.showbytesqueue (bs); return 0;} * / ---------------------------------------- // main.cpp # incdude
PThread_t threads [5]; pthread_mutex_t quemX; pthread_attr_ttt;
Void * inQueue (void * pvar) {INT i = 1; MyParameter * Para = (MyParameter *) PVAR; while (i) {printf ("Thread Inque:% D Is Working! / N", Para-> ID); Para-> BQ-> Inqueue (Para-> BUF, Para-> LEN); Para-> BQ-> Showbytesqueue (* (Para-> BQ)); // Para-> BS-> Push (Para-> BUF , PARA-> LEN); // Para-> BS-> Showbytesstack (* (Para-> BS)); Usleep (Para-> DELAY); i ;} pthread_exit (null);} void * Outqueue (void * pvar) {INT i = 1; MyParameter * Para = (MyParameter *) PVAR; while (i) {printf ("----------- Thread Outque:% D Is Working! / N ", para-> id); para-> bq-> Outqueue (Para-> BUF, Para-> LEN); Para-> BQ-> showbytesqueue (* (PARA-> BQ)); // Para-> BS -> POP (Para-> BUF, Para-> LEN); // Para-> BS-> Showbytesstack (* (Para-> BS)); Usleep (Para-> Delay); i ;} pthread_exit (null }
INT main () {// ipstack :: ipstack (int size) // size = 10 uint_8 mybuf1 [] = {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x00, 0x50, 0x58, 0x0d, 0x0d, 0x0d, 0x08, 0x00, 0x45, 0x00, 0x00, 0x34, 0x00, 0x40, 0x11, 0x00, 0x40, 0x11, 0xa8, 0x21, 0x0f, 0xc0, 0xa8, 0x21, 0x02, 0x04, 0x01, 0x00, 0x05, 0x00, 0x20, 0x60, 0x4c, 0x73, 0x66, 0x61, 0x73, 0x64, 0x66, 0x73, 0x61, 0x64, 0x66, 0x61, 0x73, 0x64, 0x66, 0x73, 0x64, 0x61, 0x66, 0x61, 0x73, 0x66, 0x73, 0x64, 0x66}; uint_8 mybuf2 [100]; int Len; bytesqueue BQ (100); MyParameter Paras [4] = {{0, & BQ, MyBuf1, 66, 1000000}, {1, & BQ, Mybuf1, 66, 2000000}, {2, & BQ, Mybuf1, 66, 300000}, {3, & BQ, MyBuf2, Len, 1000000}}; //bq.showbytesqueue (BQ); pthread_attr_init (& Attr); pthread_create (& Threads [& threads [ , & attr, inqueue, (void *) & paras [0]); pthread_create (& Threads [1], & Attr, Inqueue, (Void *) & Paras [1]); pthread_create (& Threads [2], & Attr, Inqueue, (void * ) & paras [2]); PTHREA D_create (& Threads [3], & Attr, Outqueue, (Void *) & Paras [3]); // pthread_create (& Threads [4], & Attr, Outqueue, (Void *) & Paras [3]); for (int i = 0 ; i <4; i ) {pthread_join (threads [i], null);} pthread_attr_destroy (& attr);