态度决定一切

0%

Netty入门

前言

在介绍 Netty之前一定要先搞明白同步和异步、阻塞和非阻塞、BIO、NIO、AIO 都是什么,然后有哪些优缺点,再来看 Netty 出现的原因,有哪些优势,使用场景是什么。这样循序渐进的学习,更容易理解。

同步和异步、阻塞和非阻塞

同步和异步

同步和异步关注的是消息通信机制

同步:同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回。但是一旦调用返回,就得到返回值了。

举例:你去收发室问老大爷有没快递,老大爷说你等等,我找找,然后你就等啊等啊,可能 5s 分钟就找到了,可能一天才找到,你就一直等着知道大爷回答你

异步:异步则是相反,调用在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用

举例:你去收发室问老大爷有没快递,老大爷就说我找一下,找到了我给你打电话。然后你就直接走了,干自己的事去了

阻塞和非阻塞

阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.

阻塞:是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回

举例:你去收发室问大爷有没快递,在大爷没回复你之前,你就把自己‘挂起’啥都不干,知道大爷回复你。

非阻塞:在不能立刻得到结果之前,该调用不会阻塞当前线程

举例:你去收发室问大爷有没快递,然后你就跑去玩了,时不时的过来问一下是否有结果。

BIO/NIO/AIO是什么

BIO(Block IO)

同步阻塞,服务器实现模式是一个连接一个线程,即客户端有连接请求是服务度就需要启动一个线程进行处理。

优点:编写简单,小请求量可以接受

缺点:针对高并发,超过100000的并发连接来说该方案并不可取,它所需要的线程资源太多,而且任何时候都可能存在大量线程处于阻塞状态,等待输入或者输出数据就绪,整个方案性能太差。

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class BioServer {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket();
System.out.println("启动服务器...");
serverSocket.bind(new InetSocketAddress("127.0.0.1", 8888));
while (true) {
System.out.println("等待客户端链接...");
//accept()方法阻塞,直到有新的连接
final Socket socket = serverSocket.accept();
System.out.println("客户已连接,创建新的线程处理");
new Thread(()->{
handle(socket);
}).start();
}
}


private static void handle(Socket socket) {
try {
byte[] bytes = new byte[1024];

System.out.println("读数据..");
//read block method
int len = socket.getInputStream().read(bytes);
System.out.println(new String(bytes,0,len));

System.out.println("写数据..");
//write block method
socket.getOutputStream().write(bytes,0,len);
socket.getOutputStream().flush();
}catch (IOException ex) {
ex.printStackTrace();
}
}
}


-------

public class Client {
public static void main(String[] args) throws IOException {
Socket socket = new Socket("127.0.0.1", 8888);
socket.getOutputStream().write("hello server".getBytes());
socket.getOutputStream().flush();

System.out.println("write over, wait for msg back");
byte[] bytes = new byte[1024];
int len = socket.getInputStream().read(bytes);
System.out.println(new String(bytes,0,len));
socket.close();
}
}

NIO(No-Block IO)

同步非阻塞,服务器实现是一个线程处理多个请求连接,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮训到连接有 IO 请求就进行处理

NIO核心组件:

  1. Channel: 对应JDK底层的Socket,它除了包含基本的I/O操作,如 bind(),connect(),read(),write()之外
  2. Buffer: 缓存 Buffer
  3. Selector:是 Java 非阻塞 I/O实现的关键,将通道Channel注册在 Selector上,如果某个通道 Channel发送 读或写事件,这个Channel处于就绪状态,会被Selector轮询出来,进而进行后续I/O操作

优点:使用 Java NIO可以让我们使用较少的线程处理很多连接,较少线程意味着减少了线程创建内存分配和线程上下文切换带来的开销。

缺点:编程复杂,需要处理各种问题,API 使用难度大,在高负载下可靠和高效地处理和调度I/O操作是一项繁琐而且容易出错的任务,使用 NIO编程很容易出错。

代码示例:单线程处理连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public class NioServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress("127.0.0.1", 8888));
// set block is false
serverSocketChannel.configureBlocking(false);

System.out.println("server is started, listen on :" + serverSocketChannel.getLocalAddress());
Selector selector = Selector.open();
//注册监听客户端连接事件
//注册 accept 监听事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

while (true) {
//轮训查看,阻塞方法。监听是否有事件发生
System.out.println("轮训查看是否有监听事件发生...");
selector.select();
//监听到有哪些 key 事件发生
System.out.println("监听到事件发生...");
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while(iterator.hasNext()) {
SelectionKey key = iterator.next();
//需要将 key remove 掉不然下次轮训还会处理
iterator.remove();
handle(key);
}
}


}

