PPXu

关于Netty

2019-04-14

总结,备忘...

What’s Netty

Netty是一个 Java 开源框架,一个提供异步的、事件驱动的网络应用程序框架工具,作用是封装了JAVA NIO所支持的多路复用的I/O模型,还封装了Java BIO支持的步网络通信模型,对应用程序层面屏蔽网络底层的实现细节,让应用开发者快速开发高性能,高可靠性的网络服务器和客户端程序。

Why Netty

本质上Netty是一个框架,要成为一个主流的框架,首要条件,必须是好用。
尽管Java NIO、Java AIO框架己经实现了各主流操作系统的底层支持,但比之Netty还是不够,Netty能提供的好处有更多:

  1. 对信息格式的良好封装
    基于责任链模式的编码和解码功能,提供Java原生NIO没有提供的诸如针对Protocol Buffer、JSON等信息格式的封装。
  2. 处理很多上层特有服务
    框架除了本身要兼容各类操作系统的I/O底层实现,还要提供例如客户端权限,还有上面提到的信息格式封装、简单的数据读取等上层服务。事实上,不仅NIO,netty也有对BIO框架的再次封装,Netty框架是一个面向上层业务实现进行封装的“业务层”框架。
  3. 解决Java原生NIO的bug
  • 空轮询问题
    Linux内核上出现的“不能阻塞导致CPU的使用率100%”问题
    Bug出现在 Linux 系统环境,大致是说 Java NIO 框架在实现 Linux 内核 kernel 2.6+ 中的 epoll 模型时,Selector.select(timeout)方法不能阻塞指定的 timeout 时间,导致 CPU 100% 的情况。Java官方称在JDK 7版本中问题被解决,Netty 框架在JDK 轩的环境下在 JavaNIO 框架封装之上解决了这 Bug。
  • 空指针问题
    发生于Selector.open()的NPE
    JDK-6427854 : (se) NullPointerException in Selector.open() https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6427854
    这个问题在 Netty 框架中,在负责进行 Java NIO Selector 的 NIOEventLoop 类中得到了解决。
  1. 解决半包/粘包问题
    半包/粘包问题

多路复用I/O模型

Netty的核心在于封装了多路复用的I/O模型
—-I/O模型

白话Reactor模型

经典的现实举例:
一个餐厅同时有100位客人到店,他们到店后要做的第一件事情就是点菜。但是问题来了,餐厅老板为了节约人力成本目前只有一位大堂服务员拿着唯一的一本菜单等待客人进行服务。

那么最粗暴(但是最简单)的方法是(记为方法A):无论有多少客人等待点餐,服务员都把仅有的一份菜单递给其中一位客人,然后站在客人身旁等待这个客人完成点菜过程。在记录客人点菜内容后,把点菜记录交给后堂厨师。然后再来到第二位客人面前将以上工作方式重复一次……然后是第X位客人。很明显,这样设置服务流程是不行的。因为随后的80位客人,在等待超时后就会离店,还会给差评。

于是,餐厅老板通过一种办法(记为方法B)进行了改进。老板立刻雇用99名服务员,同时印制99本新的菜单。每一名服务员手持一份菜单负责1位客人(关键不只在于服务员,还在于菜单。因为没有菜单客人也无法点菜)。在客人点完菜后,记录点菜内容交给后堂厨师(当然为了更高效,后堂厨师最好也有100名)。这样每一位客人享受的都是 VIP 服务,客人当然不会走还会给好评。但是高昂的人力成本就让人头疼了。

另外一种办法(记为方法C),就是改进点菜的方式:当客人到店后,自己领取一份菜单想好自己要点的菜后,再呼叫服务员。服务员站在客人身边记录点菜内容。将菜单递给厨师的过程也要进行改进,并不是每一份菜单记录好以后 都要交给后堂厨师。服务员可以记录好多份菜单后,同时交给厨师。那么这种方式,对于老板来说人力成本是最低的;对于客人来说,虽然不再享受VIP服务井且要等待一定的时间,但是这些都是可接受的,对于服务员来说,基本上他的时间都没有浪费,被老板榨干了每一滴血汗。

