In applications, we often need THREAD buffer pools to do something to improve program efficiency and concurrency. This article demonstrates how to use Queue this data structure to implement a simple Thread buffer pool.
A Thread buffer can be designed below: The buffer pool consists of several work Thread and a Queue, and the client is responsible for putting the task into the Queue (PUT method), and the working thread takes out these tasks and executes them (GET method) .
A classic implementation of Queue is to use a loop array (this implementation has an introduction in many data structures), as shown below:
The icon is an array of sizes that can be imagined to be a ring connected to the first Oldest points to the position where the oldest data in Queue, NEXT points to the next position that can be reproduced.
After adding a new data to Next, you need to update next: next = (next 1)% size;
After removing a data from the OLDEST position, you need to update Oldest: Oldest = (Oldest 1)% size;
When Oldest == next, Queue is empty.
When (NEXT 1)% size == Oldest, Queue is full.
(Note: In order to distinguish Queue is empty and full, you can actually place up to SIZE-1 data in Queue.)
Because this queue will be accessed by multiple threads at the same time, it is necessary to consider how Queue works in this case. First, Queue needs to be threaded, you can use the synchronized keyword in Java to make sure there is only one thread in accessing Queue.
We can also notice that when Queue is empty, the GET operation is unable; when Queue is full, the PUT operation cannot be performed. When multi-threaded access encounters this, the thread that typically wants to perform the operation can wait for (block) until the operation can be done. For example, when a Thread executes a get method on an empty Queue, this Thread should wait until the additional Thread executes the Queue's PUT method, continue. In Java, the WAIT () of the Object object provides such a function.
Combine the above content is a class: SyncQueue:
Public class syncqueue {
Public syncqueue (int size) {
_Array = New Object [size];
_size = size;
_OLDEST = 0;
_next = 0;
}
Public Synchronized Void Put (Object O) {
While (full ()) {
Try {
Wait ();
} catch (interruptedexception ex) {
Throw new ExceptionAdapter (ex);
}
}
_Array [_next] = O;
_next = (_next 1)% _size;
NOTIFY ();
}
Public synchronized object get () {
While (Empty ()) {
Try {
Wait ();
} catch (interruptedexception ex) {
Throw New ExceptionAdapter (ex);
}
Object ret = _Array [_OLDEST];
_OLDEST = (_OLDEST 1)% _size;
NOTIFY ();
Return Ret;
}
Protected Boolean Empty () {
Return _next == _old;
}
Protected Boolean Full () {
Return (_next 1)% _Size == _oldEST;
}
protected object [] _Array;
protected int _next;
Protected Int_old;
protected int _size;
}
You can pay attention to the use of while in the GET and PUT methods, if you change to IF, there is a problem. This is a very easy mistake.
;-) Using ExceptionAdapter in the above code, its role is to package a checked exception into runtimeException. Detailed instructions can be referred to in my avoidance using Checked Exception article in Java.
Next we need an object to express the tasks to be performed by the Thread buffer pool. You can find that Runnable Interface in JDK is very suitable for this role.
Finally, the implementation of the remaining working thread is very simple: a runnable object is taken from SyncQueue and execute it.
Public Class Worker Implements Runnable {
Public worker (syncqueue queue) {
_Queue = queue;
}
Public void run () {
While (true) {
Runnable task = (runnable) _Queue.get ();
Task.run ();
}
}
Protected syncqueue _queue = null;
}
Here is an example of using this Thread buffer pool:
// Construct the Thread buffer pool
Syncqueue queue = new syncqueue (10);
For (int i = 0; i <5; i ) {
New Thread (New Worker (Queue). start ();
}
// Use the Thread buffer pool
Runnable task = new mytask ();
Queue.Put (TASK);
In order to make the code in this article as simple as possible, this THREAD buffer pool is an essential framework. Some other features can also be added on this basis, such as exception processing, dynamically adjustment buffer size, and so on.
Author: DaiJiaLin mailto: woodydai@gmail.com
http://blog.9cbs.net/daijiang