private static void handle(SelectionKey key) {
if (key.isAcceptable()) {
//获得 channel
System.out.println("accept事件发生,建立一条通道");
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
try {
//建立 channel
serverSocketChannel.accept();
//设置通道是否阻塞
serverSocketChannel.configureBlocking(false);
//在通道上放置一个 read 的监听事件
System.out.println("在 channel 通道上注册 read 事件...");
serverSocketChannel.register(key.selector(),SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}

if(key.isReadable()) {
System.out.println("read事件发生...");
SocketChannel socketChannel = (SocketChannel) key.channel();
//分配内存
ByteBuffer buffer = ByteBuffer.allocate(512);
buffer.clear();
try {
//从通道读取数据
System.out.println("读取数据...");
int len = socketChannel.read(buffer);
if (len != -1) {
System.out.println(new String(buffer.array(),0,len));
}
ByteBuffer bufferToWrite = ByteBuffer.wrap("hello client".getBytes());
//向通道写数据
socketChannel.write(bufferToWrite);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

----------

public class Client {
public static void main(String[] args) throws IOException {
Socket socket = new Socket("127.0.0.1", 8888);
socket.getOutputStream().write("hello server".getBytes());
socket.getOutputStream().flush();

System.out.println("write over, wait for msg back");
byte[] bytes = new byte[1024];
int len = socket.getInputStream().read(bytes);
System.out.println(new String(bytes,0,len));
socket.close();
}
}

代码示例:线程池处理连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
public class NioPoolServer {
ExecutorService pool = Executors.newFixedThreadPool(50);

private Selector selector;

public void initServer(int port) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// set block is false
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8888));

System.out.println("server is started, listen on :" + serverSocketChannel.getLocalAddress());
selector = Selector.open();
//注册监听客户端连接事件
//注册 accept 监听事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("server run success!");
}

public static void main(String[] args) throws IOException {
NioPoolServer server = new NioPoolServer();
server.initServer(8000);
server.listen();
}



public void listen() throws IOException {
while (true) {
System.out.println("selector 轮训是否有事件..");
// block method
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while(iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
handle(key);
}

}
}

private void handle(SelectionKey key) throws IOException {
if(key.isAcceptable()) {
System.out.println("有客户端链接..");
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking(false);
channel.register(key.selector(),SelectionKey.OP_READ);
}

if(key.isReadable()) {
System.out.println("客户端写入数据...");
key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
pool.execute(new ThreadHandleChannel(key));
}
}


class ThreadHandleChannel implements Runnable {
private SelectionKey selectionKey;


public ThreadHandleChannel(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
}

@Override
public void run() {
System.out.println("读取客户端传入数据...");
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//分配内存
ByteBuffer buffer = ByteBuffer.allocate(1024);
ByteOutputStream byteOutputStream = new ByteOutputStream();
try {
//从通道读取数据
int size = 0;
while ((size = socketChannel.read(buffer)) > 0) {
buffer.flip();
byteOutputStream.write(buffer.array(),0,size);
buffer.clear();
}
byte[] content = byteOutputStream.toByteArray();

ByteBuffer writeBuffer = ByteBuffer.allocate(content.length);
writeBuffer.put(content);
writeBuffer.flip();
//write data
socketChannel.write(writeBuffer);
if(size == -1){
socketChannel.close();
return;
}
selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_READ);
selectionKey.selector().wakeup();
} catch (IOException e) {
System.out.println(e.getMessage());
}
}
}

}

AIO(Async IO)

异步非阻塞,结果返回回调接口

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class AioServer {
public static void main(String[] args) throws IOException, InterruptedException {

ExecutorService executorService = Executors.newCachedThreadPool();
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService,2);
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);
serverSocketChannel.bind(new InetSocketAddress(8888));
//此处非阻塞,执行完成后就干下一步,返回结果会调用CompletionHandler#completed来处理
serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel channel, Object attachment) {
serverSocketChannel.accept(null,this);
try {
System.out.println(channel.getRemoteAddress());
ByteBuffer allocate = ByteBuffer.allocate(1024);
channel.read(allocate, allocate, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
System.out.println(new String(attachment.array(),0,result));
channel.write(ByteBuffer.wrap("hello client".getBytes()));
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {

}
});
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});

while (true) {
Thread.sleep(1000);
}
}
}

Netty是什么

介绍

Netty是一款异步的事件驱动的网络应用程序框架,支持快速开发可维护的高性能的面向协议的服务器和客户端。Netty主要是对java的nio包进行的封装。本质是一个NIO框架,用于服务器通信相关的多种应用场景

事件驱动:例如 Client 端发送的是个连接操作或者读请求或者断开连接,Netty 服务端可以读这几种不同的事件做定制的处理

特点

  • 对 NIO 进行封装,开发者不需要关注 NIO 的底层原理,只需要调用 Netty 组件就能够完成工作。

  • 对网络调用透明,从 Socket 建立 TCP 连接到网络异常的处理都做了包装。

  • 对数据处理灵活, Netty 支持多种序列化框架,通过“ChannelHandler”机制,可以自定义“编/解码器”。

  • 对性能调优友好,Netty 提供了线程池模式以及 Buffer 的重用机制(对象池化),不需要构建复杂的多线程模型和操作队列。

