Netty-NIO

Java支持三种网络IO模型:BIO,NIO,AIO

BIO

  • 同步阻塞

  • 一个连接一个线程

  • 场景:适用于连接数量比较小且固定的架构,JDK1.4以前只有BIO

  • 编程:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public static void main(String[] args) throws IOException {
    ExecutorService executorService = Executors.newCachedThreadPool();
    ServerSocket serverSocket = new ServerSocket(9999);
    while (true) {
    Socket socket = serverSocket.accept();
    System.out.println("客户端已连接...");
    executorService.execute(() -> {
    try {
    InputStream inputStream = socket.getInputStream();
    byte[] bytes = new byte[1024];
    int length;
    while ((length = inputStream.read(bytes)) != -1) {
    System.out.println("当前线程:" + Thread.currentThread().getName()
    + ", 收到消息:" + new String(bytes, 0, length));
    }
    } catch (Exception e) {
    e.printStackTrace();
    } finally {
    try {
    socket.close();
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    });
    }
    }
    • 测试:
      • 启动main方法,
      • 打开cmd窗口,输入telnet 127.0.0.1 9999
      • 按ctrl + ]
      • 发送消息 send hello
      • 启动多个cmd发送消息,查看服务端的日志可以发现,每连接一个客户端,都需要启动一个线程,并且线程是阻塞状态

NIO

  • 同步非阻塞

  • 一个线程处理多个连接,客户端发送的连接请求会注册到多路复用器上,多路复用器轮询到连接有IO请求就进行处理

  • 场景:适用于连接数量多,且连接比较短的架构,比如聊天室,弹幕,服务器间通讯,从JDK1.4支持

三大核心

NIO三大核心部分:Channel(通道),Buffer(缓冲区),Selector(选择器)

AIO

  • 异步非阻塞
  • 采用Proactor模式,先由操作系统完成后才通知服务端程序启动线程处理
  • 场景:适用于连接数量多,且连接比较长的架构,比如相册服务器,从JDK1.7支持

NIO和BIO的区别

  • BIO以流的方式处理数据,NIO以块的方式处理
  • BIO时阻塞的,NIO是非阻塞的
  • BIO基于字节和字符流进行操作,而NIO基于Channel和Buffer进行操作,数据总是从Channel读取到Buffer,或者从Buffer写入Channel中,Selector用于监听多个Channel事件(连接请求,可读,可写),因此使用单线程可以监听多个通道

三大核心Selector,Channel,Buffer

  • 每个Channel都会对应一个buffer
  • 一个Selector对应一个线程,一个线程对应多个Channel(连接)
  • 如上图,有三个连接注册到Selector上
  • 程序切换到哪个Channel是由事件决定的
  • Selector会根据不同的事件,在各个通道上切换
  • Buffer就是一个内存块,底层是一个数组
  • 数据的读写都是通过Buffer,BIO要么是输入流要么是输出流是单向的,NIO的Buffer是可读可写的,通过flip切换,是双向的。

Buffer

NIO是面向缓冲区编程的,数据会读取到一个缓存区中,需要时可以在缓冲区向前或向后移动,提供了非阻塞式的网络

1
2
3
4
5
6
7
8
9
10
11
12
13
//创建可存放5个int数据的IntBuffer
IntBuffer intBuffer = IntBuffer.allocate(5);
//在buffer中存数据
intBuffer.put(new int[]{10, 115, 22, 7, 1});

//读取数据
//读写模式转换
intBuffer.flip();
//是否还有数据
while (intBuffer.hasRemaining()) {
//读取数据
System.out.println(intBuffer.get());
}
  • Buffer是一个顶层父类,有7个直接子类,IntBuffer,FloatBuffer,CharBuffer,DoubleBuffer,ShortBuffer,LongBuffer,ByteBuffer,都是通过对应的一个数组来存储数据,ByteBuffer最常用。

  • 通过四个参数来控制数组的数据和位置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    // Invariants: mark <= position <= limit <= capacity
    //标记
    private int mark = -1;
    //位置,下一个要被读或写的元素的索引,每次读写缓冲区的数据都会改变该值,为下次读写做准备
    private int position = 0;
    //缓存区的终点,不能对超过limit的位置进行读写,该限制可以修改
    private int limit;
    //容量,即可以容量的最大数据量,在缓冲区被创建时设置,不能改变
    private int capacity;

Channel

  • BIO中的stream是单向的,例如FileInputStream对象只能进行读取数据,NIO中的Channel是双向的,可读可写
  • 常用的Channel类:ServerSocketChannel(TCP),SocketChannel(TCP),DatagramChannel(UDP),FileChannel(文件)

FileChannel

  • FileChannel用于文件数据读写
    • read方法,从通道读取数据到缓冲区
    • write方法,从缓冲区写数据到通道
    • transferFrom,目标通道复制数据到当前通道
    • transferTo,当前通道数据复制到目标通道
  • 将字符串写入到文件示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void writeToFile() throws Exception {
FileOutputStream fileOutputStream = new FileOutputStream("D:/hello.txt");
//从输出流获取FileChannel
FileChannel fileChannel = fileOutputStream.getChannel();
//创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//将字符串放入缓冲区
byteBuffer.put("Hello World".getBytes(StandardCharsets.UTF_8));
//切换读写模式
byteBuffer.flip();
//把缓冲区数据写入Channel
fileChannel.write(byteBuffer);
//关闭流
fileOutputStream.close();
}
  • 从文件读取数据打印
1
2
3
4
5
6
7
8
9
10
11
12
13
public void readFile() throws Exception {
//创建输入流
FileInputStream fileInputStream = new FileInputStream("D:/hello.txt");
//从输入流获取Channel
FileChannel fileChannel = fileInputStream.getChannel();
//创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//将Channel中的数据读到缓冲区
int length = fileChannel.read(byteBuffer);
//打印缓冲区数据
System.out.println(new String(byteBuffer.array(), 0, length));
fileInputStream.close();
}

Selector

Selector可以检测多个注册的通道上是否有事件发生,只有在通道真正有事件发生时,才会进行读写,只需要一个线程来维护多个通道,减少了系统开销

相关方法

  • select() //阻塞
  • select(1000) //阻塞1000ms
  • wakeup() //唤醒selector
  • selectNow() //不阻塞,立马返回

注册方式

  • 创建ServerSocketChannel,并绑定端口,然后注册到Selector上,监听accept事件
  • 当有新连接时,从Selector上获取accept事件的key,通过key获取SocketChannel
  • 将SocketChannel注册到Selector上,监听read事件
  • 当通道可读时,从Selector获取可读通道,并读取通道中的数据

编写一个简单,ServerSocketChannel服务端,和SocketChannel客户端,进行客户端与客户端的实时通讯

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package cn.aacopy.tools.mytest.nio;

import org.junit.jupiter.api.Test;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;

/**
* @author iseven.yang
* @date 2022/7/8 10:13
*/
public class NioTest {

@Test
public void nioServer() throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
Thread.sleep(1000);
System.out.println("+++++++++++++++++++++++" + selector.keys().size());
if (selector.select(1000) > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
System.out.println("===============>" + selectionKeys.size());
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
System.out.println(selectionKey);
if(selectionKey.isAcceptable()) {
SocketChannel accept = serverSocketChannel.accept();
accept.configureBlocking(false);
accept.register(selector, SelectionKey.OP_READ);
}
if(selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int length = channel.read(byteBuffer);
if(length > -1) {
System.out.println(new String(byteBuffer.array(), 0, length, StandardCharsets.UTF_8));
//消息转发
Set<SelectionKey> keys = selector.keys();
Iterator<SelectionKey> channelKeys = keys.iterator();
while (channelKeys.hasNext()) {
SelectionKey eachKey = channelKeys.next();
if(eachKey != selectionKey) {
SelectableChannel selectableChannel = eachKey.channel();
if(selectableChannel instanceof SocketChannel) {
SocketChannel socketChannel = (SocketChannel) eachKey.channel();
byteBuffer.flip();
socketChannel.write(byteBuffer);
}
}
}
}
}
iterator.remove();
}
}
}
}

