2026-1-14-io_uring原理及其应用
2026-1-14-io_uring原理及其应用
什么是异步IO
我们在对IO进行操作,如读写IO的时候,如有read,write,recv,send这四个函数都是同步的。意思是read(fd,buffer,length)函数执行时在读请求和返回数据是一起的,没有做到分开,write,recv,send同理。
异步IO就是要把这两个步骤拆开来做,io_uring就是这样的一个工具。
如何设计这样的异步IO
频繁copy在设计这样的队列时将有以下问题需要考虑,在将submit queue中的节点(及task)打包放入的过程中会出现频繁copy的过程。解决方案就是用mmap的方式将队列分配的内存映射出来。 我们需要将队列做的尽量高效,在多线程中,需要做到线程安全,加锁就会导致效率降低,故需要做到无锁解决方案就是做成环形队列,逻辑上的环形队列。
什么是IO_URING
在2019年为其增加了三个系统调用:
- io_uring_setup
- io_uring_enter
- io_uring_register
由于这三个系统调用不太好理解,故使用liburing这个库来封装了前面三个系统调用。
如何安装liburing库
方法一:版本Ubuntu22.04
直接运行
1
sudo apt install liburing-dev
方法二:版本Ubuntu20.04及以下
1
2
3
4
5
git clone https://github.com/axboe/liburing.git
cd liburing
./configure
make
sudo make install
若需检查是否安装成功
1
find / -name "liburing.so*" 2>/dev/null
实现异步处理IO的TCP服务器
main函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// 初始化服务器,设置端口
unsigned short port = 9999;
int server_fd = init_server(port);
/*这里初始化参数是为了使用io_uring的一些高级特性如:
SQ / CQ 的 flags
是否启用 IOPOLL
是否使用 SQPOLL(内核线程轮询)
CQ ring 的大小等
*/
struct io_uring_params params;
// 这里没有使用特性,则将其初始化为0。
memset(¶ms, 0, sizeof(params));
/*ring是创建用户态的io_uring对象,里面保存了:
SQ(submit queue)
CQ(complete queue)
mmap 得到的 ring buffer
一些 bookkeeping 信息
*/
struct io_uring ring;
//这里将队列大小初始化为ENTRIES_LENGTH。
io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms);
// 这里创建客户端的地址信息
struct sockaddr_in clientaddr;
// 最大buffer长度
socklen_t len = sizeof(clientaddr);
// 这里是在为submit_queue里添加一个接收(accept)事件
set_event_accept(&ring, server_fd, (struct sockaddr*)&clientaddr, &len, 0);
char buf[BUFFER_LENGTH] = {0};
while(1){
//把SQE的东西都提交到内核中。
io_uring_submit(&ring);
//创建完成队列实例。
struct io_uring_cqe *cqe;
//等待至少有一个entity完成。
io_uring_wait_cqe(&ring, &cqe);
//获取所有已完成的事件
struct io_uring_cqe *cqes[128];
//nready是已完成的数量
int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);
int i = 0;
//遍历这些事件。
for(i = 0; i < nready; i++){
struct io_uring_cqe *entries = cqes[i];
struct conn_info res_info;
memcpy(&res_info, &entries->user_data, sizeof(res_info));
//然后判断是什么事件,对应处理即可。
if(res_info.event == EVENT_ACCEPT){
set_event_accept(&ring, server_fd, (struct sockaddr*)&clientaddr, &len, 0);
printf("accept a new connection fd=%d\n", entries->res);
int connfd = entries->res;
//等待对应fd发数据
set_event_recv(&ring, connfd, buf, BUFFER_LENGTH, 0);
}else if(res_info.event == EVENT_READ){
int ret = entries->res;
printf("read data from fd=%d, buffer:%s\n", ret, buf);
if (ret == 0){
close(ret);
}else if(entries->res > 0){
set_event_send(&ring, res_info.fd, buf, ret, 0);
}
}else if(res_info.event == EVENT_WRITE){
int ret = entries->res;
printf("set_event_send: %d, %s\n",ret, buf);
set_event_recv(&ring, res_info.fd, buf, BUFFER_LENGTH, 0);
}
}
//表示nready的CQE处理完了,可以释放。
io_uring_cq_advance(&ring, nready);
}
init_server()函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int init_server(unsigned short port){
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
//存储服务器地址与端口号的结构体
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);// 0.0.0.0
servaddr.sin_port = htons(port);
if(-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))){
printf("bind failed: %s\n",strerror(errno));
return -1;
}
listen(sockfd,10);
return sockfd;
}
set_event_accept()函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct conn_info{
int fd;
int event;
};
int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr,
socklen_t *addrlen, int flags){
//向对应uring申请一个空槽,实例化一个submit_queue中的结点。
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
//sqe中user_data变量为以下内容,fd与事件
struct conn_info accept_info = {
.fd = sockfd,
.event = EVENT_ACCEPT
};
//向队列中添加一个节点,并将其推入进工作内核中,已经接收好了,如果有客户端连接的话。
io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)addr, addrlen, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}
set_event_recv()函数
1
2
3
4
5
6
7
8
9
int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags){
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info recv_info = {
.fd = sockfd,
.event = EVENT_READ
};
io_uring_prep_recv(sqe, sockfd, buf, len, flags);
memcpy(&sqe->user_data, &recv_info, sizeof(struct conn_info));
}
set_event_send()函数
1
2
3
4
5
6
7
8
9
int set_event_send(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags){
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
struct conn_info send_info = {
.fd = sockfd,
.event = EVENT_WRITE
};
io_uring_prep_send(sqe, sockfd, buf, len, flags);
memcpy(&sqe->user_data, &send_info, sizeof(struct conn_info));
}
上面三个函数逻辑都是一样的,在queue里实例化后交给工作队列。
下一篇文章我将介绍如何对上述服务端进行测试
本文由作者按照 CC BY 4.0 进行授权