核心组件

  • EventLoop:线程,负责处理IO 实践
  • EventLoopGroup:线程池,包含了多个NioEventLoop
  • Channel:类似 Socket 的抽象,提供了 Socket 的,read(),wirte(),flush()等操作,可以通过 Channel 获取这个 Channel 绑定到的NioEventLoop

  • Bootstrap:客户端的引导类应用程序网络层配置提供容器
  • ServerBootstrap:服务端的引导类应用程序网络层配置提供容器
  • ChannelHandler:从应用开发者看来,ChannelHandler是最重要的组件,其中存放用来处理进站和出站数据的用户逻辑。ChannelHandler的方法被网络事件触发,ChannelHandler可以用于几乎任何类型的操作,如将数据从一种格式转换为另一种格式或处理抛出的异常。例如,其子接口ChannelInboundHandler,接受进站的事件和数据以便被用户定义的逻辑处理,或者当响应所连接的客户端时刷新ChannelInboundHandler的数据
  • ChannelPipeline:ChannelPipeline为ChannelHandler链提供了一个容器并定义了用于沿着链传播入站和出站事件流的API。当创建Channel时,会自动创建一个附属的ChannelPipeline

设计

统一的API,适用于不同的协议(阻塞和非阻塞)
基于灵活、可扩展的事件驱动模型
高度可定制的线程模型
可靠的无连接数据Socket支持(UDP)

性能

更好的吞吐量,低延迟
更低的资源消耗
最少的内存复制

健壮性

不再因过快、过慢或超负载连接导致OutOfMemoryError
不再有在高速网络环境下NIO读写频率不一致的问题

安全性:

完整的SSL/TLS和STARTTLS的支持
可用于受限环境下,如 Applet 和OSGI

易用:

详实的Javadoc和大量的示例集

示例代码:
server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class EchoServer {
private final int port;

public EchoServer(int port) {
this.port = port;
}

public static void main(String[] args) throws InterruptedException {
new EchoServer(8888).start();
}

public void start() throws InterruptedException {
final EchoServerHandler serverHandler = new EchoServerHandler();
//创建EventLoopGroup,处理事件
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(boss,worker)
//指定所使用的NIO传输 Channel
.channel(NioServerSocketChannel.class)
//使用指定的端口设置套接字地址
.localAddress(new InetSocketAddress(port))
//添加一个EchoServerHandler到子Channel的ChannelPipeline
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//EchoServerHandler标志为@Shareable,所以我们可以总是使用同样的实例
socketChannel.pipeline().addLast(serverHandler);
}
});
//异步的绑定服务器,调用sync()方法阻塞等待直到绑定完成
ChannelFuture future = b.bind().sync();
future.channel().closeFuture().sync();
} finally {
//关闭EventLoopGroup,释放所有的资源
group.shutdownGracefully().sync();
worker.shutdownGracefully().sync();
}
}
}

@ChannelHandler.Sharable //标识一个 ChannelHandler可以被多个Channel安全地共享
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = (ByteBuf) msg;
//将消息记录到控制台
System.out.println("Server received: " + buffer.toString(CharsetUtil.UTF_8));
//将接受到消息回写给发送者
ctx.write(buffer);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
hbv ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//打印异常栈跟踪
cause.printStackTrace();
//关闭该Channel
ctx.close();
}
}

Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class EchoClient {
private final String host;
private final int port;


public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}

public void start() throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture channelFuture = b.connect().sync();
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}

public static void main(String[] args) throws InterruptedException {
new EchoClient("127.0.0.1", 8888).start();
}
}

@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
System.out.println("Client received: "+byteBuf.toString());
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks",CharsetUtil.UTF_8));
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

Netty能够做什么

  • 远程服务调用,高性能网络通信如 Dubbo
  • 开发异步、非阻塞的 TCP 网络应用程序;
  • 开发异步、非阻塞的 UDP 网络应用程序;
  • Flink,Cassandra,Spark 等
  • 开发异步文件传输应用程序;
  • 开发异步 HTTP 服务端和客户端应用程序;
  • 提供对多种编解码框架的集成,包括谷歌的 Protobuf、Jbossmarshalling、Java 序列化、压缩编解码、XML 解码、字符串编解码等,这些编解码框架可以被用户直接使用;
  • 提供形式多样的编解码基础类库,可以非常方便的实现私有协议栈编解码框架的二次定制和开发;
  • 基于职责链模式的 Pipeline-Handler 机制,用户可以非常方便的对网络事件进行拦截和定制;
  • 所有的 IO 操作都是异步的,用户可以通过 Future-Listener 机制主动 Get 结果或者由 IO 线程操作完成之后主动 Notify 结果,用户的业务线程不需要同步等待
  • IP 黑白名单控制;
  • 打印消息码流;
  • 流量控制和整形;
  • 性能统计;