玩命加载中 . . .

4.4-IO多路复用


IO多路复用

IO多路复用使得程序能同时监听多个文件描述符,能够提高程序的性能,Linux下实现IO多路复用的系统调用主要有selectpollepoll

BIO阻塞模型

缺点:

  1. 线程或进程会消耗资源
  2. 线程或进程调度消耗CPU资源

NIO非阻塞模型

非阻塞,忙轮询,每次循环内$O(n)$的系统调用

  • 优点:提高了程序的执行效率
  • 缺点:需要占用更多的CPU和系统资源

select

  1. 构造一个关于文件描述符的列表,将要监听的文件描述符添加到列表中
  2. 调用系统函数,监听该列表中的文件描述符,直到有文件描述符进行IO调用时才返回,阻塞,由内核完成
  3. 返回时,会告知进程哪些文件描述符要进行IO操作
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
- nfds: 委托内核检测的最大文件描述符的值+1
- readfds: 委托内核检测哪些文件描述符的读的属性,一般检测读操作,接受数据,传入传出参数
- writedfs: 写操作的集合,是否还有缓冲区可以写数据
- exceptfds: 检测异常
- timeout: 设置的超时时间
    - NULL: 永久阻塞,直到文件描述符有变化

struct timeval {
    long    tv_sec;         /* 秒 */
    long    tv_usec;        /* 微秒 */
};
// 成功返回检测的集合中发生变化的文件描述符数量,失败返回-1
===================================

void FD_CLR(int fd, fd_set *set);
// 将文件描述符对应的标志位置0
===================================

int  FD_ISSET(int fd, fd_set *set);
// 判断某个fd是0还是1
===================================

void FD_SET(int fd, fd_set *set);
// 设置某个fd为标志位为1
===================================

void FD_ZERO(fd_set *set);
// fd集合全部初始化为0
#include <sys/time.h>
#include <sys/select.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <stdio.h>

int main() {
    int lfd = socket(PF_INET, SOCK_STREAM, 0);
    struct sockaddr_in saddr;
    saddr.sin_port = htons(9999);
    saddr.sin_family = AF_INET;
    saddr.sin_addr.s_addr = INADDR_ANY;
    bind(lfd, (struct sockaddr*)&saddr, sizeof(saddr));
    listen(lfd, 8);

    // 创建一个fd_set集合
    fd_set rdset, tmp;
    FD_ZERO(&rdset);    // 初始化
    FD_SET(lfd, &rdset);// 将监听的lfd添加到集合中
    int maxfd = lfd;
    while (1) {
        tmp = rdset;
        // 调用select系统函数,看是否有数据
        int ret = select(maxfd + 1, &tmp, NULL, NULL, NULL);  // 永久阻塞
        if (ret == -1) {
            perror("select");
            exit(-1);
        } else if (ret == 0) {
            continue;
        } else if (ret > 0) {   // 有文件描述符对应的缓冲区发生了变化
            if (FD_ISSET(lfd, &tmp)) {  // 先看lfd是否有客户端连接
                struct sockaddr_in cliaddr;
                int len = sizeof(cliaddr);
                int cfd = accept(lfd, (struct sockaddr*)&cliaddr, &len);
                // 将新的文件描述符加入集合中
                FD_SET(cfd, &rdset);
                // 更新最大的文件描述符
                maxfd = maxfd > cfd ? maxfd : cfd;
            }
            for (int i = lfd + 1; i <= maxfd; i++) {// 再看其他的是否有数据
                if (FD_ISSET(i, &tmp)) {    // 说明对应的客户端发来了数据
                    char buf[1024] = {0};
                    int len = read(i, buf, sizeof(buf));
                    if (len == -1) {
                        perror("read");
                        exit(-1);
                    }
                    else if (len == 0) {
                        printf("client close\n");
                        close(i);   // 关闭文件描述符
                        FD_CLR(i, &rdset);  // 从集合中移除
                    }
                    else if (len > 0) {
                        printf("read buf: %s\n", buf);
                        write(i, buf, strlen(buf) + 1);
                    }
                }
            }
        }
    }
    close(lfd);
    return 0;
}

缺点:

  1. 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
  2. 每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
  3. select支持的文件描述符数量太少,默认是1024
  4. fd集合不能重用,每次都需要重置

poll

#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
- fds: 需要检测的文件描述符集合