如果你是老板,会采用哪种方式呢?

到店情况:并发量。到店情况不理想时,一名服务员一份菜单,当然是足够了。所以不同的老板在不同的场合下,将会灵活选择服务员和菜单的配置。 客人:客户端请求。 点餐内容:客户端发送的实际数据。 老板:操作系统。 人力成本:系统资源。 菜单:文件状态描述符。操作系统对于一个进程能够同时持有的文件状态描述符的个数是有限制的,在Linux系统中可用$ulimit -n 命令查看这个限制值,当然也可以(并且应该)进行内核参数调整。 服务员:操作系统内核用于网络 I/O 操作的线程(内核线程)。 厨师:应用程序线程(厨房就是应用程序进程)。 餐单传递方式:包括了阻塞式和非阻塞式两种。 方法A: 阻塞式/非阻塞式,同步I/O。 方法B: 使用线程(池)进行处理的阻塞式/非阻塞式同步I/O。 方法C: 多路复用网络 I/O 模型。

Reactor线程模型

基于Reactor处理模式中,定义以下三种角色:

  • Reactor将I/O事件分派给对应的Handler
  • Acceptor处理客户端新连接,并分派请求到处理器链中
  • Handlers执行非阻塞读/写任务

单Reactor单线程模型

这是最基本的单Reactor单线程模型。其中Reactor线程,负责多路分离套接字,有新连接到来触发connect事件之后,交由Acceptor进行处理,有IO读写事件之后交给hanlder处理。

Acceptor主要任务就是构建handler,在获取到和client相关的SocketChannel之后,绑定到相应的hanlder上,对应的SocketChannel有读写事件之后,基于racotor分发,hanlder就可以处理了(所有的IO事件都绑定到selector上,由Reactor分发)。

该模型适用于处理器链中业务处理组件能快速完成的场景。不过,这种单线程模型不能充分利用多核资源,所以实际使用的不多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
   /**
* 等待事件到来,分发事件处理
*/
class Reactor implements Runnable {

private Reactor() throws Exception {

SelectionKey sk =
serverSocket.register(selector,
SelectionKey.OP_ACCEPT);
// attach Acceptor 处理新连接
sk.attach(new Acceptor());
}

public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
it.remove();
//分发事件处理
dispatch((SelectionKey) (it.next()));
}
}
} catch (IOException ex) {
//do something
}
}

void dispatch(SelectionKey k) {
// 若是连接事件获取是acceptor
// 若是IO读写事件获取是handler
Runnable runnable = (Runnable) (k.attachment());
if (runnable != null) {
runnable.run();
}
}

}
/**
* 连接事件就绪,处理连接事件
*/
class Acceptor implements Runnable {
@Override
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null) {// 注册读写
new Handler(c, selector);
}
} catch (Exception e) {

}
}
}
/**
* 处理读写业务逻辑
*/
class Handler implements Runnable {
public static final int READING = 0, WRITING = 1;
int state;
final SocketChannel socket;
final SelectionKey sk;

public Handler(SocketChannel socket, Selector sl) throws Exception {
this.state = READING;
this.socket = socket;
sk = socket.register(selector, SelectionKey.OP_READ);
sk.attach(this);
socket.configureBlocking(false);
}

@Override
public void run() {
if (state == READING) {
read();
} else if (state == WRITING) {
write();
}
}

private void read() {
process();
//下一步处理写事件
sk.interestOps(SelectionKey.OP_WRITE);
this.state = WRITING;
}

private void write() {
process();
//下一步处理读事件
sk.interestOps(SelectionKey.OP_READ);
this.state = READING;
}

/**
* task 业务处理
*/
public void process() {
//do something
}
}

单Reactor多线程模型


