Netty网络编程

2021/7/19 17:08:49

本文主要是介绍Netty网络编程,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

TCP协议

  TCP是一种面向连接的,可靠的,基于字节流的传输层通信协议.

  • 为什么要有"连接"? "连接"究竟是什么?
  • Byte stream oriented vs Message oriented

我们先来看看网络上最传统的通信,网络上的通信默认是不可靠的,例如我们从中国发一个包到美国,在这个过程,我们的包可能会丢掉,或者包的顺序是不对的,但是我们的TCP的协议是一个可靠的协议,在一个不可靠的通道上传送信息,最后的结果是可靠的.我们生活上打电话,如果信号不好,是不是会等对方确认一下是否听到,我们网络上的通信也是如此,我们往Server发一个包,我们会希望Server端会给我们返回一个回应(ACK),当我们得到这个ACK,我们Client就知道这个包发成功了,当没有收到ACK的时候,我们会去重新发这个包直到Server端给我们ACK,这是TCP最原始的一个版本,但是这个版本会有一个问题,我们可以想象,我们发一个包就得等待一次ACK,这样的效率是不是太慢了,我们发一个包到Server端返回一个包这里就会有一个等待的时间,也就是会阻塞.所以TCP就引入了一个Batch的概念,我不是一个包发过去就等待一个ACK,我是一批包发过去,等待一个ACK, 例如我们client发一个Batch,里面包含1,2,3这三个包,Server只需要返回一个ACK 3,就可以了.但是这是理想的状态,这又有一个问题,Server只收到2,3这两个包,1这个包丢了,这怎么办.当时的解决方案是"回退N步",如果有某个包丢了,Server是不会返回ACK的,Client是需要重新发送这三个包的,最后就衍生到了现在的"三次握手","四次挥手".

那为什么是"三次"而不是"五次","八次"呢?

"三次"是刚好能证明双方的通讯是没有问题的,在Client和Server的角度上来看,"三次"刚好能把自己的"发送"和"接收"能力告诉对方.

我们先来看看Socket API

(图1)

(图2)


只要是使用到Socket的技术栈,所有的语言在实现上都是大同小异的,我们一开始是初始化一个Socket,可以理解为new 一个Socket,然后我们要绑定一个端口也就是Bind()的动作,然后要监听,listen(),监听就表示我们是Server端,不是Client端,而我们的"三次握手"就是发生在Client端发送connect请求,Server端执行accept的时候,当Server端执行完accept的时候,Client和Server就可以通信了,Server端就会为这个Client分配内存等等操作,如图2所示,Server端在内部会维护两个queue(队列),sync queue就是当Client发送第一个包过来询问的时候,觉得没有问题就会放进这里,同时发送ACK,当Client又发一次包过来的时候说明连接没有问题的时候,Server端就会把这个连接放进accept queue里面,而我们应用程序从这个队列里面显式地拿一条连接出来,这个动作就是accept,一旦连接接完了,就是很传统的通信了,可以执行read()/write()操作了.

我们再来看看Java里的Socket框架

BIO & NIO

  • BIO
    • ServerSocket 这个类主要是用来接收连接的,也就是Accept()这个动作的
    • Socket 接收完连接,就是在这边操作的
  • NIO
    • ServerSocetChannel
    • SocketChannel
 1 import java.io.BufferedReader;
 2 import java.io.IOException;
 3 import java.io.InputStreamReader;
 4 import java.io.PrintWriter;
 5 import java.net.InetSocketAddress;
 6 import java.net.ServerSocket;
 7 import java.net.Socket;
 8 
 9 public class BIOServer {
10     public static void start(int port) throws IOException {
11         //socket
12         ServerSocket serverSocket = new ServerSocket();
13         //bind & listen
14         serverSocket.bind(new InetSocketAddress(port), 2);//backlog, accept queue was created in listen()
15         
16         while(true) {
17             //accept
18             final Socket clientSocket = serverSocket.accept();//block!
19             System.out.println("accept!");
20             new Thread(()-> {// or user thread pool
21                     try {
22                         BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
23                         PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
24                         String line = in.readLine();//block
25                         while(line  != null) {
26                             out.println(line);
27                             System.out.println("读到的值为:"+line);
28                             out.flush();
29                             line = in.readLine();//block
30                         }
31                         clientSocket.close();
32                     } catch (IOException e) {
33                         e.printStackTrace();
34                         try {
35                             clientSocket.close();
36                         } catch (IOException ee) {
37                             e.printStackTrace();
38                         }
39                     }
40             }).start();
41         }
42 
43     }
44     
45     public static void main( String[] args ) throws InterruptedException, IOException{
46             start(8084);
47     }
48 }

