文章

2026-1-14-io_uring原理及其应用

2026-1-14-io_uring原理及其应用

什么是异步IO

我们在对IO进行操作,如读写IO的时候,如有read,write,recv,send这四个函数都是同步的。意思是read(fd,buffer,length)函数执行时在读请求和返回数据是一起的,没有做到分开,write,recv,send同理。

异步IO就是要把这两个步骤拆开来做,io_uring就是这样的一个工具。

异步IO原理图

如何设计这样的异步IO

频繁copy在设计这样的队列时将有以下问题需要考虑,在将submit queue中的节点(及task)打包放入的过程中会出现频繁copy的过程。解决方案就是用mmap的方式将队列分配的内存映射出来。 我们需要将队列做的尽量高效,在多线程中,需要做到线程安全,加锁就会导致效率降低,故需要做到无锁解决方案就是做成环形队列,逻辑上的环形队列。

什么是IO_URING

在2019年为其增加了三个系统调用:

  1. io_uring_setup
  2. io_uring_enter
  3. io_uring_register

由于这三个系统调用不太好理解,故使用liburing这个库来封装了前面三个系统调用。

如何安装liburing库

方法一:版本Ubuntu22.04

直接运行

1
sudo apt install liburing-dev

方法二:版本Ubuntu20.04及以下

1
2
3
4
5
git clone https://github.com/axboe/liburing.git
cd liburing
./configure
make
sudo make install

若需检查是否安装成功

1
find / -name "liburing.so*" 2>/dev/null

实现异步处理IO的TCP服务器

main函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
    // 初始化服务器,设置端口
    unsigned short port = 9999;
    int server_fd = init_server(port);
    /*这里初始化参数是为了使用io_uring的一些高级特性如:
    	SQ / CQ 的 flags
		是否启用 IOPOLL
		是否使用 SQPOLL(内核线程轮询)
		CQ ring 的大小等
    */
    struct io_uring_params params;
    // 这里没有使用特性,则将其初始化为0。
    memset(&params, 0, sizeof(params));
    /*ring是创建用户态的io_uring对象,里面保存了:
    	SQ(submit queue)
    	CQ(complete queue)
    	mmap 得到的 ring buffer
    	一些 bookkeeping 信息
    */
    struct io_uring ring;
    //这里将队列大小初始化为ENTRIES_LENGTH。
    io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);
    
    // 这里创建客户端的地址信息
    struct sockaddr_in clientaddr;
    // 最大buffer长度
    socklen_t len = sizeof(clientaddr);
    // 这里是在为submit_queue里添加一个接收(accept)事件
    set_event_accept(&ring, server_fd, (struct sockaddr*)&clientaddr, &len, 0);
    char buf[BUFFER_LENGTH] = {0};
	while(1){
	   //把SQE的东西都提交到内核中。
	   io_uring_submit(&ring);
	   //创建完成队列实例。
	   struct io_uring_cqe *cqe;
	   //等待至少有一个entity完成。
	   io_uring_wait_cqe(&ring, &cqe);
	   //获取所有已完成的事件
	   struct io_uring_cqe *cqes[128];
	   //nready是已完成的数量
	   int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);
	   int i = 0;
	   //遍历这些事件。
	   for(i = 0; i < nready; i++){
	       struct io_uring_cqe *entries = cqes[i];
	       struct conn_info res_info;
	       memcpy(&res_info, &entries->user_data, sizeof(res_info));
	       //然后判断是什么事件,对应处理即可。
	       if(res_info.event == EVENT_ACCEPT){
	           set_event_accept(&ring, server_fd, (struct sockaddr*)&clientaddr, &len, 0);
	           printf("accept a new connection fd=%d\n", entries->res);
	           int connfd = entries->res;
	           //等待对应fd发数据
	           set_event_recv(&ring, connfd, buf, BUFFER_LENGTH, 0);
	       }else if(res_info.event == EVENT_READ){
	           int ret = entries->res;
	           printf("read data from fd=%d, buffer:%s\n", ret, buf);
	           if (ret == 0){
	               close(ret);
	           }else if(entries->res > 0){
	                set_event_send(&ring, res_info.fd, buf, ret, 0);
	           }
	       }else if(res_info.event == EVENT_WRITE){
	           int ret = entries->res;
	           printf("set_event_send: %d, %s\n",ret, buf);
			   set_event_recv(&ring, res_info.fd, buf, BUFFER_LENGTH, 0);
	       }
	   }
	   //表示nready的CQE处理完了,可以释放。
	   io_uring_cq_advance(&ring, nready);
	}

init_server()函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int init_server(unsigned short port){
    int sockfd = socket(AF_INET, SOCK_STREAM, 0); 
    //存储服务器地址与端口号的结构体
    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);// 0.0.0.0
    servaddr.sin_port = htons(port);
    if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))){
        printf("bind failed: %s\n",strerror(errno));
        return -1;
    }

    listen(sockfd,10);

    return sockfd;
}

set_event_accept()函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct conn_info{
    int fd;
    int event;
};
int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr,
                socklen_t *addrlen, int flags){
    //向对应uring申请一个空槽,实例化一个submit_queue中的结点。
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    //sqe中user_data变量为以下内容,fd与事件
    struct conn_info accept_info = {
        .fd = sockfd,
        .event = EVENT_ACCEPT
    };
    //向队列中添加一个节点,并将其推入进工作内核中,已经接收好了,如果有客户端连接的话。
    io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)addr, addrlen, flags);
    memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}

set_event_recv()函数

1
2
3
4
5
6
7
8
9
int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags){
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    struct conn_info recv_info = {
        .fd = sockfd,
        .event = EVENT_READ
    };
    io_uring_prep_recv(sqe, sockfd, buf, len, flags);
    memcpy(&sqe->user_data, &recv_info, sizeof(struct conn_info));
}

set_event_send()函数

1
2
3
4
5
6
7
8
9
int set_event_send(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags){
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    struct conn_info send_info = {
        .fd = sockfd,
        .event = EVENT_WRITE
    };
    io_uring_prep_send(sqe, sockfd, buf, len, flags);
    memcpy(&sqe->user_data, &send_info, sizeof(struct conn_info));
}

上面三个函数逻辑都是一样的,在queue里实例化后交给工作队列。

下一篇文章我将介绍如何对上述服务端进行测试

本文由作者按照 CC BY 4.0 进行授权

热门标签