相对于第一种单线程的模式来说,在处理业务逻辑,也就是获取到IO的读写事件之后,交由线程池来处理,这样可以减小主reactor的性能开销,从而更专注的做事件分发工作了,从而提升整个应用的吞吐。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* 多线程处理读写业务逻辑
*/
class MultiThreadHandler implements Runnable {
public static final int READING = 0, WRITING = 1;
int state;
final SocketChannel socket;
final SelectionKey sk;

//多线程处理业务逻辑
ExecutorService executorService = Executors.
newFixedThreadPool(Runtime.getRuntime().availableProcessors());


public MultiThreadHandler(SocketChannel socket, Selector sl) throws Exception {
this.state = READING;
this.socket = socket;
sk = socket.register(selector, SelectionKey.OP_READ);
sk.attach(this);
socket.configureBlocking(false);
}

@Override
public void run() {
if (state == READING) {
read();
} else if (state == WRITING) {
write();
}
}

private void read() {
//任务异步处理
executorService.submit(() -> process());

//下一步处理写事件
sk.interestOps(SelectionKey.OP_WRITE);
this.state = WRITING;
}

private void write() {
//任务异步处理
executorService.submit(() -> process());

//下一步处理读事件
sk.interestOps(SelectionKey.OP_READ);
this.state = READING;
}

/**
* task 业务处理
*/
public void process() {
//do IO ,task,queue something
}
}

多Reactor多线程模型


第三种模型比起第二种模型,是将Reactor分成两部分:

  1. mainReactor负责监听server socket,用来处理新连接的建立,将建立的socketChannel指定注册给subReactor。
  2. subReactor维护自己的selector, 基于mainReactor 注册的socketChannel多路分离IO读写事件,读写网 络数据,对业务处理的功能,另外扔给worker线程池来完成。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    /**
    * 多work 连接事件Acceptor,处理连接事件
    */
    class MultiWorkThreadAcceptor implements Runnable {

    // cpu线程数相同多work线程
    int workCount =Runtime.getRuntime().availableProcessors();
    SubReactor[] workThreadHandlers = new SubReactor[workCount];
    volatile int nextHandler = 0;

    public MultiWorkThreadAcceptor() {
    this.init();
    }

    public void init() {
    nextHandler = 0;
    for (int i = 0; i < workThreadHandlers.length; i++) {
    try {
    workThreadHandlers[i] = new SubReactor();
    } catch (Exception e) {
    }

    }
    }

    @Override
    public void run() {
    try {
    SocketChannel c = serverSocket.accept();
    if (c != null) {// 注册读写
    synchronized (c) {
    // 顺序获取SubReactor,然后注册channel
    SubReactor work = workThreadHandlers[nextHandler];
    work.registerChannel(c);
    nextHandler++;
    if (nextHandler >= workThreadHandlers.length) {
    nextHandler = 0;
    }
    }
    }
    } catch (Exception e) {
    }
    }
    }
    /**
    * 多work线程处理读写业务逻辑
    */
    class SubReactor implements Runnable {
    final Selector mySelector;

    //多线程处理业务逻辑
    int workCount =Runtime.getRuntime().availableProcessors();
    ExecutorService executorService = Executors.newFixedThreadPool(workCount);


    public SubReactor() throws Exception {
    // 每个SubReactor 一个selector
    this.mySelector = SelectorProvider.provider().openSelector();
    }

    /**
    * 注册chanel
    *
    * @param sc
    * @throws Exception
    */
    public void registerChannel(SocketChannel sc) throws Exception {
    sc.register(mySelector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT);
    }

    @Override
    public void run() {
    while (true) {
    try {
    //每个SubReactor 自己做事件分派处理读写事件
    selector.select();
    Set<SelectionKey> keys = selector.selectedKeys();
    Iterator<SelectionKey> iterator = keys.iterator();
    while (iterator.hasNext()) {
    SelectionKey key = iterator.next();
    iterator.remove();
    if (key.isReadable()) {
    read();
    } else if (key.isWritable()) {
    write();
    }
    }

    } catch (Exception e) {

    }
    }
    }

    private void read() {
    //任务异步处理
    executorService.submit(() -> process());
    }

    private void write() {
    //任务异步处理
    executorService.submit(() -> process());
    }

    /**
    * task 业务处理
    */
    public void process() {
    //do IO ,task,queue something
    }
    }

