博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java NIO 使用实例
阅读量:5799 次
发布时间:2019-06-18

本文共 13498 字,大约阅读时间需要 44 分钟。

hot3.png

     在JDK1.4之前,Java OutputStream的write方法、InputStream的Read方法和ServerSocket的accept()方法都是阻塞方法,JDK1.4之前Java引入了新的输入输出系统(New Input/Out,NIO),非阻塞是Java NIO实现的重要功能之一 。

 1、Buffer

缓冲区,传输数据使用,本质是一个数组,Channel中读数据和写数据都只能通过Buffer传输。

2、Channel

通道,所有的IO流在NIO中都是从Channel开始的,数据可以从Channel读到Buffer中,也可以从Buffer写到Channel中,是Buffer对象的唯一接口。

3、Selector

选择器,它能检测一个或多个通道 (channel) 上的事件,并将事件分发出去。

使用一个 select 线程就能监听多个通道上的事件,并基于事件驱动触发相应的响应。而不需要为每个 channel 去分配一个线程。

4、SelectionKey

包含了事件的状态信息和时间对应的通道的绑定。

 

使用NIO的步骤:

1\创建一个Selector实例

2\将该实例注册到各种通道,指定每个通道上感兴趣的IO操作

3\重复执行,选择器循环:

    31\调用一种Select()方法

    32\获取已选键值

    33\对于已选中键集中的每一个键:

        331\将已选键从键集中移除

        332\获取信道,并从键中获取附件

        333\确定准备就绪的操作并执行;对于accept操作获得的SocketChannel对象,需将信道设为非阻塞模式,并将其注册到选择器中

        334\根据需要,修改键的兴趣操作集

如下代码:

