summary refs log tree commit diff
diff options
context:
space:
mode:
authorDylan Reid <dgreid@chromium.org>2019-11-22 16:41:01 -0800
committerCommit Bot <commit-bot@chromium.org>2020-02-10 23:46:11 +0000
commit2cc138341dc601f3dfd3ebd4233a99b75ddb6bd0 (patch)
tree21c6a9fefed7104a2bcdb9121c99111ffd3bf729
parent5c51e052820f6f6a2e65d1bd02ff8eee6a3241a2 (diff)
downloadcrosvm-2cc138341dc601f3dfd3ebd4233a99b75ddb6bd0.tar
crosvm-2cc138341dc601f3dfd3ebd4233a99b75ddb6bd0.tar.gz
crosvm-2cc138341dc601f3dfd3ebd4233a99b75ddb6bd0.tar.bz2
crosvm-2cc138341dc601f3dfd3ebd4233a99b75ddb6bd0.tar.lz
crosvm-2cc138341dc601f3dfd3ebd4233a99b75ddb6bd0.tar.xz
crosvm-2cc138341dc601f3dfd3ebd4233a99b75ddb6bd0.tar.zst
crosvm-2cc138341dc601f3dfd3ebd4233a99b75ddb6bd0.zip
Add a cros_async crate.
This crate will house code using the new async/await features to be used
by other parts of crosvm.

Start the crate with a Future executor that runs tasks in a single
thread and allows futures that block on system file descriptors.

