IO

2022/3/5 6:15:35

本文主要是介绍IO,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

目录
  • 网络IO
    • 1. 网络IO
      • 1.1 什么是IO流以及IO流的作用
      • 1.2 IO流的分类
    • 2. IO流的数据来源及操作的API
      • 2.1 File类简介
      • 2.2 基于文件的输入输出流
      • 2.3 缓冲流
      • 2.4 转换流
      • 2.5 对象流
    • 3. 本地IO和网络IO
      • 3.0 本地I/O操作实例
      • 3.1 Socket和ServerSocket
      • 3.2 网络通讯协议分析
      • 3.3 网络通信原理
        • 理解阻塞过程
      • 3.4 手写RPCDemo
      • 3.4 NIO
        • 五种IO模型
    • 4. 深入分析NIO
      • 4.1 NIO的新特性
      • 4.2 核心组件
      • 4.3 零拷贝
      • 4.4 Selector
    • 5. Reactor模式
  • 回顾
    • IO

网络IO

1. 网络IO

1.1 什么是IO流以及IO流的作用

I/O实际上是Input和Output,也就是输入和输出。而流其实是一种抽象的概念,它表示的是数据的无结构化传递。会被当成无结构的字节序列或字符序列。流可以当作是磁盘与内存之间的一个管道。

1.2 IO流的分类

在Java中I/O流操作很多,但是核心体系实际上就只有File(文件流)、InputStream(字节输入流)、OutputStream(字节输出流)、Reader(字符输入流)、Writer(字符输出流)。

image-20220301161506677

  • 字节流:操作的数据单元是8位的字节。InputStream、OutputStream作为抽象基类。可以处理所有的数据文件。
  • 字符流:操作的数据单元是字符。以Writer、Reader作为抽象基类。只限于处理文本的数据文件。
  • 访问管道处理流,是用来去完成管道的读写操作,用于线程间的通讯
  • 访问数组处理流,是针对内存的操作
  • 缓冲流是提供一个缓冲区,对于缓冲区的一个处理流,避免每次与磁盘的交互,提高输入输出的一个效率
  • 对象流,主要用在序列化这个机制上,将一个对象序列化后转换成一个可存储可传输的对象,传输时用到的流。
  • 转换流:将字符流转换成字节流
  • 打印流

image-20220301162730551

2. IO流的数据来源及操作的API

  • 硬盘
  • 内存
  • 键盘
  • 网络

2.1 File类简介

File类是Java中为文件进行创建、删除、重命名、移动等操作而设计的一个类

  • File(File parent, String child):根据parent抽象路径名和child路径名字符串创建一个新的File实例。
  • File(String pathname):将指定路径名转化为抽象路径名创建一个新的File实例。
  • File(String parent, String child):根据parent路径名和child路径名创建一个File实例。
  • File(URI uri):指定URI转化为抽象路径名。

2.2 基于文件的输入输出流

