Socket网络编程学习笔记 (9)TCP数据发送与接收并行
2022/2/27 20:52:34
本文主要是介绍Socket网络编程学习笔记 (9)TCP数据发送与接收并行,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
主要实现:
- 多线程收发并行
- TCP多线程收发协作
1. TCP 服务端收发并行重构
1.1 启动main方法重构
原有的main逻辑如下:
重构后如下:
/** * @ClassName Server * @Description TODO * @Author wushaopei * @Date 2022/2/27 13:00 * @Version 1.0 */ public class Server { public static void main(String[] args) throws IOException { TCPServer tcpServer = new TCPServer(TCPConstants.PORT_SERVER); boolean isSucceed = tcpServer.start(); if(!isSucceed){ System.out.println("Start TCP server failed."); } UDPProvider.start(TCPConstants.PORT_SERVER); // 键盘输入: BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in)); String str; do { str = bufferedReader.readLine(); tcpServer.broadcast(str); } while (!"00bye00".equalsIgnoreCase(str)); UDPProvider.stop(); tcpServer.stop(); } }
重构后,从while循环不断读取键盘输入信息,当输入“00bye00” 时退出读取。此处只读取键盘输入数据,客户端发送的数据在会重新拆分出来新的线程单独处理。
1.2 重构分离收发消息的操作
创建 ClientHandler.java 重构收发消息操作:
/** * @ClassName ClientHandler * @Description TODO * @Author wushaopei * @Date 2022/2/27 16:44 * @Version 1.0 */ public class ClientHandler { private final Socket socket; private final ClientReadHandler readHandler; private final ClientWriteHandler writeHandler; private final CloseNotiry closeNotiry; public ClientHandler(Socket socket, CloseNotiry closeNotiry ) throws IOException { this.socket = socket; this.readHandler = new ClientReadHandler(socket.getInputStream()); this.writeHandler = new ClientWriteHandler(socket.getOutputStream()); this.closeNotiry = closeNotiry; System.out.println("新客户链接: " + socket.getInetAddress() + "\tP:" + socket.getPort()); } }
重构接收消息的操作:
/** * 接收数据 */ class ClientReadHandler extends Thread { private boolean done = false; private final InputStream inputStream; ClientReadHandler(InputStream inputStream){ this.inputStream = inputStream; } @Override public void run(){ super.run(); try { // 得到输入流,用于接收数据 BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream)); do { // 客户端拿到一条数据 String str = socketInput.readLine(); if(str == null){ System.out.println("客户端已无法读取数据!"); // 退出当前客户端 ClientHandler.this.exitBySelf(); break; } // 打印到屏幕 System.out.println(str); }while (!done); socketInput.close(); }catch (IOException e){ if(!done){ System.out.println("连接异常断开"); ClientHandler.this.exitBySelf(); } }finally { // 连接关闭 CloseUtils.close(inputStream); } } void exit(){ done = true; CloseUtils.close(inputStream); } }
创建一个单独的线程进行接收消息,该线程不需要关闭。
重构发送消息:
/** * 发送数据 */ class ClientWriteHandler { private boolean done = false; private final PrintStream printStream; private final ExecutorService executorService; ClientWriteHandler(OutputStream outputStream) { this.printStream = new PrintStream(outputStream); // 发送消息使用线程池来实现 this.executorService = Executors.newSingleThreadExecutor(); } void exit(){ done = true; CloseUtils.close(printStream); executorService.shutdown(); } void send(String str) { executorService.execute(new WriteRunnable(str)); } class WriteRunnable implements Runnable{ private final String msg; WriteRunnable(String msg){ this.msg = msg; } @Override public void run(){ if(ClientWriteHandler.this.done){ return; } try { ClientWriteHandler.this.printStream.println(msg); }catch (Exception e){ e.printStackTrace(); } } } }
TCPServer调用发送消息的逻辑:
public void broadcast(String str) { for (ClientHandler client : clientHandlerList){ // 发送消息 client.send(str); } }
1.3 监听客户端链接逻辑重构
private List<ClientHandler> clientHandlerList = new ArrayList<>(); /** * 监听客户端链接 */ private class ClientListener extends Thread { private ServerSocket server; private boolean done = false; private ClientListener(int port) throws IOException { server = new ServerSocket(port); System.out.println("服务器信息: " + server.getInetAddress() + "\tP:" + server.getLocalPort()); } @Override public void run(){ super.run(); System.out.println("服务器准备就绪~"); // 等待客户端连接 do{ // 得到客户端 Socket client; try { client = server.accept(); }catch (Exception e){ continue; } try { // 客户端构建异步线程 ClientHandler clientHandler = new ClientHandler(client, handler -> clientHandlerList.remove(handler)); // 启动线程 clientHandler.readToPrint(); clientHandlerList.add(clientHandler); } catch (IOException e) { e.printStackTrace(); System.out.println("客户端连接异常: " + e.getMessage()); } }while (!done); System.out.println("服务器已关闭!"); } void exit(){ done = true; try { server.close(); }catch (IOException e){ e.printStackTrace(); } } }
clientHandlerList作为类变量,用于管理当前用户的信息。接收与发送都使用该变量。
1.4 Socket、流的退出与关闭:
/** * 退出、关闭流 */ public void exit(){ readHandler.exit(); writeHandler.exit(); CloseUtils.close(socket); System.out.println("客户端已退出:" + socket.getInetAddress() + "\tP:" + socket.getPort()); } /** * 发送消息 * @param str */ public void send(String str){ writeHandler.send(str); } /** * 接收消息 */ public void readToPrint() { readHandler.exit(); } /** * 接收、发送消息异常,自动关闭 */ private void exitBySelf() { exit(); closeNotiry.onSelfClosed(this); } /** * 关闭流 */ public interface CloseNotiry{ void onSelfClosed(ClientHandler handler); }
2. TCP 客户端收发并行重构
2.1 客户端 main函数重构
public static void main(String[] args) { // 定义10秒的搜索时间,如果超过10秒未搜索到,就认为服务器端没有开机 ServerInfo info = UDPSearcher.searchServer(10000); System.out.println("Server:" + info); if( info != null){ try { TCPClient.linkWith(info); }catch (IOException e){ e.printStackTrace(); } } }
2.2 客户端接收消息重构
static class ReadHandler extends Thread{ private boolean done = false; private final InputStream inputStream; ReadHandler(InputStream inputStream){ this.inputStream = inputStream; } @Override public void run(){ try { // 得到输入流,用于接收数据 BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream)); do { // 客户端拿到一条数据 String str = null; try { str = socketInput.readLine(); }catch (SocketTimeoutException e){ } if(str == null){ System.out.println("连接已关闭,无法读取数据!"); break; } // 打印到屏幕 System.out.println(str); }while (!done); socketInput.close(); }catch (IOException e){ if(!done){ System.out.println("连接异常断开:" + e.getMessage()); } }finally { // 连接关闭 CloseUtils.close(inputStream); } } void exit(){ done = true; CloseUtils.close(inputStream); } }
创建ReadHandler用单独的线程去接收服务端的消息。连接关闭则exit() 关闭客户端。
2.3 客户端发送消息重构
private static void write(Socket client) throws IOException { // 构建键盘输入流 InputStream in = System.in; BufferedReader input = new BufferedReader(new InputStreamReader(in)); // 得到Socket输出流,并转换为打印流 OutputStream outputStream = client.getOutputStream(); PrintStream socketPrintStream = new PrintStream(outputStream); boolean flag = true; do { // 键盘读取一行 String str = input.readLine(); // 发送到服务器 socketPrintStream.println(str); // 从服务器读取一行 if("00bye00".equalsIgnoreCase(str)){ break; } }while(flag); // 资源释放 socketPrintStream.close(); }
在linkWith() 中调用write() 发送方法,由 do-while 循环读取本地键盘输入信息进行发送操作。当满足 “00bye00” 时,关闭循环,关闭socket连接,结束该线程。
2.4 客户端 linkWith 主方法重构
public static void linkWith(ServerInfo info) throws IOException { Socket socket = new Socket(); // 超时时间 socket.setSoTimeout(3000); // 端口2000;超时时间300ms socket.connect(new InetSocketAddress(Inet4Address.getByName(info.getAddress()),info.getPort()));// System.out.println("已发起服务器连接,并进入后续流程~"); System.out.println("客户端信息: " + socket.getLocalAddress() + "\tP:" + socket.getLocalPort()); System.out.println("服务器信息:" + socket.getInetAddress() + "\tP:" + socket.getPort()); try { ReadHandler readHandler = new ReadHandler(socket.getInputStream()); readHandler.start(); // 发送接收数据 write(socket); }catch (Exception e){ System.out.println("异常关闭"); } // 释放资源 socket.close(); System.out.println("客户端已退出~"); }
原有的逻辑里,是调用 todo() 方法,在todo() 方法里同时进行收发操作。现在是进行读写分离。
3. TCP 收发并行重构测试:
服务端重构后日志:
J:\folder\JDK1.8\bin\java -ja…… - Channel\out\production\classes" server.Server 服务器信息: 0.0.0.0/0.0.0.0 P:30401 服务器准备就绪~ UDDProvider Started. ServerProvider receive from ip:169.254.178.74 port:169.254.178.74 port:49878 dataValid:true ServerProvider response to:169.254.178.74 port:30202 dataLen: 50 新客户链接: /169.254.178.74 P:51094 ping pong 00bye0000bye00 客户端已无法读取数据! 客户端已退出:/169.254.178.74 P:51094
客户端重构后执行日志:
J:\folder\JDK1.8\bin\java -javaagent:J:\……- Channel\out\production\classes" client.Client UDPSearcher Started. UDPSearcher start listen. UDPSearcher sendBroadcast started. UDPSearcher sendBroadcast finished. UDPSearch receive form ip:169.254.178.74 port:30201 dataValid:true UDPSearcher Finished. UDPSearcher listener finished. Server:ServerInfo{sn='4cd6143b-205f-4e80-9b22-a6eae8e85cdd', port=30401, address='169.254.178.74'} 已发起服务器连接,并进入后续流程~ 客户端信息: /169.254.178.74 P:51094 服务器信息:/169.254.178.74 P:30401 ping pong 00bye00 客户端已退出~ Process finished with exit code 0
这篇关于Socket网络编程学习笔记 (9)TCP数据发送与接收并行的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2025-01-05Easysearch 可搜索快照功能,看这篇就够了
- 2025-01-04BOT+EPC模式在基础设施项目中的应用与优势
- 2025-01-03用LangChain构建会检索和搜索的智能聊天机器人指南
- 2025-01-03图像文字理解,OCR、大模型还是多模态模型?PalliGema2在QLoRA技术上的微调与应用
- 2025-01-03混合搜索:用LanceDB实现语义和关键词结合的搜索技术(应用于实际项目)
- 2025-01-03停止思考数据管道,开始构建数据平台:介绍Analytics Engineering Framework
- 2025-01-03如果 Azure-Samples/aks-store-demo 使用了 Score 会怎样?
- 2025-01-03Apache Flink概述:实时数据处理的利器
- 2025-01-01使用 SVN合并操作时,怎么解决冲突的情况?-icode9专业技术文章分享
- 2025-01-01告别Anaconda?试试这些替代品吧