NIOServer.javapackage org.hadoopinternal.nio;import java.net.InetSocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;public class NIOServer {	private static final int TIMEOUT  = 300;	private static final int PORT     = 12112;	public static void main(String[] args) {				try {		    			Selector selector = Selector.open();						ServerSocketChannel listenChannel = ServerSocketChannel.open();			listenChannel.configureBlocking(false);			listenChannel.socket().bind(new InetSocketAddress(PORT));			listenChannel.register(selector, SelectionKey.OP_ACCEPT);						while(true) {				if(selector.select(TIMEOUT)==0) {					System.out.print(".");					continue;				}								Iterator
iter = selector.selectedKeys().iterator(); while( iter.hasNext() ) { SelectionKey key = iter.next(); iter.remove(); //Server socket channel has pending connection request? if( key.isAcceptable() ) { SocketChannel channel=listenChannel.accept(); channel.configureBlocking(false); SelectionKey connkey=channel.register(selector, SelectionKey.OP_READ ); NIOServerConnection conn=new NIOServerConnection(connkey); connkey.attach(conn); } if( key.isReadable() ) { NIOServerConnection conn=(NIOServerConnection) key.attachment(); conn.handleRead(); } if( key.isValid() && key.isWritable() ) { NIOServerConnection conn=(NIOServerConnection) key.attachment(); conn.handleWrite(); } } } } catch (Exception e) { e.printStackTrace(); } }}
NIOServerConnection.java:package org.hadoopinternal.nio;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.SocketChannel;public class NIOServerConnection { private static final int BUFFSIZE = 1024; SelectionKey key; SocketChannel channel; ByteBuffer buffer; public NIOServerConnection(SelectionKey key) { this.key=key; this.channel=(SocketChannel) key.channel(); buffer=ByteBuffer.allocate(BUFFSIZE); } public void handleRead() throws IOException { long bytesRead=channel.read(buffer); if(bytesRead==-1) { channel.close(); } else { key.interestOps( SelectionKey.OP_READ | SelectionKey.OP_WRITE ); } } public void handleWrite() throws IOException { buffer.flip(); channel.write(buffer); if(!buffer.hasRemaining()) { key.interestOps( SelectionKey.OP_READ ); } buffer.compact(); }}

 

如下代码为hadoop IPC Server 中的Listener是一个标准的NIO应用:

/** Listens on the socket. Creates jobs for the handler threads*/  private class Listener extends Thread {        private ServerSocketChannel acceptChannel = null; //the accept channel    private Selector selector = null; //the selector that we use for the server    private Reader[] readers = null;    private int currentReader = 0;    private InetSocketAddress address; //the address we bind at    private Random rand = new Random();    private long lastCleanupRunTime = 0; //the last time when a cleanup connec-                                         //-tion (for idle connections) ran    private long cleanupInterval = 10000; //the minimum interval between                                           //two cleanup runs    private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);    private ExecutorService readPool;        public Listener() throws IOException {      address = new InetSocketAddress(bindAddress, port);      // Create a new server socket and set to non blocking mode      acceptChannel = ServerSocketChannel.open();      acceptChannel.configureBlocking(false);      // Bind the server socket to the local host and port      bind(acceptChannel.socket(), address, backlogLength);      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port      // create a selector;      selector= Selector.open();      readers = new Reader[readThreads];      readPool = Executors.newFixedThreadPool(readThreads);      for (int i = 0; i < readThreads; i++) {        Selector readSelector = Selector.open();        Reader reader = new Reader(readSelector);        readers[i] = reader;        readPool.execute(reader);      }      // Register accepts on the server socket with the selector.      acceptChannel.register(selector, SelectionKey.OP_ACCEPT);      this.setName("IPC Server listener on " + port);      this.setDaemon(true);    }        private class Reader implements Runnable {      private volatile boolean adding = false;      private Selector readSelector = null;      Reader(Selector readSelector) {        this.readSelector = readSelector;      }      public void run() {        LOG.info("Starting SocketReader");        synchronized (this) {          while (running) {            SelectionKey key = null;            try {              readSelector.select();              while (adding) {                this.wait(1000);              }                            Iterator
iter = readSelector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); if (key.isValid()) { if (key.isReadable()) { doRead(key); } } key = null; } } catch (InterruptedException e) { if (running) { // unexpected -- log it LOG.info(getName() + " caught: " + StringUtils.stringifyException(e)); } } catch (IOException ex) { LOG.error("Error in Reader", ex); } } } } /** * This gets reader into the state that waits for the new channel * to be registered with readSelector. If it was waiting in select() * the thread will be woken up, otherwise whenever select() is called * it will return even if there is nothing to read and wait * in while(adding) for finishAdd call */ public void startAdd() { adding = true; readSelector.wakeup(); } public synchronized SelectionKey registerChannel(SocketChannel channel) throws IOException { return channel.register(readSelector, SelectionKey.OP_READ); } public synchronized void finishAdd() { adding = false; this.notify(); } } /** cleanup connections from connectionList. Choose a random range * to scan and also have a limit on the number of the connections * that will be cleanedup per run. The criteria for cleanup is the time * for which the connection was idle. If 'force' is true then all * connections will be looked at for the cleanup. */ private void cleanupConnections(boolean force) { if (force || numConnections > thresholdIdleConnections) { long currentTime = System.currentTimeMillis(); if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) { return; } int start = 0; int end = numConnections - 1; if (!force) { start = rand.nextInt() % numConnections; end = rand.nextInt() % numConnections; int temp; if (end < start) { temp = start; start = end; end = temp; } } int i = start; int numNuked = 0; while (i <= end) { Connection c; synchronized (connectionList) { try { c = connectionList.get(i); } catch (Exception e) {return;} } if (c.timedOut(currentTime)) { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": disconnecting client " + c.getHostAddress()); closeConnection(c); numNuked++; end--; c = null; if (!force && numNuked == maxConnectionsToNuke) break; } else i++; } lastCleanupRunTime = System.currentTimeMillis(); } } @Override public void run() { LOG.info(getName() + ": starting"); SERVER.set(Server.this); while (running) { SelectionKey key = null; try { selector.select(); Iterator
iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); try { if (key.isValid()) { if (key.isAcceptable()) doAccept(key); } } catch (IOException e) { } key = null; } } catch (OutOfMemoryError e) { // we can run out of memory if we have too many threads // log the event and sleep for a minute and give // some thread(s) a chance to finish LOG.warn("Out of Memory in server select", e); closeCurrentConnection(key, e); cleanupConnections(true); try { Thread.sleep(60000); } catch (Exception ie) {} } catch (Exception e) { closeCurrentConnection(key, e); } cleanupConnections(false); } LOG.info("Stopping " + this.getName()); synchronized (this) { try { acceptChannel.close(); selector.close(); } catch (IOException e) { } selector= null; acceptChannel= null; // clean up all connections while (!connectionList.isEmpty()) { closeConnection(connectionList.remove(0)); } } } private void closeCurrentConnection(SelectionKey key, Throwable e) { if (key != null) { Connection c = (Connection)key.attachment(); if (c != null) { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": disconnecting client " + c.getHostAddress()); closeConnection(c); c = null; } } } InetSocketAddress getAddress() { return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress(); } void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { Connection c = null; ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel; while ((channel = server.accept()) != null) { channel.configureBlocking(false); channel.socket().setTcpNoDelay(tcpNoDelay); Reader reader = getReader(); try { reader.startAdd(); SelectionKey readKey = reader.registerChannel(channel); c = new Connection(readKey, channel, System.currentTimeMillis()); readKey.attach(c); synchronized (connectionList) { connectionList.add(numConnections, c); numConnections++; } if (LOG.isDebugEnabled()) LOG.debug("Server connection from " + c.toString() + "; # active connections: " + numConnections + "; # queued calls: " + callQueue.size()); } finally { reader.finishAdd(); } } } void doRead(SelectionKey key) throws InterruptedException { int count = 0; Connection c = (Connection)key.attachment(); if (c == null) { return; } c.setLastContact(System.currentTimeMillis()); try { count = c.readAndProcess(); } catch (InterruptedException ieo) { LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo); throw ieo; } catch (Exception e) { LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e); count = -1; //so that the (count < 0) block is executed } if (count < 0) { if (LOG.isDebugEnabled()) LOG.debug(getName() + ": disconnecting client " + c + ". Number of active connections: "+ numConnections); closeConnection(c); c = null; } else { c.setLastContact(System.currentTimeMillis()); } } synchronized void doStop() { if (selector != null) { selector.wakeup(); Thread.yield(); } if (acceptChannel != null) { try { acceptChannel.socket().close(); } catch (IOException e) { LOG.info(getName() + ":Exception in closing listener socket. " + e); } } readPool.shutdown(); } // The method that will return the next reader to work with // Simplistic implementation of round robin for now Reader getReader() { currentReader = (currentReader + 1) % readers.length; return readers[currentReader]; } }

 

转载于:https://my.oschina.net/guhai2004/blog/165492

你可能感兴趣的文章
js 经过修改改良的全浏览器支持的软键盘,随机排列
查看>>
做完小程序项目、老板给我加了6k薪资~
查看>>
脱离“体验”和“安全”谈盈利的游戏运营 都是耍流氓
查看>>
TortoiseSVN中图标的含义
查看>>
根据毫秒数计算出当前的“年/月/日/时/分/秒/星期”并不是件容易的事
查看>>
Unity Shaders and Effects Cookbook (3-5) 金属软高光
查看>>
31-hadoop-hbase-mapreduce操作hbase
查看>>
NYOJ283对称排序
查看>>
C#反射实例应用--------获取程序集信息和通过类名创建类实例
查看>>
VC中实现文字竖排的简单方法
查看>>
程序员常用的六大技术博客类
查看>>
深入理解浏览器的缓存机制
查看>>
又拍云沈志华:如何打造一款安全的App
查看>>
dubbo源码分析-架构
查看>>
6套毕业设计PPT模板拯救你的毕业答辩
查看>>
Windows phone 8 学习笔记
查看>>
我的友情链接
查看>>
sshd_config设置参数笔记
查看>>
LeetCode--112--路径总和
查看>>
感悟贴2016-05-13
查看>>