io_uring 是 Linux 下高性能的異步 IO 框架,網上很多相關資料,我之前也初步分析了一下它的實現,有興趣的可以查看 https://zhuanlan.zhihu.com/p/387620810。
Libuv 中最近加入了對 io_uring 的支持,那么為什么要把它引入 Libuv 呢?因為 epoll 不支持普通文件的 Poll 能力,所以在 Libuv 中,異步文件 IO 操作需要通過線程池來實現,具體來說就是當用戶發起一個異步文件 IO 操作時,Libuv 會把這個操作放到線程池中,當子線程處理這個任務時,會執行一個阻塞式的系統調用,這個系統調用會引起線程阻塞,從而導致這個線程被消耗掉了,當 IO 操作完成后,子線程就會被喚醒,子線程再通過主線程去執行用戶的回調。在 Libuv 早期的實現中,如果執行比較慢的任務過多就會把線程池中的線程消耗完,從而導致執行比較快的 IO 操作需要等待很長時間,一個例子就是 DNS 解析會阻塞文件 IO 任務。而 io_uring 可以支持普通文件 IO(當然能力不僅于此),不再需要借助線程池的能力,目前 Libuv 中部分異步文件 IO 操作已經替換成 io_uring(需要通過環境變量開啟),下面來看看它的實現。
原生 io_uring 的使用比較復雜,通常需要借助 liburing 庫,但是 Libuv 中可能為了減少對第三方庫的依賴,實現上使用原生的方式。
io_uring 初始化
在 Libuv 初始化時會進行 io_uring 的初始化。
uv__iou_init(loop->backend_fd, &lfields->iou, 64, UV__IORING_SETUP_SQPOLL);
lfields->iou 為 io_uring 核心結構體,UVIORING_SETUP_SQPOLL 設置內核創建線程輪詢是否有任務需要處理(用戶層設置),接著看看 uviou_init。
static void uv__iou_init(int epollfd,struct uv__iou* iou,uint32_t entries,uint32_t flags) {struct uv__io_uring_params params;struct epoll_event e;size_t cqlen;size_t sqlen;size_t maxlen;size_t sqelen;uint32_t i;char* sq;char* sqe;int ringfd;memset(?ms, 0, sizeof(params));params.flags = flags;// UV__IORING_SETUP_SQPOLL 模式下,設置多久沒有任務提交則內核線程進入 sleep 狀態if (flags & UV__IORING_SETUP_SQPOLL)params.sq_thread_idle = 10; /* milliseconds /// 調用系統調用初始化 io_uringringfd = uv__io_uring_setup(entries, ?ms);// 映射到內核發送 / 完成隊列的內存,用戶層和內核可以共同操作這個隊列sq = mmap(0,maxlen,PROT_READ | PROT_WRITE,MAP_SHARED | MAP_POPULATE,ringfd,0); /sqe = mmap(0,sqelen,PROT_READ | PROT_WRITE,MAP_SHARED | MAP_POPULATE,ringfd,0x10000000ull); /* IORING_OFF_SQES */memset(&e, 0, sizeof(e));e.events = POLLIN;e.data.fd = ringfd;// 注冊等待可讀事件,io_uring 中有任務完成后就會通過 epollepoll_ctl(epollfd, EPOLL_CTL_ADD, ringfd, &e);// 初始化 io_uring 結構體iou->sqhead = (uint32_t*) (sq + params.sq_off.head);iou->sqtail = (uint32_t*) (sq + params.sq_off.tail);iou->sqmask = (uint32_t) (sq + params.sq_off.ring_mask);iou->sqarray = (uint32_t*) (sq + params.sq_off.array);iou->sqflags = (uint32_t*) (sq + params.sq_off.flags);iou->cqhead = (uint32_t*) (sq + params.cq_off.head);iou->cqtail = (uint32_t*) (sq + params.cq_off.tail);iou->cqmask = (uint32_t) (sq + params.cq_off.ring_mask);iou->sq = sq;iou->cqe = sq + params.cq_off.cqes;iou->sqe = sqe;iou->sqlen = sqlen;iou->cqlen = cqlen;iou->maxlen = maxlen;iou->sqelen = sqelen;iou->ringfd = ringfd;iou->in_flight = 0;iou->flags = 0;}
uv__iou_init 完成了 io_uring 的初始化,并且把 io_uring 對應的 fd 注冊到 epoll,當 io_uring 有任務完成時,就可以通過 epoll 感知到。接著就可以使用 io_uring 了。
下面看一個異步文件 IO 的操作。
int uv_fs_open(uv_loop_t* loop,uv_fs_t* req,const char* path,int flags,int mode,uv_fs_cb cb) {INIT(OPEN);PATH;req->flags = flags;req->mode = mode;if (cb != NULL)if (uv__iou_fs_open(loop, req))return 0;POST;}
uv_fs_open 可以以異步的方式打開一個文件,之前時通過線程池實現的,加入 io_uring 后,就會多了一層攔截,來看看 uv__iou_fs_open。
int uv__iou_fs_open(uv_loop_t* loop, uv_fs_t* req) {struct uv__io_uring_sqe* sqe;struct uv__iou* iou;// 獲取 io_uring 結構體iou = &uv__get_internal_fields(loop)->iou;// 獲取一個任務節點,任務節點會和 req 互相關聯,回調時會用到sqe = uv__iou_get_sqe(iou, loop, req);// 設置操作上下文sqe->addr = (uintptr_t) req->path;sqe->fd = AT_FDCWD;sqe->len = req->mode;// 設置操作類型sqe->opcode = UV__IORING_OP_OPENAT;sqe->open_flags = req->flags | O_CLOEXEC;// 提交任務uv__iou_submit(iou);return 1;}
uviou_fs_open 中有兩個核心邏輯 uviou_get_sqe 和 uviou_submit,首先來看 uviou_get_sqe。
static struct uv__io_uring_sqe* uv__iou_get_sqe(struct uv__iou* iou,uv_loop_t* loop,uv_fs_t* req) {struct uv__io_uring_sqe* sqe;uint32_t head;uint32_t tail;uint32_t mask;uint32_t slot;if (iou->ringfd == -1)return NULL;head = atomic_load_explicit((_Atomic uint32_t*) iou->sqhead,memory_order_acquire);tail = *iou->sqtail;mask = iou->sqmask;slot = tail & mask;sqe = iou->sqe;// 從請求隊列中獲取一個節點sqe = &sqe[slot];memset(sqe, 0, sizeof(*sqe));// 任務節點關聯到 req,回調時需要使用sqe->user_data = (uintptr_t) req;req->work_req.loop = loop;req->work_req.work = NULL;req->work_req.done = NULL;uv__queue_init(&req->work_req.wq);uv__req_register(loop, req);iou->in_flight++;return sqe;}
uviou_get_sqe 主要是從任務隊列中獲取一個空閑節點并關聯上請求上下文結構體,uviou_get_sqe 的調用方需要設置操作上下文,比如操作類型,操作的 fd 等。通過 uviou_get_sqe 獲取任務節點并設置了操作上下文后,這個任務就會自動被操作系統感知。因為 Libuv 是使用了 UVIORING_SETUP_SQPOLL 模式,所以還需要判斷這時候內核輪訓線程是否處于睡眠狀態,這就是 uv__iou_submit 的邏輯。
static void uv__iou_submit(struct uv__iou* iou) {uint32_t flags;atomic_store_explicit((_Atomic uint32_t*) iou->sqtail,*iou->sqtail + 1,memory_order_release);flags = atomic_load_explicit((_Atomic uint32_t*) iou->sqflags,memory_order_acquire);// 判斷內核線程是否處于睡眠狀態if (flags & UV__IORING_SQ_NEED_WAKEUP)// 喚醒內核線程,說明有任務需要處理if (uv__io_uring_enter(iou->ringfd, 0, 0, UV__IORING_ENTER_SQ_WAKEUP))if (errno != EOWNERDEAD) /* Kernel bug. Harmless, ignore. /perror("libuv: io_uring_enter(wakeup)"); /
這樣就完成了任務的提交。
任務完成后,io_uring 對應的 fd 就會變成可讀,從而 epoll 就會感知到,來看看 epoll 的處理。下面是 epoll 處理就緒 fd 時的一段邏輯。
if(fd == iou->ringfd) {uv__poll_io_uring(loop, iou);have_iou_events = 1;continue;}
如果是 io_uring 的 fd 可讀,則執行 uv__poll_io_uring。
static void uv__poll_io_uring(uv_loop_t* loop, struct uv__iou* iou) {struct uv__io_uring_cqe* cqe;struct uv__io_uring_cqe* e;uv_fs_t* req;uint32_t head;uint32_t tail;uint32_t mask;uint32_t i;uint32_t flags;int nevents;int rc;// 完成隊列頭/尾節點head = iou->cqhead;tail = atomic_load_explicit((_Atomic uint32_t) iou->cqtail,memory_order_acquire);mask = iou->cqmask;cqe = iou->cqe;nevents = 0;// 遍歷完成隊列for (i = head; i != tail; i++) {e = &cqe[i & mask];// 拿到操作關聯的請求結構體req = (uv_fs_t*) (uintptr_t) e->user_data;uv__req_unregister(loop, req);iou->in_flight--;// 操作返回值,表示操作是否成功req->result = e->res;// 執行回調req->cb(req);}
uv__poll_io_uring 的邏輯很簡單,就是遍歷完成隊列,然后拿到對應的請求上下文結構體,最后執行它的回調。
現代軟件中大多數使用的 IO 模型是 epoll,隨著 io_uring 的發展和成熟,io_uring 將會出現在更多的軟件中,之前我也體驗了一下 io_uring,有興趣的可以體驗下 https://github.com/theanarkh/nodejs_io_uring。
本文鏈接:http://www.tebozhan.com/showinfo-26-14326-0.html聊聊 Libuv 最近引入的 io_uring
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com