Linux IO多路复用

2022/9/9 5:23:01

本文主要是介绍Linux IO多路复用,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

https://segmentfault.com/a/1190000003063859

\ select poll epoll
操作方式 遍历 遍历 回调
底层实现 数组 链表 哈希表
IO效率 每次调用都进行线性遍历,时间复杂度为O(n) 每次调用都进行线性遍历,时间复杂度为O(n) 事件通知方式,每当fd就绪,系统注册的回调函数就会被调用,将就绪fd放到rdllist里面。时间复杂度O(1)
最大连接数 1024(x86)或 2048(x64) 无上限 无上限
fd拷贝 每次调用select,都需要把fd集合从用户态拷贝到内核态 每次调用poll,都需要把fd集合从用户态拷贝到内核态 调用epoll_ctl时拷贝进内核并保存,之后每次epoll_wait不拷贝

select(pselect)

select 直接操纵多个文件描述符的集合 fd_set

流程:

  1. 创建文件描述符的集合 fd_set
  2. 将监听的socket和客户端socket加入 fd_set
  3. select()
  4. 用 FD_ISSET 判断哪个 fd 有事件
    1. 监听的socket有事件,表示有新客户端连接请求
    2. 客户端socket有事件,有数据或连接断开

select 只有“水平触发”模式,如果报告了fd后事件没有被处理或数据没有被全部读取,那么下次select时会再次报告该fd

select函数的缺点

  1. bitmap默认大小为1024,虽然可以调整但还是有限度的
  2. 需要遍历所有描述符
  3. rset每次循环都必须重新置位为0,不可重复使用
  4. 将rset从用户态拷贝到内核态,由内核态直接判断文件描述符是否有数据的操作,这样比直接用户态判断要快。
    尽管将rset从用户态拷贝到内核态并由内核态判断是否有数据,但还是有拷贝的开销
#include <sys/types.h>
#include <sys/time.h>

// 初始化空集合
void FD_ZERO(fd_set *fdset);

// 从集合中清除fd
void FD_CLR(int fd, fd_set *fdset);

// 添加fd到集合
void FD_SET(int fd, fd_set *fdset);

// 判断是否在set中,在 非零值,不在 零
int FD_ISSET(int fd, fd_set *fdset);

// 超时值
struct timeval {
    time_t tv_sec;  // seconds
    long tv_usec;   // microseconds
}

/*
用于测试fd_set中是否由fd处于可读或可写或错误状态
当__readfds中有可读fd,__writefds中有科协fd,__exceptfds中有错误fd
成功返回状态发生变化的fd总数,失败返回-1并设置errno
*/
extern int select (int __nfds,           // 需要测试的fd数目
                   fd_set * __readfds,   
                   fd_set * __writefds,
                   fd_set * __exceptfds,
                   struct timeval * __timeout); // Linux在退出时会将此选项清空,故每次进入select前重新设置此选项
select 示例代码
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>

int main(int argc, char const *argv[])
{
    int server_sockfd, client_sockfd;
    int server_len, client_len;
    struct sockaddr_in server_address;
    struct sockaddr_in client_address;
    int result;
    fd_set readfds, testfds;

    server_sockfd = socket(AF_INET, SOCK_STREAM, 0);
    server_address.sin_family = AF_INET;
    server_address.sin_addr.s_addr = htonl(INADDR_ANY);
    server_address.sin_port = htons(9734);
    server_len = sizeof(server_address);

    bind(server_sockfd, (struct sockaddr *)&server_address, server_len);

    listen(server_sockfd, 5);

    FD_ZERO(&readfds);
    FD_SET(server_sockfd, &readfds);

    while (1) {
        char ch;
        int fd;
        int nread;

        testfds = readfds;

        printf("server waiting\n");
        result = select(FD_SETSIZE, &testfds, NULL, NULL, 0);
        if (result < 1) {
            perror("select failed");
            exit(1);
        }

        for (fd = 0; fd < FD_SETSIZE; fd++) {
            if (FD_ISSET(fd, &testfds)) {
                if (fd == server_sockfd) {
                    client_len = sizeof(client_address);
                    client_sockfd = accept(server_sockfd, 
                                           (struct sockaddr*) &client_address, 
                                           &client_len);
                    FD_SET(client_sockfd, &readfds);
                    printf("adding client on fd %d\n", client_sockfd);
                } else {
                    ioctl(fd, FIONREAD, &nread);

                    if (nread == 0) {
                        close(fd);
                        FD_CLR(fd, &readfds);
                        printf("removing client on fd %d\n", fd);
                    } else {
                        read(fd, &ch, 1);
                        printf("serving client on fd %d\n", fd);
                        write(fd, &ch, 1);
                    }
                }
            }
        }
    }
    return 0;
}

poll(ppoll)

与select没有本质差别,管理多个文件描述符也是使用轮询,根据描述符的状态进行处理,但是poll没有最大文件描述符数量限制。

select采用了bitmap,poll采用了数组。

poll与select相同的缺点:文件描述符的数组或位图被整体复制于用户态和内核态之间,不论这些文件描述符是否有事件,它的开销随着文件描述符的增加而线性增加。二者在返回后都需要遍历整个描述符的数组。