我们这个代码,通过while(true)来不断的调用这个accept的方法,因为accept是个同步阻塞的方法,不会导致程序一直在死循环的创建线程,我们测试的时候可以直接在cmd里telnet这个ip端口,然后再输入模拟通讯.BIO有个缺点就是,在高并发的情况下,例如有1000个请求,就会创建1000个线程去执行任务,如果执行readLine的时候,阻塞了,那么1000个线程就不能执行完毕,不能得到销毁.

(图3)

我们来看看图3,BIO还有一个很浪费内存的情况就是,每一条线程(socket)过来都要经过"accept","read & parse","write"这些同步阻塞的操作,

"read & parse" : TCP是面向字节流的,肯定要解决一个边界的问题,哪里到哪里是一个完整的请求,我们要把它分隔开,这就是read & parse要做的事情,例如一个包是100字节,但是我只收到99字节,我还不能交给下层去用,我必须等待接收到一个完整的包才行,少一个字节都是不对的,下层就无法解析了

"write" : write为什么也会阻塞?接收端也是有一个窗口的,会有一个缓冲区域buffer,例如Server是要发1,2,3,4这四个包的,但是Client只收到1,3,4这三个包,就要等待2这个包,那么Client就会将1,3,4这三个包缓存进Buffer里面,当Buffer满了之后,Server就需要等待Buffer有空间才能进行发送.

因为BIO会导致创建过多的线程,线程间的上下文切换是很耗费资源的,所以就衍生出了NIO.

NIO核心:不想创建过多的线程

(图4)

NIO中的Selector你可以想象成是一个线程,一个线程负责多个Socket 连接,假设有一万个连接,这一万个连接都是交给这个线程管理,这个Selector会监听这么多个Socket的状态变化,这样就不会造成浪费,因为它要么是在做事情,要么就是没事可做,这个线程就是一直轮询这些Socket,有read或write操作的时候,它是有多少就读或写多少,不会去等待.

我们来看看NIO的Selector API

  • channel.register(selector) 注册监听 channel是指Server Socket的channel
  • while(true) + select() 轮询事件
  • selectedKeys()获得SelectionKey对象,表示channel的注册信息
  • SelectionKey.attach()对selectionKey关联任何对象
  • isReadable()/isAcceptable()/isWritable() 判断事件类型
  • 事件类型: OP_ACCEPT/OP_READ/OP_WRTIE/OP_CONNECT Server端只需要看OP_ACCEPT/OP_READ/OP_WRTIE

我们再来看看代码

 1 import java.io.IOException;
 2 import java.net.InetSocketAddress;
 3 import java.nio.ByteBuffer;
 4 import java.nio.channels.SelectionKey;
 5 import java.nio.channels.Selector;
 6 import java.nio.channels.ServerSocketChannel;
 7 import java.nio.channels.SocketChannel;
 8 import java.util.Iterator;
 9 import java.util.Set;
10 
11 public class NIOServer {
12     public static void start(int port) throws IOException {
13          ServerSocketChannel serverChannel = ServerSocketChannel.open();//我们可以想象成new 一个Socket
14          serverChannel.configureBlocking(false);//nonblocking
15          InetSocketAddress address = new InetSocketAddress(port);
16          //bind & listen
17          serverChannel.bind(address);
18          
19          Selector selector = Selector.open();//可以理解成创建单个线程,轮询管理多个Socket对象
20          
21          serverChannel.register(selector, SelectionKey.OP_ACCEPT);//把服务器的Socket注册到这个Selector上,告诉Selector我感兴趣的事情OP_ACCEPT
22          //开始进入Selector的工作模式
23          while(true) {
24              selector.select();//scan ,轮询一遍,查看所管理的channel是否有时间发生
25              
26              Set<SelectionKey> readyKeys = selector.selectedKeys();//这个SelectionKey就是每一个channel绑定的东西
27              System.out.println("readyKeys.size="+readyKeys.size());
28              Iterator<SelectionKey> it = readyKeys.iterator();
29              while (it.hasNext()) {
30                  SelectionKey key = it.next();
31                  //accept,如果这个key是Acceptable事件
32                  if (key.isAcceptable()) {
33                      System.out.println("isAcceptable");
34                      ServerSocketChannel server = (ServerSocketChannel)key.channel();
35                      SocketChannel socket = server.accept();//直接获取这个channel,这不是一个阻塞的方法,一经调用马上返回
36                      System.out.println("Accept !");
37                      socket.configureBlocking(false);//don't forget to set nonblocking
38                       socket.register(selector, SelectionKey.OP_READ );//tricky,在最开始selector只关注OP_ACCEPT事件,现在就可以关注这个channel的OP_READ事件了
39                  }
40                  if (key.isReadable()) {
41                      System.out.println("isReadable");
42                      SocketChannel socket = (SocketChannel) key.channel();
43                      final ByteBuffer buffer = ByteBuffer.allocate(64);
44                      final int bytesRead = socket.read(buffer);//also nonblock, how to convert byte to frame
45                      if (bytesRead > 0) {
46                          buffer.flip();
47                          int ret = socket.write(buffer);
48                          System.out.println("ret="+ret);
49                          if (ret <=0) {
50                              //register op_write
51                              socket.register(selector, SelectionKey.OP_WRITE);
52                          }
53                          
54                          buffer.clear();
55                      } else if (bytesRead < 0) {//means connection closed
56                          key.cancel();
57                          socket.close();
58                          System.out.println("Client close");
59                      }
60                  }
61                  //不能在这里写wirtable事件,因为OP_ACCEPT-->OP_READ-->OP_WRITE,一旦到了OP_WRITE,这个while循环就会一直走里面的代码块,没有必要
62 //                 if (key.isWritable()) {
63 //                     SocketChannel socket = (SocketChannel) key.channel();
64 //                     final ByteBuffer buffer = ByteBuffer.allocate(64);
65 //                     socket.write(buffer);
66 //                     //remove 
67 //                 }
68                  
69                  
70                  it.remove();//don't forget, why need to manually move?
71              }
72          }     
73     }
74     
75         
76      public static void main( String[] args ) throws InterruptedException, IOException{
77                 start(8085);
78       }
79 }    

