zero-copy       

zero-copy

零拷贝主要的任务就是避免CPU将数据从一块存储拷贝到另外一块存储,避免让CPU做大量的数据拷贝任务,减少不必要的拷贝,或者让别的组件来做这一类简单的数据传输任务,让CPU解脱出来专注于别的任务。这样就可以让系统资源的利用更加有效。并减少内核态和用户态的切换

while((n = read(diskfd, buf, BUF_SIZE)) > 0)
    write(sockfd, buf , n);

image-20191029173050503

mmap

buf = mmap(diskfd, len);
write(sockfd, buf, len);

image-20191029173205203

sendfile

#include<sys/sendfile.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

image-20191029174319368

硬件优化sendfile

image-20191029173545420

Java NIO中的FileChannal.transferTo()方法就是这样的实现,这个实现是依赖于操作系统底层的sendFile()实现的。

public void transferTo(long position, long count, WritableByteChannel target);
他底层的调用时系统调用**sendFile()**方法:

#include <sys/socket.h>
ssize_t sendfile(int out_fd, int in_fd, off_t *offset, size_t count);

Java Nio transferTo transferFrom

fis = new FileInputStream(srcPath);
fos = new FileOutputStream(dstPath);
fisChannel = fis.getChannel();
fosChannel = fos.getChannel();
long len = fisChannel.transferTo(0, fisChannel.size(), fosChannel);

java send file api 是 transferTo 方法和 transferFrom 方法。

注意:send file 是一个从磁盘到网卡驱动的 IO 优化。反过来,网卡到磁盘,是没有这个 IO 优化的。也就是说 transferFrom 方法并没有这种福利。

transferFrom

    public long transferFrom(ReadableByteChannel src,
                             long position, long count)
        throws IOException
    {
        ensureOpen();
        if (!src.isOpen())
            throw new ClosedChannelException();
        if (!writable)
            throw new NonWritableChannelException();
        if ((position < 0) || (count < 0))
            throw new IllegalArgumentException();
        if (position > size())
            return 0;
        if (src instanceof FileChannelImpl)
           return transferFromFileChannel((FileChannelImpl)src,
                                          position, count);

        return transferFromArbitraryChannel(src, position, count);
    }

如果src是普通 FileChannel 的话,就走 mmap

private static final long MAPPED_TRANSFER_SIZE = 8L*1024L*1024L;   
	
	private long transferFromFileChannel(FileChannelImpl src,
                                         long position, long count)
        throws IOException
    {
        if (!src.readable)
            throw new NonReadableChannelException();
        synchronized (src.positionLock) {
            long pos = src.position();
            long max = Math.min(count, src.size() - pos);

            long remaining = max;
            long p = pos;
            while (remaining > 0L) {
                long size = Math.min(remaining, MAPPED_TRANSFER_SIZE);
                // ## Bug: Closing this channel will not terminate the write
                MappedByteBuffer bb = src.map(MapMode.READ_ONLY, p, size);
                try {
                    long n = write(bb, position);
                    assert n > 0;
                    p += n;
                    position += n;
                    remaining -= n;
                } catch (IOException ioe) {
                    // Only throw exception if no bytes have been written
                    if (remaining == max)
                        throw ioe;
                    break;
                } finally {
                    unmap(bb);
                }
            }
            long nwritten = max - remaining;
            src.position(pos + nwritten);
            return nwritten;
        }
    }

在一个循环中,每次就是将 源文件根据 position 映射为一个 mmap,最大 8M,逐次的将数据写入的目标文件中

如果我们使用的是 SocketChannelImpl ,就会走堆外内存,也是在一个循环里进行写入,每次最大 8k。用完尽量进行回收,不是释放。

先写到堆外内存,再写入

image-20200708090712824

