netty源码 之接收连接

2021/7/27 22:07:25

本文主要是介绍netty源码 之接收连接,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

目录

接收链接

netty的接收连接

前话

1、bossGroup 轮询链接事件

2、bossGroup 创建socketChannel

3、ServerBootstrapAcceptor注册到worker线程

4、workerGroup 将 socketChannel 注册到选择的NioEventLoop的selector

5、workerGroup 注册读事件


接收链接

NIO的读事件

 while(!stop){//循环遍历selector,休眠时间为1S,当又处于就绪状态的CHannel时,selector将返回该channel的集合。通过对Channel集合的迭代,可进行网络异步读写操作
        try {
            selector.select(1000);
            Set<SelectionKey> selectKeys = selector.selectedKeys();
            Iterator<SelectionKey> it = selectKeys.iterator();
            SelectionKey key = null;
            while (it.hasNext()){
                key = it.next();
                it.remove();
                handleInput(key);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

netty的接收连接

 

前话

轮询连接和读事件都是在NioEventLoop对象里。这里有一个run方法。

 

1、bossGroup 轮询链接事件

//NioEventLoop  ,不管是Boss 还是 Worker都是在这里监听
protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    //这里是在轮询selector 等待事件 
                    //这里是在轮询selector 等待事件
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
            }
​
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    //处理刚才监听到的事件
                    processSelectedKeys();
                } finally {
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}
​
// 处理selectedKeys
private void processSelectedKeys() {
        if (selectedKeys != null) {
            //这个是优化的方法 , 据说性能提高2%
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
 }
​
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                return;
            }
            //channel绑定的线程不是自己,那么不做处理,可以看到,这是线程安全的。
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }
​
        try {
            int readyOps = k.readyOps();
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                unsafe.finishConnect();
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }
            //如果是链接或者读事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            //注意 如果是链接的事件,走的是 AbstractNioMessageChannel.read
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

2、bossGroup 创建socketChannel

// 接收一个nio的 socketChannel , 并将其封装成NioSocketChannel , 并设置为OP_READ

//注意 如果是链接的事件,走的是 AbstractNioMessageChannel.read

//AbstractNioMessageChannel.NioMessageUnsafe.read
 public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);
​
            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        //在这里是接收了一个socketChannel 并加入到buf 
                        // SocketChannel ch = SocketUtils.accept(javaChannel());
                        // buf.add(new NioSocketChannel(this, ch));
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
​
                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }
                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    //在这里触发 注册的
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();
​
                if (exception != null) {
                    closed = closeOnReadError(exception);
​
                    pipeline.fireExceptionCaught(exception);
                }
​
                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        } 

3、ServerBootstrapAcceptor注册到worker线程

ServerBootstrapAcceptor 这是一个关键的类 , 它将Boss监听到链接事件 ,注册到worker线程池

//ServerBootstrapAcceptor 
public void channelRead(ChannelHandlerContext ctx, Object msg) {
            //这个channel 是 第二步 封装的 NioSocketChannel对象
            final Channel child = (Channel) msg;
            //这是引导程序配置的handlers
            child.pipeline().addLast(childHandler);
            //设置参数
            setChannelOptions(child, childOptions, logger);
​
            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
​
            try {
                //选择一个NioEventLoop 注册NioSocketChannel
                //next().register(channel)
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

4、workerGroup 将 socketChannel 注册到选择的NioEventLoop的selector

这块和启动流程的注册一样 ​ 注册一个 0事件(selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);)

5、workerGroup 注册读事件

DefaultChannelPipeline.HeadContext.read() , 在这里注册时间

 



这篇关于netty源码 之接收连接的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程