为什么需要发送和接收缓存

为什么需要发送和接收缓存

基于网络的应用程序都需要将接收到的数据先放入缓冲区,等一个数据包完整接收到了再传递给应用层。 大家都知道TCP是面向字节流的,发送方 send 了 n 字节,但接收方并不知道一次 read 操作收到了多少字节,可能是1,可能是n, 也可能是n-x 或 n+x (x 未知)。

发送数据也是一样,一个数据包可能只发送了一部分,剩余的放在缓冲区中在 socket 端口可写时通过 on_write 回调函数中继续发送。

这里缓冲区的设计就很有讲究,尽量避免不必要的内存分配和复制,以提高性能。它可以是一个字符队列:

发送方缓冲:从队尾追加数据,从队头取出数据发送到 socket

接收方缓冲:从队头取出数据,从队尾接收从 socket 中的数据

最简单的方法就是开辟一块内存,比如一个大数组为缓冲区,设置一个读指针 readIndex,从readIndex 位置开始读一直读到 writeIndex, 一个写指针writeIndex,数据从writeIndex 开始写一直写到 capacity。

著名的 C++ 网络编程框架 ACE 中就有 ACE_Message_Block 的设计

ACE_Message_Block 主要有读指针,写指针,数据块(ACE_Data_Block), 和连接指针(指向下一个消息体),这样就会将收到的数据串成一个链表。

再以 Netty 中的 ByteBuf 为例详细了解一下其设计思想

1)0 ~ readIndex 为无效区域

2)readIndex ~ writeIndex 为可读区域

3)writeIndex ~ capacity 为可写区域

4)capacity ~ maxCapacity 为可扩容区域

具体实现类为 AbstractByteBuf 的各个子类,主要区别在于是不是使用了内存池,是不是在堆内

内存区域主要分两类:

堆内内存: heap 堆内存

堆外内存: direct 或 native 内存

ByteBuf 实现

内存池中?

安全?

堆内?

PooledHeapByteBuf

Y

Y

Y

PooledUnsafeHeapByteBuf

Y

N

Y

PooledDirectByteBuf

Y

Y

N

PooledUnsafeDirectByteBuf

Y

N

N

UnpooledHeapByteBuf

N

Y

Y

UnpooledUnsafeHeapByteBuf

N

N

Y

UnpooledDirectByteBuf

N

Y

N

UnpooledUnsafeDirectByteBuf

N

N

N

主要方法有

方法

说明

capacity()

容量=废弃的字节数+可读字节数+可写字节数

maxCapacity()

ByteBuf 最大所能容纳的最大字节数

isWritable()

ByteBuf 是否可写, capacity() > writerIndex

writeBytes(byte[] src)

写入字节

isReadable()

ByteBuf 是否可写, writerIndex > readerIndex

readBytes(byte[] dst)

读取字节

内存的分配是交由 ByteBufAllocator 来分配的

写段代码演示一下

public static void printBufferIndex(ByteBuf buffer, String message) {

log.info("# {} -> buffer: {}, readableTypes {}, writableBytes: {}, capacity: {}",

message, buffer, buffer.readableBytes(), buffer.writableBytes(), buffer.capacity());

}

@Test

public void testByteBuf() {

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(12, 16);

buffer.writeBytes(new byte[] { 1 , 2, 3, 4, 5, 6});

printBufferIndex(buffer, "write 6 bytes");

assertTrue(buffer.readerIndex() == 0 && buffer.writerIndex() == 6);

buffer.writeBytes(new byte[] { 7, 8, 9, 10, 11 ,12, 13, 14, 15, 16});

printBufferIndex(buffer, "write 12 bytes");

assertTrue(buffer.readerIndex() == 0 && buffer.writerIndex() == 16);

int size = buffer.readableBytes();

byte[] output = new byte[size];

buffer.readBytes(output);

printBufferIndex(buffer, String.format("read %d bytes", size));

assertTrue(buffer.readerIndex() == 16 && buffer.writerIndex() == 16);

buffer.discardReadBytes();

printBufferIndex(buffer, "discardReadBytess");

assertTrue(buffer.readerIndex() == 0 && buffer.writerIndex() == 0);

}

执行结果如下

# 先写6个字节,readIndex = 0, writeIndex = 6

write 6 bytes -> buffer: PooledUnsafeDirectByteBuf(ridx: 0, widx: 6, cap: 12/16), readableTypes 6, writableBytes: 6, capacity: 12

# 再写10个字节,readIndex = 0, writeIndex = 16

write 12 bytes -> buffer: PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 16/16), readableTypes 16, writableBytes: 0, capacity: 16

# 再读16个字节,readIndex = 16, writeIndex = 16

read 16 bytes -> buffer: PooledUnsafeDirectByteBuf(ridx: 16, widx: 16, cap: 16/16), readableTypes 0, writableBytes: 0, capacity: 16

# 已经读过的字节丢弃掉,readIndex = 0, writeIndex = 0

discardReadBytess -> buffer: PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 16/16), readableTypes 0, writableBytes: 16, capacity: 16

零拷贝

除了通过读写指针来减少内存的复制,Netty 还应用了如下的技术来提高性能

Netty 接收及发送 ByteBuffer 用 DirectBuffer, 使用堆外直接内存进行 socket 读写,不需要进行字节缓冲区的二次拷贝

Netty 使用 ComposeByteBuffer ,可以聚合多个 ByteBuffer 对象,不需要通过内存拷贝的方式来合并几个小的 ByteBuffer 到一个大的 ByteBuffer

Netty 对于文件传输采用了 transferTo 方法,可以直接将文件缓冲区的数据发送到目标 Channel, 避免了通过循环 write() 的方式进行内存拷贝

相关推荐

世界杯史上十大乌龙球
BT365账户验证需要多久

世界杯史上十大乌龙球

06-30 👁️ 8880
三星官翻机的来源与购买渠道
365bet电脑版

三星官翻机的来源与购买渠道

07-18 👁️ 1830
巫覡宗教
365bet电脑版

巫覡宗教

07-30 👁️ 763