对外内存获取方式即ByteBuffer.allocateDirect

    /**
     * Returns a temporary buffer of at least the given size
     */
    public static ByteBuffer getTemporaryDirectBuffer(int size) {
        // If a buffer of this size is too large for the cache, there
        // should not be a buffer in the cache that is at least as
        // large. So we'll just create a new one. Also, we don't have
        // to remove the buffer from the cache (as this method does
        // below) given that we won't put the new buffer in the cache.
        if (isBufferTooLarge(size)) {
            return ByteBuffer.allocateDirect(size);
        }

        BufferCache cache = bufferCache.get();
        ByteBuffer buf = cache.get(size);
        if (buf != null) {
            return buf;
        } else {
            // No suitable buffer in the cache so we need to allocate a new
            // one. To avoid the cache growing then we remove the first
            // buffer from the cache and free it.
            if (!cache.isEmpty()) {
                buf = cache.removeFirst();
                free(buf);
            }
            return ByteBuffer.allocateDirect(size);
        }
    }

这个 bufferCache 是一个 ThreadLocal, 如下图所示 ,线程安全。类 netty 内存设计。

transferTo

  1. 如果 OS 支持 send file(windows 不支持),就执行 system call。
  2. 如果 OS 不支持,就走 mmap。
  3. 如果 mmap 失败,就走 堆外内存。
    public long transferTo(long position, long count,
                           WritableByteChannel target)
        throws IOException
    {
        ensureOpen();
        if (!target.isOpen())
            throw new ClosedChannelException();
        if (!readable)
            throw new NonReadableChannelException();
        if (target instanceof FileChannelImpl &&
            !((FileChannelImpl)target).writable)
            throw new NonWritableChannelException();
        if ((position < 0) || (count < 0))
            throw new IllegalArgumentException();
        long sz = size();
        if (position > sz)
            return 0;
        int icount = (int)Math.min(count, Integer.MAX_VALUE);
        if ((sz - position) < icount)
            icount = (int)(sz - position);

        long n;

        // Attempt a direct transfer, if the kernel supports it
        // send file
        if ((n = transferToDirectly(position, icount, target)) >= 0)
            return n;

        // Attempt a mapped transfer, but only to trusted channel types
      	// mmp
        if ((n = transferToTrustedChannel(position, icount, target)) >= 0)
            return n;

        // Slow path for untrusted targets
        // 堆外内存
        return transferToArbitraryChannel(position, icount, target);
    }

1.

transferToDirectly ->

    // Transfers from src to dst, or returns -2 if kernel can't do that
    private native long transferTo0(FileDescriptor src, long position,
                                    long count, FileDescriptor dst);

2.

transferToDirectly->

    private long transferToTrustedChannel(long position, long count,
                                          WritableByteChannel target)
        throws IOException
    {
        boolean isSelChImpl = (target instanceof SelChImpl);
        if (!((target instanceof FileChannelImpl) || isSelChImpl))
            return IOStatus.UNSUPPORTED;

        // Trusted target: Use a mapped buffer
        long remaining = count;
        while (remaining > 0L) {
            long size = Math.min(remaining, MAPPED_TRANSFER_SIZE);
            try {
                MappedByteBuffer dbb = map(MapMode.READ_ONLY, position, size);
                try {
                    // ## Bug: Closing this channel will not terminate the write
                    int n = target.write(dbb);
                    assert n >= 0;
                    remaining -= n;
                    if (isSelChImpl) {
                        // one attempt to write to selectable channel
                        break;
                    }
                    assert n > 0;
                    position += n;
                } finally {
                    unmap(dbb);
                }
            } catch (ClosedByInterruptException e) {
                // target closed by interrupt as ClosedByInterruptException needs
                // to be thrown after closing this channel.
                assert !target.isOpen();
                try {
                    close();
                } catch (Throwable suppressed) {
                    e.addSuppressed(suppressed);
                }
                throw e;
            } catch (IOException ioe) {
                // Only throw exception if no bytes have been written
                if (remaining == count)
                    throw ioe;
                break;
            }
        }
        return count - remaining;
    }

3.

transferToArbitraryChannel->同transferFromArbitraryChannel

Netty 内存使用技巧 - Zero-Copy

例 1:使用逻辑组合,代替实际复制。

例如 CompositeByteBuf:

io.netty.handler.codec.ByteToMessageDecoder#COMPOSITE_CUMULATOR

Netty 内存使用技巧 - Zero-Copy

例 2:使用包装,代替实际复制。

byte[] bytes = data.getBytes();

ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);

Netty 内存使用技巧 - Zero-Copy

