The most embarrassing use of NIO is not careful that the CPU utilization is maintained at 100%. General reasons may be
Registered OP_WRITE events have not been processed for a registration event (or untreated) to use NIO:
Only in one thread (operation in multiple threads) only a nightmare) only registering the currently interested event to send data directly, write the OP_WRITE event once again, in the next time send
Practical code: SelectorProcessor class, distribution event (best use thread pool) package zzzhc;
import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels. Selector; import java.util.iterator;
/ ** * @Author zzzhc * / public class selectorprocessor imports runnable {private factory selector selector;
PRIVATE FINAL Queue WaitClosequeue;
PRIVATE FINAL Queue WaitregisterQueue;
Private final Queue Waitaddinterestqueue;
PRIVATE FINAL THREAD Processorthread;
private boolean shutdown = false; private final static SelectorProcessor instance; static {try {instance = new SelectorProcessor ();} catch (IOException e) {throw new RuntimeException (e);}} public static SelectorProcessor getDefaultInstance () {return instance;}
public SelectorProcessor () throws IOException {selector = Selector.open (); waitCloseQueue = new Queue (); waitRegisterQueue = new Queue (); waitAddInterestQueue = new Queue (); processorThread = new Thread (this); //processorThread.setDaemon ( TRUE); // is this needed? processorthread.start ();
public void register (SelectableChannel sc, Handler handler, int ops) {if (Thread.currentThread () == processorThread) {doRegister (sc, handler, ops);} else {ChannelAssociater r = new ChannelAssociater (sc, handler, ops) ; synchronized (waitRegisterQueue) {waitRegisterQueue.push (r); selector.wakeup ();}}} public void addInterestOps (SelectableChannel sc, int addOps) {if (Thread.currentThread () == processorThread) {addOps (sc, addOps } Else {channelociaater r = new channelDOps; synchronized (waitaddinterestqueue) {waitaddinterestqueue.push (r); selector.wakeup ();}}}
public void closeChannel (SelectableChannel sc) {if (Thread.currentThread () == processorThread) {doClose (sc);} else {synchronized (waitCloseQueue) {waitCloseQueue.push (sc); selector.wakeup ();}}}
protected void doRegister (SelectableChannel sc, Handler handler, int ops) {if (Thread.currentThread () == processorThread) {try {sc.register (selector, ops, handler);} catch (ClosedChannelException e) {e.printStackTrace ( }}}
protected void addOps (SelectableChannel sc, int addOps) {if (Thread.currentThread () == processorThread) {SelectionKey key = sc.keyFor (selector); key.interestOps (key.interestOps () | addOps);}}
protected void doClose (SelectableChannel sc) {if (Thread.currentThread () == processorThread) {try {sc.close ();} catch (IOException e) {e.printStackTrace ();}}} protected void dealRegister () { if (Thread.currentThread () == processorThread) {synchronized (waitRegisterQueue) {while {ChannelAssociater ca = (ChannelAssociater) waitRegisterQueue .shift () (waitRegisterQueue.isEmpty ()!); doRegister (ca.sc, ca.handler, ca .OPS);}}}}}
protected void dealAddInterest () {if (Thread.currentThread () == processorThread) {synchronized (waitAddInterestQueue) {while {ChannelAssociater ca = (ChannelAssociater) waitAddInterestQueue .shift () (waitAddInterestQueue.isEmpty ()!); addOps (ca.sc , ca.ops);}}}}
protected void dealClose () {if (Thread.currentThread () == processorThread) {synchronized (waitCloseQueue) {while {SelectableChannel sc = (SelectableChannel) waitCloseQueue .shift () (waitCloseQueue.isEmpty ()!); doClose (sc); }}}}
Protected void dealshutdown () {iterator ity = select.keys (). iterator (); while (item.hasnext ()) {Try {SelectionKey Key = (SelectionKey) Iterator.Next (); key.channel (). close.................... );} Catch (ooException e) {E.PrintStackTrace ();}} try {selector.close ();} catch ({ooException e) {E.PrintStackTrace ();}} public void shutdown ()}} public void shutdown ()} True; selector.wakeup ();
Public void run () {int keycount = 0; while (! shutdown) {deaLregister (); dealaddinterest (); dealclose (); try {keycount = selector.select (); if (keycount == 0) {Continue;} Iterator itrator = selector.selectedKeys (). Iterator (); while (item.hasnext ()) {SelectionKey Key = (SelectionKey) iTerator.next (); item.remove (); key.interestOps () (key.interestOps () (Key.interestOps () ~ key.readyOps ()); Handler Handler = (Handler) Key.attachment (); if (key.isacceptable ()) {(AcceptHandler) Handler .handleAccept ();} else if (key.isconnectable ()) {((ConnectHandler) .handleconnect ();} else {readwritehandler rwh = ((READWRITEHANDAL) HANDLER); IF (key.isvalid () && key.ieadable ()) {rwh.handleread ();} else if (key.isvalid () && key.iswritable ()) {rwh.handlewrite ();}}}} catch ClosedSelectorexception CSE) {system.err.println ("Selector Closed:" Cse.getMessage () "/ nquit"); return;} catch (ooException e) {E.PrintStackTrace ();}} DEALSHUTDOWN () Class ChannelAlassociater {SelectableChannel SC;
Handler handler;
Int ops;
CHANELASSOCIATER (SELECTABECHANNEL SC, HANDLER HANDAL, INT OPS) {this.sc = sc; this.handler = Handler; this.ops = OPS;}}} // Queue class, simple packaging for LinkedArrayList, providing PUSH, POP, SHIFT , unshift operation, use it to Package Zzzhc;
Import java.util. *;
/ ** * @Author zzzhc * * / public class queue {private linkedlist content = new linkedList (); public void unshift (Object O) {Content .addfirst (o); public object shift () {return content.removefirst (); public void push (Object O) {content.addlast (o);} public object pop () {return content.removelast (); ()} Public int size ();} public int size ();}
} // Handler, ConnectHandler, AcceptHandler, ReadwriteHandler interface, processing event package zzzhc;
/ ** * @Author zzzhc * / public interface handler {
Package zzzhc;
/ ** * @author zzzhc * * / public interface connecthandler extends handler {void handleconnect ();
Package zzzhc;
/ ** * @Author zzzhc * / public interface accepthandler extends handler {void handleaccept ();
Package zzzhc;
/ ** * @Author zzzhc * / public interface readwritehandler Extends Handler {void handleread (); void handlewrite ();
}