内核将用户的fds结构体数组拷贝到内核中,当有事件发生时内核再将所有时间都返回到fds数组中,

struct pollfd {
    int   fd;         /* file descriptor */
    short events;     /* requested events */
    short revents;    /* returned events */
};

int poll(struct poll_fd *fds,  // 数组指针
         nfds_t  nfds,         // 数组大小
         int timeout);         // 超时时间

events 和 revents 的取值:

poll 使用并不方便,代码比select和epoll都复杂,但性能不如epoll

poll 代码
#define _GNU_SOURCE
#include <arpa/inet.h>
#include <assert.h>
#include <netinet/in.h>
#include <poll.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

#define NFDS 100  // fds数组的大小

// 创建一个用于监听的socket
int CreateSocket() {
  int listenfd = socket(AF_INET, SOCK_STREAM, 0);
  assert(-1 != listenfd);

  struct sockaddr_in ser;
  memset(&ser, 0, sizeof(ser));
  ser.sin_family = AF_INET;
  ser.sin_port = htons(6000);
  ser.sin_addr.s_addr = inet_addr("127.0.0.1");

  int res = bind(listenfd, (struct sockaddr *)&ser, sizeof(ser));
  assert(-1 != res);

  listen(listenfd, 5);

  return listenfd;
}

// 初始化fds结构体数组
void InitFds(struct pollfd *fds) {
  int i = 0;
  for (; i < NFDS; ++i) {
    fds[i].fd = -1;
    fds[i].events = 0;
    fds[i].revents = 0;
  }
}

// 向fds结构体数组中插入一个文件描述符
void InsertFd(
    struct pollfd *fds, int fd,
    int flag)  //此处flag是为了判断是文件描述符c,还是listenfd,来设置events
{
  int i = 0;
  for (; i < NFDS; ++i) {
    if (fds[i].fd == -1) {
      fds[i].fd = fd;
      fds[i].events |= POLLIN;
      if (flag) {
        fds[i].events |= POLLRDHUP;
      }

      break;
    }
  }
}

// 从fds结构体数组中删除一个文件描述符
void DeleteFd(struct pollfd *fds, int fd) {
  int i = 0;
  for (; i < NFDS; ++i) {
    if (fds[i].fd == fd) {
      fds[i].fd = -1;
      fds[i].events = 0;
      break;
    }
  }
}

// 获取一个已完成三次握手的连接
void GetClientLink(int fd, struct pollfd *fds) {
  struct sockaddr_in cli;
  socklen_t len = sizeof(cli);
  int c = accept(fd, (struct sockaddr *)&cli, &len);
  assert(c != -1);

  printf("one client link success\n");

  InsertFd(fds, c, 1);
}

// 断开一个用户连接
void UnlinkClient(int fd, struct pollfd *fds) {
  close(fd);
  DeleteFd(fds, fd);
  printf("one client unlink\n");
}

// 处理客户端发送来的数据
void DealClientData(int fd, struct pollfd *fds) {
  char buff[128] = {0};

  int n = recv(fd, buff, 127, 0);
  if (n <= 0) {
    UnlinkClient(fd, fds);
    return;
  }

  printf("%s\n", buff);

  send(fd, "ok", 2, 0);
}

// poll返回后,处理就绪的文件描述符
void DealFinishFd(struct pollfd *fds, int listenfd) {
  int i = 0;
  for (; i < NFDS; ++i) {
    if (fds[i].fd == -1) {
      continue;
    }

    int fd = fds[i].fd;
    if (fd == listenfd && fds[i].revents & POLLIN) {
      GetClientLink(fd, fds);
      //获取连接
    } else if (fds[i].revents & POLLRDHUP) {
      UnlinkClient(fd, fds);
      //断开连接
    } else if (fds[i].revents & POLLIN) {
      DealClientData(fd, fds);
      //处理客户端数据
    }
  }
}

int main() {
  int listenfd = CreateSocket();

  struct pollfd *fds = (struct pollfd *)malloc(sizeof(struct pollfd) * NFDS);
  // malloc一个fds结构体数组
  assert(NULL != fds);

  InitFds(fds);
  //初始化fds结构体数组

  InsertFd(fds, listenfd, 0);
  //插入文件描述符listenfd

  while (1) {
    int n = poll(fds, NFDS, -1);
    if (n <= 0) {
      printf("poll error\n");
      continue;
    }

    DealFinishFd(fds, listenfd);
    //处理就绪的文件描述符
  }

  free(fds);
}

优点:

  1. poll() 不要求开发者计算最大文件描述符加一的大小。
  2. poll() 在应付大数目的文件描述符的时候速度更快,相比于select。
  3. 它没有最大连接数的限制,原因是它是基于链表来存储的。
  4. 在调用函数时,只需要对参数进行一次设置就好了

缺点:

  1. 大量的fd的数组被整体复制于用户态和内核地址空间之间,而不管这样的复制是不是有意义(epoll可以解决此问题)
  2. 与select一样,poll返回后,需要轮询pollfd来获取就绪的描述符,这样会使性能下降
  3. 同时连接的大量客户端在一时刻可能只有很少的就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降