例 3:调用 JDK 的 Zero-Copy 接口。

Netty 中也通过在 DefaultFileRegion 中包装了 NIO 的 FileChannel.transferTo() 方法实

现了零拷贝:io.netty.channel.DefaultFileRegion#transferTo

public class UnitTest1 {
    private static final String prefix = "~/path/to/";

    public static void main(String[] args) throws Exception {
        streamCopy("input", "output1");
        bufferCopy("input", "output2");
        directBufferCopy("input", "output3");
        mappedByteBufferCopy("input", "output4");
        mappedByteBufferCopyByPart("input", "output5");
        channelCopy("input", "output6");
    }

    /**
     * 使用stream
     */
    private static void streamCopy(String from, String to) throws IOException {
        long startTime = System.currentTimeMillis();

        File inputFile = new File(prefix + from);
        File outputFile = new File(prefix + to);

        FileInputStream fis = new FileInputStream(inputFile);
        FileOutputStream fos = new FileOutputStream(outputFile);

        byte[] bytes = new byte[1024];
        int len;
        while ((len = fis.read(bytes)) != -1) {
            fos.write(bytes, 0, len);
        }
        fos.flush();

        fis.close();
        fos.close();

        long endTime = System.currentTimeMillis();
        System.out.println("streamCopy cost:" + (endTime - startTime));
    }

    /**
     * 使用buffer
     */
    private static void bufferCopy(String from, String to) throws IOException {
        long startTime = System.currentTimeMillis();

        RandomAccessFile inputFile = new RandomAccessFile(prefix + from, "r");
        RandomAccessFile outputFile = new RandomAccessFile(prefix + to, "rw");

        FileChannel inputChannel = inputFile.getChannel();
        FileChannel outputChannel = outputFile.getChannel();

        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        while (inputChannel.read(byteBuffer) != -1) {
            byteBuffer.flip();
            outputChannel.write(byteBuffer);
            byteBuffer.clear();
        }

        inputChannel.close();
        outputChannel.close();

        long endTime = System.currentTimeMillis();
        System.out.println("bufferCopy cost:" + (endTime - startTime));
    }

    /**
     * 使用堆外内存
     */
    private static void directBufferCopy(String from, String to) throws IOException {
        long startTime = System.currentTimeMillis();

        RandomAccessFile inputFile = new RandomAccessFile(prefix + from, "r");
        RandomAccessFile outputFile = new RandomAccessFile(prefix + to, "rw");

        FileChannel inputChannel = inputFile.getChannel();
        FileChannel outputChannel = outputFile.getChannel();

        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
        while (inputChannel.read(byteBuffer) != -1) {
            byteBuffer.flip();
            outputChannel.write(byteBuffer);
            byteBuffer.clear();
        }

        inputChannel.close();
        outputChannel.close();

        long endTime = System.currentTimeMillis();
        System.out.println("directBufferCopy cost:" + (endTime - startTime));
    }

    /**
     * 内存映射全量
     */
    private static void mappedByteBufferCopy(String from, String to) throws IOException {
        long startTime = System.currentTimeMillis();

        RandomAccessFile inputFile = new RandomAccessFile(prefix + from, "r");
        RandomAccessFile outputFile = new RandomAccessFile(prefix + to, "rw");

        FileChannel inputChannel = inputFile.getChannel();
        FileChannel outputChannel = outputFile.getChannel();

        MappedByteBuffer iBuffer = inputChannel.map(MapMode.READ_ONLY, 0, inputFile.length());
        MappedByteBuffer oBuffer = outputChannel.map(MapMode.READ_WRITE, 0, inputFile.length());

        // 直接操作buffer,没有其他IO操作
        oBuffer.put(iBuffer);

        inputChannel.close();
        outputChannel.close();

        long endTime = System.currentTimeMillis();
        System.out.println("mappedByteBufferCopy cost:" + (endTime - startTime));
    }

