常用的零拷贝有 mmap 和 sendFile
除了标准的文件 IO,例如 open, read, write,内核还提供接口允许应用将文件 map 到内存。使得内存中的一个字节与文件中的一个字节一一对应。
read()
和 write()
系统调用,也避免了数据的拷贝。lseek()
系统调用。#include <sys/mman.h>
void * mmap (void *addr,
size_t len,
int prot,
int flags,
int fd,
off_t offset);
场景 | 私有影射 | 共享映射 |
---|---|---|
匿名映射 | 通常用于内存分配fd=-1,flags=MAP_ANONYMOUS|MAP_PRIVATE | 通常用于进程间内存共享,常用于父子进程之间通信。FD=-1,flags=MAP_ANONYMOUS|MAP_SHARED |
文件映射 | 通常用于加载动态库flags=MAP_PRIVATE | 通常用于内存映射IO、进程间通信、读写文件。flags=MAP_SHARED |
/** Allocates large pages memory.
*/
ptr = mmap(NULL, size, PROT_READ | PROT_WRITE,
MAP_PRIVATE | OS_MAP_ANON, -1, 0);
package read;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
public class FormerReader {
public static void main(String [] f) throws Exception {
File fileIn = new File("/Users/joyce/Downloads/java.pdf"); //打开源文件
File fileOut = new File("/Users/joyce/Downloads/target.pdf");
// 普通stream方式
long formerStart = System.currentTimeMillis();
FileInputStream streamln = new FileInputStream (fileIn);
FileOutputStream streamOut = new FileOutputStream (fileOut);
int c;
while ((c = streamln.read()) != -1) {
streamOut.write(c);
}
streamln.close();
streamOut.close();
long formerEnd = System.currentTimeMillis();
System.out.println((formerStart-formerEnd)/1000);
// randomaccessFile
formerStart = System.currentTimeMillis();
RandomAccessFile randomAccessFileR = new RandomAccessFile(fileIn, "r");
RandomAccessFile randomAccessFileW = new RandomAccessFile(fileOut, "rw");
byte[] buf = new byte[1024];
while((randomAccessFileR.read(buf)) != -1) {
randomAccessFileW.write(buf);
}
formerEnd = System.currentTimeMillis();
System.out.println((formerStart-formerEnd)/1000);
// nio FileChannel
formerStart = System.currentTimeMillis();
FileChannel fileChannelIn = new RandomAccessFile(fileIn, "r").getChannel();
FileChannel fileChannelOut = new RandomAccessFile(fileOut, "rw").getChannel();
ByteBuffer buffer = ByteBuffer.allocate(48);
int bytesRead = fileChannelIn.read(buffer);
while(bytesRead != -1){
buffer.flip();
fileChannelOut.write(buffer);
buffer.clear();
bytesRead = fileChannelIn.read(buffer);
}
formerEnd = System.currentTimeMillis();
System.out.println((formerStart-formerEnd)/1000);
// nio MappedByteBuffer
// 将FileChannle对于的文件的一部分直接映射到内存。(这里的内存是堆外内存)
formerStart = System.currentTimeMillis();
long len = fileIn.length();
MappedByteBuffer mappedByteBuffer = new RandomAccessFile(fileIn, "r").getChannel().map(FileChannel.MapMode.READ_ONLY, 0, len);
MappedByteBuffer mappedByteBufferout = new RandomAccessFile(fileOut, "rw").getChannel().map(FileChannel.MapMode.READ_WRITE, 0, len);
for (int offset = 0; offset < len; offset++) {
byte b = mappedByteBuffer.get();
mappedByteBufferout.put(b);
}
formerEnd = System.currentTimeMillis();
System.out.println((formerStart-formerEnd)/1000);
}
}
kafka的索引文件利用的是mmap,数据文件是用FileChannel
rocketmq写文件两种都有,可以配置
kafka.log.AbstractIndex#mmap
@volatile
protected var mmap: MappedByteBuffer = {
// 第1步:创建索引文件
val newlyCreated = file.createNewFile()
// 第2步:以writable指定的方式(读写方式或只读方式)打开索引文件
val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r")
try {
if(newlyCreated) {
if(maxIndexSize < entrySize) // 预设的索引文件大小不能太小,如果连一个索引项都保存不了,直接抛出异常
throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
// 第3步:设置索引文件长度,roundDownToExactMultiple计算的是不超过maxIndexSize的最大整数倍entrySize
// 比如maxIndexSize=1234567,entrySize=8,那么调整后的文件长度为1234560
raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
}
// 第4步:更新索引长度字段_length
_length = raf.length()
// 第5步:创建MappedByteBuffer对象
val idx = {
if (writable)
raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length)
else
raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length)
}
/* set the position in the index for the next entry */
// 第6步:如果是新创建的索引文件,将MappedByteBuffer对象的当前位置置成0
// 如果索引文件已存在,将MappedByteBuffer对象的当前位置设置成最后一个索引项所在的位置
if(newlyCreated)
idx.position(0)
else
idx.position(roundDownToExactMultiple(idx.limit(), entrySize))
// 第7步:返回创建的MappedByteBuffer对象
idx
} finally {
CoreUtils.swallow(raf.close(), AbstractIndex) // 关闭打开索引文件句柄
}
}
新版的rocketMQ应该是用mmap写了
网友测评
MMAP 众所周知,基于 OS 的 mmap 的内存映射技术,通过 MMU 映射文件,使随机读写文件和读写内存相似的速度。
FileChannel 快,只是因为他是基于 block 的
mmap 通过内存映射,将文件映射到内核缓冲区,同时,用户空间可以共享内核空间的数据。这样,在进行网络传输时,就可以减少 内核空间 到 用户空间 的拷贝次数
user buffer 和 kernel buffer 共享 data 数据。如果你想把硬盘的 data 数据传输到网络中,再也不用拷贝到用户空间,再从用户空间拷贝到 Socket 缓冲区。
你只需要从 内核缓冲区 拷贝到 Socket 缓冲区即可,这将减少一次内存拷贝(从 4 次变成了 3 次),
数据根本不经过用户态,直接从内核缓冲区进入到 Socket Buffer,同时,由于和用户态完全无关,就减少了一次上下文切换
再减少1次拷贝
再稍微讲讲 mmap 和 sendFile 的区别。
mmap 适合小数据量读写,sendFile 适合大文件传输。
mmap 需要 4 次上下文切换,3 次数据拷贝;sendFile 需要 3 次上下文切换,最少 2 次数据拷贝。
sendFile 可以利用 DMA 方式,减少 CPU 拷贝,mmap 则不能(必须从内核拷贝到 Socket 缓冲区)
在这个选择上:rocketMQ 在消费消息时,使用了 mmap。kafka 使用了 sendFile。
kafka 大部分场景是使用消息队列,基本上没有复杂场景,就是一个数据的流转,所以适合数据进来直接被消费方读取走了,在这期间不需要做其他内部业务逻辑
kafka中partition leader到follower的消息同步和consumer拉取partition中的消息都使用到zero cory。Cousumer从broker获取数据时直接使用了FileChannel.transferTo(),直接在内核态进行的channel到channel的数据传输。
https://blog.csdn.net/weixin_37782390/article/details/103833306
https://www.cnblogs.com/liang1101/p/13160828.html
https://www.jianshu.com/p/187eada7b900
https://juejin.im/post/5cd82323f265da038932b1e6
https://zhuanlan.zhihu.com/p/132393165
https://blog.csdn.net/alex_xfboy/article/details/90174840