NIO代码裸写有很大的风险,每一步都需要很严谨,而且很难融入业务,BIO跟它比还算比较方便,后面就衍生出了Reactor模式

 

Netty是一个NIO Client Server的框架

Netty核心概念

  • Channel      -SocketChannel
  • EventLoop     -Selector
  • ChannelPipeline   
  • ChannelHandler
  • Bootstrap      -Main
  • ByteBuf      -ByteBuffer
  • Codec

ChannelPipeline,ChannelHandler面向的是业务的

(图5)

select():遍历操作

processSelectedkeys():负责一些IO时间,这里不能放阻塞的代码,不然就违反NIO的原则

runAllTasks:会去判断当前任务是否属于主线程,如果不是则放到task queue里面异步执行

(图6)

我们为了最大限度地榨干CPU的性能,通常会根据CPU的核数来创建多个EventLoop,每个EventLoop负责多个channel.

(图7)

Pipeline:流水线的概念

ChannelHandler:真正操作业务的地方,有任务来就会进入到Handler代码块 

传统的网络编程都有这几步:read,decode,compute,encode,send,每一层都可以有对应的Handler.

InboundHandler:专门处理外部进来的请求

OutboundHandler:专门处理应用的请求

Netty各组件之间关系

  • EventLoopGroup 1:n EventLoop
  • EventLoop 1:1 Thread
  • EventLoop 1:n Channel
  • Channel 1:1 ChannelPipeline
  • ChannelPipeline 1:n ChannelHandler
  • ChannelHander 1:n ChannelPipeline

粘包 & 拆包

(图8)

我们来看图8的例子,当Client发来一堆包的时候,我们要从这一堆包里面拆分出来3个完整的信息,分别是(1011),(01011),(011),为了得到完整信息,我们会去跟Client端指定协议,例如每段有用的消息要在头部发一个特殊表示的head,head包后续跟着是整段信息的包长度(length),而这个包长度是否包括head包和length包这两个包完全可以由我们自己来定.我再来看看如何指定一套完整的协议

理解LengthFieldBasedFrameDecoder

  • Length Field在帧的什么位置? -lengthFieldOffset.
  • Length Field自身长度多少? -lengthFieldLength.
  • Length Field中的长度值具体指哪部分? -lengthAdjustment.
  • Frame Decode后希望获得的内容(想要丢弃的帧部分) -initialBytesToStripe.
 1 import java.util.function.Consumer;
 2 
 3 import com.tim.nettttty.handler.EchoHandler;
 4 import com.tim.nettttty.handler.IProtocalHandler;
 5 import com.tim.nettttty.handler.PipelinePrintHandler;
 6 import com.tim.nettttty.handler.PrintInboundHandler;
 7 
 8 import io.netty.bootstrap.ServerBootstrap;
 9 import io.netty.channel.Channel;