@Test
public void startClient() {
new Thread(() -> nioClient()).start();
new Thread(() -> nioClient()).start();
new Thread(() -> nioClient()).start();
while (true) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

@Test
public void nioClient() {
try {
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9999));
socketChannel.configureBlocking(false);

new Thread(() -> {
try {
while (true) {
Thread.sleep(200);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int length = socketChannel.read(byteBuffer);
if (length > 0) {
System.err.println(Thread.currentThread().getName() + "接收到消息:" + new String(byteBuffer.array(), 0, length, StandardCharsets.UTF_8));
}
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();

ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
Random random = new Random();
while (true) {
Thread.sleep(8000);
String msg = "你好:" + random.nextInt(100);
System.out.println(Thread.currentThread().getName() + "发送消息:" + msg);
byteBuffer.put(msg.getBytes(StandardCharsets.UTF_8));
byteBuffer.flip();
socketChannel.write(byteBuffer);
byteBuffer.clear();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

  • SelectionKey channel关心的事件
  • ServerSocketChannel 服务端监听新的客户端连接
  • SocketChannel 网络IO通道,负责具体的读写操作

零拷贝

是针对CPU而言的,不是没有copy,而是没有CPU拷贝

传统IO

4次拷贝,3次状态切换

磁盘拷贝到内核态(kernel buffer),再由内核态拷贝到用户态,再由用户态拷贝到内核态(socket buffer),再拷贝到协议栈

mmap优化

  • mmap通过内存映射,将文件映射到内核缓冲区,同时,用户空间可以共享内核空间的数据,这样在进行网络传输时,可以减少内核空间到用户空间到拷贝次数

  • 需要4次数上下文切换,3次数据拷贝

  • 适合小数据量传输

sendFile

  • 在linux2.4版本以后,数据不经过用户态,直接从kernel buffer拷贝到协议栈
  • 3次上下文切换,2次数据拷贝
  • 没有CPU拷贝
  • 适合大文件传输