Using asynchronous I / O Complete events, the thread in the thread pool will only process the data only when data is received, and once the data is processed, the thread returns to the thread pool.
To perform asynchronous I / O calls, the operating system I / O handle must be associated with the thread pool, and a callback method must be specified. When the I / O operation is complete, the thread in the thread pool will call the callback method.
The following C # code example illustrates a simple asynchronous I / O operation.
Note that this example takes more than 100MB of available memory.
[C #]
Using system;
Using system.io;
Using system.threading;
Using system.Runtime.InteropServices;
Public class bulkimageProcasync {
Public const string imagebasename = "tmpimage-";
Public const INT NumImages = 200;
Public const Int numpixels = 512 * 512;
// processimage has a simple o (n) loop, and you can vary the number
// of Times You Repeat That Loop To Make The Application More CPU-Bound
// or more I / o-bound.
Public static int processimageRepeats = 20;
// Threads Must Decrement NumImagestofinish, and protect
// Their access to it through a mutex.
Public static int numimagestofinish = NumImages;
Public static object numimagesMutex = new object [0];
// WaitObject is Signalled When All Image Processing IS DONE.
Public static Object WaitObject = New Object [0];
Public class imageStateObject {
PUBLIC BYTE [] PIXELS;
Public int imageum;
PUBLIC FILESTREAM FS;
}
Public static void makeimagefiles () {
INT SIDES = (int) Math.sqrt (Numpixels);
Console.write ("Making" NumImages " SIDES " X " SIDES " Images ... ");
Byte [] pixels = new byte [numpixels];
For (int i = 0; i Pixels [i] = (byte) i; For (int i = 0; i FILESTREAM FS = New FileStream ImageBaseName i ". TMP", Filemode.create, FileAccess.write, FILESHARE.NOONE, 8192, False ); Fs.write (Pixels, 0, Pixels.length); FlushfileBuffers (fs.handle); fs.close (); } Console.writeline ("DONE."); } Public Static Void ReadinImageCallback (IASYNCRESULT asyncRESULT) { ImagestateObject State = (imagestateObject) asyncResult.asyncState; Stream stream = state.fs; INT BYTESREAD = stream.endread (asyncREAD); IF (BytesRead! = Numpixels) Throw New Exception ("in ReadinImageCallBack, Got Wrong Number of Bytes from the Image! Got:" BytesRead); ProcessImage (state.pixels, state.imagenum); stream.close (); // Now write out the image. No async here. FILESTREAM FS = New FileStream ImageBaseName State.Imagenum ".done", Filemode.create, FileAccess.write, FILESHARE.NOONE, 4096, False ); fs.write (state.pixels, 0, numpixels); fs.close (); // this application model Uses Too Much Memory. // Releaseing Memory As Soon As Possible Is A Good Idea, Especially Global // state. State.pixels = null; // Record That An Image Is Done Now. LOCK (NumImagesmutex) { NumImagestofinish -; IF (NumImagestofinish == 0) { LOCK (WAITOBJECT) { Monitor.pulse; WaitObject); } } } } Public static void processimage (byte [] Pixels, Int iMAGENUM) { Console.WriteLine ("ProcessImage"; // Do some cpu-intensive operation on the image. For (int i = 0; i For (int J = 0; J Pixels [J] = 1; Console.writeline ("ProcessImage" ImageNum "DONE."); } Public static void processimagesinbulk () { Console.writeline ("Processing Images ..."); Long t0 = environment.tickcount; NumImagestofinish = NumImages; AsyncCallback readimageCallback = new asyncCallback; for (int i = 0; i ImagestateObject State = new imagestateObject (); State.pixels = new byte [numpixels]; State.imagenum = i; // Very Large Items Are Read Only ONCE, SO THE // Buffer on the file stream can be beoriful. FILESTREAM FS = New FileStream ImageBaseName i ". TMP", Filemode.Open, FileAccess.read, FILESHARE.READ, 1, True ); State.fs = fs; Fs.beginread (state.pixels, 0, numpixels, readimagecallback, state); } // determine WHETHER ALL IMAGES Are Done Being Processed. // if NOT, Block Until All Are Finished. Bool Mustblock = FALSE; LOCK (NumImagesmutex) { IF (NumImagestofinish> 0) Mustblock = true; } IF (Mustblock) { Console.writeline All worker threads are queued ... blocking unsy completion. Numleft: " NumImagestofinish ); LOCK (WAITOBJECT) { Monitor.pulse; WaitObject); } } Long t1 = environment.tickcount; Console.writeline ("Total Time Processing Images: {0} MS", (T1-T0)); } Public static void cleanup () { For (int i = 0; i File.delete (ImageBaseName i ". TMP"); File.delete (ImageBaseName i ". DONE"); } } Public static void trytocleardiskcache () { // Try to force all pending writes to disk, and clear the // Disk cache of any data. Byte [] bytes = new byte [100 * (1 << 20)]; For (int i = 0; i BYTES [I] = 0; BYTES = NULL; Gc.collect (); Thread.sleep (2000); } Public static void main (String [] args) { Console.writeline ("Bulk Image Processing Sample Application, Using Asynchronous I / O"); Console.Writeline ("Simulates Applying A Simple Transformation To" NumImages "/" Images / "); Console.Writeline ("(IE, Async FileStream & Threadpool Benchmark)); Console.Writeline ("Warning - this Test Requires" (Numpixels * NumImages * 2) "BYTES OF TMP SPACE"); IF (args.length == 1) { ProcessimageRepeats = int32.parse (Args [0]); Console.WriteLine ("ProcessImage Inner Loop -" ProcessimageRepeats); } MakeImagefiles (); Trytocleardiskcache (); ProcessImagesNbulk (); Cleanup (); } [DLLIMPORT ("kernel32", setLastError = true)] Private static extern void flushfilebuffrs (intptr handle); }