public static void main(String[] args) {
    File file = new File("D:\\appdata\\IODemo\\Capture001.png");
    try (
        FileOutputStream fileOutputStream = new FileOutputStream("D:\\appdata\\IODemo\\Capture002.png");
        FileInputStream fileInputStream = new FileInputStream(file)) { // 1.7之后,将流写入try()中,代码执行完毕后,会自动关闭流
        int len = 0;
        byte[] buffer = new byte[1024];
        long start = System.currentTimeMillis();
        while ((len = fileInputStream.read(buffer)) != -1) {
            fileOutputStream.write(buffer, 0, len);
        }
        long end = System.currentTimeMillis();
        System.out.println((end - start) / 1000);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

流一定要关闭,否则当前线程没执行完会一直使其被进程占用。

try (FileReader reader = new FileReader("/appdata/IODemo/IODemo");
     FileWriter writer = new FileWriter("/appdata/IODemo/IODemo.txt")) {
    int i = 0;
    char[] chars = new char[1];
    while ((i = reader.read(chars)) != -1) {
        writer.write(new String(chars, 0, i));
    }
} catch (Exception e) {
    e.printStackTrace();
}

2.3 缓冲流

缓冲流是带缓冲区的处理流,他会提供一个缓冲区,缓冲区的作用主要目的是:避免每次和硬盘打交道,能够提高输入/输出的执行效率。

BufferedInputStream

private static int DEFAULT_BUFFER_SIZE = 8192; // 默认8Kb的缓冲区
private static int MAX_BUFFER_SIZE = Integer.MAX_VALUE - 8; // 最大缓冲区大小
// 每次读取的8Kb size的字节会存储在buf[]数组中
//每次调用read()方法时,会首先去尝试从这个数组中读取,如果读取失败,会从数据源(磁盘上)去读取
protected volatile byte buf[];

// 两种构造方法最终调用该方法,带int参数的会覆盖默认的8Kb size
public BufferedInputStream(InputStream in, int size) {
    super(in);
    if (size <= 0) {
        throw new IllegalArgumentException("Buffer size <= 0");
    }
    buf = new byte[size];
}

其实缓冲流原理上是帮我们封装了8Kb大小的数据,先从磁盘读8Kb到我们内存,后由我们自己去操作这8Kb的数据,当处理完8Kb缓冲区没有了,再加载数据到缓冲区,再读到内存去处理。当我们用普通流去处理文件,将buffer[]设置的稍微大一点,一样可以达到提高效率的结果。

public static void main(String[] args) {

    try (BufferedInputStream bufferedInputStream = new BufferedInputStream(new FileInputStream("/appdata/IODemo/IODemo"));
         BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(new FileOutputStream("/appdata/IODemo/IODemo.txt"))) {
        int len = 0;
        byte[] bytes = new byte[1024];
        while ((len = bufferedInputStream.read(bytes)) != -1) {
            // System.out.println(new String(bytes, 0, len));
            bufferedOutputStream.write(bytes, 0, len);
            bufferedOutputStream.flush();
        }
    } catch (IOException e) {
        e.printStackTrace();
    }

}

将创建InputStream写入到try()中,可以帮我们实现close()关闭流的操作,这个close中包含了buffred的flush操作,如果没有关闭流,又没有手动flush(),将会丢失数据。

public void close() throws IOException {
    try (OutputStream ostream = out) {
        flush();
    }
}
try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("/appdata/IODemo/IODemo"), StandardCharsets.UTF_8))) {
    String str;
    while ((str = reader.readLine()) != null) {
        System.out.println(str);
    }
} catch (Exception e) {
    e.printStackTrace();
}

2.4 转换流

try (InputStream inputStream = new FileInputStream("/appdata/IODemo/IODemo");
     InputStreamReader reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8)) {
    char[] chars = new char[1024];
    int i;
    while ((i = reader.read(chars)) != -1) {
        System.out.println(new String(chars, 0, i));
    }
} catch (Exception e) {
    e.printStackTrace();
}

在这个转换流中,时可以指定字符集编码的。

2.5 对象流

关于序列化和反序列化这个问题,我在18年参加工作的时候,遇到过一个项目,之后就再没有用过了。当时架构还是分布式dubbo+zookeeper,但是传输报文竟然用到这个我是没想到的。

什么是序列化和反序列化?

  • 序列化是把对象的状态信息转化为可存储或传输的形式的过程,也就是把对象转化为字节序列的过程成为对象的序列化
  • 反序列化是序列化的逆向过程,把字节数组反序列化为对象。
public class UserSerializable implements Serializable {

    private static final long serialVersionUID = 8160464260217334369L;

    private String name;

    private int age;

    public void setName(String name) {
        this.name = name;
    }

