零拷贝主要的任务就是避免CPU将数据从一块存储拷贝到另外一块存储,避免让CPU做大量的数据拷贝任务,减少不必要的拷贝,或者让别的组件来做这一类简单的数据传输任务,让CPU解脱出来专注于别的任务。这样就可以让系统资源的利用更加有效。并减少内核态和用户态的切换
while((n = read(diskfd, buf, BUF_SIZE)) > 0)
write(sockfd, buf , n);
buf = mmap(diskfd, len);
write(sockfd, buf, len);
#include<sys/sendfile.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);
Java NIO中的FileChannal.transferTo()方法就是这样的实现,这个实现是依赖于操作系统底层的sendFile()实现的。
public void transferTo(long position, long count, WritableByteChannel target);
他底层的调用时系统调用**sendFile()**方法:
#include <sys/socket.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);
fis = new FileInputStream(srcPath);
fos = new FileOutputStream(dstPath);
fisChannel = fis.getChannel();
fosChannel = fos.getChannel();
long len = fisChannel.transferTo(0, fisChannel.size(), fosChannel);
java send file api 是 transferTo 方法和 transferFrom 方法。
注意:send file 是一个从磁盘到网卡驱动的 IO 优化。反过来,网卡到磁盘,是没有这个 IO 优化的。也就是说 transferFrom 方法并没有这种福利。
public long transferFrom(ReadableByteChannel src,
long position, long count)
throws IOException
{
ensureOpen();
if (!src.isOpen())
throw new ClosedChannelException();
if (!writable)
throw new NonWritableChannelException();
if ((position < 0) || (count < 0))
throw new IllegalArgumentException();
if (position > size())
return 0;
if (src instanceof FileChannelImpl)
return transferFromFileChannel((FileChannelImpl)src,
position, count);
return transferFromArbitraryChannel(src, position, count);
}
如果src是普通 FileChannel 的话,就走 mmap
private static final long MAPPED_TRANSFER_SIZE = 8L*1024L*1024L;
private long transferFromFileChannel(FileChannelImpl src,
long position, long count)
throws IOException
{
if (!src.readable)
throw new NonReadableChannelException();
synchronized (src.positionLock) {
long pos = src.position();
long max = Math.min(count, src.size() - pos);
long remaining = max;
long p = pos;
while (remaining > 0L) {
long size = Math.min(remaining, MAPPED_TRANSFER_SIZE);
// ## Bug: Closing this channel will not terminate the write
MappedByteBuffer bb = src.map(MapMode.READ_ONLY, p, size);
try {
long n = write(bb, position);
assert n > 0;
p += n;
position += n;
remaining -= n;
} catch (IOException ioe) {
// Only throw exception if no bytes have been written
if (remaining == max)
throw ioe;
break;
} finally {
unmap(bb);
}
}
long nwritten = max - remaining;
src.position(pos + nwritten);
return nwritten;
}
}
在一个循环中,每次就是将 源文件根据 position 映射为一个 mmap,最大 8M,逐次的将数据写入的目标文件中
如果我们使用的是 SocketChannelImpl ,就会走堆外内存,也是在一个循环里进行写入,每次最大 8k。用完尽量进行回收,不是释放。
先写到堆外内存,再写入
对外内存获取方式即ByteBuffer.allocateDirect
/**
* Returns a temporary buffer of at least the given size
*/
public static ByteBuffer getTemporaryDirectBuffer(int size) {
// If a buffer of this size is too large for the cache, there
// should not be a buffer in the cache that is at least as
// large. So we'll just create a new one. Also, we don't have
// to remove the buffer from the cache (as this method does
// below) given that we won't put the new buffer in the cache.
if (isBufferTooLarge(size)) {
return ByteBuffer.allocateDirect(size);
}
BufferCache cache = bufferCache.get();
ByteBuffer buf = cache.get(size);
if (buf != null) {
return buf;
} else {
// No suitable buffer in the cache so we need to allocate a new
// one. To avoid the cache growing then we remove the first
// buffer from the cache and free it.
if (!cache.isEmpty()) {
buf = cache.removeFirst();
free(buf);
}
return ByteBuffer.allocateDirect(size);
}
}
这个 bufferCache 是一个 ThreadLocal, 如下图所示 ,线程安全。类 netty 内存设计。
public long transferTo(long position, long count,
WritableByteChannel target)
throws IOException
{
ensureOpen();
if (!target.isOpen())
throw new ClosedChannelException();
if (!readable)
throw new NonReadableChannelException();
if (target instanceof FileChannelImpl &&
!((FileChannelImpl)target).writable)
throw new NonWritableChannelException();
if ((position < 0) || (count < 0))
throw new IllegalArgumentException();
long sz = size();
if (position > sz)
return 0;
int icount = (int)Math.min(count, Integer.MAX_VALUE);
if ((sz - position) < icount)
icount = (int)(sz - position);
long n;
// Attempt a direct transfer, if the kernel supports it
// send file
if ((n = transferToDirectly(position, icount, target)) >= 0)
return n;
// Attempt a mapped transfer, but only to trusted channel types
// mmp
if ((n = transferToTrustedChannel(position, icount, target)) >= 0)
return n;
// Slow path for untrusted targets
// 堆外内存
return transferToArbitraryChannel(position, icount, target);
}
1.
transferToDirectly ->
// Transfers from src to dst, or returns -2 if kernel can't do that
private native long transferTo0(FileDescriptor src, long position,
long count, FileDescriptor dst);
2.
transferToDirectly->
private long transferToTrustedChannel(long position, long count,
WritableByteChannel target)
throws IOException
{
boolean isSelChImpl = (target instanceof SelChImpl);
if (!((target instanceof FileChannelImpl) || isSelChImpl))
return IOStatus.UNSUPPORTED;
// Trusted target: Use a mapped buffer
long remaining = count;
while (remaining > 0L) {
long size = Math.min(remaining, MAPPED_TRANSFER_SIZE);
try {
MappedByteBuffer dbb = map(MapMode.READ_ONLY, position, size);
try {
// ## Bug: Closing this channel will not terminate the write
int n = target.write(dbb);
assert n >= 0;
remaining -= n;
if (isSelChImpl) {
// one attempt to write to selectable channel
break;
}
assert n > 0;
position += n;
} finally {
unmap(dbb);
}
} catch (ClosedByInterruptException e) {
// target closed by interrupt as ClosedByInterruptException needs
// to be thrown after closing this channel.
assert !target.isOpen();
try {
close();
} catch (Throwable suppressed) {
e.addSuppressed(suppressed);
}
throw e;
} catch (IOException ioe) {
// Only throw exception if no bytes have been written
if (remaining == count)
throw ioe;
break;
}
}
return count - remaining;
}
3.
transferToArbitraryChannel->同transferFromArbitraryChannel
Netty 内存使用技巧 - Zero-Copy
例 1:使用逻辑组合,代替实际复制。
例如 CompositeByteBuf:
io.netty.handler.codec.ByteToMessageDecoder#COMPOSITE_CUMULATOR
Netty 内存使用技巧 - Zero-Copy
例 2:使用包装,代替实际复制。
byte[] bytes = data.getBytes();
ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
Netty 内存使用技巧 - Zero-Copy
例 3:调用 JDK 的 Zero-Copy 接口。
Netty 中也通过在 DefaultFileRegion 中包装了 NIO 的 FileChannel.transferTo() 方法实
现了零拷贝:io.netty.channel.DefaultFileRegion#transferTo
public class UnitTest1 {
private static final String prefix = "~/path/to/";
public static void main(String[] args) throws Exception {
streamCopy("input", "output1");
bufferCopy("input", "output2");
directBufferCopy("input", "output3");
mappedByteBufferCopy("input", "output4");
mappedByteBufferCopyByPart("input", "output5");
channelCopy("input", "output6");
}
/**
* 使用stream
*/
private static void streamCopy(String from, String to) throws IOException {
long startTime = System.currentTimeMillis();
File inputFile = new File(prefix + from);
File outputFile = new File(prefix + to);
FileInputStream fis = new FileInputStream(inputFile);
FileOutputStream fos = new FileOutputStream(outputFile);
byte[] bytes = new byte[1024];
int len;
while ((len = fis.read(bytes)) != -1) {
fos.write(bytes, 0, len);
}
fos.flush();
fis.close();
fos.close();
long endTime = System.currentTimeMillis();
System.out.println("streamCopy cost:" + (endTime - startTime));
}
/**
* 使用buffer
*/
private static void bufferCopy(String from, String to) throws IOException {
long startTime = System.currentTimeMillis();
RandomAccessFile inputFile = new RandomAccessFile(prefix + from, "r");
RandomAccessFile outputFile = new RandomAccessFile(prefix + to, "rw");
FileChannel inputChannel = inputFile.getChannel();
FileChannel outputChannel = outputFile.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while (inputChannel.read(byteBuffer) != -1) {
byteBuffer.flip();
outputChannel.write(byteBuffer);
byteBuffer.clear();
}
inputChannel.close();
outputChannel.close();
long endTime = System.currentTimeMillis();
System.out.println("bufferCopy cost:" + (endTime - startTime));
}
/**
* 使用堆外内存
*/
private static void directBufferCopy(String from, String to) throws IOException {
long startTime = System.currentTimeMillis();
RandomAccessFile inputFile = new RandomAccessFile(prefix + from, "r");
RandomAccessFile outputFile = new RandomAccessFile(prefix + to, "rw");
FileChannel inputChannel = inputFile.getChannel();
FileChannel outputChannel = outputFile.getChannel();
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
while (inputChannel.read(byteBuffer) != -1) {
byteBuffer.flip();
outputChannel.write(byteBuffer);
byteBuffer.clear();
}
inputChannel.close();
outputChannel.close();
long endTime = System.currentTimeMillis();
System.out.println("directBufferCopy cost:" + (endTime - startTime));
}
/**
* 内存映射全量
*/
private static void mappedByteBufferCopy(String from, String to) throws IOException {
long startTime = System.currentTimeMillis();
RandomAccessFile inputFile = new RandomAccessFile(prefix + from, "r");
RandomAccessFile outputFile = new RandomAccessFile(prefix + to, "rw");
FileChannel inputChannel = inputFile.getChannel();
FileChannel outputChannel = outputFile.getChannel();
MappedByteBuffer iBuffer = inputChannel.map(MapMode.READ_ONLY, 0, inputFile.length());
MappedByteBuffer oBuffer = outputChannel.map(MapMode.READ_WRITE, 0, inputFile.length());
// 直接操作buffer,没有其他IO操作
oBuffer.put(iBuffer);
inputChannel.close();
outputChannel.close();
long endTime = System.currentTimeMillis();
System.out.println("mappedByteBufferCopy cost:" + (endTime - startTime));
}
/**
* 内存映射部分
*/
private static void mappedByteBufferCopyByPart(String from, String to) throws IOException {
long startTime = System.currentTimeMillis();
RandomAccessFile inputFile = new RandomAccessFile(prefix + from, "r");
RandomAccessFile outputFile = new RandomAccessFile(prefix + to, "rw");
FileChannel inputChannel = inputFile.getChannel();
FileChannel outputChannel = outputFile.getChannel();
for (long i = 0; i < inputFile.length(); i += 1024) {
long size = 1024;
// 避免文件产生间隙
if (i + size > inputFile.length()) {
size = inputFile.length() - i;
}
MappedByteBuffer iBuffer = inputChannel.map(MapMode.READ_ONLY, i, size);
MappedByteBuffer oBuffer = outputChannel.map(MapMode.READ_WRITE, i, size);
oBuffer.put(iBuffer);
}
inputChannel.close();
outputChannel.close();
long endTime = System.currentTimeMillis();
System.out.println("mappedByteBufferCopyByPart cost:" + (endTime - startTime));
}
/**
* zero copy
*/
private static void channelCopy(String from, String to) throws IOException {
long startTime = System.currentTimeMillis();
RandomAccessFile inputFile = new RandomAccessFile(prefix + from, "r");
RandomAccessFile outputFile = new RandomAccessFile(prefix + to, "rw");
FileChannel inputChannel = inputFile.getChannel();
FileChannel outputChannel = outputFile.getChannel();
inputChannel.transferTo(0, inputFile.length(), outputChannel);
inputChannel.close();
outputChannel.close();
long endTime = System.currentTimeMillis();
System.out.println("channelCopy cost:" + (endTime - startTime));
}
}
streamCopy cost:2718
bufferCopy cost:2604
directBufferCopy cost:2420
mappedByteBufferCopy cost:541
mappedByteBufferCopyByPart cost:11232
channelCopy cost:330
复制代码
(1) 用户进程发起sendfile系统调用
(2) 内核基于DMA Copy将文件数据从磁盘拷贝到内核缓冲区
(3) 内核将内核缓冲区中的文件描述信息(文件描述符,数据长度)拷贝到Socket缓冲区
(4) 内核基于Socket缓冲区中的文件描述信息和DMA硬件提供的Gather Copy功能将内核缓冲区数据复制到网卡
(5) 用户进程sendfile系统调用完成并返回
mmap + write 实现零拷贝的基本流程如下:
RocketMQ中消息基于mmap实现存储和加载的逻辑写在org.apache.rocketmq.store.MappedFile中,内部实现基于nio提供的java.nio.MappedByteBuffer,基于FileChannel的map方法得到mmap的缓冲区:
// 初始化
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
查询CommitLog的消息时,基于mappedByteBuffer偏移量pos,数据大小size查询:
public SelectMappedBufferResult selectMappedBuffer(int pos, int size) {
int readPosition = getReadPosition();
// ...各种安全校验
// 返回mappedByteBuffer视图
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(pos);
ByteBuffer byteBufferNew = byteBuffer.slice();
byteBufferNew.limit(size);
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
}
tips: transientStorePoolEnable机制 Java NIO mmap的部分内存并不是常驻内存,可以被置换到交换内存(虚拟内存),RocketMQ为了提高消息发送的性能,引入了内存锁定机制,即将最近需要操作的CommitLog文件映射到内存,并提供内存锁定功能,确保这些文件始终存在内存中,该机制的控制参数就是transientStorePoolEnable
因此,MappedFile数据保存CommitLog刷盘有2种方式:
RocketMQ 基于 mmap+write 实现零拷贝,适用于业务级消息这种小块文件的数据持久化和传输 Kafka 基于 sendfile 这种零拷贝方式,适用于系统日志消息这种高吞吐量的大块文件的数据持久化和传输
tips: Kafka 的索引文件使用的是 mmap+write 方式,数据文件发送网络使用的是 sendfile 方式
Netty 的零拷贝分为两种:
https://www.jianshu.com/p/fad3339e3448
https://www.jianshu.com/p/713af3a13bde
https://juejin.cn/post/6844904095786991623
https://juejin.cn/post/6844903993844432909