Change-Id: If77778ac056210dabbfc6e6e1e63df1c1b904a7f
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/1955045
Reviewed-by: Dylan Reid <dgreid@chromium.org>
Tested-by: Dylan Reid <dgreid@chromium.org>
Tested-by: kokoro <noreply+kokoro@google.com>
Commit-Queue: Dylan Reid <dgreid@chromium.org>
-rw-r--r--Cargo.toml1
-rw-r--r--cros_async/Cargo.toml16
-rw-r--r--cros_async/src/complete.rs101
-rw-r--r--cros_async/src/executor.rs179
-rw-r--r--cros_async/src/fd_executor.rs250
-rw-r--r--cros_async/src/lib.rs354
-rw-r--r--cros_async/src/select.rs110
-rw-r--r--cros_async/src/waker.rs42
8 files changed, 1053 insertions, 0 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 231e532..009a09f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -19,6 +19,7 @@ overflow-checks = true
 members = ["qcow_utils"]
 exclude = [
     "assertions",
+    "cros_async",
     "data_model",
     "rand_ish",
     "sync",
diff --git a/cros_async/Cargo.toml b/cros_async/Cargo.toml
new file mode 100644
index 0000000..4e62732
--- /dev/null
+++ b/cros_async/Cargo.toml
@@ -0,0 +1,16 @@
+[package]
+name = "cros_async"
+version = "0.1.0"
+authors = ["The Chromium OS Authors"]
+edition = "2018"
+
+[dependencies]
+libc = "*"
+paste = "*"
+pin-utils = "0.1.0-alpha.4"
+sys_util = { path = "../sys_util" }
+syscall_defines = { path = "../syscall_defines" }
+
+[dependencies.futures]
+version = "*"
+default-features = false
diff --git a/cros_async/src/complete.rs b/cros_async/src/complete.rs
new file mode 100644
index 0000000..8455066
--- /dev/null
+++ b/cros_async/src/complete.rs
@@ -0,0 +1,101 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// Need non-snake case so the macro can re-use type names for variables.
+#![allow(non_snake_case)]
+
+use std::future::Future;
+use std::pin::Pin;
+use std::task::Context;
+
+use futures::future::{maybe_done, FutureExt, MaybeDone};
+
+use crate::executor::{FutureList, FutureState, UnitFutures};
+
+// Macro-generate future combinators to allow for running different numbers of top-level futures in
+// this FutureList. Generates the implementation of `FutureList` for the completion types. For an
+// explicit example this is modeled after, see `UnitFutures`.
+macro_rules! generate {
+    ($(
+        $(#[$doc:meta])*
+        ($Complete:ident, <$($Fut:ident),*>),
+    )*) => ($(
+        $(#[$doc])*
+        #[must_use = "Combinations of futures don't do anything unless run in an executor."]
+        paste::item! {
+            pub(crate) struct $Complete<$($Fut: Future + Unpin),*> {
+                added_futures: UnitFutures,
+                $($Fut: MaybeDone<$Fut>,)*
+                $([<$Fut _state>]: FutureState,)*
+            }
+        }
+
+        impl<$($Fut: Future + Unpin),*> $Complete<$($Fut),*> {
+            paste::item! {
+                pub(crate) fn new($($Fut: $Fut),*) -> $Complete<$($Fut),*> {
+                    $Complete {
+                        added_futures: UnitFutures::new(),
+                        $($Fut: maybe_done($Fut),)*
+                        $([<$Fut _state>]: FutureState::new(),)*
+                    }
+                }
+            }
+        }
+
+        impl<$($Fut: Future + Unpin),*> FutureList for $Complete<$($Fut),*> {
+            type Output = ($($Fut::Output),*);
+
+            fn futures_mut(&mut self) -> &mut UnitFutures {
+                &mut self.added_futures
+            }
+
+            paste::item! {
+                fn poll_results(&mut self) -> Option<Self::Output> {
+                    let _ = self.added_futures.poll_results();
+
+                    let mut complete = true;
+                    $(
+                        if self.[<$Fut _state>].needs_poll.replace(false) {
+                            let mut ctx = Context::from_waker(&self.[<$Fut _state>].waker);
+                            // The future impls `Unpin`, use `poll_unpin` to avoid wrapping it in
+                            // `Pin` to call `poll`.
+                            complete &= self.$Fut.poll_unpin(&mut ctx).is_ready();
+                        }
+                    )*
+
+                    if complete {
+                        $(
+                            let $Fut = Pin::new(&mut self.$Fut);
+                        )*
+                        Some(($($Fut.take_output().unwrap()), *))
+                    } else {
+                        None
+                    }
+                }
+
+                fn any_ready(&self) -> bool {
+                    let mut ready = self.added_futures.any_ready();
+                    $(
+                        ready |= self.[<$Fut _state>].needs_poll.get();
+                    )*
+                    ready
+                }
+            }
+        }
+    )*)
+}
+
+generate! {
+    /// _Future for the [`complete2`] function.
+    (Complete2, <_Fut1, _Fut2>),
+
+    /// _Future for the [`complete3`] function.
+    (Complete3, <_Fut1, _Fut2, _Fut3>),
+
+    /// _Future for the [`complete4`] function.
+    (Complete4, <_Fut1, _Fut2, _Fut3, _Fut4>),
+
+    /// _Future for the [`complete5`] function.
+    (Complete5, <_Fut1, _Fut2, _Fut3, _Fut4, _Fut5>),
+}
diff --git a/cros_async/src/executor.rs b/cros_async/src/executor.rs
new file mode 100644
index 0000000..ac788f1
--- /dev/null
+++ b/cros_async/src/executor.rs
@@ -0,0 +1,179 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use std::cell::Cell;
+use std::collections::VecDeque;
+use std::future::Future;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::task::Waker;
+use std::task::{Context, Poll};
+
+use crate::waker::create_waker;
+
+/// Represents a future executor that can be run. Implementers of the trait will take a list of
+/// futures and poll them until completed.
+pub trait Executor {
+    /// The type returned by the executor. This is normally `()` or a combination of the output the
+    /// futures produce.
+    type Output;
+
+    /// Run the executor, this will return once the exit criteria is met. The exit criteria is
+    /// specified when the executor is created, for example running until all futures are complete.
+    fn run(&mut self) -> Self::Output;
+}
+
+// Tracks if a future needs to be polled and the waker to use.
+pub(crate) struct FutureState {
+    pub needs_poll: Rc<Cell<bool>>,
+    pub waker: Waker,
+}
+
+impl FutureState {
+    pub fn new() -> FutureState {
+        let needs_poll = Rc::new(Cell::new(true));
+        // Safe because a valid pointer is passed to `create_waker` and the valid result is
+        // passed to `Waker::from_raw`. And because the reference count to needs_poll is
+        // incremented by cloning it so it can't be dropped before the waker.
+        let waker = unsafe {
+            let clone = needs_poll.clone();
+            let raw_waker = create_waker(Rc::into_raw(clone) as *const _);
+            Waker::from_raw(raw_waker)
+        };
+        FutureState { needs_poll, waker }
+    }
+}
+
+// Couples a future owned by the executor with a flag that indicates the future is ready to be
+// polled. Futures will start with the flag set. After blocking by returning `Poll::Pending`, the
+// flag will be false until the waker is triggered and sets the flag to true, signalling the
+// executor to poll the future again.
+pub(crate) struct ExecutableFuture<T> {
+    future: Pin<Box<dyn Future<Output = T>>>,
+    state: FutureState,
+}
+
+impl<T> ExecutableFuture<T> {
+    // Creates an `ExecutableFuture` from the future. The returned struct is used to track when the
+    // future should be polled again.
+    pub fn new(future: Pin<Box<dyn Future<Output = T>>>) -> ExecutableFuture<T> {
+        ExecutableFuture {
+            future,
+            state: FutureState::new(),
+        }
+    }
+
+    // Polls the future if needed and returns the result.
+    // Covers setting up the waker and context before calling the future.
+    fn poll(&mut self) -> Poll<T> {
+        let mut ctx = Context::from_waker(&self.state.waker);
+        let f = self.future.as_mut();
+        f.poll(&mut ctx)
+    }
+}
+
+// Private trait used to allow one executor to behave differently.  Using FutureList allows the
+// executor code to be common across different collections of crates and different termination
+// behavior. For example, one list can decide to exit after the first trait completes, others can
+// wait until all are complete.
+pub(crate) trait FutureList {
+    type Output;
+
+    // Return a mutable reference to the list of futures that can be added or removed from this
+    // List.
+    fn futures_mut(&mut self) -> &mut UnitFutures;
+    // Polls all futures that are ready. Returns the results if this list has completed.
+    fn poll_results(&mut self) -> Option<Self::Output>;
+    // Returns true if any future in the list is ready to be polled.
+    fn any_ready(&self) -> bool;
+}
+
+// `UnitFutures` is the simplest implementor of `FutureList`. It runs all futures added to it until
+// there are none left to poll. The futures must all return `()`.
+pub(crate) struct UnitFutures {
+    futures: VecDeque<ExecutableFuture<()>>,
+}
+
+impl UnitFutures {
+    // Creates a new, empty list of futures.
+    pub fn new() -> UnitFutures {
+        UnitFutures {
+            futures: VecDeque::new(),
+        }
+    }
+
+    // Adds a future to the list of futures to be polled.
+    pub fn append(&mut self, futures: &mut VecDeque<ExecutableFuture<()>>) {
+        self.futures.append(futures);
+    }
+
+    // Polls all futures that are ready to be polled. Removes any futures that indicate they are
+    // completed.
+    pub fn poll_all(&mut self) {
+        let mut i = 0;
+        while i < self.futures.len() {
+            let fut = &mut self.futures[i];
+            let remove = if fut.state.needs_poll.replace(false) {
+                fut.poll().is_ready()
+            } else {
+                false
+            };
+            if remove {
+                self.futures.remove(i);
+            } else {
+                i += 1;
+            }
+        }
+    }
+}
+
+impl FutureList for UnitFutures {
+    type Output = ();
+
+    fn futures_mut(&mut self) -> &mut UnitFutures {
+        self
+    }
+
+    fn poll_results(&mut self) -> Option<Self::Output> {
+        self.poll_all();
+        if self.futures.is_empty() {
+            Some(())
+        } else {
+            None
+        }
+    }
+
+    fn any_ready(&self) -> bool {
+        self.futures.iter().any(|fut| fut.state.needs_poll.get())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::sync::atomic::{AtomicUsize, Ordering};
+
+    #[test]
+    fn basic_run() {
+        async fn f(called: Rc<AtomicUsize>) {
+            called.fetch_add(1, Ordering::Relaxed);
+        }
+
+        let f1_called = Rc::new(AtomicUsize::new(0));
+        let f2_called = Rc::new(AtomicUsize::new(0));
+
+        let fut1 = Box::pin(f(f1_called.clone()));
+        let fut2 = Box::pin(f(f2_called.clone()));
+
+        let mut futures = VecDeque::new();
+        futures.push_back(ExecutableFuture::new(fut1));
+        futures.push_back(ExecutableFuture::new(fut2));
+
+        let mut uf = UnitFutures::new();
+        uf.append(&mut futures);
+        assert!(uf.poll_results().is_some());
+        assert_eq!(f1_called.load(Ordering::Relaxed), 1);
+        assert_eq!(f2_called.load(Ordering::Relaxed), 1);
+    }
+}
diff --git a/cros_async/src/fd_executor.rs b/cros_async/src/fd_executor.rs
new file mode 100644
index 0000000..58d6013
--- /dev/null
+++ b/cros_async/src/fd_executor.rs
@@ -0,0 +1,250 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+//! The executor runs all given futures to completion. Futures register wakers associated with file
+//! descriptors. The wakers will be called when the FD becomes readable or writable depending on
+//! the situation.
+//!
+//! `FdExecutor` is meant to be used with the `futures-rs` crate that provides combinators and
+//! utility functions to combine futures.
+//!
+//! # Example of starting the framework and running a future:
+//!
+//! ```
+//! # use std::rc::Rc;
+//! # use std::cell::RefCell;
+//! use cros_async::Executor;
+//! async fn my_async(mut x: Rc<RefCell<u64>>) {
+//!     x.replace(4);
+//! }
+//!
+//! let mut ex = cros_async::empty_executor().expect("Failed creating executor");
+//! let x = Rc::new(RefCell::new(0));
+//! cros_async::fd_executor::add_future(Box::pin(my_async(x.clone())));
+//! ex.run();
+//! assert_eq!(*x.borrow(), 4);
+//! ```
+
+use std::cell::RefCell;
+use std::collections::{BTreeMap, VecDeque};
+use std::fmt::{self, Display};
+use std::fs::File;
+use std::future::Future;
+use std::os::unix::io::FromRawFd;
+use std::os::unix::io::RawFd;
+use std::pin::Pin;
+use std::task::Waker;
+
+use sys_util::{PollContext, WatchingEvents};
+
+use crate::executor::{ExecutableFuture, Executor, FutureList};
+
+#[derive(Debug, PartialEq)]
+pub enum Error {
+    /// Attempts to create two Executors on the same thread fail.
+    AttemptedDuplicateExecutor,
+    /// Failed to copy the FD for the polling context.
+    DuplicatingFd(sys_util::Error),
+    /// Failed accessing the thread local storage for wakers.
+    InvalidContext,
+    /// Creating a context to wait on FDs failed.
+    CreatingContext(sys_util::Error),
+    /// PollContext failure.
+    PollContextError(sys_util::Error),
+    /// Failed to submit the waker to the polling context.
+    SubmittingWaker(sys_util::Error),
+}
+pub type Result<T> = std::result::Result<T, Error>;
+
+impl Display for Error {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        use self::Error::*;
+
+        match self {
+            AttemptedDuplicateExecutor => write!(f, "Cannot have two executors on one thread."),
+            DuplicatingFd(e) => write!(f, "Failed to copy the FD for the polling context: {}", e),
+            InvalidContext => write!(
+                f,
+                "Invalid context, was the Fd executor created successfully?"
+            ),
+            CreatingContext(e) => write!(f, "An error creating the fd waiting context: {}.", e),
+            PollContextError(e) => write!(f, "PollContext failure: {}", e),
+            SubmittingWaker(e) => write!(f, "An error adding to the Aio context: {}.", e),
+        }
+    }
+}
+
+// Temporary vectors of new additions to the executor.
+
+// Tracks active wakers and the futures they are associated with.
+thread_local!(static STATE: RefCell<Option<FdWakerState>> = RefCell::new(None));
+
+fn add_waker(fd: RawFd, waker: Waker, events: WatchingEvents) -> Result<()> {
+    STATE.with(|state| {
+        let mut state = state.borrow_mut();
+        if let Some(state) = state.as_mut() {
+            state.add_waker(fd, waker, events)
+        } else {
+            Err(Error::InvalidContext)
+        }
+    })
+}
+
+/// Tells the waking system to wake `waker` when `fd` becomes readable.
+/// The 'fd' must be fully owned by the future adding the waker, and must not be closed until the
+/// next time the future is polled. If the fd is closed, there is a race where another FD can be
+/// opened on top of it causing the next poll to access the new target file.
+pub fn add_read_waker(fd: RawFd, waker: Waker) -> Result<()> {
+    add_waker(fd, waker, WatchingEvents::empty().set_read())
+}
+
+/// Tells the waking system to wake `waker` when `fd` becomes writable.
+/// The 'fd' must be fully owned by the future adding the waker, and must not be closed until the
+/// next time the future is polled. If the fd is closed, there is a race where another FD can be
+/// opened on top of it causing the next poll to access the new target file.
+pub fn add_write_waker(fd: RawFd, waker: Waker) -> Result<()> {
+    add_waker(fd, waker, WatchingEvents::empty().set_write())
+}
+
+/// Adds a new top level future to the Executor.
+/// These futures must return `()`, indicating they are intended to create side-effects only.
+pub fn add_future(future: Pin<Box<dyn Future<Output = ()>>>) -> Result<()> {
+    STATE.with(|state| {
+        let mut state = state.borrow_mut();
+        if let Some(state) = state.as_mut() {
+            state.new_futures.push_back(ExecutableFuture::new(future));
+            Ok(())
+        } else {
+            Err(Error::InvalidContext)
+        }
+    })
+}
+
+// Tracks active wakers and associates wakers with the futures that registered them.
+struct FdWakerState {
+    poll_ctx: PollContext<u64>,
+    token_map: BTreeMap<u64, (File, Waker)>,
+    next_token: u64, // Next token for adding to the context.
+    new_futures: VecDeque<ExecutableFuture<()>>,
+}
+
+impl FdWakerState {
+    fn new() -> Result<Self> {
+        Ok(FdWakerState {
+            poll_ctx: PollContext::new().map_err(Error::CreatingContext)?,
+            token_map: BTreeMap::new(),
+            next_token: 0,
+            new_futures: VecDeque::new(),
+        })
+    }
+
+    // Adds an fd that, when signaled, will trigger the given waker.
+    fn add_waker(&mut self, fd: RawFd, waker: Waker, events: WatchingEvents) -> Result<()> {
+        let duped_fd = unsafe {
+            // Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD
+            // will only be added to the poll loop.
+            File::from_raw_fd(dup_fd(fd)?)
+        };
+        self.poll_ctx
+            .add_fd_with_events(&duped_fd, events, self.next_token)
+            .map_err(Error::SubmittingWaker)?;
+        let next_token = self.next_token;
+        self.token_map.insert(next_token, (duped_fd, waker));
+        self.next_token += 1;
+        Ok(())
+    }
+
+    // Waits until one of the FDs is readable and wakes the associated waker.
+    fn wait_wake_event(&mut self) -> Result<()> {
+        let events = self.poll_ctx.wait().map_err(Error::PollContextError)?;
+        for e in events.iter() {
+            if let Some((fd, waker)) = self.token_map.remove(&e.token()) {
+                self.poll_ctx.delete(&fd).map_err(Error::PollContextError)?;
+                waker.wake_by_ref();
+            }
+        }
+        Ok(())
+    }
+}
+
+/// Runs futures to completion on a single thread. Futures are allowed to block on file descriptors
+/// only. Futures can only block on FDs becoming readable or writable. `FdExecutor` is meant to be
+/// used where a poll or select loop would be used otherwise.
+pub(crate) struct FdExecutor<T: FutureList> {
+    futures: T,
+}
+
+impl<T: FutureList> Executor for FdExecutor<T> {
+    type Output = Result<T::Output>;
+
+    fn run(&mut self) -> Self::Output {
+        self.append_futures();
+
+        loop {
+            if let Some(output) = self.futures.poll_results() {
+                return Ok(output);
+            }
+
+            self.append_futures();
+
+            // If no futures are ready, sleep until a waker is signaled.
+            if !self.futures.any_ready() {
+                STATE.with(|state| {
+                    let mut state = state.borrow_mut();
+                    if let Some(state) = state.as_mut() {
+                        state.wait_wake_event()?;
+                    } else {
+                        unreachable!("Can't get here without a context being created");
+                    }
+                    Ok(())
+                })?;
+            }
+        }
+    }
+}
+
+impl<T: FutureList> FdExecutor<T> {
+    /// Create a new executor.
+    pub fn new(futures: T) -> Result<FdExecutor<T>> {
+        STATE.with(|state| {
+            if state.borrow().is_some() {
+                return Err(Error::AttemptedDuplicateExecutor);
+            }
+            state.replace(Some(FdWakerState::new()?));
+            Ok(())
+        })?;
+        Ok(FdExecutor { futures })
+    }
+
+    // Add any new futures and wakers to the lists.
+    fn append_futures(&mut self) {
+        STATE.with(|state| {
+            let mut state = state.borrow_mut();
+            if let Some(state) = state.as_mut() {
+                self.futures.futures_mut().append(&mut state.new_futures);
+            } else {
+                unreachable!("Can't get here without a context being created");
+            }
+        });
+    }
+}
+
+impl<T: FutureList> Drop for FdExecutor<T> {
+    fn drop(&mut self) {
+        STATE.with(|state| {
+            state.replace(None);
+        });
+    }
+}
+
+// Used to dup the FDs passed to the executor so there is a guarantee they aren't closed while
+// waiting in TLS to be added to the main polling context.
+unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
+    let ret = libc::dup(fd);
+    if ret < 0 {
+        Err(Error::DuplicatingFd(sys_util::Error::last()))
+    } else {
+        Ok(ret)
+    }
+}
diff --git a/cros_async/src/lib.rs b/cros_async/src/lib.rs
new file mode 100644
index 0000000..f671a0c
--- /dev/null
+++ b/cros_async/src/lib.rs
@@ -0,0 +1,354 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+//! An Executor and future combinators based on operations that block on file descriptors.
+//!
+//! This crate is meant to be used with the `futures-rs` crate that provides further combinators
+//! and utility functions to combine and manage futures. All futures will run until they block on a
+//! file descriptor becoming readable or writable. Facilities are provided to register future
+//! wakers based on such events.
+//!
+//! # Running top-level futures.
+//!
+//! Use helper functions based the desired behavior of your application.
+//!
+//! ## Completing one of several futures.
+//!
+//! If there are several top level tasks that should run until any one completes, use the "select"
+//! family of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run`
+//! function will return when the first future completes. The uncompleted futures will also be
+//! returned so they can be run further or otherwise cleaned up. These functions are inspired by
+//! the `select_all` function from futures-rs, but built to be run inside an FD based executor and
+//! to poll only when necessary. See the docs for [`select2`](fn.select2.html),
+//! [`select3`](fn.select3.html), [`select4`](fn.select4.html), and [`select5`](fn.select5.html).
+//!
+//! ## Completing all of several futures.
+//!
+//! If there are several top level tasks that all need to be completed, use the "complete" family
+//! of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run`
+//! function will return only once all the futures passed to it have completed. These functions are
+//! inspired by the `join_all` function from futures-rs, but built to be run inside an FD based
+//! executor and to poll only when necessary. See the docs for [`complete2`](fn.complete2.html),
+//! [`complete3`](fn.complete3.html), [`complete4`](fn.complete4.html), and
+//! [`complete5`](fn.complete5.html).
+//!
+//! ## Many futures all returning `()`
+//!
+//! It there are futures that produce side effects and return `()`, the
+//! [`empty_executor`](fn.empty_executor.html) function provides an Executor that runs futures
+//! returning `()`. Futures are added using the [`add_future`](fn.add_future.html) function.
+//!
+//! # Implementing new FD-based futures.
+//!
+//! When building futures to be run in an `FdExecutor` framework, use the following helper
+//! functions to perform common tasks:
+//!
+//! [`add_read_waker`](fn.add_read_waker.html) - Used to associate a provided FD becoming readable
+//! with the future being woken. Used before returning Poll::Pending from a future that waits until
+//! an FD is writable.
+//!
+//! [`add_write_waker`](fn.add_write_waker.html) - Used to associate a provided FD becoming
+//! writable with the future being woken. Used before returning Poll::Pending from a future that
+//! waits until an FD is readable.
+//!
+//! [`add_future`](fn.add_future.html) - Used to add a new future to the top-level list of running
+//! futures.
+
+mod complete;
+mod executor;
+pub mod fd_executor;
+mod select;
+mod waker;
+
+pub use executor::Executor;
+pub use select::SelectResult;
+
+use executor::UnitFutures;
+use fd_executor::{FdExecutor, Result};
+use std::future::Future;
+
+/// Creates an empty FdExecutor that can have futures returning `()` added via
+/// [`add_future`](fn.add_future.html).
+///
+///  # Example
+///
+///    ```
+///    use cros_async::{empty_executor, Executor};
+///    use cros_async::fd_executor::add_future;
+///    use futures::future::pending;
+///
+///    let fut = async { () };
+///    let mut ex = empty_executor().expect("Failed to create executor");
+///
+///    add_future(Box::pin(fut));
+///    ex.run();
+///    ```
+pub fn empty_executor() -> Result<impl Executor> {
+    FdExecutor::new(UnitFutures::new())
+}
+
+// Select helpers to run until any future completes.
+
+/// Creates an executor that runs the two given futures until one completes, returning a tuple
+/// containing the result of the finished future and the still pending future.
+///
+///  # Example
+///
+///    ```
+///    use cros_async::{empty_executor, Executor, select2, SelectResult};
+///    use cros_async::fd_executor::add_future;
+///    use futures::future::pending;
+///    use futures::pin_mut;
+///
+///    let first = async {5};
+///    let second = async {let () = pending().await;};
+///    pin_mut!(first);
+///    pin_mut!(second);
+///    match select2(first, second) {
+///        Ok((SelectResult::Finished(5), SelectResult::Pending(_second))) => (),
+///        _ => panic!("Select didn't return the first future"),
+///    };
+///    ```
+pub fn select2<F1: Future + Unpin, F2: Future + Unpin>(
+    f1: F1,
+    f2: F2,
+) -> Result<(SelectResult<F1>, SelectResult<F2>)> {
+    FdExecutor::new(select::Select2::new(f1, f2)).and_then(|mut f| f.run())
+}
+
+/// Creates an executor that runs the three given futures until one or more completes, returning a
+/// tuple containing the result of the finished future(s) and the still pending future(s).
+///
+///  # Example
+///
+///    ```
+///    use cros_async::{empty_executor, Executor, select3, SelectResult};
+///    use cros_async::fd_executor::add_future;
+///    use futures::future::pending;
+///    use futures::pin_mut;
+///
+///    let first = async {4};
+///    let second = async {let () = pending().await;};
+///    let third = async {5};
+///    pin_mut!(first);
+///    pin_mut!(second);
+///    pin_mut!(third);
+///    match select3(first, second, third) {
+///        Ok((SelectResult::Finished(4),
+///            SelectResult::Pending(_second),
+///            SelectResult::Finished(5))) => (),
+///        _ => panic!("Select didn't return the futures"),
+///    };
+///    ```
+pub fn select3<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin>(
+    f1: F1,
+    f2: F2,
+    f3: F3,
+) -> Result<(SelectResult<F1>, SelectResult<F2>, SelectResult<F3>)> {
+    FdExecutor::new(select::Select3::new(f1, f2, f3)).and_then(|mut f| f.run())
+}
+
+/// Creates an executor that runs the four given futures until one or more completes, returning a
+/// tuple containing the result of the finished future(s) and the still pending future(s).
+///
+///  # Example
+///
+///    ```
+///    use cros_async::{empty_executor, Executor, select4, SelectResult};
+///    use cros_async::fd_executor::add_future;
+///    use futures::future::pending;
+///    use futures::pin_mut;
+///
+///    let first = async {4};
+///    let second = async {let () = pending().await;};
+///    let third = async {5};
+///    let fourth = async {let () = pending().await;};
+///    pin_mut!(first);
+///    pin_mut!(second);
+///    pin_mut!(third);
+///    pin_mut!(fourth);
+///    match select4(first, second, third, fourth) {
+///        Ok((SelectResult::Finished(4), SelectResult::Pending(_second),
+///            SelectResult::Finished(5), SelectResult::Pending(_fourth))) => (),
+///        _ => panic!("Select didn't return the futures"),
+///    };
+///    ```
+pub fn select4<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin, F4: Future + Unpin>(
+    f1: F1,
+    f2: F2,
+    f3: F3,
+    f4: F4,
+) -> Result<(
+    SelectResult<F1>,
+    SelectResult<F2>,
+    SelectResult<F3>,
+    SelectResult<F4>,
+)> {
+    FdExecutor::new(select::Select4::new(f1, f2, f3, f4)).and_then(|mut f| f.run())
+}
+
+/// Creates an executor that runs the five given futures until one or more completes, returning a
+/// tuple containing the result of the finished future(s) and the still pending future(s).
+///
+///  # Example
+///
+///    ```
+///    use cros_async::{empty_executor, Executor, select5, SelectResult};
+///    use cros_async::fd_executor::add_future;
+///    use futures::future::pending;
+///    use futures::pin_mut;
+///
+///    let first = async {4};
+///    let second = async {let () = pending().await;};
+///    let third = async {5};
+///    let fourth = async {let () = pending().await;};
+///    let fifth = async {6};
+///    pin_mut!(first);
+///    pin_mut!(second);
+///    pin_mut!(third);
+///    pin_mut!(fourth);
+///    pin_mut!(fifth);
+///    match select5(first, second, third, fourth, fifth) {
+///        Ok((SelectResult::Finished(4), SelectResult::Pending(_second),
+///            SelectResult::Finished(5), SelectResult::Pending(_fourth),
+///            SelectResult::Finished(6))) => (),
+///        _ => panic!("Select didn't return the futures"),
+///    };
+///    ```
+pub fn select5<
+    F1: Future + Unpin,
+    F2: Future + Unpin,
+    F3: Future + Unpin,
+    F4: Future + Unpin,
+    F5: Future + Unpin,
+>(
+    f1: F1,
+    f2: F2,
+    f3: F3,
+    f4: F4,
+    f5: F5,
+) -> Result<(
+    SelectResult<F1>,
+    SelectResult<F2>,
+    SelectResult<F3>,
+    SelectResult<F4>,
+    SelectResult<F5>,
+)> {
+    FdExecutor::new(select::Select5::new(f1, f2, f3, f4, f5)).and_then(|mut f| f.run())
+}
+
+// Combination helpers to run until all futures are complete.
+
+/// Creates an executor that runs the two given futures to completion, returning a tuple of the
+/// outputs each yields.
+///
+///  # Example
+///
+///    ```
+///    use cros_async::{empty_executor, Executor, complete2};
+///    use futures::pin_mut;
+///
+///    let first = async {5};
+///    let second = async {6};
+///    pin_mut!(first);
+///    pin_mut!(second);
+///    assert_eq!(complete2(first, second).unwrap_or((0,0)), (5,6));
+///    ```
+pub fn complete2<F1: Future + Unpin, F2: Future + Unpin>(
+    f1: F1,
+    f2: F2,
+) -> Result<(F1::Output, F2::Output)> {
+    FdExecutor::new(complete::Complete2::new(f1, f2)).and_then(|mut f| f.run())
+}
+
+/// Creates an executor that runs the three given futures to completion, returning a tuple of the
+/// outputs each yields.
+///
+///  # Example
+///
+///    ```
+///    use cros_async::{empty_executor, Executor, complete3};
+///    use futures::pin_mut;
+///
+///    let first = async {5};
+///    let second = async {6};
+///    let third = async {7};
+///    pin_mut!(first);
+///    pin_mut!(second);
+///    pin_mut!(third);
+///    assert_eq!(complete3(first, second, third).unwrap_or((0,0,0)), (5,6,7));
+///    ```
+pub fn complete3<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin>(
+    f1: F1,
+    f2: F2,
+    f3: F3,
+) -> Result<(F1::Output, F2::Output, F3::Output)> {
+    FdExecutor::new(complete::Complete3::new(f1, f2, f3)).and_then(|mut f| f.run())
+}
+
+/// Creates an executor that runs the four given futures to completion, returning a tuple of the
+/// outputs each yields.
+///
+///  # Example
+///
+///    ```
+///    use cros_async::{empty_executor, Executor, complete4};
+///    use futures::pin_mut;
+///
+///    let first = async {5};
+///    let second = async {6};
+///    let third = async {7};
+///    let fourth = async {8};
+///    pin_mut!(first);
+///    pin_mut!(second);
+///    pin_mut!(third);
+///    pin_mut!(fourth);
+///    assert_eq!(complete4(first, second, third, fourth).unwrap_or((0,0,0,0)), (5,6,7,8));
+///    ```
+pub fn complete4<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin, F4: Future + Unpin>(
+    f1: F1,
+    f2: F2,
+    f3: F3,
+    f4: F4,
+) -> Result<(F1::Output, F2::Output, F3::Output, F4::Output)> {
+    FdExecutor::new(complete::Complete4::new(f1, f2, f3, f4)).and_then(|mut f| f.run())
+}
+
+/// Creates an executor that runs the five given futures to completion, returning a tuple of the
+/// outputs each yields.
+///
+///  # Example
+///
+///    ```
+///    use cros_async::{empty_executor, Executor, complete5};
+///    use futures::pin_mut;
+///
+///    let first = async {5};
+///    let second = async {6};
+///    let third = async {7};
+///    let fourth = async {8};
+///    let fifth = async {9};
+///    pin_mut!(first);
+///    pin_mut!(second);
+///    pin_mut!(third);
+///    pin_mut!(fourth);
+///    pin_mut!(fifth);
+///    assert_eq!(complete5(first, second, third, fourth, fifth).unwrap_or((0,0,0,0,0)),
+///               (5,6,7,8,9));
+///    ```
+pub fn complete5<
+    F1: Future + Unpin,
+    F2: Future + Unpin,
+    F3: Future + Unpin,
+    F4: Future + Unpin,
+    F5: Future + Unpin,
+>(
+    f1: F1,
+    f2: F2,
+    f3: F3,
+    f4: F4,
+    f5: F5,
+) -> Result<(F1::Output, F2::Output, F3::Output, F4::Output, F5::Output)> {
+    FdExecutor::new(complete::Complete5::new(f1, f2, f3, f4, f5)).and_then(|mut f| f.run())
+}
diff --git a/cros_async/src/select.rs b/cros_async/src/select.rs
new file mode 100644
index 0000000..8eef317
--- /dev/null
+++ b/cros_async/src/select.rs
@@ -0,0 +1,110 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+// Need non-snake case so the macro can re-use type names for variables.
+#![allow(non_snake_case)]
+
+use std::future::Future;
+use std::pin::Pin;
+use std::task::Context;
+
+use futures::future::{maybe_done, FutureExt, MaybeDone};
+
+use crate::executor::{FutureList, FutureState, UnitFutures};
+
+pub enum SelectResult<F: Future> {
+    Pending(F),
+    Finished(F::Output),
+}
+
+// Macro-generate future combinators to allow for running different numbers of top-level futures in
+// this FutureList. Generates the implementation of `FutureList` for the select types. For an
+// explicit example this is modeled after, see `UnitFutures`.
+macro_rules! generate {
+    ($(
+        $(#[$doc:meta])*
+        ($Select:ident, <$($Fut:ident),*>),
+    )*) => ($(
+        $(#[$doc])*
+        #[must_use = "Combinations of futures don't do anything unless run in an executor."]
+        paste::item! {
+            pub(crate) struct $Select<$($Fut: Future + Unpin),*> {
+                added_futures: UnitFutures,
+                $($Fut: MaybeDone<$Fut>,)*
+                $([<$Fut _state>]: FutureState,)*
+            }
+        }
+
+        impl<$($Fut: Future + Unpin),*> $Select<$($Fut),*> {
+            paste::item! {
+                pub(crate) fn new($($Fut: $Fut),*) -> $Select<$($Fut),*> {
+                    $Select {
+                        added_futures: UnitFutures::new(),
+                        $($Fut: maybe_done($Fut),)*
+                        $([<$Fut _state>]: FutureState::new(),)*
+                    }
+                }
+            }
+        }
+
+        impl<$($Fut: Future + Unpin),*> FutureList for $Select<$($Fut),*> {
+            type Output = ($(SelectResult<$Fut>),*);
+
+            fn futures_mut(&mut self) -> &mut UnitFutures {
+                &mut self.added_futures
+            }
+
+            paste::item! {
+                fn poll_results(&mut self) -> Option<Self::Output> {
+                    let _ = self.added_futures.poll_results();
+
+                    let mut complete = false;
+                    $(
+                        let $Fut = Pin::new(&mut self.$Fut);
+                        if self.[<$Fut _state>].needs_poll.replace(false) {
+                            let mut ctx = Context::from_waker(&self.[<$Fut _state>].waker);
+                            // The future impls `Unpin`, use `poll_unpin` to avoid wrapping it in
+                            // `Pin` to call `poll`.
+                            complete |= self.$Fut.poll_unpin(&mut ctx).is_ready();
+                        }
+                    )*
+
+                    if complete {
+                        Some(($(
+                                    match std::mem::replace(&mut self.$Fut, MaybeDone::Gone) {
+                                        MaybeDone::Future(f) => SelectResult::Pending(f),
+                                        MaybeDone::Done(o) => SelectResult::Finished(o),
+                                        MaybeDone::Gone=>unreachable!(),
+                                    }
+                               ), *))
+                    } else {
+                        None
+                    }
+                }
+
+                fn any_ready(&self) -> bool {
+                    let mut ready = self.added_futures.any_ready();
+                    $(
+                        ready |= self.[<$Fut _state>].needs_poll.get();
+                    )*
+                    ready
+                }
+            }
+        }
+    )*)
+}
+
+generate! {
+    /// _Future for the [`select2`] function.
+    (Select2, <_Fut1, _Fut2>),
+
+    /// _Future for the [`select3`] function.
+    (Select3, <_Fut1, _Fut2, _Fut3>),
+
+    /// _Future for the [`select4`] function.
+    (Select4, <_Fut1, _Fut2, _Fut3, _Fut4>),
+
+    /// _Future for the [`select5`] function.
+    (Select5, <_Fut1, _Fut2, _Fut3, _Fut4, _Fut5>),
+}
diff --git a/cros_async/src/waker.rs b/cros_async/src/waker.rs
new file mode 100644
index 0000000..f0dac0f
--- /dev/null
+++ b/cros_async/src/waker.rs
@@ -0,0 +1,42 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use std::rc::Rc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::task::{RawWaker, RawWakerVTable};
+
+// Boiler-plate for creating a waker with function pointers.
+// This waker sets the atomic bool it is passed to true.
+// The bool will be used by the executor to know which futures to poll
+
+// Convert the pointer back to the Rc it was created from and drop it.
+unsafe fn waker_drop(data_ptr: *const ()) {
+    // from_raw, then drop
+    let _rc_bool = Rc::<AtomicBool>::from_raw(data_ptr as *const _);
+}
+
+unsafe fn waker_wake(_: *const ()) {}
+
+// Called when the bool should be set to true to wake the waker.
+unsafe fn waker_wake_by_ref(data_ptr: *const ()) {
+    let bool_atomic_ptr = data_ptr as *const AtomicBool;
+    let bool_atomic_ref = bool_atomic_ptr.as_ref().unwrap();
+    bool_atomic_ref.store(true, Ordering::Relaxed);
+}
+
+// The data_ptr will be a pointer to an Rc<AtomicBool>.
+unsafe fn waker_clone(data_ptr: *const ()) -> RawWaker {
+    let rc_bool = Rc::<AtomicBool>::from_raw(data_ptr as *const _);
+    let new_ptr = rc_bool.clone();
+    Rc::into_raw(rc_bool); // Don't decrement the ref count of the original, so back to raw.
+    create_waker(Rc::into_raw(new_ptr) as *const _)
+}
+
+static WAKER_VTABLE: RawWakerVTable =
+    RawWakerVTable::new(waker_clone, waker_wake, waker_wake_by_ref, waker_drop);
+
+/// To use safely, data_ptr must be from Rc<AtomicBool>::from_raw().
+pub unsafe fn create_waker(data_ptr: *const ()) -> RawWaker {
+    RawWaker::new(data_ptr, &WAKER_VTABLE)
+}