struct pollfd {
    int   fd;         /* 委托内核检测的文件描述符*/
    short events;     /* 检测什么事件 */
    short revents;    /* 文件描述符实际发生的事情 */
};

- events
    - POLLIN: 读事件
    - POLLOUT: 写事件
- nfds: 最大文件描述符的下标 + 1
- timeout: 阻塞时长,0表示不阻塞,-1表示阻塞,检测到fd有变化解除阻塞,大于0表示阻塞的时长
// 返回值:成功返回n个文件描述符发生变化,失败返回-1

epoll

#include <sys/epoll.h>

int epoll_create(int size);
// 创建了一个epoll实例,在内核中创建了一个数据,一个是需要检测的文件描述符的信息(红黑树)
// 一个是就绪列表,存放检测到数据发送的文件描述符信息(双向链表)
- size: 现在不用了,给个非0值就行
// 返回值:成功返回大于0的文件描述符,操作epoll实例,失败返回-1
======================================================

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 对epoll实例进行管理,添加、删除、修改文件描述符信息
- epfd: epoll实例对应的文件描述符
- op: 对应的操作
    - EPOLL_CTL_ADD: 添加
    - EPOLL_CTL_MOD: 修改
    - EPOLL_CTL_DEL: 删除
- fd: 要检测的文件描述符
- event: 检测文件描述符的哪个事件

struct epoll_event {
    uint32_t     events;      /* Epoll events */
    epoll_data_t data;        /* User data variable */
};

- events 检测事件:
    - EPOLLIN
    - EPOLLOUT
    - EPOLLERR
    - EPOLLET   边沿触发
    - EPOLLONESHOT

typedef union epoll_data {
    void        *ptr;
    int          fd;    // 要检测的文件描述符
    uint32_t     u32;
    uint64_t     u64;
} epoll_data_t;
======================================================

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
// 在一段超时时间内等待一组文件描述符上的事件
- epfd: epoll实例的文件描述符
- events: 传出参数,保存发生变化的文件描述符信息
- maxevents: 第二个参数结构体数组的大小
- timeout: 阻塞时间,0表示不阻塞,-1表示永久阻塞,大于0表示阻塞的时长(毫秒)
// 返回值:成功返回就绪的文件描述符的个数,失败返回-1

因为epoll只返回就绪的文件描述符,所以我们只要遍历这些文件描述符就可以了

int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
for (int i = 0; i < ret; i++) {
    int sockfd = events[i].data.fd; // 获得就绪的文件描述符
    // socket肯定就绪,直接处理
}

使用epoll编写服务器

默认是水平触发

#include <sys/time.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <sys/epoll.h>

int main() {
    // 1. 创建socket
    int lfd = socket(PF_INET, SOCK_STREAM, 0);
    struct sockaddr_in saddr;
    saddr.sin_port = htons(9999);
    saddr.sin_family = AF_INET;
    saddr.sin_addr.s_addr = INADDR_ANY;

    // 2. 绑定
    bind(lfd, (struct sockaddr*)&saddr, sizeof(saddr));

    // 3. 监听
    listen(lfd, 8);

    // 4. 创建epoll实例
    int epfd = epoll_create(1);

    // 将要监听的文件描述符添加到文件描述符中
    struct epoll_event epev;
    epev.events = EPOLLIN;  // 检测读事件
    epev.data.fd = lfd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &epev);

    struct epoll_event epevs[1024];
    while (1) {
        int ret = epoll_wait(epfd, epevs, 1024, -1);    // 检测哪些fd有数据

        printf("ret: %d\n", ret);
        for (int i = 0; i < ret; i++) {
            int curfd = epevs[i].data.fd;
            if (curfd == lfd) { // 监听到有客户端连接
                struct sockaddr_in cliaddr;
                int len = sizeof(cliaddr);
                int cfd = accept(lfd, (struct sockaddr*)&cliaddr, &len);
                epev.events = EPOLLIN;
                epev.data.fd = cfd;
                epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &epev);
            } else if (epevs[i].events & EPOLLIN) {  // 否则就是通信的socket
                char buf[1024] = {0};// 接收客户端数据
                int len = read(curfd, buf, sizeof(buf));
                if (len == -1) {
                    perror("read");
                    exit(-1);
                }
                else if (len == 0) {    // 客户端关闭
                    printf("client close\n");
                    epoll_ctl(epfd, EPOLL_CTL_DEL, curfd, NULL);
                    close(curfd);
                }
                else if (len > 0) {
                    printf("read buf: %s\n", buf);
                    write(curfd, buf, strlen(buf) + 1);
                }
            } else {
                printf("something else happened\n");
            }
        }
    }

    close(lfd);
    close(epfd);

    return 0;
}

