President of Daniel Robbins and CEO, Gentoo Technologies, Inc. 2000 September
This article is the last part of the POSIX Threads, Daniel will discuss how to use conditions variables in detail. The condition variable is the POSIX thread structure that allows you to "wake up" thread when you encounter some conditions. They can be regarded as a signal transmission of a thread safe. Daniel uses the knowledge you have learned to implement a multi-threaded working group application, which will be discussed around this example.
Detailed Condition Variables In the end of the previous article, I describe a relatively special problem: how should it handle this situation if the thread is waiting for a particular condition? It can repeat the mutual exclusive object lock and unlock, each time you check the shared data structure to find a value. But this is wasting time and resources, and this busy query is very efficient. The best way to solve this problem is to wait for special conditions to occur with pthread_cond_wait () call. It is important to understand the role of pthread_cond_wait () - it is the core of the POSIX thread signal transmission system and the most difficult part. First, let us consider the following: Threads lock the mutually exclusive object to view the link list, but the list happens to be empty. This particular thread can't do anything - its design is to remove the node from the list, but there is no node now. Therefore, it can only: lock the mutual exclusive object, the thread will call pthread_cond_wait (& mycond, & mymutex). pthread_cond_wait () call is quite complex, so we only perform one of its operations each time. The first thing to pTHREAD_COND_WAIT () is to unlock the mutual exclusive object (so other threads can modify the link list), and wait for the condition Mycond (so when pthread_cond_wait () receives the "signal" of another thread, It will wake up). Now the mutual exclusive object has been unlocked, and other threads can access and modify the link list, which may also add items. At this point, PTHREAD_COND_WAIT () call has not returned. Unlocking the mutually exclusive object will occur immediately, but the wait for Mycond is usually a blocking operation, which means that the thread will sleep, and the CPU cycle will not be consumed before it wakes up. This is exactly what we expect. The thread will always sleep until a specific condition occurs, and there will be a busy query for any waste of CPU time during this period. From the perspective of the thread, it is just waiting for the pthread_cond_wait () call back. Now, it is now assumed that another thread (called "2") locked MYMUTEX and added a linked list. After unlocking the mutually exclusive object, the line 2 will immediately call the function pthread_cond_broadcast (& mycond). After this operation, the line 2 will make all threads waiting for mycond condition variables to wake up immediately. This means the first thread (still in pthread_cond_wait () call) will now be awake. Now, look at what the first thread happens. You may think that after the line 2 is called pthread_cond_broadcast (& mymutex), the pthread_cond_wait () of the line 1 will return immediately. Not that! In fact, pthread_cond_wait () will perform the last operation: Reallocate MyMutex. Once pthread_cond_wait () locked the mutual exclusive object, then it will return and allow the line No. 1 to continue to execute. At that time, it can check the list immediately to see the changes it interested. Stop and review! That process is very complicated, so let's take a look at it first. The first thread is first called: pthread_mutex_lock (& mymutex);
Then it checked the list. Didn't find something of interest, then it calls:
Pthread_cond_wait (& mycond, & mymutex);
Then, pthread_cond_wait () calls to perform a number of operations before returning: pthread_mutex_unlock (& mymutex);
It unlocks mymutex, then enter the sleep state, waiting for Mycond to receive the POSIX thread "signal". Once the "signal" is received (the quotation number is because we are not discussing the traditional UNIX signal, it will wake up from pthread_cond_signal () or pthread_cond_broadcast (). But pthread_cond_wait () does not return immediately - it also wants to do something: Re-lock Mutex:
Pthread_mutex_lock (& mymutex);
PTHREAD_COND_WAIT () knows that we are looking for changes in mymutex "back", so it continues to lock the mutually exclusive object for us, and then return. PTHREAD_COND_WAIT () Test Now I have reviewed pthread_cond_wait (), and you should understand its way of work. You should be able to describe all of PTHREAD_COND_WAIT (). try it. If you understand pthread_cond_wait (), the rest is quite easy, so please re-read the above part until remember. Ok, can you tell me what state is the mutex before calling pthread_cond_wait ()? When pthread_cond_wait () calls return, what state is the mutually exclusive object? The answers to these two questions are "locked". Since PTHREAD_COND_WAIT () is fully understood, now continue to study simpler - initialization and real send signals and broadcast processes. By then, we will have a C code that contains multi-threaded work queues. The initialization and clearing condition variables are a real data structure that requires initialization. The following method is initialized. First, define or assign a conditional variable, as shown below:
pthread_cond_t mycond;
Then, the following functions are called:
Pthread_cond_init (& mycond, null);
Hey, the initialization is complete! Before releasing or discarding condition variables, it needs to be destroyed, as shown below:
pthread_cond_destroy (& mycond);
Very simple. Then discuss PTHREAD_COND_WAIT () calls. Waiting once to initialize the mutually exclusive object and condition variable, you can wait for a certain condition, as shown below:
Pthread_cond_wait (& mycond, & mymutex);
Note that the code should contain Mycond and MyMutex logically. A specific condition can only have a mutually exclusive object, and the conditional variables should represent a special conditional change of mutual exclusive data "internal". A mutually exclusive variable (eg, COND_EMPTY, COND_FULL, COND_CLEANUP), but each condition variable can only have a mutually exclusive object. Send signals and broadcasts require a little attention to sending signals and broadcasts. If the thread changes some shared data, and it wants to wake all the waits that are waiting for, you should use pthread_cond_broadcast call, as shown below:
pthread_cond_broadcast; & mycond;
In some cases, the active thread only needs to wake up the first thread that is sleeping. Suppose you only add a work job to the queue. Then just wake up a work program thread (wake up other threads is impolite!): Pthread_cond_signal (& mycond);
This function only wakes up a thread. If the POSIX thread standard allows you to specify an integer, you can let you awaken a certain number of threads that are sleeping, which is more perfect. But unfortunately, I have not been invited to participate in the meeting. Workgroup I will demonstrate how to create multi-thread workgroups. In this scenario, we have created a number of work program threads. Every thread checks WQ ("Work Queue") to see if there is a need to complete. If there is a job that needs to be completed, the thread will remove a node from the queue, perform these specific work, then wait for a new job to arrive. At the same time, the main thread is responsible for creating these work program threads, adding the work to the queue, then collecting all work program threads when it exits. You will encounter a lot of C code, prepare it! The queue requires a queue for two reasons. First, you need a queue to save your job. It also needs to be used to track the data structure of the termination thread. I still remember the first few articles (see the references at the end of this article), have I mentioned PTHREAD_JOIN with a specific process identification? Use the "Clear Queue" (called "CQ") to solve problems that cannot wait for any termination thread (discuss this issue later). The following is a standard queue code. Save this code to file queue.h and queue.c: queue.h
/ * queue.h
** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc.
** Author: Daniel Robbins
** Date: 16 Jun 2000
* /
Typedef struct node {
Struct Node * Next;
} node;
Typedef struct queue {
Node * head, * tail;
} queue;
Void queue_init (queue * myroot);
Void queue_put (queue * myroot, node * mynode);
Node * queue_get (queue * myroot);
Queue.c
/ * queue.c
** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc.
** Author: Daniel Robbins
** Date: 16 Jun 2000
**
** This set of queue functions Was Originally Thread-aware. I
** redesigned the code to make this set of queue routines
** Thread-Ignorant (Just A Generic, Boring Yet Very Fast Set of Queue
** routines). Why the change? Because it makes more sense to have
** The Thread Support as an Optional ADD-ON. CONSIDER A SITUATION
** Where you want to add 5 nodes to the queue. with the
** thread-enabled version, Each Call to Queue_put () Would ()
** Automatic or Lock and UNLOCK The Queue Mutex 5 Times - That's a ** Lot of Unnecessary Overhead. However, by MOVING THREAD Stuff
** Out of the queue routines, The Caller Can Lock The Mutex ONCE AT
** The beginning, the insert 5 items, and the unlock at the end.
** MOVING The LOCK / Unlock Code Out of The Queue Functions Allows for
** Optimizations That Aren't Possible Otherwise. It also makes this
** Code Useful for Non-Threaded Applications.
**
** We can easily thread-enable this data structure by using the
** Data_Control Type Defined in Control.c and control.h. * /
#include
#include "queue.h"
Void queue_init (queue * myroot) {
MYROOT-> Head = NULL;
MYROOT-> TAIL = NULL;
}
Void queue_put (queue * myroot, node * mynode) {
MYNODE-> next = NULL;
IF (MyRoot-> Tail! = NULL)
MYROOT-> TAIL-> Next = MyNode;
MYROOT-> TAIL = MYNODE;
IF (MyRoot ->: head == null)
MYROOT-> Head = mynode;
}
Node * queue_get (queue * myroot) {
// Get from root
Node * mynode;
MYNODE = myroot-> head;
IF (myroot-> head! = null)
MYROOT-> Head = myroot-> head-> next
Return mynode;
}
Data_Control code I have written is not a queue routine for thread security. In fact I created a "Data Pack" or "Control" structure, which can be a data structure supported by any thread. Look at Control.h: Control.h
#include
Typedef struct data_control {
PTHREAD_MUTEX_T MUTEX;
Pthread_cond_t cond;
int Active;
} DATA_CONTROL;
Now you have seen the Data_Control structure definition, the following is its visual representation: the lock represents the mutual exclusive object in the Data_Control structure image, which allows mutual exclusion of the data structure. Yellow star represents the condition variable, which can sleep until the data structure discussed is changed. The ON / OFF switch represents an integer "ACTIVE", which tells the thread whether this data is active. In the code, I use an integer Active as a logo, tell the work queue to turn it off. The following is Control.c: Control.c
/ * Control.c
** Copyright 2000 Daniel Robbins, Gentoo Technologies, Inc. ** Author: Daniel Robbins
** Date: 16 Jun 2000
**
** THESE ROUTINES Provide an Easy Way To make any type of
** Data-Structure Thread-aware. Simply Associate A DATA_CONTROL
** Structure with The Data Structure (By Creating A New Struct, for
** EXAMPLE). THEN, SIMPLY LOCK AND UNLOCK The MUTEX, OR
** WAIT / SIGNAL / BROADCAST ON THE CONDition Variable in The Data_Control
** Structure as needed.
**
** Data_Control Structs Contain An Int Called "Active". THIS IS IS
** Intended to Be Used for a Specific Kind of Multithreaded Design,
** WHERE Each Thread Checks The State of "Active" Every Time It LOCKS
** The Mutex. if Active is 0, The Thread Knows That Instead of Doing
** ITS Normal Routine, IT Should Stop Itself. if Active IS 1, IT
** Should Continue As Normal. So, by Setting Active To 0, A
** Controlling Thread Can Easily Inform A Thread Work Crew To Shut
** Down Instead of Processing New Jobs. Use the control_activate ()
** And Control_Deactivate () Functions, Which Will Also Broadcast on
** The Data_Control Struct's Condition Variable, So That All Threads
** Stuck in pthread_cond_wait () Will Wake Up, Have An Opportunity To
** NOTICE The change, and then terminate.
* /
#include "control.h"
INT Control_init (Data_Control * MyControl) {
Int MyStatus;
IF (PTHREAD_MUTEX_INIT (& (MyControl-> Mutex), NULL)
Return 1;
IF (PTHREAD_COND_INIT (& (MyControl-> COND), NULL)
Return 1;
MyControl-> Active = 0;
Return 0;
}
INT Control_Destroy (data_control * mycontrol) {
Int MyStatus;
IF (PTHREAD_COND_DESTROY (& (MyControl-> Cond)))
Return 1;
IF (PTHREAD_COND_DESTROY (& (MyControl-> Cond)))
Return 1;
MyControl-> Active = 0;
Return 0;
}
Int Control_Activate (Data_Control * MyControl) {
Int MyStatus;
IF (PTHREAD_MUTEX_LOCK (& (MyControl-> Mutex)))))
Return 0;
MyControl-> Active = 1;
Pthread_mutex_unlock (& (MyControl-> Mutex));
Pthread_cond_broadcast (& (MyControl-> COND);
Return 1;
}
INT control_deactivate (data_control * mycontrol) {
Int MyStatus;
IF (PTHREAD_MUTEX_LOCK (& (MyControl-> Mutex)))))
Return 0;
MyControl-> Active = 0;
Pthread_mutex_unlock (& (MyControl-> Mutex));
Pthread_cond_broadcast (& (MyControl-> COND);
Return 1;
}
The debugging time requires a file before starting debugging. The following is dbug.h: dbug.h
#define dabort () /
{Printf ("Aborting At line% D in Source File% S / N", __ line __, __ file__); Abort ();
This code is used to handle unconfarable errors in the working group code. The working group code says the working group code, the following is: Workcrew.c
#include
#include
#include "control.h"
#include "queue.h"
#include "dbug.h"
/ * The work_queue Holds Tasks for the Various Threads to Complete. * /
Struct work_queue {
Data_Control Control;
Queue Work;
} wq;
/ * I Added a job number to the work node. Normally, The Work Node
Would Contain Additional Data That Needed To Be Processed. * /
Typedef struct work_node {
Struct Node * Next;
Int Jobnum;
} wnode;
/ * The cleanup queue Holds Stopped Threads. Before A Thread
Terminates, IT Adds Itself to this list. Since the main trread is
Waiting for Changes in this List, IT Will Then Wake Up and Clean Up
The newly terminated thread. * /
Struct cleanup_queue {
Data_Control Control;
Queue Cleanup;
} CQ;
/ * I added a threeread number (for debugging / instructional purposes) and
a thread id to the cleanup node. The cleanup node gets passed to
The New Thread on Startup, And Just Before The Thread Stops, Itattaches The Cleanup Node To The Cleanup Queue. The Main Thread
Monitors the cleanup queue and is the one what performs the
Necessary cleanup. * /
Typedef struct cleanup_node {
Struct Node * Next;
Int threadnum;
Pthread_t Tid;
CNODE;
Void * threadfunc (void * myarg) {
Wnode * mywork;
CNODE * mynode;
MYNODE = (cnode *) myarg;
Pthread_mutex_lock (& wq.control.mutex);
While (wq.control.active) {
While (wq.Work.head == null && wq.control.active) {
Pthread_cond_wait (& wq.control.cond, & wq.control.mutex);
}
IF (! wq.control.active)
Break;
// We got something!
MYWORK = (WNODE *) Queue_Get (& Wq.Work);
Pthread_mutex_unlock (& wq.control.mutex);
// perform processing ...
Printf ("Thread Number% D Processing Job% D / N", MyNode-> Threadnum, MyWork-> Jobnum);
Free (MyWork);
Pthread_mutex_lock (& wq.control.mutex);
}
Pthread_mutex_unlock (& wq.control.mutex);
Pthread_mutex_lock (& CQ.Control.Mutex);
Queue_put (& CQ.cleanup, (node *) mynode;
Pthread_mutex_unlock (& CQ.Control.Mutex);
Pthread_cond_signal (& CQ.Control.cond);
Printf ("Thread% D shutting down ... / n", mynode-> threadnum);
Return NULL;
}
#define Num_Workers 4
INT NUMTHREADS;
Void Join_threads (void) {
Cnode * Curnode;
Printf ("Joining Threads ... / N");
While (NUMTHREADS) {
Pthread_mutex_lock (& CQ.Control.Mutex);
/ * Below, WE Sleep Until there is a new cleanup node. this
Takes Care of Any False Wakeups ... Even IF WE BREAK OUT OF
Pthread_cond_wait (), we don't make any assumptions That
Condition wee WAITI for is true. * /
While (cq.cleanup.head == null) {
PTHREAD_COND_WAIT (& CQ.Control.cond, & CQ.Control.Mutex);
/ * at this point, we hold the mutex and there is an item in the
List this we need to process. First, We Remove the Node from
The queue. Then, we call pthread_join () on the Tid Stored in
The node. when pthread_join () Returns, We Have Cleaned Up
After A Thread. Only the do we free () THE NODE, DECREMENT THE
Number of additional threads we need to wait for and repeat the
Entire Process, IF Necessary * /
Curnode = (cnode *) queue_get (& CQ.cleanup);
Pthread_mutex_unlock (& CQ.Control.Mutex);
Pthread_join (Curnode-> Tid, NULL);
Printf ("Joined with Thread% D / N", CURNODE-> Threadnum;
Free (CURNODE);
Numthreads -;
}
}
INT CREATE_THREADS (VOID) {
INT X;
Cnode * Curnode;
For (x = 0; x Curnode = malloc (sizeof (cnode); IF (! curnode) Return 1; Curnode-> threadnum = x; IF (pthread_create (& curnode-> tid, null, threadfunc, (void *) curnode)) Return 1; Printf ("Created Thread% D / N", X); NUMTHREADS ; } Return 0; } Void Initialize_structs (void) { NUMTHREADS = 0; IF (Control_init (& Wq.Control)) Dabort (); Queue_init (& wq.work); IF (CONTROL_INIT (& CQ.Control)) { Control_Destroy; & wq.control); Dabort (); } Queue_init (& wq.work); Control_Activate (& Wq.Control); } Void cleanup_structs (void) { Control_Destroy (& CQ.Control); Control_Destroy; & wq.control); } INT main (void) { INT X; Wnode * mywork; Initialize_structs (); / * CREATION * / IF (Create_Threads ()) { Printf ("Error Starting Threads ... Cleaning Up./N"); JOIN_THREADS (); Dabort (); } Pthread_mutex_lock (& wq.control.mutex); FOR (x = 0; x <16000; x ) { Mywork = malloc (sizeof (wnode)); if (! mywork) { Printf ("Ouch! CAN't MalloC! / N"); Break; } mywork-> jobnum = x; Queue_put (& wq.Work, (Node *) Mywork); } Pthread_mutex_unlock (& wq.control.mutex); Pthread_cond_broadcast (& wq.control.cond); Printf ("Sleeping ... / N"); Sleep (2); Printf ("deactivating work queue ... / n"); Control_deactivate (& wq.control); / * Cleanup * / JOIN_THREADS (); Cleanup_structs (); } The code initial row is now quickly initial code. The first structure defined is called "WQ", which contains Data_Control and the team header. The Data_Control structure is used to arbitrate access to the entire queue, including nodes in the queue. The next step is to define the actual working node. To make the code conform to the examples in this article, all included are the job number. Next, create a clear queue. Note The description has a mode of work. Ok, let us skip Threadfunc (), join_threads (), create_threads (), and initialize_structs () call, jump to main (). The first thing to do is to initialize the structure - this includes initializing Data_Controls and queues, as well as activating the work queue. Precautions for clearing are now initializing the thread. If you look at create_threads () call, it seems that everything is normal - except for one thing. Note that we are allocating a clear node, and initializing its thread number and TID components. We also cleared the node as the initial argument to each new work program thread. Why did you do this? Because when a work program thread exits, it will clean the node to the clear queue and terminate. At that time, the main thread detected this node (using condition variable) in the clear queue and removes this node out of the queue. Because the TID (thread identity) is stored in the clear node, the main thread can be exactly what thread has terminated. Then, the main thread will call pthread_join (TID) and join the appropriate work program thread. If there is no record, the main thread needs to connect the work program thread in any order, which may be in the order of creation. Since the thread is not necessarily based on this order, the main thread may wait for the coupling of another thread when it has been connected to ten threads. How can you understand how this design decision has accelerated closing code (especially in the case of hundreds of work program threads)? Creating a job We have launched a work program thread (they have completed DHREADFUNC (), will discuss this function later), and now the main thread starts to insert the work node into the work queue. First, it locks the WQ control mutual exclusive object, and then assigns 16,000 work packs to insert them into queues one by one. After completion, pthread_cond_broadcast () will be called, so all the threads that are sleeping will be awakened and started to work. At this point, the main thread will sleep for two seconds and then release the work queue and notify the work program thread termination. Next, the main thread calls the Join_Threads () function to clear all work program threads. Threadfunc () now discusses threadfunc (), which is the code to be executed all work program threads. When the work program thread starts, it will immediately lock the work queue mutual exclusive object, get a working node (if any), then process it. If you don't work, call pthread_cond_wait (). You will notice this call in a very compact While () loop, it is very important. When you wake up from pthread_cond_wait (), you must never think that the condition will certainly happen - it may happen, or there may not occur. If this happens, it is incorrectly awakened, and the list is empty, then the While loop will call pthread_cond_wait again. If there is a work node, then we only print its job number, release it and exit. However, actual code will perform some more substantive operations.