How to use NIO

xiaoxiao2021-03-06  45

The most embarrassing use of NIO is not careful that the CPU utilization is maintained at 100%. General reasons may be

Registered OP_WRITE events have not been processed for a registration event (or untreated) to use NIO:

Only in one thread (operation in multiple threads) only a nightmare) only registering the currently interested event to send data directly, write the OP_WRITE event once again, in the next time send

Practical code: SelectorProcessor class, distribution event (best use thread pool) package zzzhc;

import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels. Selector; import java.util.iterator;

/ ** * @Author zzzhc * / public class selectorprocessor imports runnable {private factory selector selector;

PRIVATE FINAL Queue WaitClosequeue;

PRIVATE FINAL Queue WaitregisterQueue;

Private final Queue Waitaddinterestqueue;

PRIVATE FINAL THREAD Processorthread;

private boolean shutdown = false; private final static SelectorProcessor instance; static {try {instance = new SelectorProcessor ();} catch (IOException e) {throw new RuntimeException (e);}} public static SelectorProcessor getDefaultInstance () {return instance;}

public SelectorProcessor () throws IOException {selector = Selector.open (); waitCloseQueue = new Queue (); waitRegisterQueue = new Queue (); waitAddInterestQueue = new Queue (); processorThread = new Thread (this); //processorThread.setDaemon ( TRUE); // is this needed? processorthread.start ();

public void register (SelectableChannel sc, Handler handler, int ops) {if (Thread.currentThread () == processorThread) {doRegister (sc, handler, ops);} else {ChannelAssociater r = new ChannelAssociater (sc, handler, ops) ; synchronized (waitRegisterQueue) {waitRegisterQueue.push (r); selector.wakeup ();}}} public void addInterestOps (SelectableChannel sc, int addOps) {if (Thread.currentThread () == processorThread) {addOps (sc, addOps } Else {channelociaater r = new channelDOps; synchronized (waitaddinterestqueue) {waitaddinterestqueue.push (r); selector.wakeup ();}}}

public void closeChannel (SelectableChannel sc) {if (Thread.currentThread () == processorThread) {doClose (sc);} else {synchronized (waitCloseQueue) {waitCloseQueue.push (sc); selector.wakeup ();}}}

protected void doRegister (SelectableChannel sc, Handler handler, int ops) {if (Thread.currentThread () == processorThread) {try {sc.register (selector, ops, handler);} catch (ClosedChannelException e) {e.printStackTrace ( }}}

protected void addOps (SelectableChannel sc, int addOps) {if (Thread.currentThread () == processorThread) {SelectionKey key = sc.keyFor (selector); key.interestOps (key.interestOps () | addOps);}}

protected void doClose (SelectableChannel sc) {if (Thread.currentThread () == processorThread) {try {sc.close ();} catch (IOException e) {e.printStackTrace ();}}} protected void dealRegister () { if (Thread.currentThread () == processorThread) {synchronized (waitRegisterQueue) {while {ChannelAssociater ca = (ChannelAssociater) waitRegisterQueue .shift () (waitRegisterQueue.isEmpty ()!); doRegister (ca.sc, ca.handler, ca .OPS);}}}}}

protected void dealAddInterest () {if (Thread.currentThread () == processorThread) {synchronized (waitAddInterestQueue) {while {ChannelAssociater ca = (ChannelAssociater) waitAddInterestQueue .shift () (waitAddInterestQueue.isEmpty ()!); addOps (ca.sc , ca.ops);}}}}

protected void dealClose () {if (Thread.currentThread () == processorThread) {synchronized (waitCloseQueue) {while {SelectableChannel sc = (SelectableChannel) waitCloseQueue .shift () (waitCloseQueue.isEmpty ()!); doClose (sc); }}}}

Protected void dealshutdown () {iterator ity = select.keys (). iterator (); while (item.hasnext ()) {Try {SelectionKey Key = (SelectionKey) Iterator.Next (); key.channel (). close.................... );} Catch (ooException e) {E.PrintStackTrace ();}} try {selector.close ();} catch ({ooException e) {E.PrintStackTrace ();}} public void shutdown ()}} public void shutdown ()} True; selector.wakeup ();

Public void run () {int keycount = 0; while (! shutdown) {deaLregister (); dealaddinterest (); dealclose (); try {keycount = selector.select (); if (keycount == 0) {Continue;} Iterator itrator = selector.selectedKeys (). Iterator (); while (item.hasnext ()) {SelectionKey Key = (SelectionKey) iTerator.next (); item.remove (); key.interestOps () (key.interestOps () (Key.interestOps () ~ key.readyOps ()); Handler Handler = (Handler) Key.attachment (); if (key.isacceptable ()) {(AcceptHandler) Handler .handleAccept ();} else if (key.isconnectable ()) {((ConnectHandler) .handleconnect ();} else {readwritehandler rwh = ((READWRITEHANDAL) HANDLER); IF (key.isvalid () && key.ieadable ()) {rwh.handleread ();} else if (key.isvalid () && key.iswritable ()) {rwh.handlewrite ();}}}} catch ClosedSelectorexception CSE) {system.err.println ("Selector Closed:" Cse.getMessage () "/ nquit"); return;} catch (ooException e) {E.PrintStackTrace ();}} DEALSHUTDOWN () Class ChannelAlassociater {SelectableChannel SC;

Handler handler;

Int ops;

CHANELASSOCIATER (SELECTABECHANNEL SC, HANDLER HANDAL, INT OPS) {this.sc = sc; this.handler = Handler; this.ops = OPS;}}} // Queue class, simple packaging for LinkedArrayList, providing PUSH, POP, SHIFT , unshift operation, use it to Package Zzzhc;

Import java.util. *;

/ ** * @Author zzzhc * * / public class queue {private linkedlist content = new linkedList (); public void unshift (Object O) {Content .addfirst (o); public object shift () {return content.removefirst (); public void push (Object O) {content.addlast (o);} public object pop () {return content.removelast (); ()} Public int size ();} public int size ();}

} // Handler, ConnectHandler, AcceptHandler, ReadwriteHandler interface, processing event package zzzhc;

/ ** * @Author zzzhc * / public interface handler {

Package zzzhc;

/ ** * @author zzzhc * * / public interface connecthandler extends handler {void handleconnect ();

Package zzzhc;

/ ** * @Author zzzhc * / public interface accepthandler extends handler {void handleaccept ();

Package zzzhc;

/ ** * @Author zzzhc * / public interface readwritehandler Extends Handler {void handleread (); void handlewrite ();

}

转载请注明原文地址:https://www.9cbs.com/read-82664.html

New Post(0)