nio       

java nio

bytebuffer

socket channel

selector

// 1. 获取数据源 和 目标传输地的输入输出流(此处以数据源 = 文件为例)
    FileInputStream fin = new FileInputStream(infile);
    FileOutputStream fout = new FileOutputStream(outfile);

    // 2. 获取数据源的输入输出通道
    FileChannel fcin = fin.getChannel();
    FileChannel fcout = fout.getChannel();

    // 3. 创建 缓冲区 对象:Buffer(共有2种方法)
     // 方法1:使用allocate()静态方法
     ByteBuffer buff = ByteBuffer.allocate(256);
     // 上述方法创建1个容量为256字节的ByteBuffer
     // 注:若发现创建的缓冲区容量太小,则重新创建一个大小合适的缓冲区

    // 方法2:通过包装一个已有的数组来创建
     // 注:通过包装的方法创建的缓冲区保留了被包装数组内保存的数据
     ByteBuffer buff = ByteBuffer.wrap(byteArray);

     // 额外:若需将1个字符串存入ByteBuffer,则如下
     String sendString="你好,服务器. ";
     ByteBuffer sendBuff = ByteBuffer.wrap(sendString.getBytes("UTF-16"));

    // 4. 从通道读取数据 & 写入到缓冲区
    // 注:若 以读取到该通道数据的末尾,则返回-1
    fcin.read(buff);

    // 5. 传出数据准备:将缓存区的写模式 转换->> 读模式
    buff.flip();

    // 6. 从 Buffer 中读取数据 & 传出数据到通道
    fcout.write(buff);

    // 7. 重置缓冲区
    // 目的:重用现在的缓冲区,即 不必为了每次读写都创建新的缓冲区,在再次读取之前要重置缓冲区
    // 注:不会改变缓冲区的数据,只是重置缓冲区的主要索引值
    buff.clear();

public class ServerSocketChannelTest {
 private static byte[] data = new byte[255];
 
 public static void main(String[] args) throws IOException {
  for (int i = 0; i < data.length; i++) {
   data[i] = (byte) i;
  }
  //新建NIO通道
  ServerSocketChannel server = ServerSocketChannel.open();
  //使通道为非阻塞
  server.configureBlocking(false);
  //创建基于NIO通道的socket连接
  ServerSocket ss = server.socket();
  //新建socket通道的端口
  ss.bind(new InetSocketAddress(9000));
  //将NIO通道绑定到选择器
  Selector selector = Selector.open();
  server.register(selector, SelectionKey.OP_ACCEPT);
 
  while (true) {
   //获取通道内是否有选择器的关心事件
   int num = selector.select();
   //如果小于1,停止此次循环,进行下一个循环
   if (num < 1) {
    continue;
   }
   //获取通道内关心事件的集合
   Set selectedKeys = selector.selectedKeys();
   Iterator iterator = selectedKeys.iterator();
   while (iterator.hasNext()) {
    SelectionKey key = (SelectionKey) iterator.next();
    //移走此次事件
    iterator.remove();
    
    if (key.isAcceptable()) {
     //获取对应的SocketChannel
     SocketChannel client = server.accept();
     System.out.println("Accepted connection from " + client);
     //使此通道为非阻塞
     client.configureBlocking(false);
     //将数组data的大小定义为ByteBuffer缓冲区的大小
     ByteBuffer source = ByteBuffer.wrap(data);
     
     //在此通道上注册事件
     SelectionKey key2 = client.register(selector,
       SelectionKey.OP_WRITE);
     //通道执行事件
     key2.attach(source);
    } else if (key.isWritable()) {
     //获取此通道的SocketChannel
     SocketChannel client = (SocketChannel) key.channel();
     ByteBuffer output = (ByteBuffer) key.attachment();
     //如果缓存区没了,重置一下
     if (!output.hasRemaining()) {
      output.rewind();
     }
     //在此通道内写东西
     client.write(output);
    }
    key.channel().close();
   }
 
  }
 
 }
}


public class SocketChannelTest {
 public static void main(String[] args) throws Exception {
  //建立到服务端的链接
  SocketAddress address = new InetSocketAddress("127.0.0.1", 9000);
  SocketChannel client = SocketChannel.open(address);
  //创建静态的缓冲区
  ByteBuffer buffer = ByteBuffer.allocate(255);
 
  //读取数据,到buffer中
  client.read(buffer);
  //将position重新置为0
  buffer.clear();
  //输出缓冲区的数据
  for (int i = 0; i < buffer.array().length; i++) {
   System.out.println(buffer.array()[i]);
  }
 }
}

nio byteBuffer

容量(Capacity):缓冲区能够容纳的数据元素的最大数量。初始设定后不能更改。 上界(Limit):缓冲区中第一个不能被读或者写的元素位置。或者说,缓冲区内现存元素的上界。 位置(Position):缓冲区内下一个将要被读或写的元素位置。在进行读写缓冲区时,位置会自动更新。 标记(Mark):一个备忘位置。初始时为“未定义”,调用mark时mark=positon,调用reset时position=mark。 这四个属性总是满足如下关系:

mark<=position<=limit<=capacity
ByteBuffer buf = ByteBuffer.allocate(1024);

direct buffer

ByteBuffer buf = ByteBuffer.allocateDirect(1024);
   public static ByteBuffer allocateDirect(int capacity) {
        return new DirectByteBuffer(capacity);
    }

    public static ByteBuffer allocate(int capacity) {
        if (capacity < 0)
            throw new IllegalArgumentException();
        return new HeapByteBuffer(capacity, capacity);
    }

java.nio.HeapByteBuffer#HeapByteBuffer(int, int)

HeapByteBuffer(int cap, int lim) {            // package-private

    super(-1, 0, lim, cap, new byte[cap], 0);
    /*
    hb = new byte[cap];
    offset = 0;
    */




}

heapByteBuffer就是在java程序理new byte[cap]空间

java.nio.DirectByteBuffer#DirectByteBuffer(int)

DirectByteBuffer(int cap) {                   // package-private

    super(-1, 0, cap, cap);
    boolean pa = VM.isDirectMemoryPageAligned();
    int ps = Bits.pageSize();
    long size = Math.max(1L, (long)cap + (pa ? ps : 0));
    Bits.reserveMemory(size, cap);

    long base = 0;
    try {
      	// native方法,jvm调用c的malloc
        base = unsafe.allocateMemory(size);
    } catch (OutOfMemoryError x) {
        Bits.unreserveMemory(size, cap);
        throw x;
    }
    unsafe.setMemory(base, size, (byte) 0);
    if (pa && (base % ps != 0)) {
        // Round up to page boundary
        address = base + ps - (base & (ps - 1));
    } else {
        address = base;
    }
    cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
    att = null;



}

这是一块在Java堆外分配的,可以在Java程序中访问的内存。

HeapByteBuffer和DirectByteBuffer的区别在于HeapByteBuffer是在Java Heap上分配的,但是Java NIO在读写到相应的Channel的时候,会先将Java Heap的buffer内容拷贝至直接内存——Direct Memory。这样的话,无疑DirectByteBuffer的IO性能肯定强于使用HeapByteBuffer,它省去了临时buffer的拷贝开销,这也是为什么各个NIO框架大多使用DirectByteBuffer的原因。

  1. 底层通过write、read、pwrite,pread函数进行系统调用时,需要传入buffer的起始地址和buffer count作为参数。具体参见:write(2): to file descriptorread(2): read from file descriptorpread(2) - Linux man pagepwrite(2) - Linux man page。如果使用java heap的话,我们知道jvm中buffer往往以byte[] 的形式存在,这是一个特殊的对象,由于java heap GC的存在,这里对象在堆中的位置往往会发生移动,移动后我们传入系统函数的地址参数就不是真正的buffer地址了,这样的话无论读写都会发生出错。而C Heap仅仅受Full GC的影响,相对来说地址稳定。DirectBuffer不会被GC移动

  2. JVM规范中没有要求Java的byte[]必须是连续的内存空间,它往往受宿主语言的类型约束;而C Heap中我们分配的虚拟地址空间是可以连续的,而上述的系统调用要求我们使用连续的地址空间作为buffer。

   static int write(FileDescriptor fd, ByteBuffer src, long position,
                     NativeDispatcher nd) 
        throws IOException
    {   
        if (src instanceof DirectBuffer)
            return writeFromNativeBuffer(fd, src, position, nd);

        // Substitute a native buffer
        int pos = src.position();
        int lim = src.limit();
        assert (pos <= lim);
        int rem = (pos <= lim ? lim - pos : 0); 
        ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
        try {
            bb.put(src);
            bb.flip();
        // ................略

image-20211123151758236

所以传统的Java Socket编程每次发送数据的时候,都会申请一块直接内存(堆外),然后从堆内复制到堆外,最后在调用send发送

为什么要把数据从堆内复制到堆外呢?因为堆内的对象地址会随着gc改变,在send的时候会崩

在高并发场景下,这是非常费内存的,假如每个链接发送的数据是1k, 那么堆内有1K的数据,堆外还要申请1K的数据,还要做数据拷贝,百万链接就需要2T的内存(极端场景), 无疑是瓶颈之一。

既然Socket Api一定要用堆外的内存,一个解决思路就是复用这块内存,这样就不必每次都申请一块新的内存,减少系统调用损耗

计算机世界就是解决一个问题的同时带来新的问题,如果复用了这块内存,如何在不用的时候进行回收就是一个问题了,因为这块内存在堆外,JVM的gc管不到,放着不管又迟早触发Linux的OOM Killer.

jdk1.4以后的NIO带来了解决方案: DirectBuffer

reference

https://www.jianshu.com/p/d30893c4d6bb

https://www.zhihu.com/question/60892134

https://zhuanlan.zhihu.com/p/27625923

https://juejin.cn/post/6844903644932866062