    /**
     * 内存映射部分
     */
    private static void mappedByteBufferCopyByPart(String from, String to) throws IOException {
        long startTime = System.currentTimeMillis();

        RandomAccessFile inputFile = new RandomAccessFile(prefix + from, "r");
        RandomAccessFile outputFile = new RandomAccessFile(prefix + to, "rw");

        FileChannel inputChannel = inputFile.getChannel();
        FileChannel outputChannel = outputFile.getChannel();

        for (long i = 0; i < inputFile.length(); i += 1024) {
            long size = 1024;
            // 避免文件产生间隙
            if (i + size > inputFile.length()) {
                size = inputFile.length() - i;
            }
            MappedByteBuffer iBuffer = inputChannel.map(MapMode.READ_ONLY, i, size);
            MappedByteBuffer oBuffer = outputChannel.map(MapMode.READ_WRITE, i, size);
            oBuffer.put(iBuffer);
        }

        inputChannel.close();
        outputChannel.close();

        long endTime = System.currentTimeMillis();
        System.out.println("mappedByteBufferCopyByPart cost:" + (endTime - startTime));
    }

    /**
     * zero copy
     */
    private static void channelCopy(String from, String to) throws IOException {
        long startTime = System.currentTimeMillis();

        RandomAccessFile inputFile = new RandomAccessFile(prefix + from, "r");
        RandomAccessFile outputFile = new RandomAccessFile(prefix + to, "rw");

        FileChannel inputChannel = inputFile.getChannel();
        FileChannel outputChannel = outputFile.getChannel();

        inputChannel.transferTo(0, inputFile.length(), outputChannel);

        inputChannel.close();
        outputChannel.close();

        long endTime = System.currentTimeMillis();
        System.out.println("channelCopy cost:" + (endTime - startTime));
    }

}

streamCopy cost:2718
bufferCopy cost:2604
directBufferCopy cost:2420
mappedByteBufferCopy cost:541
mappedByteBufferCopyByPart cost:11232
channelCopy cost:330
复制代码

image-20220316144319198

(1) 用户进程发起sendfile系统调用

(2) 内核基于DMA Copy将文件数据从磁盘拷贝到内核缓冲区

(3) 内核将内核缓冲区中的文件描述信息(文件描述符,数据长度)拷贝到Socket缓冲区

(4) 内核基于Socket缓冲区中的文件描述信息和DMA硬件提供的Gather Copy功能将内核缓冲区数据复制到网卡

(5) 用户进程sendfile系统调用完成并返回

image-20220316144345015

mmap + write 实现零拷贝的基本流程如下:

RocketMQ中消息基于mmap实现存储和加载的逻辑写在org.apache.rocketmq.store.MappedFile中,内部实现基于nio提供的java.nio.MappedByteBuffer,基于FileChannel的map方法得到mmap的缓冲区:

// 初始化
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);

查询CommitLog的消息时,基于mappedByteBuffer偏移量pos,数据大小size查询:

public SelectMappedBufferResult selectMappedBuffer(int pos, int size) {
	int readPosition = getReadPosition();
	// ...各种安全校验
    
	// 返回mappedByteBuffer视图
	ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
	byteBuffer.position(pos);
	ByteBuffer byteBufferNew = byteBuffer.slice();
	byteBufferNew.limit(size);
	return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
}

tips: transientStorePoolEnable机制 Java NIO mmap的部分内存并不是常驻内存,可以被置换到交换内存(虚拟内存),RocketMQ为了提高消息发送的性能,引入了内存锁定机制,即将最近需要操作的CommitLog文件映射到内存,并提供内存锁定功能,确保这些文件始终存在内存中,该机制的控制参数就是transientStorePoolEnable

因此,MappedFile数据保存CommitLog刷盘有2种方式:

RocketMQ 基于 mmap+write 实现零拷贝,适用于业务级消息这种小块文件的数据持久化和传输 Kafka 基于 sendfile 这种零拷贝方式,适用于系统日志消息这种高吞吐量的大块文件的数据持久化和传输

tips: Kafka 的索引文件使用的是 mmap+write 方式,数据文件发送网络使用的是 sendfile 方式

Netty零拷贝

Netty 的零拷贝分为两种:

refernce

https://www.jianshu.com/p/fad3339e3448

https://www.jianshu.com/p/713af3a13bde

https://juejin.cn/post/6844904095786991623

https://juejin.cn/post/6844903993844432909