// 1. 获取数据源 和 目标传输地的输入输出流(此处以数据源 = 文件为例)
FileInputStream fin = new FileInputStream(infile);
FileOutputStream fout = new FileOutputStream(outfile);
// 2. 获取数据源的输入输出通道
FileChannel fcin = fin.getChannel();
FileChannel fcout = fout.getChannel();
// 3. 创建 缓冲区 对象:Buffer(共有2种方法)
// 方法1:使用allocate()静态方法
ByteBuffer buff = ByteBuffer.allocate(256);
// 上述方法创建1个容量为256字节的ByteBuffer
// 注:若发现创建的缓冲区容量太小,则重新创建一个大小合适的缓冲区
// 方法2:通过包装一个已有的数组来创建
// 注:通过包装的方法创建的缓冲区保留了被包装数组内保存的数据
ByteBuffer buff = ByteBuffer.wrap(byteArray);
// 额外:若需将1个字符串存入ByteBuffer,则如下
String sendString="你好,服务器. ";
ByteBuffer sendBuff = ByteBuffer.wrap(sendString.getBytes("UTF-16"));
// 4. 从通道读取数据 & 写入到缓冲区
// 注:若 以读取到该通道数据的末尾,则返回-1
fcin.read(buff);
// 5. 传出数据准备:将缓存区的写模式 转换->> 读模式
buff.flip();
// 6. 从 Buffer 中读取数据 & 传出数据到通道
fcout.write(buff);
// 7. 重置缓冲区
// 目的:重用现在的缓冲区,即 不必为了每次读写都创建新的缓冲区,在再次读取之前要重置缓冲区
// 注:不会改变缓冲区的数据,只是重置缓冲区的主要索引值
buff.clear();
public class ServerSocketChannelTest {
private static byte[] data = new byte[255];
public static void main(String[] args) throws IOException {
for (int i = 0; i < data.length; i++) {
data[i] = (byte) i;
}
//新建NIO通道
ServerSocketChannel server = ServerSocketChannel.open();
//使通道为非阻塞
server.configureBlocking(false);
//创建基于NIO通道的socket连接
ServerSocket ss = server.socket();
//新建socket通道的端口
ss.bind(new InetSocketAddress(9000));
//将NIO通道绑定到选择器
Selector selector = Selector.open();
server.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
//获取通道内是否有选择器的关心事件
int num = selector.select();
//如果小于1,停止此次循环,进行下一个循环
if (num < 1) {
continue;
}
//获取通道内关心事件的集合
Set selectedKeys = selector.selectedKeys();
Iterator iterator = selectedKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
//移走此次事件
iterator.remove();
if (key.isAcceptable()) {
//获取对应的SocketChannel
SocketChannel client = server.accept();
System.out.println("Accepted connection from " + client);
//使此通道为非阻塞
client.configureBlocking(false);
//将数组data的大小定义为ByteBuffer缓冲区的大小
ByteBuffer source = ByteBuffer.wrap(data);
//在此通道上注册事件
SelectionKey key2 = client.register(selector,
SelectionKey.OP_WRITE);
//通道执行事件
key2.attach(source);
} else if (key.isWritable()) {
//获取此通道的SocketChannel
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
//如果缓存区没了,重置一下
if (!output.hasRemaining()) {
output.rewind();
}
//在此通道内写东西
client.write(output);
}
key.channel().close();
}
}
}
}
public class SocketChannelTest {
public static void main(String[] args) throws Exception {
//建立到服务端的链接
SocketAddress address = new InetSocketAddress("127.0.0.1", 9000);
SocketChannel client = SocketChannel.open(address);
//创建静态的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(255);
//读取数据,到buffer中
client.read(buffer);
//将position重新置为0
buffer.clear();
//输出缓冲区的数据
for (int i = 0; i < buffer.array().length; i++) {
System.out.println(buffer.array()[i]);
}
}
}
容量(Capacity):缓冲区能够容纳的数据元素的最大数量。初始设定后不能更改。 上界(Limit):缓冲区中第一个不能被读或者写的元素位置。或者说,缓冲区内现存元素的上界。 位置(Position):缓冲区内下一个将要被读或写的元素位置。在进行读写缓冲区时,位置会自动更新。 标记(Mark):一个备忘位置。初始时为“未定义”,调用mark时mark=positon,调用reset时position=mark。 这四个属性总是满足如下关系:
mark<=position<=limit<=capacity
ByteBuffer buf = ByteBuffer.allocate(1024);
ByteBuffer buf = ByteBuffer.allocateDirect(1024);
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}
public static ByteBuffer allocate(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException();
return new HeapByteBuffer(capacity, capacity);
}
java.nio.HeapByteBuffer#HeapByteBuffer(int, int)
HeapByteBuffer(int cap, int lim) { // package-private
super(-1, 0, lim, cap, new byte[cap], 0);
/*
hb = new byte[cap];
offset = 0;
*/
}
heapByteBuffer就是在java程序理new byte[cap]空间
java.nio.DirectByteBuffer#DirectByteBuffer(int)
DirectByteBuffer(int cap) { // package-private
super(-1, 0, cap, cap);
boolean pa = VM.isDirectMemoryPageAligned();
int ps = Bits.pageSize();
long size = Math.max(1L, (long)cap + (pa ? ps : 0));
Bits.reserveMemory(size, cap);
long base = 0;
try {
// native方法,jvm调用c的malloc
base = unsafe.allocateMemory(size);
} catch (OutOfMemoryError x) {
Bits.unreserveMemory(size, cap);
throw x;
}
unsafe.setMemory(base, size, (byte) 0);
if (pa && (base % ps != 0)) {
// Round up to page boundary
address = base + ps - (base & (ps - 1));
} else {
address = base;
}
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
att = null;
}
这是一块在Java堆外分配的,可以在Java程序中访问的内存。
HeapByteBuffer和DirectByteBuffer的区别在于HeapByteBuffer是在Java Heap上分配的,但是Java NIO在读写到相应的Channel的时候,会先将Java Heap的buffer内容拷贝至直接内存——Direct Memory。这样的话,无疑DirectByteBuffer的IO性能肯定强于使用HeapByteBuffer,它省去了临时buffer的拷贝开销,这也是为什么各个NIO框架大多使用DirectByteBuffer的原因。
底层通过write、read、pwrite,pread函数进行系统调用时,需要传入buffer的起始地址和buffer count作为参数。具体参见:write(2): to file descriptor,read(2): read from file descriptor,pread(2) - Linux man page,pwrite(2) - Linux man page。如果使用java heap的话,我们知道jvm中buffer往往以byte[] 的形式存在,这是一个特殊的对象,由于java heap GC的存在,这里对象在堆中的位置往往会发生移动,移动后我们传入系统函数的地址参数就不是真正的buffer地址了,这样的话无论读写都会发生出错。而C Heap仅仅受Full GC的影响,相对来说地址稳定。DirectBuffer不会被GC移动
JVM规范中没有要求Java的byte[]必须是连续的内存空间,它往往受宿主语言的类型约束;而C Heap中我们分配的虚拟地址空间是可以连续的,而上述的系统调用要求我们使用连续的地址空间作为buffer。
static int write(FileDescriptor fd, ByteBuffer src, long position,
NativeDispatcher nd)
throws IOException
{
if (src instanceof DirectBuffer)
return writeFromNativeBuffer(fd, src, position, nd);
// Substitute a native buffer
int pos = src.position();
int lim = src.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
try {
bb.put(src);
bb.flip();
// ................略

所以传统的Java Socket编程每次发送数据的时候,都会申请一块直接内存(堆外),然后从堆内复制到堆外,最后在调用send发送
为什么要把数据从堆内复制到堆外呢?因为堆内的对象地址会随着gc改变,在send的时候会崩。
在高并发场景下,这是非常费内存的,假如每个链接发送的数据是1k, 那么堆内有1K的数据,堆外还要申请1K的数据,还要做数据拷贝,百万链接就需要2T的内存(极端场景), 无疑是瓶颈之一。
既然Socket Api一定要用堆外的内存,一个解决思路就是复用这块内存,这样就不必每次都申请一块新的内存,减少系统调用损耗。
计算机世界就是解决一个问题的同时带来新的问题,如果复用了这块内存,如何在不用的时候进行回收就是一个问题了,因为这块内存在堆外,JVM的gc管不到,放着不管又迟早触发Linux的OOM Killer.
jdk1.4以后的NIO带来了解决方案: DirectBuffer
https://www.jianshu.com/p/d30893c4d6bb
https://www.zhihu.com/question/60892134
https://zhuanlan.zhihu.com/p/27625923
https://juejin.cn/post/6844903644932866062