10 import io.netty.channel.ChannelFuture;
11 import io.netty.channel.ChannelInitializer;
12 import io.netty.channel.ChannelPipeline;
13 import io.netty.channel.EventLoopGroup;
14 import io.netty.channel.nio.NioEventLoopGroup;
15 import io.netty.channel.socket.nio.NioServerSocketChannel;
16 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
17 import io.netty.handler.codec.LineBasedFrameDecoder;
18 import io.netty.handler.codec.string.StringDecoder;
19 import io.netty.handler.codec.string.StringEncoder;
20 import io.netty.util.CharsetUtil;
21 import io.netty.util.concurrent.DefaultEventExecutorGroup;
22 
23 
24 public class Server {
25     
26     private static void use( ChannelPipeline pipeline, Consumer<ChannelPipeline> strategy) {
27         strategy.accept(pipeline);
28     }
29     
30     private static Consumer<ChannelPipeline> echo = p ->{
31         p.addLast(
32                 new LineBasedFrameDecoder(80,false,false),
33                 new StringDecoder(),
34                 new EchoHandler(),
35                 new PipelinePrintHandler(),
36                 new StringEncoder(CharsetUtil.UTF_8)
37                 
38                 );
39     };
40     
41     private static Consumer<ChannelPipeline> print = p ->{
42         p.addLast(
43                 new PrintInboundHandler("id1")
44                 );
45     };
46     
47     private static Consumer<ChannelPipeline> decode = p ->{
48         p.addLast( 
49                 new LengthFieldBasedFrameDecoder(1024,2,2,-2,0))
50                 .addLast(new DefaultEventExecutorGroup(16), new IProtocalHandler())
51                 .addLast( new StringEncoder(CharsetUtil.UTF_8))
52                 ;
53     };
54     
55     private static void start(int port) throws InterruptedException {
56         EventLoopGroup bossGroup = new NioEventLoopGroup();
57         EventLoopGroup workerGroup = new NioEventLoopGroup();
58         try {
59             ServerBootstrap b = new ServerBootstrap();//Bootstrap for client
60             b.group(bossGroup, workerGroup);
61             b.channel(NioServerSocketChannel.class);//always
62             b.childHandler(new ChannelInitializer() {
63                 @Override
64                 protected void initChannel(Channel ch) throws Exception {
65                     use(ch.pipeline(), decode);
66                 }                
67             });
68             
69             ChannelFuture f = b.bind(port).sync();
70             f.channel().closeFuture().sync();
71         } finally {
72             bossGroup.shutdownGracefully();
73             workerGroup.shutdownGracefully();
74         }
75     }
76     
77     public static void main( String[] args ) throws InterruptedException{
78          start(8084);
79     }
80 }
 1 import java.util.Random;
 2 
 3 import io.netty.buffer.ByteBuf;
 4 import io.netty.channel.ChannelHandlerContext;
 5 import io.netty.channel.ChannelInboundHandlerAdapter;
 6 import io.netty.util.CharsetUtil;
 7 
 8 public class IProtocalHandler extends ChannelInboundHandlerAdapter {
 9     
10     @Override
11     public void channelRead(ChannelHandlerContext ctx, final Object msg) throws Exception {
12         int sleep = 500 * new Random().nextInt(5);
13         System.out.println("sleep:" + sleep);
14         Thread.sleep(sleep);
15         
16         final ByteBuf buf = (ByteBuf) msg;
17         char c1 = (char) buf.readByte();
18         char c2 = (char) buf.readByte();
19         
20         if (c1 != 'J' || c2 != 'W') {
21             ctx.fireExceptionCaught(new Exception("magic error"));
22             return ;
23         }
24         
25         buf.readShort();//skip length
26         
27         String outputStr = buf.toString(CharsetUtil.UTF_8);
28         System.out.println(outputStr);
29         
30         ctx.channel().writeAndFlush(outputStr+"\n");
31         
32     }
33     
34 }
 1 import java.io.BufferedReader;
 2 import java.io.DataOutputStream;
 3 import java.io.IOException;
 4 import java.io.InputStreamReader;
 5 import java.net.Socket;
 6 
 7 public class OIOClient {
 8     
 9     public static final String[] commands = new String[] {
10             "hi",
11             "i am client",
12             "helloworld",
13             "java and netty" 
14     };
15     public static void main(String[] args) throws IOException {
16         int concurrent = 1;
17         Runnable task = () ->{
18             try {
19                 Socket socket = new Socket("127.0.0.1", 8084);
20                 DataOutputStream out = new DataOutputStream(socket.getOutputStream());
21                 /**
22                  *  HEADER(2)|LENGTH(2)|BODY
23                  *  LENGTH = (self(2) + BODY),  not include header
24                  */
25                 for(String str : commands) {
26                     out.writeByte('J');
27                     out.writeByte('W');
28                     int length = str.length();
29                     out.writeShort(length*2 + 2);//why *2 here?
30                     out.writeChars(str);
31                 }
32                 out.flush();
33                 
34                 BufferedReader br=new BufferedReader(new InputStreamReader(socket.getInputStream()));
35                 String line = null;
36                 while(!((line =br.readLine())==null)){
37                     System.out.println(line);
38                 }
39                 
40                 socket.close();
41             }catch (Exception e) {
42                 e.printStackTrace();
43             }
44         };
45         
46         
47         for(int i =0;i<concurrent;i++) {
48             new Thread(task).start();
49         }
50         
51         
52     }
53 }

 



这篇关于Netty网络编程的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程