网络编程IO多路复用-服务端代码
2021/5/12 20:55:21
本文主要是介绍网络编程IO多路复用-服务端代码,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
使用Java NIO完成服务端代码的编写,代码写的不完善,本文主要想体现多路复用的几种编程模型和思想。
一、单线程版本
使用单线程+NIO完成服务端代码的编写,并且使用一个Selector注册器。在一个线程中处理ServerSocketChannel的accept、SocketChannel的read、write。
- Server
创建ServerSocketChannel,并将其注册到Selector中。
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; /** * * 使用NIO多路复用处理与客户端的通信 * */ public class Server { private final Selector selector; private final ServerSocketChannel serverSocketChannel; public Server(int port) throws Exception{ selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } public void start() { while (!Thread.interrupted()) { try { if (selector.select()>0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); dispatch(selectionKey); iterator.remove(); } } } catch (IOException e) { try { selector.close(); } catch (Exception ee) { } e.printStackTrace(); } } } private void dispatch(SelectionKey selectionKey) { if (selectionKey.isAcceptable()) { new Acceptor(serverSocketChannel, selector).accept(); } else if (selectionKey.isReadable()) { new ReadHandler((SocketChannel)selectionKey.channel(), selectionKey).read(); } else if (selectionKey.isWritable()) { new WriteHandler((SocketChannel)selectionKey.channel(), selectionKey).write(); } } }
- Acceptor
接收客户端的连接。
import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; /** * * 负责接收客户端的连接 * */ public class Acceptor { private final ServerSocketChannel serverSocketChannel; private Selector selector; public Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) { this.serverSocketChannel = serverSocketChannel; this.selector = selector; } public void accept() { try { SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } catch (Exception e) { e.printStackTrace(); } } }
- ReadHandler
接收客户端发送的内容。
import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; /** * * 负责读取客户端数据,即read * */ public class ReadHandler { private final SocketChannel socketChannel; private final SelectionKey selectionKey; public ReadHandler(SocketChannel socketChannel, SelectionKey selectionKey) { this.socketChannel = socketChannel; this.selectionKey = selectionKey; } public void read() { try { // 读取客户端数据 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); socketChannel.read(byteBuffer); byteBuffer.flip(); System.out.println("客户端【"+socketChannel.getRemoteAddress()+"】发来信息:"+new String(byteBuffer.array())); // 注册写事件,给客户端回消息 selectionKey.interestOps(SelectionKey.OP_WRITE); } catch (Exception e) { // 这边如果发生了异常,要调用cancel方法,取消该selectionkey的监听 // 比如客户端端口连接,如果缺少此行代码,控制台会一直打印错误 selectionKey.cancel(); e.printStackTrace(); } } }
- WriteHandler
向客户端发送内容。
import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; /** * * 负责向客户端响应数据,即wirte * */ public class WriteHandler { private final SocketChannel socketChannel; private final SelectionKey selectionKey; public WriteHandler(SocketChannel socketChannel, SelectionKey selectionKey) { this.socketChannel = socketChannel; this.selectionKey = selectionKey; } public void write() { try { // 发送数据给客户端 String msg = "你好,欢迎"+socketChannel.getRemoteAddress(); ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); socketChannel.write(buffer); // 注册读事件,继续等待客户端的信息 selectionKey.interestOps(SelectionKey.OP_READ); } catch (Exception e) { selectionKey.cancel(); e.printStackTrace(); } } }
运行服务端:
/** * * 启动server服务 */ public class Main { public static void main(String[] args) throws Exception { Server server = new Server(8800); server.start(); } }
二、多线程线程池版本
使用线程池在不同的线程中处理ServerSocketChannel的accept、SocketChannel的read、write。仍然使用一个Selector。
- Server
43行中让当前线程暂停500,目的是让dispatch方法逻辑执行完成之后再执行iterator.remove()将当前的SelectionKey从集合中移除。因为dispatch中使用了线程池异步处理,可能会存在代码先执行了iterator.remove(),后执行dispatch逻辑,这样会导致错误。(这种sleep方式处理是存在问题的)
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * * 使用NIO多路复用处理与客户端的通信 * */ public class Server { private final Selector selector; private final ServerSocketChannel serverSocketChannel; private final ExecutorService executorService = Executors.newFixedThreadPool(1024); public Server(int port) throws Exception{ selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } public void start() { while (!Thread.interrupted()) { try { if (selector.select()>0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); dispatch(selectionKey); try { Thread.sleep(500); } catch (Exception e) {} iterator.remove(); } } } catch (IOException e) { try { selector.close(); } catch (Exception ee) { } e.printStackTrace(); } } } private void dispatch(SelectionKey selectionKey) { if (selectionKey.isAcceptable()) { executorService.execute(new Acceptor(serverSocketChannel, selector)); } else if (selectionKey.isReadable()) { executorService.execute(new ReadHandler((SocketChannel)selectionKey.channel(), selectionKey)); } else if (selectionKey.isWritable()) { executorService.execute(new WriteHandler((SocketChannel)selectionKey.channel(), selectionKey)); } } }
- Acceptor
实现Runnable接口,重写run方法。
import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; /** * * 负责接收客户端的连接 * */ public class Acceptor implements Runnable{ private final ServerSocketChannel serverSocketChannel; private Selector selector; public Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) { this.serverSocketChannel = serverSocketChannel; this.selector = selector; } public void accept() { try { SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } catch (Exception e) { e.printStackTrace(); } } @Override public void run() { accept(); } }
- ReadHandler
实现Runnable接口,重写run方法。
import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; /** * * 负责读取客户端数据,即read * */ public class ReadHandler implements Runnable{ private final SocketChannel socketChannel; private final SelectionKey selectionKey; public ReadHandler(SocketChannel socketChannel, SelectionKey selectionKey) { this.socketChannel = socketChannel; this.selectionKey = selectionKey; } public void read() { try { // 读取客户端数据 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); socketChannel.read(byteBuffer); byteBuffer.flip(); System.out.println("客户端【"+socketChannel.getRemoteAddress()+"】发来信息:"+new String(byteBuffer.array())); // 注册写事件,给客户端回消息 selectionKey.interestOps(SelectionKey.OP_WRITE); } catch (Exception e) { // 这边如果发生了异常,要调用cancel方法,取消该selectionkey的监听 // 比如客户端端口连接,如果缺少此行代码,控制台会一直打印错误 selectionKey.cancel(); e.printStackTrace(); } } @Override public void run() { read(); } }
- WriteHandler
实现Runnable接口,重写run方法。
import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; /** * * 负责向客户端响应数据,即wirte * */ public class WriteHandler implements Runnable{ private final SocketChannel socketChannel; private final SelectionKey selectionKey; public WriteHandler(SocketChannel socketChannel, SelectionKey selectionKey) { this.socketChannel = socketChannel; this.selectionKey = selectionKey; } public void write() { try { // 发送数据给客户端 String msg = "你好,欢迎"+socketChannel.getRemoteAddress(); ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); socketChannel.write(buffer); // 注册读事件,继续等待客户端的信息 selectionKey.interestOps(SelectionKey.OP_READ); } catch (Exception e) { selectionKey.cancel(); e.printStackTrace(); } } @Override public void run() { write(); } }
三、多Selector(主从)
一个Selector负责接收客户端的连接(ServerSocketChannel#accept),多个Selector负责客户端的读写数据(SocketChannel#read、SokcetChannel#write)。
- Server
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Random; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * * 使用NIO多路复用处理与客户端的通信 * */ public class Server { /** * 主selector,负责监听accept事件,处理客户端的连接 */ private final Selector masterSelector; /** * 存放从selector集合 */ private Selector[] selectors; private final ServerSocketChannel serverSocketChannel; private final ExecutorService executorService = Executors.newFixedThreadPool(1024); public Server(int port) throws Exception{ masterSelector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); serverSocketChannel.register(masterSelector, SelectionKey.OP_ACCEPT); // 创建两个从selector Selector subSelector1 = Selector.open(); Selector subSelector2 = Selector.open(); selectors = new Selector[]{subSelector1, subSelector2}; } /** * 每个Selector放进单独的线程的进行循环, * 避免select阻塞互相影响。 */ public void start() { executorService.execute(()->loop(masterSelector)); for (Selector selector : selectors) { executorService.execute(()->loop(selector)); } } /** * 因为Selector的select是阻塞方法,多个Selector在单线程中循环, * 会造成互相等待的影响,所以每个Selector都另起一个线程。 * @param selector */ private void loop(Selector selector) { while (!Thread.interrupted()) { try { // 这里最多阻塞1秒则直接返回 int select = selector.select(1000); if (select>0) { Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); dispatch(selectionKey); iterator.remove(); } } } catch (IOException e) { try { selector.close(); } catch (Exception ee) { } e.printStackTrace(); } } } private void dispatch(SelectionKey selectionKey) { if (selectionKey.isAcceptable()) { // 从子selector中随机取出一个作为参数 Random random = new Random(); int i = random.nextInt(selectors.length); Selector subSelector = selectors[i]; new Acceptor(serverSocketChannel, subSelector).accept(); } else if (selectionKey.isReadable()) { new ReadHandler((SocketChannel)selectionKey.channel(), selectionKey).read(); } else if (selectionKey.isWritable()) { new WriteHandler((SocketChannel)selectionKey.channel(), selectionKey).write(); } } }
一主二从,一个主Selector负责accept客户端连接。两个从Selector负责与客户端read、write。
其他类的代码同单线程版本。
这篇关于网络编程IO多路复用-服务端代码的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-06-26结对编程到底难不难?答案在这里
- 2024-06-19《2023版Java工程师》课程升级公告
- 2024-06-15matplotlib作图不显示3D图,怎么办?
- 2024-06-1503-Loki 日志监控
- 2024-06-1504-让LLM理解知识 -Prompt
- 2024-06-05做软件测试需要懂代码吗?
- 2024-06-0514-ShardingSphere的分布式主键实现
- 2024-06-03为什么以及如何要进行架构设计权衡?
- 2024-05-31全网首发第二弹!软考2024年5月《软件设计师》真题+解析+答案!(11-20题)
- 2024-05-31全网首发!软考2024年5月《软件设计师》真题+解析+答案!(21-30题)