    public void setAge(int age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "UserSerializable{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }

    public static void main(String[] args) {
        UserSerializable user = new UserSerializable();
        user.setAge(26);
        user.setName("Elian");
        String fileName = "/appdata/IODemo/User";
        try (FileInputStream fileInputStream = new FileInputStream(fileName);
             FileOutputStream fileOutputStream = new FileOutputStream(fileName);
             ObjectOutputStream outputStream = new ObjectOutputStream(fileOutputStream);
             ObjectInputStream objectInputStream = new ObjectInputStream(fileInputStream)
        ) {
            outputStream.writeObject(user);
            outputStream.flush();
            UserSerializable newUser = (UserSerializable) objectInputStream.readObject();
            System.out.println(newUser);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3. 本地IO和网络IO

3.0 本地I/O操作实例

public class NIOFirstDemo {
    public static void main(String[] args) {
        bio();
        bufferBio();
        nio();
        mmap();
        zeroCopy();
    }

    private static void bio() {
        try (FileInputStream bioInputStream = new FileInputStream("/appdata/IODemo/jdk api 1.8_google.CHM");
             FileOutputStream bioOutputStream = new FileOutputStream("/appdata/IODemo/jdk_bio.CHM")) {
            // bio实现copy
            long bioStart = System.currentTimeMillis();
            int len = 0;
            byte[] buffer = new byte[1024];
            while ((len = bioInputStream.read(buffer)) != -1) {
                bioOutputStream.write(buffer, 0, len);
            }
            bioOutputStream.flush();
            System.out.println(System.currentTimeMillis() - bioStart);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void bufferBio() {
        try (BufferedInputStream bioInputStream = new BufferedInputStream(new FileInputStream("/appdata/IODemo/jdk api 1.8_google.CHM"));
             BufferedOutputStream bioOutputStream = new BufferedOutputStream(new FileOutputStream("/appdata/IODemo/jdk_bufferBio.CHM"))) {
            // bio实现copy
            long bioStart = System.currentTimeMillis();
            int len = 0;
            byte[] buffer = new byte[1024];
            while ((len = bioInputStream.read(buffer)) != -1) {
                bioOutputStream.write(buffer, 0, len);
            }
            bioOutputStream.flush();
            System.out.println(System.currentTimeMillis() - bioStart);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void nio() {
        try (FileChannel inChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk api 1.8_google.CHM"), StandardOpenOption.READ);
             FileChannel outChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk_nio.CHM"),
                     StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);) {
            // nio 实现copy
            long nioStart = System.currentTimeMillis();
            int len = 0;
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            while ((len = inChannel.read(buffer)) != -1) {
                buffer.flip();
                outChannel.write(buffer);
                buffer.clear();
            }
            System.out.println(System.currentTimeMillis() - nioStart);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 依然将用户空间的
    private static void mmap() {
        try (FileChannel inChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk api 1.8_google.CHM"), StandardOpenOption.READ);
             FileChannel outChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdb_mmap.CHM"),
                     StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);) {
            long mmapStart = System.currentTimeMillis();
            MappedByteBuffer inMappedBuffer = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
            MappedByteBuffer outMappedBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());
            byte[] bytes = new byte[inMappedBuffer.limit()];
            inMappedBuffer.get(bytes);
            outMappedBuffer.put(bytes);
            System.out.println("mmap:" + (System.currentTimeMillis() - mmapStart));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void zeroCopy() {
        try(FileChannel inChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk api 1.8_google.CHM"), StandardOpenOption.READ);
            FileChannel outChannel = FileChannel.open(Paths.get("/appdata/IODemo/jdk_zeroCopy.CHM"),
                    StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);) {
            long zeroCopyStart = System.currentTimeMillis();
            inChannel.transferTo(0, inChannel.size(), outChannel);
            System.out.println(System.currentTimeMillis() - zeroCopyStart);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

实验顺序(速度由快到慢排序)

zeroCopy(零拷贝) > mmap(内存映射) > bufferedInputStream > bio(基于channle) ~= nio

zerCopy无需将文件映射到内存,mmap会将buffer读进内存,关于Buffer继续往下看4.2。

3.1 Socket和ServerSocket

// 服务端
final int DEFAULT_PORT = 9090;
try (ServerSocket serverSocket = new ServerSocket(DEFAULT_PORT)) {
    Socket socket = serverSocket.accept();// 阻塞操作,等待客户端的连接
    System.out.println("Client port:" + socket.getPort() + " has been connected!");
    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
    String str = bufferedReader.readLine();
    System.out.println("Client Content:" + str);
    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8));
    writer.write(str + "\n"); // 如果不换行,客户端会一直等待读取完
    writer.flush();
    bufferedReader.close();
    writer.close();
} catch (Exception e) {
    e.printStackTrace();
}

try (Socket socket = new Socket("localhost", 9090)) {
    OutputStream outputStream = socket.getOutputStream();
    outputStream.write("Hello Elian\n".getBytes(StandardCharsets.UTF_8)); // 不换行服务端会一直等待读取完,进入阻塞
    BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    System.out.println(reader.readLine());
} catch (Exception e) {
    e.printStackTrace();
}

3.2 网络通讯协议分析

image-20220302125049178

image-20220302125308839

客户端是怎样找到目标服务的呢?

客户端发起请求的时候,在不同的层去增加不同的协议头,在数据链路层组装目标机器的Mac地址,这个地址是通过ARP协议,我们已知目标的IP,需要获得目标的Mac地址,会发送一个广播消息,会在网段内去询问这个IP是谁,目标地址会发送自己Mac地址给到当前这个发送端,就可以去组装目标的Mac地址。那么在数据发送过程中,进入IP广播后,某个网卡就会发现,对应Mac的网卡就会把数据包收进来。

3.3 网络通信原理

本地磁盘IO通信:

image-20220302142120134

网络磁盘通信:

image-20220302142142094

两者不同在于:本地磁盘要通过DMA(直接存储访问器)将磁盘上的内容读取到内核空间缓冲区,再从内核空间缓冲区读到用户空间缓冲区进行操作。而网络IO是通过网卡中的缓冲区读取到系统内核缓冲区,如果应用进程一直没有调用socket的read()方法读取数据将数据copy到用户缓冲区,数据会一直被缓存在内核缓冲区里面。

理解阻塞过程

image-20220302143625641

accept()每次只能接收一个并处理一个socket,这样只能等上一个socket处理完才能继续处理下一个请求。BIO每次阻塞两个位置,第一个阻塞位置是accept过程,另一个阻塞过程是I/O流读写的过程。

解决办法:通过线程池进行处理。

public static void main(String[] args) {
    final int DEFAULT_PORT = 9090;
    try (ServerSocket serverSocket = new ServerSocket(DEFAULT_PORT)) {
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        while (true) {
            Socket socket = serverSocket.accept();// 阻塞操作,等待客户端的连接
            executorService.submit(new ServerSocketThread(socket));
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

public class ServerSocketThread implements Runnable {

    private Socket socket;

    public ServerSocketThread(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        System.out.println("Client port:" + socket.getPort() + " has been connected");
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            BufferedWriter writer=new BufferedWriter((new OutputStreamWriter(socket.getOutputStream())))){
            String clinetStr = reader.readLine();
            System.out.println("Client resived message: " + clinetStr);
            Thread.sleep(15000);
            writer.write("OK.\n");
        } catch(Exception e){
            e.printStackTrace();
        }
    }
}

现在还有一个缺点:

线程数取决于计算机本身的线程数,但是线程数设置太大,又会造成线程之间切换造成的资源消耗。

3.4 手写RPCDemo

RPC(Remote Procedure Call) 远程过程调用,是一种通过网络从计算机程序上请求服务,而不需要了解底层网络技术的协议。一般用来实现部署在不同机器上的系统之间的方法调用,使得程序能够像访问本地系统资源一样,通过网络传输去访问远端系统资源。

image-20220302164640587

// 1. 公共类
// 接口
public interface IHelloWorld {
    String sayHello(String content);
}

// Request
public class RpcRequest implements Serializable {
    
    private static final long serialVersionUID = -7922155162004878476L;
    
    private String className;
    private String methodName;

    private Object[] parameters;
    private Class[]  types;

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Object[] getParameters() {
        return parameters;
    }

    public void setParameters(Object[] parameters) {
        this.parameters = parameters;
    }

    public Class[] getTypes() {
        return types;
    }

    public void setTypes(Class[] types) {
        this.types = types;
    }
}

// 2. provider
// impl
public class HelloWorldImpl implements IHelloWorld {
    @Override
    public String sayHello(String content) {
        return "Hello " + content;
    }
}

// Server
public class RpcProxyServer {
    private final ExecutorService executorService = Executors.newCachedThreadPool();

    public void publisher (int port) {
        try (ServerSocket server = new ServerSocket(port)) {
            while (true) {
                final Socket socket = server.accept();
                executorService.execute(new ProcessorHandler(socket));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class ProcessorHandler implements Runnable {
    private final Socket socket;
    public ProcessorHandler(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
        ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream())) {
            RpcRequest request = (RpcRequest)objectInputStream.readObject();
            Object object = invoke(request);
            objectOutputStream.writeObject(object);
            objectOutputStream.flush();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private Object invoke(RpcRequest request) throws Exception {
        Class<?> clazz = Class.forName(request.getClassName());
        Method method = clazz.getMethod(request.getMethodName(), request.getTypes());
        if (request.getClassName().substring(request.getClassName().lastIndexOf('.') + 1).equals("IHelloWorld"))
            return method.invoke(new HelloWorldImpl(), request.getParameters());
        else
            return null;
    }
}

// 3.consumer
// client
public class App 
{
    public static void main( String[] args )
    {
        RpcProxyClient client = new RpcProxyClient();
        IHelloWorld iHelloWorld = client.clientProxy(IHelloWorld.class, "localhost", 9090);

        System.out.println(iHelloWorld.sayHello("Elian"));
    }
}

// Client
public class RpcProxyClient {
    public <T> T clientProxy(final Class<T> interfaceCls, final String host, final int port) {
        return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(), new Class<?>[]{interfaceCls}, new RemoteInvocationHandler(host, port));
    }
}
// 动态代理类
public class RemoteInvocationHandler implements InvocationHandler {

    private String host;
    private int port;

    public RemoteInvocationHandler(String host, int port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcRequest request = new RpcRequest();
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParameters(args);
        request.setTypes(method.getParameterTypes());

        RpcNetTransport rpcNetTransport = new RpcNetTransport(host, port);
        Object object = rpcNetTransport.send(request);
        return object;
    }
}
// reader读取返回报文
public class RpcNetTransport {
    private String host;
    private int port;

    public RpcNetTransport(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public Object send( RpcRequest request ) {
        try (Socket socket = new Socket(host, port);
             ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
             ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream())) {
            objectOutputStream.writeObject(request);
            objectOutputStream.flush();
            return objectInputStream.readObject();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

3.4 NIO

五种IO模型

阻塞IO,非阻塞IO,IO复用,信号驱动IO,异步IO,无论哪种IO,都是为了能够提高服务端能够并行处理的连接数量。

  1. 阻塞IO

    image-20220302231856640

应用进程调用accept()时,触发系统把数据从网卡缓冲区复制到内核空间,再从内核空间复制到用户空间,如果这个过程中,数据没有准备好,到数据返回或发生错误返回之前,用户进程一直处于阻塞状态,这个就是阻塞IO。

  1. 非阻塞IO

image-20220302232932498

非阻塞是指用户进程调用accept()后,如果数据没有准备好,会返回一个EWOULDBLOCK状态,并创建一个线程出来不断轮询返回结果。由此可见会增加CPU的消耗。

  1. IO复用

image-20220302233336221

  • select/poll

单个进程可以同时处理多个客户端的网络IO链接,我们可以把所有链接过来的客户端注册到select/poll复用器上,用一个线程或者进程来调用这个select/poll,调用这个select的时候会阻塞,阻塞的时候,内核会去监视所有select/poll所负责的socket,当其中一个socket准备好的时候,那么这个select/poll就会返回,如果再次调用这个select的时候,就会把数据从内核拷贝到用户空间。

select/poll模型最大的缺点是,他只能线性的轮询1024个链接,当然这1024个链接只有少数处于活跃状态,会导致网络的延迟。jdk1.5之前的NIO是使用这种模型。

这种模型处理的情况是:多个不同的监听,而且只是提高了并发连接数,并不是提高单个线程处理性能。连接数少的情况下,不一定比BIO效率更高。

  • epoll

对select/poll进行的优化:

  • 对单个进程所打开的连接数没有限制;
  • 利用每个文件描述符fd上的callback函数来实现异步回调,不需要轮询了;
  • mmap,可以通过内核和用户空间映射同一块内存来减少内存复制。

image-20220302235611165

image-20220304180148962

  1. 信号驱动

image-20220303000101291

  1. 异步IO

image-20220303001110759

总结:

  1. BIO是指accept()过程和读写过程会被阻塞,每个线程只能同时处理一个链接,这个时候线程是不能做别的事情的,我们通过将获取的Socket丢进线程池,来解决能够处理下个监听到的Socket的能力。如果连接数量足够多,这时候性能就会下降,会有其他连接在等待被accept()到,并把它获取的socket丢进线程池。
  2. NIO是一种非阻塞IO,当线程在某个复用器通道读取数据没有读取到,可以进行其他事情的处理,不需要等待连接。

4. 深入分析NIO

4.1 NIO的新特性

image-20220303003151530

相比较老的IO来说,所有操作都是基于Channel和Buffer来说的,可以将Channel看成是InputStream/OutputStream,应用程序与磁盘/网络缓冲区之间的一个通道,而所有数据操作都是通过缓冲区来实现的。

4.2 核心组件

通道(Channel):Java NIO数据来源,可以是网络,也可以是本地磁盘

缓冲区(Buffer):数据读写的中转区

选择器(selectors):异步IO的核心类,可以实现异步非阻塞IO,一个selectors可以管理多个通道Channel

  • Channle

FileChannle:从文件中读取数据

DatagramChannel:通过UDP协议读写网络中的数据

SocketChannel:通过TCP协议读写网络中的数据

ServerSocketChannel:监听一个TCP连接,对于每一个新的客户端连接都会创建一个SocketChannel

  • Buffer

缓冲区本质上是一块可以写入的数据,以及从中读取数据的内存,实际上也是一个byte[]数据,只是在NIO中被封装成了NIO Buffer对象,并提供了一组方法来访问这个内存块,要理解buffer的工作原理,需要知道几个属性:

private int position = 0; // 下一个位置
private int limit;	// 
private int capacity; // 容量,buffer数组初始化的最大容量
private int mark; // 标记
  • 读:position=0; limit = capacity = [size];当要添加的数据byte[].lenth > limit - position时都可以成功。

  • flip():limit=position; position=0,防止多余数据的写出

  • 写:position遍历到limit的过程

  • get():有4个重载方法,get()获取一个单字节,get(int) 获取特定位置的字节,get(byte[]) ,get(byte[], int, int)获取一段字节

  • put():有5个重载,put(byte),put(int, byte),put(ByteBuffer),put(byte[], int, int),put(byte[])

  • 堆内内存:由JVM控制的内存,堆外内存不数据JVM运行时内存,而是用的系统内存,但GC会触发回收。ByteBuffer有两个子类:HeapByteBuffer和DirectByteBuffer

    • HeapByteBuffer:JVM堆内存

    • DirectByteBuffer:堆外本地内存

    • MappedByteBuffer:mmap的内存映射,读写性能极高

      • MappedByteBuffer将文件直接映射到内存。可以映射整个文件,如果文件比较大的话可以考虑分段进行映射,只要指定文件的感兴趣部分就可以。

      • 由于MappedByteBuffer申请的是直接内存,因此不受Minor GC控制,只能在发生Full GC时才能被回收,因此Java提供了DirectByteBuffer类来改善这一情况。它是MappedByteBuffer类的子类,同时它实现了DirectBuffer接口,维护一个Cleaner对象来完成内存回收。因此它既可以通过Full GC来回收内存,也可以调用clean()方法来进行回收

      • FileChannel提供了map方法来把文件映射为内存对象:

        MappedByteBuffer outMappedBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());
        

使用堆外内存的原因

  1. 对垃圾回收停顿的改善。因为full gc时,垃圾收集器会对所有分配的堆内内存进行扫描,垃圾收集对Java应用造成的影响,跟堆的大小是成正比的。过大的堆会影响Java应用的性能。如果使用堆外内存的话,堆外内存是直接受操作系统管理。这样做的结果就是能保持一个较小的JVM堆内存,以减少垃圾收集对应用的影响。(full gc时会触发堆外空闲内存的回收。)

  2. 减少了数据从JVM拷贝到native堆的次数,在某些场景下可以提升程序I/O的性能。

  3. 可以突破JVM内存限制,操作更多的物理内存。

使用堆外内存的问题

  1. 堆外内存难以控制,如果内存泄漏,那么很难排查(VisualVM可以通过安装插件来监控堆外内存)。

  2. 堆外内存只能通过序列化和反序列化来存储,保存对象速度比堆内存慢,不适合存储很复杂的对象。一般简单的对象或者扁平化的比较适合。

  3. 直接内存的访问速度(读写方面)会快于堆内存。在申请内存空间时,堆内存速度高于直接内存。

  4. 当直接内存不足时会触发full gc,排查full gc的时候,一定要考虑。

ByteBuffer模型

初始

image-20220304225349332

read(), put()

position = n;
limit = capacity = 8;
mark = -1;

flip()

limit = position; // 用来设置限制
position = 0;
mark = -1;

mark()

mark = postion; // 标记

reset()

position = mark;

clear()实际上数据还在

position = 0;
limit = capacity;
mark = -1;

4.3 零拷贝

  • 正常情况下,将一个文件发送给另一台服务器,需要四次拷贝,首先用户进程调用cpu,从用户空间切换到内核空间,内核空间调用DMA,从硬盘/网卡copy数据到内核空间,然后cpu系统调用,从内核空间copy到用户空间,然后再从用户空间通过cpu系统调用将数据copy到内核空间,再从内核空间通过DMAcopy到网卡缓冲区/硬盘。

image-20220303151649071

  • 零拷贝:不要用户空间了,省略了内核缓冲区拷贝到用户缓冲区。(目前理解是只针对客户端)

image-20220303151849935

Linux支持的零拷贝方式:

  • mmap内存映射:MappedByteBuffer channle.map();的方式
  • sendfile:transferTo/transferFrom
  • sendfile with DMA Scatter/Gather Copy
  • splice

server

public class ZeroCopyServer {
    public static void main(String[] args) {
        try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
             RandomAccessFile writeFile = new RandomAccessFile("/appdata/IODemo/Capture001_zerCopy.png", "rw");
             FileChannel fileChannel = writeFile.getChannel();
        ) {
            long start = System.currentTimeMillis();
            serverSocketChannel.bind(new InetSocketAddress(9090));
            SocketChannel socketChannel = serverSocketChannel.accept();
            ByteBuffer buffer = ByteBuffer.allocate(8192);
            int i = 0;
            int j = 0;
            /*while ((i = socketChannel.read(buffer)) != -1) {
                buffer.flip();
                fileChannel.map(FileChannel.MapMode.READ_WRITE, j, i ).put(buffer);
                buffer.clear();
                j += i;
            }*/ // 2527ms mmap()方式
            while ((i = socketChannel.read(buffer)) != -1) {
                buffer.flip();
                fileChannel.write(buffer);
                buffer.clear();
                j += i;
            } // 4462ms 普通写
            System.out.println("传输大小:" + j + ";时间:" + (System.currentTimeMillis() - start));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

client

public class ZeroCopyClient {
    public static void main(String[] args) {
        try (SocketChannel socketChannel = SocketChannel.open();
             FileChannel fileChannel = FileChannel.open(Paths.get("/appdata/IODemo/Capture001.png"))) {
            socketChannel.connect(new InetSocketAddress("localhost", 9090));
            int position = 0;
            long size=fileChannel.size();
            while (size > 0) {
                long transfer = fileChannel.transferTo(position, fileChannel.size(), socketChannel); // 零拷贝,只从File Copy到缓冲区
                position += transfer;
                size -= transfer;
            }
            System.out.println("上传文件大小:" + position);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4.4 Selector

Selector(选择器,多路复用器)是Java NIO中能够检测一到多个NIO通道,是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。

image-20220303163415475

服务端处理过程:

  1. Selector.open()开启一个多路复用器,将ServerSocketChannel注册到selector上,这个ServerSocketServer必然不能是阻塞的,一个Channel会以4种状态注册到selector上:

    • SelectionKey.OP_ACCEPT:可接收
    • SelectionKey.OP_CONNECT:可连接
    • SelectionKey.OP_READ:可读
    • SelectionKey.OP_WRITE:可写
  2. 通过Selector的select()方法可以阻塞selection的操作,当通道中有已准备好进行I/O操作的SelectionKey,会返回这些准备好的SelectionKey的个数,下面是select()的重载方法:

    • int select():阻塞到至少有一个通道在你注册的事件上就绪了。

    • int select(long timeout):和select()一样,但最长阻塞时间为timeout毫秒。

    • int selectNow():非阻塞,只要有通道就绪就立刻返回。

    select()方法返回的int值表示有多少通道已经就绪,是自上次调用select()方法后有多少通道变成就绪状态。之前在select()调用时进入就绪的通道不会在本次调用中被记入,而在前一次select()调用进入就绪但现在已经不在处于就绪的通道也不会被记入。例如:首次调用select()方法,如果有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。

  3. 当selector.select()返回了准备好的连接数量后,可以通过selector.selectedKeys()获取所有已就绪的Channel的描述符selectedKey,这个selectKyes中包含了它所对应的selector和channel,并且能获取到当前这个selectedKey对应的channel的状态(key.isAcceptable()等)

  4. 如果当前selectedKey描述的是一个isAcceptable(),可以从当前selectedKey中将其对应的ServerSocketChannel也就是我们最初注册进来的channel获取出来,并建立accept()监听,进入阻塞(其实已经不用阻塞了,肯定是个准备好的channel,拿到SocketChannel后,将其设置为非阻塞,通过SelectionKey.OP_READ状态注册到selector中去,最后将其在selectKeys中移除。

    注册事件状态时,可用 | 连接,比如SelectionKey.OP_READ | SelectionKey.OP_ACCEPT

    image-20220304172521562

    // selector.open();
    private native int epollCreate();
    // serverSocketChannel/socketChannle.register(selector, SelectionKey.OP_ACCEPT)
    private native void epollCtl(int epfd, int opcode, int fd, int events);
    // selector.select()
    private native int epollwait(long pollAddress, int numfds, long timeout, int epfd) throws IOException;
    
  5. 再次进行selector.select(),这时会返回刚刚readable的SelectionKey,通过selector.selectionKeys()拿到后,判断其状态为isReadable(),就可以对其进行读写操作了,最后也要将其描述符移除掉

客户端处理过程:

  1. 创建SocketChannel设置为非阻塞,通过SelectionKey.OP_CONNECT状态注册到一个selector上。
  2. 通过selector的select()方法阻塞selection操作。
  3. 然后通过selector.seletedKeys()获取所有就绪的SelectionKey描述符。
  4. 如果当前描述符为isConnectable(),获取当前描述符对应的channel,判断当前channel是否已启动连接操作,但是并没有通过finishConnect()完成连接,如果为true,执行channel.finishConnect(),并将channel设置为非阻塞,写数据后以SelectionKey.OP_READ状态重新注册到selector中,最后将其在selectKeys中移除。
  5. 再次进行selector.select()操作,如果在轮询过程中获得了isReadable()的描述符selectionKey,对其进行读取,完成处理过程,最后也要将其描述符移除掉

server

public class NIOSelectorServer {
    public static void main(String[] args) {
        try (Selector selector = Selector.open();
             ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
            serverSocketChannel.configureBlocking(false); // 在多路复用器中,这个必须设置为非阻塞
            serverSocketChannel.bind(new InetSocketAddress(9090));
            // 监听连接事件
            // 将serverSocketChannel注册到selector上
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            while (true) {
                // 参数可以带时间:0:阻塞;有时间:设置一个超时时间
                selector.select(); // 阻塞所有注册到多路复用器上的事件
                Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 对于连接的SocketChannel的selectKey的集合
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove(); // 避免重复处理
                    // socket两种状态:listen    通信R/W
                    if (selectionKey.isAcceptable()) {  // 是一个连接事件
                        acceptHandler(selectionKey);
                    } else if (selectionKey.isReadable()) { // 是一个读事件
                        readHandler(selectionKey);
                    }
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void acceptHandler(SelectionKey key) {
        try {
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            SocketChannel client = ssc.accept(); // 目的是调用accept接收客户端,例如fd7
            client.configureBlocking(false);
            ByteBuffer buffer = ByteBuffer.allocate(1024);

            client.register(key.selector(), SelectionKey.OP_READ, buffer);
            System.out.println("-------------------------------------------");
            System.out.println("新客户端:" + client.getRemoteAddress());
            System.out.println("-------------------------------------------");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void readHandler(SelectionKey key) {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        try {
            channel.read(buffer);
            buffer.flip();
            System.out.println("Client Info: "+new String(buffer.array()));
            buffer.clear();
            buffer.put("Hello Client, i'm Server".getBytes());
            buffer.flip();
            channel.write(buffer);
            channel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

client

public class NIOSelectorClient {
    public static void main(String[] args) {
        try (Selector selector = Selector.open()) {
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            socketChannel.connect(new InetSocketAddress("localhost", 9090));
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            while (true) {
                selector.select();
                Set<SelectionKey> selectionKeySet = selector.selectedKeys();
                Iterator<SelectionKey> selectionKeyIterator = selectionKeySet.iterator();
                while (selectionKeyIterator.hasNext()) {
                    SelectionKey selectionKey = selectionKeyIterator.next();
                    selectionKeyIterator.remove();
                    if (selectionKey.isConnectable()) {
                        connectHandler(selector, selectionKey);
                    } else if (selectionKey.isReadable()) {
                        readHandler(selectionKey);
                    }
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void connectHandler(Selector selector, SelectionKey selectionKey) throws IOException {
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        if (channel.isConnectionPending()) {
            channel.finishConnect();
        }
        channel.configureBlocking(false);
        channel.write(ByteBuffer.wrap("Hello Server, I'm NIO Client".getBytes()));
        channel.register(selector, SelectionKey.OP_READ);
    }

    private static void readHandler(SelectionKey selectionKey) throws IOException {
        SocketChannel channel = (SocketChannel) selectionKey.channel();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        channel.read(byteBuffer);
        byteBuffer.flip();
        System.out.println("client receive message: " + new String(byteBuffer.array()));
        channel.close();
    }
}

5. Reactor模式

  • 5.1 传统阻塞式
accpet() + new Thread(() -> {

	// 业务处理

}).start();
  • 5.2 单reactor单线程处理
new SocketServerChannel().registror(selector, SelectionKey.OP_ACCEPT);

while(true) {

	selector.select();

	seletor.selectedKeys().iterator();

	while (iterator.hasnext()) {

		// 业务处理

	}

}
  • 5.3 单reactor多线程处理

在上面业务处理部分加入多线程。

  • 5.4 多reactor多线程处理

Netty,两个Grop,一个处理accept,一个处理业务

回顾

IO

  • inputStream/outputStream, reader/writer
  • BufferedIutStream/BufferedReader(readLine()) 需要flush(), FileInputStream/FileReader
  • RandomAccessFile(File/路径, "rw"),File,FileChannle.open(Paths, StrandardOpenOption)
  • Socket -> BIO + 多线程
  • NewIO(No Block IO),基于Buffer + Channel
  • ServerSocketChannle.open().bind().configurBlocking(false)
    • Selector.open()
    • serverSocketChannle.registor(selector, SelectionKey.ACCEPT)
    • selector.select()
    • selector.selectionKeys()
  • 阻塞(I/O阻塞,连接阻塞)
  • epool(多路[多个Channle注册到selector上] 复用[一个或少量的线程])
  • Netty框架,后续维护
  • 零拷贝(Kafka, rocketMQ),内存映射(mmap)


这篇关于IO的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程