epoll的工作模式

  • LT模式(水平触发,默认)

支持阻塞和非阻塞,内核告诉你一个文件描述符是否就绪了,就可以对fd进行IO操作,如果不进行任何操作,还是会继续通知

  • ET模式(边沿触发)

非阻塞,当描述符从未就绪变为就绪时,内核会告知,并且不会再发送更多的就绪通知,直到文件描述符不再为就绪状态

ET模式在很大程度上减少了epoll事件被重复触发的次数,效率要比LT模式高,epoll工作在ET模式时,必须使用非阻塞套接字,以避免由于一个文件描述符的阻塞读写操作把处理多个文件描述符的任务饿死

ET模式

对于采用ET工作模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序应该立即处理该事件,因为后续的epoll_wait调用将不再向应用程序通知这一事件

所以每次读数据要循环读取全部数据,并且要把read设置为非阻塞以跳出循环

每个使用ET模式的文件描述符都应该是非阻塞的

#include <stdio.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>

int main() {
    // 创建socket
    int lfd = socket(PF_INET, SOCK_STREAM, 0);
    struct sockaddr_in saddr;
    saddr.sin_port = htons(9999);
    saddr.sin_family = AF_INET;
    saddr.sin_addr.s_addr = INADDR_ANY;
    bind(lfd, (struct sockaddr *)&saddr, sizeof(saddr));// 绑定
    listen(lfd, 8);// 监听

    int epfd = epoll_create(100);// 创建一个epoll实例

    // 将监听的文件描述符相关的检测信息添加到epoll实例中
    struct epoll_event epev;
    epev.events = EPOLLIN;
    epev.data.fd = lfd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &epev);

    struct epoll_event epevs[1024];

    while(1) {
        int ret = epoll_wait(epfd, epevs, 1024, -1);

        printf("ret = %d\n", ret);

        for(int i = 0; i < ret; i++) {
            int curfd = epevs[i].data.fd;
            if(curfd == lfd) {  // 监听的文件描述符有数据达到,有客户端连接
                struct sockaddr_in cliaddr;
                int len = sizeof(cliaddr);
                int cfd = accept(lfd, (struct sockaddr *)&cliaddr, &len);

                int flag = fcntl(cfd, F_GETFL); // 获取cfd文件属性
                flag |= O_NONBLOCK;     // 设置为非阻塞
                fcntl(cfd, F_SETFL, flag);

                epev.events = EPOLLIN | EPOLLET;    // 设置边沿触发
                epev.data.fd = cfd;
                epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &epev);
            } else if (epevs[i].events & EPOLLIN) {
                char buf[5];
                int len = 0;
                // 循环读取出所有数据
                while( (len = read(curfd, buf, sizeof(buf))) > 0) {
                    printf("recv data : %s\n", buf);// 打印数据
                    write(curfd, buf, len);
                }
                if(len == 0) {
                    printf("client closed....");
                } else if(len == -1) {
                    if(errno == EAGAIN) {   // 数据读完可能返回-1,这种情况不能退出
                        printf("data over.....");
                    } else {
                        perror("read");
                        exit(-1);
                    }
                }
            }
        }
    }

    close(lfd);
    close(epfd);
    return 0;
}

EPOLLONESHOT事件

对于注册了EPOLLONESHOT事件的文件描述符,操作系统最多触发其上注册的的一个可读、可写或者异常事件,且只触发一次,除非我们使用epoll_ctl函数重置该文件描述符上注册的EPOLLONESHOT事件,这样,当一个线程在处理某个socket时,其他线程是不可能有机会操作该socket的。
但反过来,注册了EPOLLONESHOT事件的socket一旦被某个线程处理完毕,该线程就应该立即重置这个socket上的EPOLLONESHOT事件,以确保这个socket下一次可读时,其EPOLLIN事件能被触发,进而让其他工作线程有机会继续处理这个socket


文章作者: kunpeng
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 kunpeng !
  目录