summary refs log tree commit diff
path: root/io_uring/src/uring.rs
diff options
context:
space:
mode:
authorDylan Reid <dgreid@chromium.org>2020-03-13 13:40:40 -0700
committerCommit Bot <commit-bot@chromium.org>2020-04-05 21:32:20 +0000
commita9a1d0227713d22df6262ca99dd622281e8dc9bf (patch)
tree3f99d11aa72dc30d1b05e3e1b3d96d3391c5090b /io_uring/src/uring.rs
parent252d5b3cf3fd7a48fe9d610b59e3d6da9f2c6fe9 (diff)
downloadcrosvm-a9a1d0227713d22df6262ca99dd622281e8dc9bf.tar
crosvm-a9a1d0227713d22df6262ca99dd622281e8dc9bf.tar.gz
crosvm-a9a1d0227713d22df6262ca99dd622281e8dc9bf.tar.bz2
crosvm-a9a1d0227713d22df6262ca99dd622281e8dc9bf.tar.lz
crosvm-a9a1d0227713d22df6262ca99dd622281e8dc9bf.tar.xz
crosvm-a9a1d0227713d22df6262ca99dd622281e8dc9bf.tar.zst
crosvm-a9a1d0227713d22df6262ca99dd622281e8dc9bf.zip
Add io_uring interfaces
Provide a low-level interface to operating the new io_uring interface.

This is an unsafe interface with the basic operations of setting up the
ring, adding to it, removing from it, and polling it.

This will be followed by a safe interface layer on top of this code,
then by additions to the asynchronous executor that allow for
asynchronously doing multiple operations to files from one context.