典型的多路复用 I/O 实现

多路复用 I/O 模型在应用层工作效率比我们俗称的 I/O 模型快的本质原因是,前者不再使用操作系统级别的“同步 ”模型,OS操作进程/线程的挂起与恢复涉及的用户态与核心态的切换会引起较大的开销。Linux 操作系统环境下,多路复用 I/O 型就是技术人员通常简称的 NIO 技术。多路复用目前具体的实现主要包括四种: select、poll、epoll、kqueue。

多路复用I/O技术的优缺点

多路复用 I/O 技术由操作系统提供支持,并提供给各种高级语言进行使用。它针对阻塞式同步 I/O 和非阻塞式同步 I/O 而言有很多优势,最直接的效果就是它绕过了 I/O 在操作系统层面的 accept() 方法的阻塞问题。

  • 使用 多路复用I/O 技术 ,应用程序就可以不用再单纯使用多线程技术来解决并发 I/O 处理的性能问题了(针对操作系统内核 I/O 管理模块和应用程序进程而言都是这样的)。在实际业务的处理中,应用程序进程还是需要引入(一般由线程池支持)多线程技术的* 同一个端口可以处理多种网络协议。例如,使用 ServerSocketChannel 类的服务器端口监昕,既可以接收到TCP协议又可以接收 UDP协议内容。也就是说端口的数据接收规则只和 Selector 注册的需要关心的事件有关。
  • 操作系统级别的优化: 多路复用I/O技术可以使操作系统级别在一个端口上能够同时接受多个客户端的I/O时间,同时具有之前我们讲到的阻塞式同步I/O的所有特点。Selector的一部分作用更相当于“轮询代理器”。
  • 依然是同步I/O模型: 多路复用I/O,是基于操作系统级别对“同步I/O”的实现。这里所说的“同步I/O”,简单一句话解释就是:只有上层(包括上层的某种代理机制)系统询问“我”是否有某个事件发生了,否则“我”不会主动告诉上层系统事件发生了。

  1. 多路复用 I/O 技术最适用的是“高并发”场景,所谓高并发是指1毫秒内至少同时有成百上千个连接请求准备就绪,其他情况下多路复用 I/O 技术发挥不出它的明显优势。
  2. 使用Java NIO 进行功能实现,相对于传统的 Socket 套接宇实现要复杂一些,所以实际应用中,需要根据自己的业务需求进行技术选择。

