Non-Blocking Socket I / O in JDK 1.4
By Tim Burns, Senior Software Engineer, Owl Mountain Software, LLC
Last Changed: 12/14/2001 7:11 am
JDK 1.4 provides developers non-blocking I / O on both sockets and files. For Java network programmers, non-blocking I / O is very exciting, because it makes writing scalable, portable socket applications simpler.
Previously, Java programmers would have to deal with multiple socket connections by starting a thread for each connection. Inevitably, they would encounter issues such as operating system limits, deadlocks, or thread safety violations. Now, the developer can use selectors to manage multiple simultaneous SOCKET Connections On A Single Thread. This Article DEALS with How to Write A Simple Non-Blocking Server Using The New I / O IN JDK 1.4 for non-blocking sockets.
Selectors and Channels
Non-blocking I / O is built around the Selector object for multiplexing selectable channels. The Selector object maintains a set of selected keys that may be active at a given time in a server program. Keys can be added or removed from the selection channel. Each Key Corresponds TO A Client Connection. The Selector Itself Manages The Keys and The Program Uses The key-state to manage Callbacks To Perform Individual Client Requests.
A Selector object can be created using itself as a factory or using a SelectorProvider factory. The external SelectorProvider factory is available because programmers may want to take advantage of existing proprietary non-blocking socket libraries for higher or tuned performance.
The Simplest Method of Creating A Socket Is Using The Following Command:
Selector selector = selector.open ();
Similiarly, One Can Use The SelectorProvider Factory:
Selector selector = SelectorProvider.provider () openSelector ();.. To demultiplex incoming data, create a channel and register that channel with the Selector Initially, one should configure a ServerSocketChannel to accept new connections by binding it to a port on the local host .
ServersocketChannel Channel = ServersocketChannel.Open ();
Channel.configureblocking (false);
Inetaddress lh = inetaddress.getlocalhost ();
Inetsocketaddress isa = new inetsocketaddress (lh, port);
Channel.socket (). Bind (ISA);
Channels SHOULD BEY WILL PERFORM in The Program. For instance, the channel That Accepts New Connections Should Be Registered As Follows:
SelectionKey Acceptkey = Channel.Register (selector, selectionKey.op_accept);
A Channel That Reads and Writes Data Will Be Registered As Follows:
SelectionKey ReadWriteKey = Channel.register (selector, selectionKey. Op_read | selectionKey. Op_write);
The class..........
While (Keysadded = selector.select ())> 0) {
Set readyKeys = select .selectedKeys ();
Iterator I = ReadyKeys.Itemrator ();
// Walk THROUGH The Ready Keys Collection and Process // Requests.
While (I.hasNext ()) {
SelectionKey SK = (SelectionKey) i.next ();
(Accept The Connection and Process The Request)
}
}
Serving sockets with keys
The key in the previous example will either be acceptable, readable, writable, or connectable. The boolean check on the flag is key.isAcceptable (), key.isWritable (), key.isReadable () and key.isConnectable (). On a new connection from the server, the key returned by the selector is an acceptable key. The selector will return keys on I / O events that correspond to the event and will either be writeable or readable.On a new connection a key is set isAcceptable (). During the connection . In the meantime, We attach the socket to the key so we can pick it up on the callback.
IF (Key.isacceptable ()) {
ServersocketChannel NextReady =
(Serversocketchannel) key.channel ();
SocketChannel Channel = NextReady.accept ();
Channel.configureblocking (false);
SelectionKey Readkey =
Channel.Register (this.selector,
SelectionKey.op_read | SelectionKey.op_write;
Readkey.attach (Channel);
This.Callback.Put (Channel, New ChannelCallback (Channel);
}
...................... ..
While ((this._keysadded = acceptkey.selector (). select ())> 0) {
While (I.hasNext ()) {
SelectionKey Key = (SelectionKey) i.next ();
I.Remove ();
IF (Key.isacceptable ()) {
ACCEPT The Socket Channel
}
Else IF (key.iswritable ()) {
? (Write to the socket channel channel channel channel
}
}
}
When the client sends a request to the server, the Selector returns a key with isReadble () true. There is new data pending so we should process that data. Note that during the accept process, we attached the Socket to the channel, so we Can RETRIEVE IT AGAIN for Reading and Writing. Sockets Become Writable Once A Read-Request Has Been Completed, So We Don Need To Check In this case if () {
SelectableChannel NextReady =
SelectableChannel "key.channel ();
(? Read and Write stuff to and from the socket)
}
Writing to a non-blocking socket
Writing to a non-blocking socket is a little tricky. Here is what the na 飗 e Programmer Might Do:
Else IF (key.iswritable ()) {
Socket Socket = (Socket) Key.attachment ();
PrintWriter out =
New printwriter (socket.getoutputstream (), true);
Out.println ("What is your name?");
}
The problem with this code is that the PrintWriter blocks I / O and does not support the underlying asynchronous I / O mechanisms. To deal with this problem, we can not use any of the standard I / O utilities, but instead must wrap our message in A bytebuffer object and send it through the socketchannel object.
Else IF (key.iswritable ()) {
Socketchannel Channel = (SocketChannel) Key.attachment ();
String message = "what is your name?";
Bytebuffer buf = bytebuffer.wrap (message.getbytes ());
INT nbytes = channel.write (buf);
}
Reading from a non-blocking socket
Non-blocking socket reads also relely.
Bytebuffer Bytebuffer = bytebuffer.allocate (bufsize);
int nbytes = channel.read (byteBuffer); When the channel has been read, the byteBuffer is not yet ready for decoding You need to fix the limit at the current position, and then set the current position to zero before decoding the message..
Bytebuffer.flip ();
Extracting the data from the ByteBuffer requires the package java.nio.charset * This package has three abstract classes:... Charset, CharsetDecoder, and CharsetEncoder Generally, one will decode ByteBuffer messages using CharsetDecoder.
Charset Charset = Charset.Forname ("US-ASCII");
CharSetDecoder decoder = charset.newdecoder ();
Charbuffer charbuffer = decoder.decode (bytebuffer);
String result = charbuffer.tostring ();
The Callback Pattern
Context
.
Problem
We can not execute a command until we get a complete message. Messages may be arriving from multiple clients in any order. We need to complete the messages and execute a command when the messages are complete. The design should be simple.
Solution
Attach an ChannelCallback object to every unique socket containing that can service an append (String) message and a execute () message. Append the results of the read to the object every time a new ByteBuffer is received. If a read is complete, execute the Callback.
Conclusions
Non-blocking I / O is a very exciting development in Java. It will make writing portable, scalable server-side applications in Java simpler. To use it, you need to use the package java.nio. * And the subpackages. The java .nio. * package contains the buffers needed to marshall the data in and out of the channel, the java.nio.channels. * package contains the objects needed to perform the I / O operations, and the java.nio.charset is needed To Translate Byte Data To and from Specified Character Sets Such As US-ASCII, UTF-8, ETC.THANKS
Thanks to Members of The Saint Louis Developers RoundTable, Especial Brian Button, And Edwin Van Der Elst.
References
[SUN01] Sun Microsystems, 揘 EW I / O APS? Http://java.sun.com/j2se/1.4/docs/guide/nio.
............................
[OUS96] OUSTERHOUT, JOHN.? A href = "http://home.pacbell.net/ouster/threads.ppt"> Why Threads Are A Bad Idea (for MOST PURPOSES)? Usenix Technical Conference, 1996
[REN98] Renesse, Robbert Van,? A href = "http://www.cs.cornell.edu/info/people/rvr/papers/event/event.ps"> Goal-Oriented Programming, or Composition Using Events, Or Threads Considered Harmful? ACM Sigops 98.
[VER96] ALLAN VERMEULEN,? A href = "http://www.ddj.com/articles/960606/9606D/9606d.htm"> An asynchronous design pattern? Dr. Dobbs Journal, June, 1996.
Source Code
The Source Code Here Is Derived from Sun 抯 EXAMPLE Server [SUN01].
Package com.owlmountain.concurrent;
Import java.io. *;
Import java.nio. *;
Import java.nio.channels. *;
Import java.nio.channels.spi. *;
Import java.nio.charset. *;
Import java.net. *;
Import java.util. *;
Import org.apache.log4j. *;
Public class nonblockingserver {
INT port = 4001; selector selector = null;
ServersocketChannel SelectableChannel = NULL;
INT keysadded = 0;
Static category log =
Category.getInstance (NonblockingServer.class.getname ());
Static string quit_server = "quit";
STATIC STRING Shutdown = "shutdown";
Public nonblockingserver () {
}
Public NonblockingServer (int port) {
THIS.PORT = Port;
}
Public void initialize ()
THROWS IOEXCEPTION {
This.Selector = selectorProvider.Provider (). OpenSelector ();
This.selectablechannel = serversocketchannel.open ();
this.selectablechannel.configureblocking (false);
Inetaddress lh = inetaddress.getlocalhost ();
Inetsocketaddress isa = new inetsocketaddress (lh, this.port);
This.SelectableChannel.Socket (). Bind (ISA);
}
Public void finalize ()
THROWS IOEXCEPTION {
This.SelectableChannel.Close ();
This.Selector.Close ();
}
Public void acceptconnections ()
THROWS IEXCEPTION, INTERRUPTEDEXCEPTION {
SelectionKey AcceptKey =
This.SelectableChannel.Register (this.selector,
SelectionKey.op_Accept);
Log.debug ("acceptor loop ...");
While ((this.keysadded = acceptkey.selector (). select ())> 0) {
Log.debug ("Selector Returned"
this.keysadded "Ready for IO Operations");
Set readyKeys = this.Selector.selectedKeys ();
Iterator I = ReadyKeys.Itemrator ();
While (I.hasNext ()) {
SelectionKey Key = (SelectionKey) i.next ();
I.Remove ();
IF (Key.isacceptable ()) {
ServersocketChannel NextReady =
(Serversocketchannel) key.channel ();
Log.debug ("Processing Selection Key Read ="
Key.isreadable () "Write =" key.iswritable ()
"accept =" key.isacceptable ()); socketchannel channel ();
Channel.configureblocking (false);
SelectionKey Readkey =
Channel.Register (this.selector,
SelectionKey.op_read | SelectionKey.op_write;
Readkey.attach (New ChannelCallback (Channel);
}
Else if (Key.IsReadable ()) {
SelectableChannel NextReady =
SelectableChannel "key.channel ();
Log.debug ("Processing Selection Key Read ="
Key.isreadable () "Write =" key.iswritable ()
"accept =" key.isacceptable ());
This.ReadMessage (CHANNELLBACK) key.attachment ());
}
Else IF (key.iswritable ()) {
ChannelCallback Callback = (CHANNELLBACK) Key.attachment ();
String message = "what is your name?";
Bytebuffer buf = bytebuffer.wrap (message.getbytes ());
INT nbytes = Callback.getChannel (). Write (buf);
}
}
}
Log.debug ("End Acceptor Loop ...");
}
Public Void WriteMessage (Socketchannel Channel, String Message)
THROWS IOEXCEPTION {
Bytebuffer buf = bytebuffer.wrap (message.getbytes ());
INT nbytes = channel.write (buf);
Log.debug ("Wrote" Nbytes "to Channel.");
}
Static final int buffs = 8;
Public string decode (Bytebuffer Bytebuffer)
Throws charactercodingexception {
Charset Charset = Charset.Forname ("US-ASCII");
CharSetDecoder decoder = charset.newdecoder ();
Charbuffer charbuffer = decoder.decode (bytebuffer);
String result = charbuffer.tostring ();
Return Result;
}
Public void ReadMessage (ChannelCallback Callback)
THROWS IEXCEPTION, INTERRUPTEDEXCEPTION {
Bytebuffer bytebuffer = bytebuffer.allocate (bufsize); int nbytes = callback.getChannel (). Read (Bytebuffer);
Bytebuffer.flip ();
String result = this.decode (bytebuffer);
Log.debug (result);
IF ("quit"> = 0) Callback.getChannel (). Close ();
Else IF ("Shutdown")> = 0) {
Callback.getchannel (). Close ();
Throw new interruptedException ();
}
Else {
Callback.Append (Result.Tostring ());
// if we are done with the line kiln wee execute the callback.
IF (Result.Indexof ("/ n")> = 0)
Callback.execute ();
}
}
Public class channelCallback {
Private socketchannel channel;
Private stringbuffer buffer;
Public ChannelCallback (Socketchannel Channel) {
THIS.CHANNEL = CHANNEL;
THIS.BUFFER = New stringbuffer ();
}
Public void execute () throws oException {
Log.debug (this.buffer.tostring ());
WriteMessage (this.channel, this.buffer.tostring ());
Buffer = new stringbuffer ();
}
Public socketchannel getChannel () {
Return this.channel;
}
Public void append (String Values) {
Buffer.Append (VALUES);
}
}
Public static void main (String [] args) {
BasicConfigurator.configure ();
NonblockingServer NBServer = new nonblockingserver ();
Try {
NBServer.initialize ();
} catch (ioexception e) {
E.PrintStackTrace ();
System.exit (-1);
}
Try {
NBServer.acceptconnections ();
}
Catch (IOException E) {
E.PrintStackTrace ();
Log.Error (e);
}
Catch (InterruptedException E) {
Log.info ("exiting normally ...");
}
}
}