Change-Id: I71f7ffb04ce8cb4da470deda9aee768ab95d3d98
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2124009
Reviewed-by: Dylan Reid <dgreid@chromium.org>
Reviewed-by: Zach Reizner <zachr@chromium.org>
Tested-by: Dylan Reid <dgreid@chromium.org>
Tested-by: kokoro <noreply+kokoro@google.com>
Commit-Queue: Dylan Reid <dgreid@chromium.org>
Diffstat (limited to 'io_uring/src/uring.rs')
-rw-r--r--io_uring/src/uring.rs772
1 files changed, 772 insertions, 0 deletions
diff --git a/io_uring/src/uring.rs b/io_uring/src/uring.rs
new file mode 100644
index 0000000..0a51df1
--- /dev/null
+++ b/io_uring/src/uring.rs
@@ -0,0 +1,772 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use std::fmt;
+use std::fs::File;
+use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
+use std::ptr::null_mut;
+use std::sync::atomic::{AtomicU32, Ordering};
+
+use sys_util::{MemoryMapping, WatchingEvents};
+
+use crate::bindings::*;
+use crate::syscalls::*;
+
+/// Holds per-operation, user specified data. The usage is up to the caller. The most common use is
+/// for callers to identify each request.
+pub type UserData = u64;
+
+#[derive(Debug)]
+pub enum Error {
+    /// The call to `io_uring_enter` failed with the given errno.
+    RingEnter(libc::c_int),
+    /// The call to `io_uring_setup` failed with the given errno.
+    Setup(libc::c_int),
+    /// Failed to map the completion ring.
+    MappingCompleteRing(sys_util::MmapError),
+    /// Failed to map the submit ring.
+    MappingSubmitRing(sys_util::MmapError),
+    /// Failed to map submit entries.
+    MappingSubmitEntries(sys_util::MmapError),
+    /// Too many ops are already queued.
+    NoSpace,
+}
+pub type Result<T> = std::result::Result<T, Error>;
+
+impl fmt::Display for Error {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        use self::Error::*;
+
+        match self {
+            RingEnter(e) => write!(f, "Failed to enter io uring {}", e),
+            Setup(e) => write!(f, "Failed to setup io uring {}", e),
+            MappingCompleteRing(e) => write!(f, "Failed to mmap completion ring {}", e),
+            MappingSubmitRing(e) => write!(f, "Failed to mmap submit ring {}", e),
+            MappingSubmitEntries(e) => write!(f, "Failed to mmap submit entries {}", e),
+            NoSpace => write!(
+                f,
+                "No space for more ring entries, try increasing the size passed to `new`",
+            ),
+        }
+    }
+}
+
+/// Basic statistics about the operations that have been submitted to the uring.
+#[derive(Default)]
+pub struct URingStats {
+    total_enter_calls: u64, // Number of times the uring has been entered.
+    total_ops: u64,         // Total ops submitted to io_uring.
+    total_complete: u64,    // Total ops completed by io_uring.
+}
+
+/// Unsafe wrapper for the kernel's io_uring interface. Allows for queueing multiple I/O operations
+/// to the kernel and asynchronously handling the completion of these operations.
+/// Use the various `add_*` functions to configure operations, then call `wait` to start
+/// the operations and get any completed results. Each op is given a u64 user_data argument that is
+/// used to identify the result when returned in the iterator provided by `wait`.
+///
+/// # Example polling an FD for readable status.
+///
+/// ```
+/// # use std::fs::File;
+/// # use std::os::unix::io::AsRawFd;
+/// # use std::path::Path;
+/// # use sys_util::WatchingEvents;
+/// # use io_uring::URingContext;
+/// let f = File::open(Path::new("/dev/zero")).unwrap();
+/// let mut uring = URingContext::new(16).unwrap();
+/// uring
+///   .add_poll_fd(f.as_raw_fd(), WatchingEvents::empty().set_read(), 454)
+/// .unwrap();
+/// let (user_data, res) = uring.wait().unwrap().next().unwrap();
+/// assert_eq!(user_data, 454 as UserData);
+/// assert_eq!(res.unwrap(), 1 as i32);
+///
+/// ```
+pub struct URingContext {
+    ring_file: File, // Holds the io_uring context FD returned from io_uring_setup.
+    submit_ring: SubmitQueueState,
+    submit_queue_entries: SubmitQueueEntries,
+    complete_ring: CompleteQueueState,
+    io_vecs: Vec<libc::iovec>,
+    in_flight: usize, // The number of pending operations.
+    added: usize,     // The number of ops added since the last call to `io_uring_enter`.
+    stats: URingStats,
+}
+
+impl URingContext {
+    /// Creates a `URingContext` where the underlying uring has a space for `num_entries`
+    /// simultaneous operations.
+    pub fn new(num_entries: usize) -> Result<URingContext> {
+        let ring_params = io_uring_params::default();
+        // The below unsafe block isolates the creation of the URingContext. Each step on it's own
+        // is unsafe. Using the uring FD for the mapping and the offsets returned by the kernel for
+        // base addresses maintains safety guarantees assuming the kernel API guarantees are
+        // trusted.
+        unsafe {
+            // Safe because the kernel is trusted to only modify params and `File` is created with
+            // an FD that it takes complete ownership of.
+            let fd = io_uring_setup(num_entries, &ring_params).map_err(Error::Setup)?;
+            let ring_file = File::from_raw_fd(fd);
+
+            // Mmap the submit and completion queues.
+            // Safe because we trust the kernel to set valid sizes in `io_uring_setup` and any error
+            // is checked.
+            let submit_ring = SubmitQueueState::new(
+                MemoryMapping::from_fd_offset_populate(
+                    &ring_file,
+                    ring_params.sq_off.array as usize
+                        + ring_params.sq_entries as usize * std::mem::size_of::<u32>(),
+                    u64::from(IORING_OFF_SQ_RING),
+                )
+                .map_err(Error::MappingSubmitRing)?,
+                &ring_params,
+            );
+
+            let submit_queue_entries = SubmitQueueEntries {
+                mmap: MemoryMapping::from_fd_offset_populate(
+                    &ring_file,
+                    ring_params.sq_entries as usize * std::mem::size_of::<io_uring_sqe>(),
+                    u64::from(IORING_OFF_SQES),
+                )
+                .map_err(Error::MappingSubmitEntries)?,
+                len: ring_params.sq_entries as usize,
+            };
+
+            let complete_ring = CompleteQueueState::new(
+                MemoryMapping::from_fd_offset_populate(
+                    &ring_file,
+                    ring_params.cq_off.cqes as usize
+                        + ring_params.cq_entries as usize * std::mem::size_of::<io_uring_cqe>(),
+                    u64::from(IORING_OFF_CQ_RING),
+                )
+                .map_err(Error::MappingCompleteRing)?,
+                &ring_params,
+            );
+
+            Ok(URingContext {
+                ring_file,
+                submit_ring,
+                submit_queue_entries,
+                complete_ring,
+                io_vecs: vec![
+                    libc::iovec {
+                        iov_base: null_mut(),
+                        iov_len: 0
+                    };
+                    num_entries
+                ],
+                added: 0,
+                in_flight: 0,
+                stats: Default::default(),
+            })
+        }
+    }
+
+    // Call `f` with the next available sqe or return an error if none are available.
+    // After `f` returns, the sqe is appended to the kernel's queue.
+    fn prep_next_sqe<F>(&mut self, mut f: F) -> Result<()>
+    where
+        F: FnMut(&mut io_uring_sqe, &mut libc::iovec),
+    {
+        // Find the next free submission entry in the submit ring and fill it with an iovec.
+        // The below raw pointer derefs are safe because the memory the pointers use lives as long
+        // as the mmap in self.
+        let tail = self.submit_ring.pointers.tail(Ordering::Relaxed);
+        let next_tail = tail.wrapping_add(1);
+        if next_tail == self.submit_ring.pointers.head(Ordering::Acquire) {
+            return Err(Error::NoSpace);
+        }
+        // `tail` is the next sqe to use.
+        let index = (tail & self.submit_ring.ring_mask) as usize;
+        let sqe = self.submit_queue_entries.get_mut(index).unwrap();
+
+        f(sqe, &mut self.io_vecs[index]);
+
+        // Tells the kernel to use the new index when processing the entry at that index.
+        self.submit_ring.set_array_entry(index, index as u32);
+        // Ensure the above writes to sqe are seen before the tail is updated.
+        // set_tail uses Release ordering when storing to the ring.
+        self.submit_ring.pointers.set_tail(next_tail);
+
+        self.added += 1;
+
+        Ok(())
+    }
+
+    unsafe fn add_rw_op(
+        &mut self,
+        ptr: *const u8,
+        len: usize,
+        fd: RawFd,
+        offset: u64,
+        user_data: UserData,
+        op: u8,
+    ) -> Result<()> {
+        self.prep_next_sqe(|sqe, iovec| {
+            iovec.iov_base = ptr as *const libc::c_void as *mut _;
+            iovec.iov_len = len;
+            sqe.opcode = op;
+            sqe.addr = iovec as *const _ as *const libc::c_void as u64;
+            sqe.len = 1;
+            sqe.__bindgen_anon_1.off = offset;
+            sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
+            sqe.ioprio = 0;
+            sqe.user_data = user_data;
+            sqe.flags = 0;
+            sqe.fd = fd;
+        })?;
+
+        Ok(())
+    }
+
+    /// Asynchronously writes to `fd` from the address given in `ptr`.
+    /// # Safety
+    /// `add_write` will write up to `len` bytes of data from the address given by `ptr`. This is
+    /// only safe if the caller guarantees that the memory lives until the transaction is complete
+    /// and that completion has been returned from the `wait` function. In addition there must not
+    /// be other references to the data pointed to by `ptr` until the operation completes.  Ensure
+    /// that the fd remains open until the op completes as well.
+    pub unsafe fn add_write(
+        &mut self,
+        ptr: *const u8,
+        len: usize,
+        fd: RawFd,
+        offset: u64,
+        user_data: UserData,
+    ) -> Result<()> {
+        self.add_rw_op(ptr, len, fd, offset, user_data, IORING_OP_WRITEV as u8)
+    }
+
+    /// Asynchronously reads from `fd` to the address given in `ptr`.
+    /// # Safety
+    /// `add_read` will write up to `len` bytes of data to the address given by `ptr`. This is only
+    /// safe if the caller guarantees there are no other references to that memory and that the
+    /// memory lives until the transaction is complete and that completion has been returned from
+    /// the `wait` function.  In addition there must not be any mutable references to the data
+    /// pointed to by `ptr` until the operation completes.  Ensure that the fd remains open until
+    /// the op completes as well.
+    pub unsafe fn add_read(
+        &mut self,
+        ptr: *mut u8,
+        len: usize,
+        fd: RawFd,
+        offset: u64,
+        user_data: UserData,
+    ) -> Result<()> {
+        self.add_rw_op(ptr, len, fd, offset, user_data, IORING_OP_READV as u8)
+    }
+
+    /// Syncs all completed operations, the ordering with in-flight async ops is not
+    /// defined.
+    pub fn add_fsync(&mut self, fd: RawFd, user_data: UserData) -> Result<()> {
+        self.prep_next_sqe(|sqe, _iovec| {
+            sqe.opcode = IORING_OP_FSYNC as u8;
+            sqe.fd = fd;
+            sqe.user_data = user_data;
+
+            sqe.addr = 0;
+            sqe.len = 0;
+            sqe.__bindgen_anon_1.off = 0;
+            sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
+            sqe.__bindgen_anon_2.rw_flags = 0;
+            sqe.ioprio = 0;
+            sqe.flags = 0;
+        })
+    }
+
+    /// See the usage of `fallocate`, this asynchronously performs the same operations.
+    pub fn add_fallocate(
+        &mut self,
+        fd: RawFd,
+        offset: u64,
+        len: usize,
+        mode: u64,
+        user_data: UserData,
+    ) -> Result<()> {
+        // Note that len for fallocate in passed in the addr field of the sqe and the mode uses the
+        // len field.
+        self.prep_next_sqe(|sqe, _iovec| {
+            sqe.opcode = IORING_OP_FALLOCATE as u8;
+
+            sqe.fd = fd;
+            sqe.addr = len as u64;
+            sqe.len = mode as u32;
+            sqe.__bindgen_anon_1.off = offset;
+            sqe.user_data = user_data;
+
+            sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
+            sqe.__bindgen_anon_2.rw_flags = 0;
+            sqe.ioprio = 0;
+            sqe.flags = 0;
+        })
+    }
+
+    /// Adds an FD to be polled based on the given flags.
+    /// The user must keep the FD open until the operation completion is returned from
+    /// `wait`.
+    /// Note that io_uring is always a one shot poll. After the fd is returned, it must be re-added
+    /// to get future events.
+    pub fn add_poll_fd(
+        &mut self,
+        fd: RawFd,
+        events: WatchingEvents,
+        user_data: UserData,
+    ) -> Result<()> {
+        self.prep_next_sqe(|sqe, _iovec| {
+            sqe.opcode = IORING_OP_POLL_ADD as u8;
+            sqe.fd = fd;
+            sqe.user_data = user_data;
+            sqe.__bindgen_anon_2.poll_events = events.get_raw() as u16;
+
+            sqe.addr = 0;
+            sqe.len = 0;
+            sqe.__bindgen_anon_1.off = 0;
+            sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
+            sqe.ioprio = 0;
+            sqe.flags = 0;
+        })
+    }
+
+    /// Removes an FD that was previously added with `add_poll_fd`.
+    pub fn remove_poll_fd(
+        &mut self,
+        fd: RawFd,
+        events: WatchingEvents,
+        user_data: UserData,
+    ) -> Result<()> {
+        self.prep_next_sqe(|sqe, _iovec| {
+            sqe.opcode = IORING_OP_POLL_REMOVE as u8;
+            sqe.fd = fd;
+            sqe.user_data = user_data;
+            sqe.__bindgen_anon_2.poll_events = events.get_raw() as u16;
+
+            sqe.addr = 0;
+            sqe.len = 0;
+            sqe.__bindgen_anon_1.off = 0;
+            sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0;
+            sqe.ioprio = 0;
+            sqe.flags = 0;
+        })
+    }
+
+    /// Sends operations added with the `add_*` functions to the kernel.
+    pub fn submit(&mut self) -> Result<()> {
+        self.in_flight += self.added;
+        self.stats.total_ops = self.stats.total_ops.wrapping_add(self.added as u64);
+        if self.added > 0 {
+            self.stats.total_enter_calls = self.stats.total_enter_calls.wrapping_add(1);
+            unsafe {
+                // Safe because the only memory modified is in the completion queue.
+                io_uring_enter(self.ring_file.as_raw_fd(), self.added as u64, 1, 0)
+                    .map_err(Error::RingEnter)?;
+            }
+        }
+        self.added = 0;
+
+        Ok(())
+    }
+
+    /// Sends operations added with the `add_*` functions to the kernel and return an iterator to any
+    /// completed operations. `wait` blocks until at least one completion is ready.  If called
+    /// without any new events added, this simply waits for any existing events to complete and
+    /// returns as soon an one or more is ready.
+    pub fn wait<'a>(
+        &'a mut self,
+    ) -> Result<impl Iterator<Item = (UserData, std::io::Result<i32>)> + 'a> {
+        let completed = self.complete_ring.num_completed();
+        self.stats.total_complete = self.stats.total_complete.wrapping_add(completed as u64);
+        self.in_flight -= completed;
+        self.in_flight += self.added;
+        self.stats.total_ops = self.stats.total_ops.wrapping_add(self.added as u64);
+        if self.in_flight > 0 {
+            unsafe {
+                self.stats.total_enter_calls = self.stats.total_enter_calls.wrapping_add(1);
+                // Safe because the only memory modified is in the completion queue.
+                io_uring_enter(
+                    self.ring_file.as_raw_fd(),
+                    self.added as u64,
+                    1,
+                    IORING_ENTER_GETEVENTS,
+                )
+                .map_err(Error::RingEnter)?;
+            }
+        }
+        self.added = 0;
+
+        // The CompletionQueue will iterate all completed ops.
+        Ok(&mut self.complete_ring)
+    }
+}
+
+impl AsRawFd for URingContext {
+    fn as_raw_fd(&self) -> RawFd {
+        self.ring_file.as_raw_fd()
+    }
+}
+
+struct SubmitQueueEntries {
+    mmap: MemoryMapping,
+    len: usize,
+}
+
+impl SubmitQueueEntries {
+    fn get_mut(&mut self, index: usize) -> Option<&mut io_uring_sqe> {
+        if index >= self.len {
+            return None;
+        }
+        unsafe {
+            // Safe because the mut borrow of self resticts to one mutable reference at a time and
+            // we trust that the kernel has returned enough memory in io_uring_setup and mmap.
+            Some(&mut *(self.mmap.as_ptr() as *mut io_uring_sqe).add(index))
+        }
+    }
+}
+
+struct SubmitQueueState {
+    _mmap: MemoryMapping,
+    pointers: QueuePointers,
+    ring_mask: u32,
+    array: *mut u32,
+}
+
+impl SubmitQueueState {
+    // # Safety
+    // Safe iff `mmap` is created by mapping from a uring FD at the SQ_RING offset and params is
+    // the params struct passed to io_uring_setup.
+    unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> SubmitQueueState {
+        let ptr = mmap.as_ptr();
+        // Transmutes are safe because a u32 is atomic on all supported architectures and the
+        // pointer will live until after self is dropped because the mmap is owned.
+        let head = ptr.add(params.sq_off.head as usize) as *const AtomicU32;
+        let tail = ptr.add(params.sq_off.tail as usize) as *const AtomicU32;
+        // This offset is guaranteed to be within the mmap so unwrap the result.
+        let ring_mask = mmap.read_obj(params.sq_off.ring_mask as usize).unwrap();
+        let array = ptr.add(params.sq_off.array as usize) as *mut u32;
+        SubmitQueueState {
+            _mmap: mmap,
+            pointers: QueuePointers { head, tail },
+            ring_mask,
+            array,
+        }
+    }
+
+    // Sets the kernel's array entry at the given `index` to `value`.
+    fn set_array_entry(&self, index: usize, value: u32) {
+        // Safe because self being constructed from the correct mmap guaratees that the memory is
+        // valid to written.
+        unsafe {
+            std::ptr::write_volatile(self.array.add(index), value as u32);
+        }
+    }
+}
+
+struct CompleteQueueState {
+    mmap: MemoryMapping,
+    pointers: QueuePointers,
+    ring_mask: u32,
+    cqes_offset: u32,
+    completed: usize,
+}
+
+impl CompleteQueueState {
+    /// # Safety
+    /// Safe iff `mmap` is created by mapping from a uring FD at the CQ_RING offset and params is
+    /// the params struct passed to io_uring_setup.
+    unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> CompleteQueueState {
+        let ptr = mmap.as_ptr();
+        let head = ptr.add(params.cq_off.head as usize) as *const AtomicU32;
+        let tail = ptr.add(params.cq_off.tail as usize) as *const AtomicU32;
+        let ring_mask = mmap.read_obj(params.cq_off.ring_mask as usize).unwrap();
+        CompleteQueueState {
+            mmap,
+            pointers: QueuePointers { head, tail },
+            ring_mask,
+            cqes_offset: params.cq_off.cqes,
+            completed: 0,
+        }
+    }
+
+    fn get_cqe(&self, head: u32) -> &io_uring_cqe {
+        unsafe {
+            // Safe because we trust that the kernel has returned enough memory in io_uring_setup
+            // and mmap and index is checked within range by the ring_mask.
+            let cqes = (self.mmap.as_ptr() as *const u8).add(self.cqes_offset as usize)
+                as *const io_uring_cqe;
+
+            let index = head & self.ring_mask;
+
+            &*cqes.add(index as usize)
+        }
+    }
+
+    fn num_completed(&mut self) -> usize {
+        std::mem::replace(&mut self.completed, 0)
+    }
+}
+
+// Return the completed ops with their result.
+impl Iterator for CompleteQueueState {
+    type Item = (UserData, std::io::Result<i32>);
+
+    fn next(&mut self) -> Option<Self::Item> {
+        // Safe because the pointers to the atomics are valid and the cqe must be in range
+        // because the kernel provided mask is applied to the index.
+        let head = self.pointers.head(Ordering::Relaxed);
+
+        // Synchronize the read of tail after the read of head.
+        if head == self.pointers.tail(Ordering::Acquire) {
+            return None;
+        }
+
+        self.completed += 1;
+
+        let cqe = self.get_cqe(head);
+        let user_data = cqe.user_data;
+        let res = cqe.res;
+
+        // Store the new head and ensure the reads above complete before the kernel sees the
+        // update to head, `set_head` uses `Release` ordering
+        let new_head = head.wrapping_add(1);
+        self.pointers.set_head(new_head);
+
+        let io_res = match res {
+            r if r < 0 => Err(std::io::Error::from_raw_os_error(r)),
+            r => Ok(r),
+        };
+        Some((user_data, io_res))
+    }
+}
+
+struct QueuePointers {
+    head: *const AtomicU32,
+    tail: *const AtomicU32,
+}
+
+impl QueuePointers {
+    // Loads the tail pointer atomically with the given ordering.
+    fn tail(&self, ordering: Ordering) -> u32 {
+        // Safe because self being constructed from the correct mmap guaratees that the memory is
+        // valid to read.
+        unsafe { (*self.tail).load(ordering) }
+    }
+
+    // Stores the new value of the tail in the submit queue. This allows the kernel to start
+    // processing entries that have been added up until the given tail pointer.
+    // Always stores with release ordering as that is the only valid way to use the pointer.
+    fn set_tail(&self, next_tail: u32) {
+        // Safe because self being constructed from the correct mmap guaratees that the memory is
+        // valid to read and it's used as an atomic to cover mutability concerns.
+        unsafe { (*self.tail).store(next_tail, Ordering::Release) }
+    }
+
+    // Loads the head pointer atomically with the given ordering.
+    fn head(&self, ordering: Ordering) -> u32 {
+        // Safe because self being constructed from the correct mmap guaratees that the memory is
+        // valid to read.
+        unsafe { (*self.head).load(ordering) }
+    }
+
+    // Stores the new value of the head in the submit queue. This allows the kernel to start
+    // processing entries that have been added up until the given head pointer.
+    // Always stores with release ordering as that is the only valid way to use the pointer.
+    fn set_head(&self, next_head: u32) {
+        // Safe because self being constructed from the correct mmap guaratees that the memory is
+        // valid to read and it's used as an atomic to cover mutability concerns.
+        unsafe { (*self.head).store(next_head, Ordering::Release) }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::fs::OpenOptions;
+    use std::io::Write;
+    use std::path::{Path, PathBuf};
+    use std::time::Duration;
+
+    use sys_util::PollContext;
+    use tempfile::TempDir;
+
+    use super::*;
+
+    fn append_file_name(path: &Path, name: &str) -> PathBuf {
+        let mut joined = path.to_path_buf();
+        joined.push(name);
+        joined
+    }
+
+    #[test]
+    fn read_one_block_at_a_time() {
+        let tempdir = TempDir::new().unwrap();
+        let file_path = append_file_name(tempdir.path(), "test");
+        let queue_size = 128;
+
+        let mut uring = URingContext::new(queue_size).unwrap();
+        let mut buf = [0u8; 0x1000];
+        let f = OpenOptions::new()
+            .read(true)
+            .write(true)
+            .create(true)
+            .truncate(true)
+            .open(&file_path)
+            .unwrap();
+        f.set_len(0x1000 * 2).unwrap();
+
+        for i in 0..queue_size * 2 {
+            unsafe {
+                // Safe because the `wait` call waits until the kernel is done with`buf`.
+                let index = i as u64;
+                uring
+                    .add_read(
+                        buf.as_mut_ptr(),
+                        buf.len(),
+                        f.as_raw_fd(),
+                        (index % 2) * 0x1000,
+                        index,
+                    )
+                    .unwrap();
+                let (user_data, res) = uring.wait().unwrap().next().unwrap();
+                assert_eq!(user_data, i as UserData);
+                assert_eq!(res.unwrap(), buf.len() as i32);
+            }
+        }
+    }
+
+    #[test]
+    fn write_one_block() {
+        let tempdir = TempDir::new().unwrap();
+        let file_path = append_file_name(tempdir.path(), "test");
+
+        let mut uring = URingContext::new(16).unwrap();
+        let mut buf = [0u8; 4096];
+        let mut f = OpenOptions::new()
+            .read(true)
+            .write(true)
+            .create(true)
+            .truncate(true)
+            .open(&file_path)
+            .unwrap();
+        f.write(&buf).unwrap();
+        f.write(&buf).unwrap();
+
+        unsafe {
+            // Safe because the `wait` call waits until the kernel is done mutating `buf`.
+            uring
+                .add_write(buf.as_mut_ptr(), buf.len(), f.as_raw_fd(), 0, 55)
+                .unwrap();
+            let (user_data, res) = uring.wait().unwrap().next().unwrap();
+            assert_eq!(user_data, 55 as UserData);
+            assert_eq!(res.unwrap(), buf.len() as i32);
+        }
+    }
+
+    #[test]
+    fn write_one_submit_poll() {
+        let tempdir = TempDir::new().unwrap();
+        let file_path = append_file_name(tempdir.path(), "test");
+
+        let mut uring = URingContext::new(16).unwrap();
+        let mut buf = [0u8; 4096];
+        let mut f = OpenOptions::new()
+            .read(true)
+            .write(true)
+            .create(true)
+            .truncate(true)
+            .open(&file_path)
+            .unwrap();
+        f.write(&buf).unwrap();
+        f.write(&buf).unwrap();
+
+        let ctx: PollContext<u64> = PollContext::build_with(&[(&uring, 1)]).unwrap();
+        {
+            // Test that the uring context isn't readable before any events are complete.
+            let events = ctx.wait_timeout(Duration::from_millis(1)).unwrap();
+            assert!(events.iter_readable().next().is_none());
+        }
+
+        unsafe {
+            // Safe because the `wait` call waits until the kernel is done mutating `buf`.
+            uring
+                .add_write(buf.as_mut_ptr(), buf.len(), f.as_raw_fd(), 0, 55)
+                .unwrap();
+            uring.submit().unwrap();
+            // Poll for completion with epoll.
+            let events = ctx.wait().unwrap();
+            let event = events.iter_readable().next().unwrap();
+            assert_eq!(event.token(), 1);
+            let (user_data, res) = uring.wait().unwrap().next().unwrap();
+            assert_eq!(user_data, 55 as UserData);
+            assert_eq!(res.unwrap(), buf.len() as i32);
+        }
+    }
+    #[test]
+    fn fallocate_fsync() {
+        let tempdir = TempDir::new().unwrap();
+        let file_path = append_file_name(tempdir.path(), "test");
+
+        {
+            let buf = [0u8; 4096];
+            let mut f = OpenOptions::new()
+                .read(true)
+                .write(true)
+                .create(true)
+                .truncate(true)
+                .open(&file_path)
+                .unwrap();
+            f.write(&buf).unwrap();
+        }
+
+        let init_size = std::fs::metadata(&file_path).unwrap().len() as usize;
+        let set_size = init_size + 1024 * 1024 * 50;
+        let f = OpenOptions::new()
+            .read(true)
+            .write(true)
+            .create(true)
+            .open(&file_path)
+            .unwrap();
+
+        let mut uring = URingContext::new(16).unwrap();
+        uring
+            .add_fallocate(f.as_raw_fd(), 0, set_size, 0, 66)
+            .unwrap();
+        let (user_data, res) = uring.wait().unwrap().next().unwrap();
+        assert_eq!(user_data, 66 as UserData);
+        assert_eq!(res.unwrap(), 0 as i32);
+
+        uring.add_fsync(f.as_raw_fd(), 67).unwrap();
+        let (user_data, res) = uring.wait().unwrap().next().unwrap();
+        assert_eq!(user_data, 67 as UserData);
+        assert_eq!(res.unwrap(), 0 as i32);
+
+        uring
+            .add_fallocate(
+                f.as_raw_fd(),
+                init_size as u64,
+                set_size - init_size,
+                (libc::FALLOC_FL_PUNCH_HOLE | libc::FALLOC_FL_KEEP_SIZE) as u64,
+                68,
+            )
+            .unwrap();
+        let (user_data, res) = uring.wait().unwrap().next().unwrap();
+        assert_eq!(user_data, 68 as UserData);
+        assert_eq!(res.unwrap(), 0 as i32);
+
+        drop(f); // Close to ensure directory entires for metadata are updated.
+
+        let new_size = std::fs::metadata(&file_path).unwrap().len() as usize;
+        assert_eq!(new_size, set_size);
+    }
+
+    #[test]
+    fn dev_zero_readable() {
+        let f = File::open(Path::new("/dev/zero")).unwrap();
+        let mut uring = URingContext::new(16).unwrap();
+        uring
+            .add_poll_fd(f.as_raw_fd(), WatchingEvents::empty().set_read(), 454)
+            .unwrap();
+        let (user_data, res) = uring.wait().unwrap().next().unwrap();
+        assert_eq!(user_data, 454 as UserData);
+        assert_eq!(res.unwrap(), 1 as i32);
+    }
+}