Netty的几个重要概念

  1. Netty线程机制
    采用的多Reactor模型:Boss线程+Work线程

    Boss线程负责发现连接到服务器的新的 Channel (SocketServerChannel 的 ACCEPT事件),并且将这个 Channel经过检查后注册到 Work 连接池的某个 EventLoop 线程中.而当 Work 线程发现操作系统有它感兴趣的 I/O 事件时(例如SocketChannel的READ事件),则调用相应的ChannelHandler事件。当某个channel失效后(例如显示调用ctx.close()),这个channel将从绑定的EventLoop中剔除。

    在Netty中,如果我们使用的是Java NIO框架实现的对多路复用I/O模型的支持,那么进行这个循环的是NIOEventLoop类(processSelectedKeyPlain方法、processSelectedKey方法)。另外这个类中Netty解决了空轮询bug及Selector.open()的NPE。

    一个Work线程池的线程将按照底层封装Java NIO框架中Selector的事件状态,决定要执行ChannelHandler中的哪一个事件方法(Netty中包括了channelRegistered、channelUnregistered、channelActive、channelInactive等事件方法)。执行完成后,Work线程将一直轮询直到操作系统回复下一个它所管理的channel发生了新的I/O事件。

  2. ByteBuf
    Netty 重写了 Java NIO 框架中的缓存结构,井将这个结构应用在更上层的封装中。
    • io.netty.buffer.EmptyByteBuf:这是一个初始容量和最大容量都为0的缓存区。一般我们用这种缓存区描述“没有任何处理结果”,并将其向下 Handler 传递。
    • io.netty.buffer.ReadOnlyByteBuf:这是一个不允许任何“写请求”的只读缓存区。一般通过Unpooled.unmodifiableBuffer(ByteBuf)方法将某一个可正常读写缓存区转变而成。如果我们需要在下一个 Handler 理的过程中禁止写入任何数据到缓存区,那么就可以在这个 Ha ndler 中进行“只读缓存区”的转换。
    • io.netty.buffer.UnpooledDirectByteBuf:基本的 Java NIO 框架的 ByteBuffer 封装。直接使用这个缓存区实现来处理 Handler 事件。
    • io.netty.buffer.PooledByteBuf: Netty 4.x 版本的缓存新特性。主要是为了减少之前unpoolByteBuf 在创建和销毁时的 GC 时间。
  3. Channel
    Channel可译为通道。你可以使用 Java NIO 中的 Channel 去初步理解它,但实际上它的意义和 Java NIO 中的通道意义还不 样。我们可以解释成 “更抽象、更丰富”。
    • Netty中的channel专门指网络通信,不同于Java NIO中的Channel,后者还有指类似FileChannel本地文件的I/O通道。
    • Netty更加抽象,它不仅封装了多路复用I/O模型,还封装了Java BIO模型。
  4. ChannelPipeline和ChannelHandler
    Netty 中的每一个 Channel ,都有一个独立的 ChannelPipeline 中文名称为“通道水管”。只不过这个水管是双向的,里面流淌着数据,数据可以通过这个“水管”流入到服务器,也可以通过这个“水管”从服务器流出。
    (1) 责任链和适配器的应用
    (2) ChannellnboundHandler 类举例
    • HttpRequestDecoder
    • ByteArrayDecoder
    • DelimiterBasedFrameDecoder
    • ProtobuIDecoder和ProtobufVarint32FrameDecoder等标准数据格式解析处理器
    (3) ChannelOutboundHandler 类举例
    • HttpResponseEncoder
    • ByteArrayEncoder
    • ProtobutEncoder、ProtobufVarint32LengthFieldPrepender、MarshallingEncoder、JZlibEncoder等。

半包/粘包问题

在TCP连接中,指令和指令之间没有间隔,接收方可能为了接收两条连贯的指令,一共做了三次的接收,而第二次接收会收到一部分的包1的部分内容和包2的部分内容。

半包是指,接收方应用程序在接收信息时,没有收到一个完整的信息格式块。粘包是指,接收方应用程序在接收信息时,除了接收到发送方应用程序发送的某一个完整数据信息描述,还接收到了发送方应用程序发送的下一个数据信息的一部分。

半包和粘包问题产生的根本原因是TCP本质上没有“数据块”的概念,而是一连串的数据流。在应用程序层面上、业务层面上,我们自行定义的“数据块”在TCP层面上并不被协议认可。

这个问题只会发生在TCP协议进行连续发送数据时(TCP长连接),由于UDP都是有边界的数据报,所以UDP不会出现这个问题。而TCP短连接也不会出现这问题,因为发送完一个指令信息后连接就断开了,不会再发送第二个指令数据。

半包/粘包是一个应用层问题。要解决半包/粘包问题,就是在应用程序层面建立协商一致的信息还原依据。常见的有两种方式: * 消息定长 即保证每一个完整的信息描述的长度都是一定的,这样无论 TCP/IP 协议如何进行分片,数据接收方都可以按照固定长度进行消息的还原。 * 增加分隔符 在完整的一块数据结束后增加协商一致的分隔符(例如增加一个回车符)

Netty提供了多种解码器的封装帮助解决半包/粘包问题。 > * FixedLengthFrameDecoder > * DelimiterBasedFrameDecoder > * LineBasedFrameDecoder > * 甚至针对不同的数据格式, Netty都提供了半包和粘包问题的现成解决方式。例如 ProtobuN arint32FrameDecoder 解码器,就是专门解决 Protobuf 数据格式在 TCP 长连接传输时的半包问题的。

Reference

《高性能服务系统构建与实战》–银文杰/编著
【NIO系列】——之Reactor模型

扫描二维码,分享此文章