Submission
Blocking
Linking
completion
Poll is not polling
io-wq is not wq
SYSCALL io_uring_enter
-> io_submit_sqes //under a mutex ctx->uring_lock
---
struct io_submit_state state, *statep = NULL;
struct io_kiocb *link = NULL;
int i, submitted = 0;
...
/* make sure SQ entry isn't read before tail */
nr = min3(nr, ctx->sq_entries, io_sqring_entries(ctx));
if (!percpu_ref_tryget_many(&ctx->refs, nr))
return -EAGAIN;
if (nr > IO_PLUG_THRESHOLD) { IO_PLUG_THRESHOLD = 2//
io_submit_state_start(&state, nr);
// blk_start_plug is invoked here
// Even on high speed device, plug and merge is welcome
^^^^^^^^^^^^^^^^^^^^^^^^^
// because the overhead of io path could be shared by multiple IO
statep = &state;
}
ctx->ring_fd = ring_fd;
ctx->ring_file = ring_file;
for (i = 0; i < nr; i++) {
const struct io_uring_sqe *sqe;
struct io_kiocb *req;
int err;
sqe = io_get_sqe(ctx);
...
req = io_alloc_req(ctx, statep);
// Try to allocate reqs in batch
err = io_init_req(ctx, req, sqe, statep);
io_consume_sqe(ctx); //only update ctx->cached_sq_head here
/* will complete beyond this point, count as submitted */
submitted++;
...
err = io_submit_sqe(req, sqe, &link);
}
...
if (statep)
io_submit_state_end(&state);
// blk_finish_plug
/* Commit SQ ring head once we've consumed and submitted all SQEs */
io_commit_sqring(ctx);
---
io_uring supports real async IO for both direct IO and buffered IO.
// Code about link is ignored
io_submit_sqe
-> io_queue_sqe
-> __io_queue_sqe
---
ret = io_issue_sqe(req, sqe, true);
-> io_read_prep()
-> io_prep_rw()
---
if (force_nonblock)
kiocb->ki_flags |= IOCB_NOWAIT;
if (ctx->flags & IORING_SETUP_IOPOLL) {
if (!(kiocb->ki_flags & IOCB_DIRECT) ||
!kiocb->ki_filp->f_op->iopoll)
return -EOPNOTSUPP;
kiocb->ki_flags |= IOCB_HIPRI; // do polling in block layer
kiocb->ki_complete = io_complete_rw_iopoll;
req->result = 0;
req->iopoll_completed = 0;
}
---
-> io_read()
/*
* We async punt it if the file wasn't marked NOWAIT, or if the file
* doesn't support non-blocking read/write attempts
*/
if (ret == -EAGAIN && (!(req->flags & REQ_F_NOWAIT) ||
(req->flags & REQ_F_MUST_PUNT))) {
if (io_arm_poll_handler(req)) {
if (linked_timeout)
io_queue_linked_timeout(linked_timeout);
goto exit;
}
punt:
io_req_init_async(req);
...
/*
* Queued up for async execution, worker will release
* submit reference when the iocb is actually submitted.
*/
io_queue_async_work(req);
goto exit;
}
---
There are two points here,
We will get a EAGAIN we the code enters into sleep, for examples,
- generic_file_buffered_read
buffered read IO supports nowait for doing 'fast read', namely,
tring read to see cache hit or not.
---
...
page = find_get_page(mapping, index);
if (!page) {
if (iocb->ki_flags & IOCB_NOWAIT)
goto would_block;
...
}
...
if (!PageUptodate(page)) {
if (iocb->ki_flags & IOCB_NOWAIT) {
put_page(page);
goto would_block;
}
/*
* See comment in do_read_cache_page on why
* wait_on_page_locked is used to avoid unnecessarily
* serialisations and why it's safe.
*/
error = wait_on_page_locked_killable(page);
...
}
...
would_block:
error = -EAGAIN;
...
---
io_write
---
...
/* file path doesn't support NOWAIT for non-direct_IO */
if (force_nonblock && !(kiocb->ki_flags & IOCB_DIRECT) &&
(req->flags & REQ_F_ISREG))
goto copy_iov;
...
copy_iov:
ret = io_setup_async_rw(req, io_size, iovec,
inline_vecs, &iter);
...
/* any defer here is final, must blocking retry */
if (!(req->flags & REQ_F_NOWAIT) &&
!file_can_poll(req->file))
req->flags |= REQ_F_MUST_PUNT;
return -EAGAIN;
...
buffered write IO doesn't support non blocking !
See, xfs_file_buffered_aio_write and ext4_buffered_write_iter, which would return -EOPNOTSUPP for IOCB_NOWAIT.
blk_mq_make_request
--
rq = __blk_mq_alloc_request(&data);
if (unlikely(!rq)) {
rq_qos_cleanup(q, bio);
if (bio->bi_opf & REQ_NOWAIT)
bio_wouldblock_error(bio);
---
bio_set_flag(bio, BIO_QUIET);
bio->bi_status = BLK_STS_AGAIN;
bio_endio(bio);
---
goto queue_exit;
}
--
Regarding to the poll handler, refer to the poll is not polling
Usually, the async work will be punt to io-wq, io_wq_submit_work will do that.
io_wq_submit_work
---
/* if NO_CANCEL is set, we must still run the work */
if ((work->flags & (IO_WQ_WORK_CANCEL|IO_WQ_WORK_NO_CANCEL)) ==
IO_WQ_WORK_CANCEL) {
ret = -ECANCELED;
}
if (!ret) {
do {
ret = io_issue_sqe(req, NULL, false);
/*
* We can get EAGAIN for polled IO even though we're
* forcing a sync submission from here, since we can't
* wait for request slots on the block side.
*/
if (ret != -EAGAIN)
break;
cond_resched();
} while (1);
}
---
Linked sqes provide a way to describe dependencies between a sequence
of sqes within the greater submission ring, where each sqe execution
depends on the successful completion of the previous sqe.
submit
sqe(REQ_F_LINK) -> sqe(REQ_F_LINK) -> sqe(REQ_F_LINK) -> sqe
io_submit_sqe
---
if (*link) {
struct io_kiocb *head = *link;
...
if (io_alloc_async_ctx(req))
return -EAGAIN;
ret = io_req_defer_prep(req, sqe);
...
list_add_tail(&req->link_list, &head->link_list);
/* last request of a link, enqueue the link */
if (!(req->flags & (REQ_F_LINK | REQ_F_HARDLINK))) {
io_queue_link_head(head);
---
if (unlikely(req->flags & REQ_F_FAIL_LINK)) {
io_cqring_add_event(req, -ECANCELED);
io_double_put_req(req);
} else
io_queue_sqe(req, NULL);
---
*link = NULL;
}
} else {
...
if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) {
req->flags |= REQ_F_LINK_HEAD;
INIT_LIST_HEAD(&req->link_list);
if (io_alloc_async_ctx(req))
return -EAGAIN;
ret = io_req_defer_prep(req, sqe);
if (ret)
req->flags |= REQ_F_FAIL_LINK;
*link = req;
} else {
io_queue_sqe(req, sqe);
}
}
---
completion
io_free_req()
---
io_req_find_next(req, &nxt);
__io_free_req(req);
if (nxt)
io_queue_async_work(nxt);
---
io_req_find_next()
-> io_fail_links() //REQ_F_FAIL_LINK
it will fail all of the linked sqes
-> io_req_link_next()
get next req and set REQ_F_LINK_HEAD
io_do_iopoll()
-> io_iopoll_complete()
-> io_req_multi_free()
---
if ((req->flags & REQ_F_LINK_HEAD) || io_is_fallback_req(req))
return false;
---
-> io_free_req()
io_complete_rw()
---
io_complete_rw_common(kiocb, res);
io_put_req(req);
-> io_free_req()
---
How does the userland application know its IO has been completed ?
There are two points here,
nvme_irq()
-> nvme_process_cq()
-> nvme_handle_cqe()
-> nvme_end_request()
-> blk_mq_complete_request()
blk_poll()
---
// For the device that doesn't support polling, just return 0 here.
// generic_make_request_checks would clear REQ_HIPRI if device doesn't
// support polling to ensure bio is issued to correct queue.
if (!blk_qc_t_valid(cookie) ||
!test_bit(QUEUE_FLAG_POLL, &q->queue_flags))
return 0;
if (current->plug)
blk_flush_plug_list(current->plug, false);
hctx = q->queue_hw_ctx[blk_qc_t_to_queue_num(cookie)];
...
state = current->state;
do {
int ret;
hctx->poll_invoked++;
ret = q->mq_ops->poll(hctx);
---
-> nvme_poll()
-> nvme_process_cq()
---
if (signal_pending_state(state, current))
__set_current_state(TASK_RUNNING);
if (current->state == TASK_RUNNING)
return 1;
if (ret < 0 || !spin)
break;
cpu_relax();
} while (!need_resched());
__set_current_state(TASK_RUNNING);
---
direct-IO + async
blk_update_request()
-> bio_endio()
-> blkdev_bio_end_io()
-> dio->iocb->ki_complete()
io_complete_rw()
-> io_complete_rw_common()>
for the buffered IO
io_issue_sqe()
-> io_read()
---
if (req->file->f_op->read_iter)
ret2 = call_read_iter(req->file, kiocb, &iter);
else
ret2 = loop_rw_iter(READ, req->file, kiocb, &iter);
/* Catch -EAGAIN return for forced non-blocking submission */
if (!force_nonblock || ret2 != -EAGAIN) {
kiocb_done(kiocb, ret2);
}
---
kiocb_done()
--
if (ret >= 0 && kiocb->ki_complete == io_complete_rw)
// For the successful buffered IO, .read_iter() would
// return the bytes that need to be copied.
io_complete_rw(kiocb, ret, 0);
-> io_complete_rw_common()
else
io_rw_done(kiocb, ret);
--
io_rw_done()
---
switch (ret) {
// For the successful async io, it would return here
case -EIOCBQUEUED:
break;
...
default:
kiocb->ki_complete(kiocb, ret, 0);
}
---
io_complete_rw_common
-> __io_cqring_add_event()
---
spin_lock_irqsave(&ctx->completion_lock, flags);
__io_cqring_fill_event(req, res, cflags);
io_commit_cqring(ctx);
-> __io_commit_cqring()
---
/* order cqe stores with ring update */
smp_store_release(&rings->cq.tail, ctx->cached_cq_tail);
//Wake up the io_uring_poll
if (wq_has_sleeper(&ctx->cq_wait)) {
wake_up_interruptible(&ctx->cq_wait);
kill_fasync(&ctx->cq_fasync, SIGIO, POLL_IN);
}
---
spin_unlock_irqrestore(&ctx->completion_lock, flags);
io_cqring_ev_posted(ctx);
---
io_cqring_ev_posted()
---
//wake up the task wait on io_uring_enter() -> io_cqring_wait()
if (waitqueue_active(&ctx->wait))
wake_up(&ctx->wait);
if (waitqueue_active(&ctx->sqo_wait))
wake_up(&ctx->sqo_wait);
//The application could sleep on the event fd
if (io_should_trigger_evfd(ctx))
eventfd_signal(ctx->cq_ev_fd, 1);
---
- io_uring_enter
- io_sq_thread
io_iopoll_getevents()
---
while (!list_empty(&ctx->poll_list) && !need_resched()) {
int ret;
ret = io_do_iopoll(ctx, nr_events, min);
if (ret < 0)
return ret;
if (!min || *nr_events >= min)
return 0;
}
---
The ctx->poll_list is filled by
io_issue_sqe()
-> io_iopoll_req_issued() //IORING_SETUP_IOPOLL
---
/*
* For fast devices, IO may have already completed. If it has, add
* it to the front so we find it first.
*/
if (READ_ONCE(req->iopoll_completed))
list_add(&req->list, &ctx->poll_list);
else
list_add_tail(&req->list, &ctx->poll_list);
// Why we need this ? When io_sq_thread is setup, io_uring_enter won't
// issue the sqes by itself.
if ((ctx->flags & IORING_SETUP_SQPOLL) &&
wq_has_sleeper(&ctx->sqo_wait))
wake_up(&ctx->sqo_wait);
---
io_do_iopoll()
---
/*
* Only spin for completions if we don't have multiple devices hanging
* off our complete list, and we're under the requested amount.
*/
spin = !ctx->poll_multi_file && *nr_events < min;
ret = 0;
list_for_each_entry_safe(req, tmp, &ctx->poll_list, list) {
struct kiocb *kiocb = &req->rw.kiocb;
// req->iopoll_completed is modified by io_complete_rw_iopoll
// io_complete_rw_iopoll could be invoked by blk_poll path or
// normal irq completion path.
if (READ_ONCE(req->iopoll_completed)) {
list_move_tail(&req->list, &done);
continue;
}
//Note !!!, the condition of break here is non-empty done
if (!list_empty(&done))
break;
// the 'spin' here would make the blk_poll loop
ret = kiocb->ki_filp->f_op->iopoll(kiocb, spin);
...
if (ret && spin)
spin = false;
ret = 0;
}
if (!list_empty(&done))
io_iopoll_complete(ctx, nr_events, &done);
return ret;
}
io_iopoll_complete()
---
rb.to_free = rb.need_iter = 0;
while (!list_empty(done)) {
int cflags = 0;
req = list_first_entry(done, struct io_kiocb, list);
...
list_del(&req->list);
...
__io_cqring_fill_event(req, req->result, cflags);
(*nr_events)++;
if (refcount_dec_and_test(&req->refs) &&
!io_req_multi_free(&rb, req))
io_free_req(req);
}
io_commit_cqring(ctx);
// io_uring_enter could wait the completion of IOs even if there is sq thread
if (ctx->flags & IORING_SETUP_SQPOLL)
io_cqring_ev_posted(ctx);
---
io_arm_poll_handler()
---
ipt.pt._qproc = io_async_queue_proc;
ret = __io_arm_poll_handler(req, &apoll->poll, &ipt, mask,
io_async_wake);
---
It will install a wait_queue_entry on the wait_queue_head_t of underlying file
with a wake up function io_async_wake
tcp_poll
-> sock_poll_wait()
-> poll_wait(filp, &sock->wq.wait, p)
-> p->_qproc(filp, wait_address, p)
io_async_queue_proc()
-> __io_queue_proc()
---
pt->error = 0;
poll->head = head;
add_wait_queue(head, &poll->wait);
---
tcp_data_ready()
-> sk->sk_data_ready()
sock_def_readable
---
rcu_read_lock();
wq = rcu_dereference(sk->sk_wq);
if (skwq_has_sleeper(wq))
wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLPRI |
EPOLLRDNORM | EPOLLRDBAND);
...
rcu_read_unlock();
---
The wakeup callback will be invoked at this moment, in our case, it is io_async_wake
io_async_wake
-> __io_async_wake
---
list_del_init(&poll->wait.entry);
tsk = req->task;
req->result = mask;
init_task_work(&req->task_work, func);
ret = task_work_add(tsk, &req->task_work, true);
...
wake_up_process(tsk);
---
The interesting thing is that the task is still executed by the original task who
arm the poll handler. task_work_run has been hooked on
LWN io-wq
Hashed work
A call to io_wq_enqueue() adds the work to the queue for future execution.
The io_wq_enqueue_hashed() variant, instead, is one of the reasons for the
creation of new mechanism; it guarantees that no two jobs enqueued with the
same val will run concurrently. If an application submits multiple buffered
I/O requests for a single file, they should not be run concurrently or they
are likely to just block each other via lock contention. Buffered I/O on
different files can and should run concurrently, though. "Hashed" work entries
make it easy for io_uring to manage that concurrency in an optimal way.
And comment about the previous async_list to optimize sequential read/write IO
commit 31b515106428b9717d2b6475b6f6182cf231b1e6
Author: Jens Axboe
Let's look at how does the hash work implement in io-wq
io_prep_async_work()
---
if (req->flags & REQ_F_ISREG) {
if (def->hash_reg_file)
io_wq_hash_work(&req->work, file_inode(req->file));
} else {
if (def->unbound_nonreg_file)
req->work.flags |= IO_WQ_WORK_UNBOUND;
}
---
Refer to io_wqe_insert_work
io_wqe->work_list - [nw] - [h0-w0] - [h0-w1] - [h0-w2] - [nw] - [nw]
^
|
+------------------------------+
|
-> hash_tail[hash] //tail of a hash list, every new work is inserted here
-> hash_map set if there is running work on that hash bucket
The hash bucket number is 64 on 64bits system