epoll(epoll_pwait)

解决了fd_set拷贝轮询的问题。内核每次返回的都是已就绪的文件描述符。

  1. 创建epoll句柄,它本身就是一个fd,需要关闭
    int epoll_create(int size); // 返回一个文件描述符,参数在新版本中被忽略,但是要给一个大于0的数
  2. 注册需要监视的fd和事件
    int epoll_ctl(int epfd,
                  int op,   // 选项为3个宏:添加 EPOLL_CTL_ADD,删除 EPOLL_CTL_DEL,修改 EPOLL_CTL_MOD
                  int fd,   // 需要监听的fd(文件描述符)
                  struct epoll_event *event  // 需要监听什么事
                  );
    
    struct epoll_event {
      __uint32_t events;  /* Epoll events */
      epoll_data_t data;  /* User data variable */
    };
    
    //events可以是以下几个宏的集合:
    EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
    EPOLLOUT:表示对应的文件描述符可以写;
    EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
    EPOLLERR:表示对应的文件描述符发生错误;
    EPOLLHUP:表示对应的文件描述符被挂断;
    EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
    EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
    
    typedef union epoll_data {
        void *ptr;
        int fd;
        _uint32_t u32;
        _uint64_t u64;
    }epoll_data_t;
  3. 等待事件发生
    // 返回就绪事件的个数,失败-1,超时0
    int epoll_wait(int epfd,
                   struct epoll_event * events, // 用于接收内核返回的就绪事件的数组
                   int maxevents, // 一次最多能处理的事件个数
                   int timeout    // 超时时间,为0则立即返回,-1永不超时
    );
epoll_LT 示例模式
 #include <arpa/inet.h>
#include <netinet/in.h>
#include <signal.h>
#include <string.h>
#include <sys/epoll.h>
#include <unistd.h>

#include <iostream>

const int max_events = 128;

int server_socket;
int epoll_fd;

void sig_handler(int signo) {
  close(server_socket);
  close(epoll_fd);
  std::cout << "recv SIGTERM, exit process." << std::endl;
  exit(EXIT_SUCCESS);
}

int main(int argc, char const *argv[]) {
  struct sigaction term_action;
  sigset_t all_sig;
  sigfillset(&all_sig);
  term_action.sa_mask = all_sig;
  term_action.sa_handler = sig_handler;
  sigaction(SIGTERM, &term_action, nullptr);

  server_socket = socket(AF_INET, SOCK_STREAM, 0);

  sockaddr_in server_addr;
  server_addr.sin_family = AF_INET;
  server_addr.sin_port = htons(8080);
  server_addr.sin_addr.s_addr = inet_addr("0.0.0.0");

  bind(server_socket, (struct sockaddr *)&server_addr, sizeof(server_addr));
  listen(server_socket, 128);

  epoll_fd = epoll_create(1);
  epoll_event server_socket_event;
  server_socket_event.events = EPOLLIN;
  server_socket_event.data.fd = server_socket;
  epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_socket, &server_socket_event);

  epoll_event events[max_events];
  while (true) {

    int n = epoll_wait(epoll_fd, events, max_events, -1);
    for (int i = 0; i < n; ++i) {
      if (events[i].data.fd == server_socket) {
        int client_socket = accept(server_socket, nullptr, nullptr);
        std::cout << "accept new client: " << client_socket << std::endl;
        epoll_event client_socket_event;
        client_socket_event.events = EPOLLIN;
        client_socket_event.data.fd = client_socket;
        epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_socket, &client_socket_event);

      } else if (events[i].events & EPOLLRDHUP) {
        // 理论上 EPOLLRDHUP 信号是对方挂断后发出,但实际上可能没有这个信号
        std::cout << events[i].data.fd << " disconnect" << std::endl;
        epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, nullptr);

      } else if (events[i].events & EPOLLIN) {
        char buf[BUFSIZ];
        memset(buf, 0, sizeof(buf));
        int client_socket = events[i].data.fd;
        int nrecv = recv(client_socket, buf, BUFSIZ, 0);
        std::cout << "recv from " << client_socket << " :" << buf << std::endl;
        if (nrecv == 0) {
          std::cout << events[i].data.fd << " disconnect" << std::endl;
          epoll_ctl(epoll_fd, EPOLL_CTL_DEL, events[i].data.fd, nullptr);
        }
      }
    }
  }

  close(epoll_fd);
  close(server_socket);
  return 0;
}

epoll 工作模式:

  • 水平触发 level trigger:(默认,支持 block socket 和 non-block socket)
    若报告了fd事件后没有被处理或数据没有被全部读取,epoll还会再报告该事件
  • 边缘触发 edge trigger:(仅支持非阻塞)
    若报告了fd事件后没有被处理或数据没有被全部读取,epoll不会再报告该事件。如果不立即处理,数据会丢失。

ET模式在很大程度上减少了epoll事件被重复触发的次数,因此效率要比LT模式高。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。

 

 

 

 

 

 



这篇关于Linux IO多路复用的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程