Netty学习笔记(4) Netty源码 - accept 和 read流程
2022/1/30 14:04:53
本文主要是介绍Netty学习笔记(4) Netty源码 - accept 和 read流程,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
文章目录
- 前言
- 1. nio中的accept回顾
- 2. netty中的accept流程
- 1. int localRead = doReadMessages(readBuf)
- 2. pipeline.fireChannelRead(readBuf.get(i))
- 1. childGroup.register(child).addListener(new ChannelFutureListener()
- 3. netty 中的 read 流程
前言
笔记基于黑马的Netty教学讲义加上自己的一些理解,感觉这是看过的视频中挺不错的,基本没有什么废话,视频地址:黑马Netty。下面是。
还是这一段代码:
public class TestSourceServer { public static void main(String[] args) { new ServerBootstrap() //EventLoop有一个线程和执行器selector,用于关注事件,解决一些任务 .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>(){ @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler()); } }).bind(8080); } }
[1] 标记
1. nio中的accept回顾
- selector.selecr() 阻塞直到事件发生
- 遍历处理 selectedKeys
- 拿到一个 key,判断类型是不是 accept
- 创建 SocketChannel,设置非阻塞
- 将 SocketChannel注册到 selector
- 设置 SocketChannel 关注 read 事件
2. netty中的accept流程
接着上一篇文章 NioServerSocketChannel 的第10点,下面这里就是进入 accept事件,可以说到了这里完成了nio 的1,2,3
点,而unsafe.read();
完成剩下的三点
下面就是 read 方法中的代码,这里面我们主要观察4、5、6
三点在哪会被执行
@Override public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { //4、创建 SocketChannel,设置非阻塞 //下面看了源码后这里是 localRead =1 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; //pipeline:拿到NioServertSocketChannel的流水线 //调用上面的handler处理 //这一步其实上面的处理器只有三个 head-accept-end //都是前面的文章说过的 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }
下面是重要的方法,作用写在上面了
1. int localRead = doReadMessages(readBuf)
@Override protected int doReadMessages(List<Object> buf) throws Exception { //建立连接,创建SocketChannel返回 SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { //创建了 NioSocketChannel,下面就是把NioSocketChannel当成一个消息放到结果里面 //到时候pipeline上的处理器会获取到这些信息并进行处理 buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; } //accept public static SocketChannel accept(final ServerSocketChannel serverSocketChannel) throws IOException { try { return AccessController.doPrivileged(new PrivilegedExceptionAction<SocketChannel>() { @Override public SocketChannel run() throws IOException { //调用serverSocketChannel.accept把连接建立完成 return serverSocketChannel.accept(); } }); } catch (PrivilegedActionException e) { throw (IOException) e.getCause(); } }
总结:这个方法主要是
- 创建了 SocketChannel 和 NioServerSocketChannel
- 把NioServerSocketChannel作为一个消息设置到结果里面
- 下面运行到 pipeline.fireChannelRead 方法的时候就会调用handler去处理accept
- 至此,第四步完成 nio步骤
2. pipeline.fireChannelRead(readBuf.get(i))
运行到这一步,意思就是调用 pipeline 上的 handler 来处理消息,一旦调用就会跳转到下面 ServerBootstrapAcceptor 这个 accept 处理器的 read 方法中,下面就是这个方法的流程
@Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; //设置处理器 child.pipeline().addLast(childHandler); //下面设置一些参数 setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); try { //这时比较重要的一些流程,其实就是把一个新的 eventLoop //在里面找到一个 selector 来和channel进行绑定,并设置一个 //线程监听绑定的channel的事件 childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
1. childGroup.register(child).addListener(new ChannelFutureListener()
我们一层层进入这个方法
注意下面这个方法为什么不走 if ,因为我们运行到这里的时候走的线程是 NioEventGroup 里面的 ServerSocketChannel 的线程,而我们新建的 SocketChannel 和 当前的线程应该不能是同一个才对
似曾相识的 doRegister()
,这个方法在之前的文章(netty的源码中)也说过,里面的作用就是 把 nioServerSocketChannel 和 selector 绑定起来,并且没有关注事件,到这里,第五步完成,5. 将 SocketChannel注册到 selector
, nio步骤
我们在这个方法中继续运行,在 diRegister() 方法下面有一个方法,这个方法的作用就是触发我们新创建的 channel 上面的初始化事件我们继续运行之后就会来到我们编写的客户端的 initChannel 方法里面,主要的作用看名字也知道,就是添加处理器handler 的
我们继续沿着上面的代码向下走,到下面的 fireChannelActive() 方法中,这个方法就是用来关注 read 事件的
看这个方法的调用链
来到最终的调用方法,可以看到就是在这里调用了关注 read事件,至此,第六步完成 nio步骤,当然中间的调用链不用管,只是说看到这里能意识到最终确实是完成了 nio 中accept 流程的这步,只不过 netty 中对这六步做了层层的封装。
3. netty 中的 read 流程
还是这个方法,客户端连接上之后发送一条数据给服务端,注意第一次进入是accept,第二次进入才是 read,可以看到这里 readyOps 变成了 1
@Override public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } //获取 pipeline,要用到里面的 handler 处理器来处理 final ChannelPipeline pipeline = pipeline(); // 获取 ByteBuf,因为消息在这里面 final ByteBufAllocator allocator = config.getAllocator(); //allocHandle :动态调整上面的ByteBuf大小,使用直接内存,因为是 io操作,使用直接内存效率高 final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { //分配具体的 ByteBuf,分完就可以读数据了 byteBuf = allocHandle.allocate(allocator); //这个方法是读取客户端发送过来的数据 allocHandle.lastBytesRead(doReadBytes(byteBuf)); //证明读完了 if (allocHandle.lastBytesRead() <= 0) { //没有东西读了,就把 ByteBuf 释放掉 byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { // There is nothing left to read as we received an EOF. readPending = false; } break; } //读一次消息就增加一次 allocHandle.incMessagesRead(1); readPending = false; //这也是个重要的方法,意思是调用我们服务端的handler来处理发送的消息 //这个方法调用之后就会进入我们写的handler那里 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }
如有错误,欢迎指出!!!!
这篇关于Netty学习笔记(4) Netty源码 - accept 和 read流程的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-05-09flutter3.x_macos桌面os实战
- 2024-05-09Rust中的并发性:Sync 和 Send Traits
- 2024-05-08使用Ollama和OpenWebUI在CPU上玩转Meta Llama3-8B
- 2024-05-08完工标准(DoD)与验收条件(AC)究竟有什么不同?
- 2024-05-084万 star 的 NocoDB 在 sealos 上一键起,轻松把数据库编程智能表格
- 2024-05-08Mac 版Stable Diffusion WebUI的安装
- 2024-05-08解锁CodeGeeX智能问答中3项独有的隐藏技能
- 2024-05-08RAG算法优化+新增代码仓库支持,CodeGeeX的@repo功能效果提升
- 2024-05-08代码报错不用愁,CodeGeeX一键完成代码修复、错误解释的功能上线了!
- 2024-05-08今天开始程序员不用再发愁写commit message了,全部由CodeGeeX自动完成!