From 2cc138341dc601f3dfd3ebd4233a99b75ddb6bd0 Mon Sep 17 00:00:00 2001 From: Dylan Reid Date: Fri, 22 Nov 2019 16:41:01 -0800 Subject: Add a cros_async crate. This crate will house code using the new async/await features to be used by other parts of crosvm. Start the crate with a Future executor that runs tasks in a single thread and allows futures that block on system file descriptors. Change-Id: If77778ac056210dabbfc6e6e1e63df1c1b904a7f Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/1955045 Reviewed-by: Dylan Reid Tested-by: Dylan Reid Tested-by: kokoro Commit-Queue: Dylan Reid --- Cargo.toml | 1 + cros_async/Cargo.toml | 16 ++ cros_async/src/complete.rs | 101 ++++++++++++ cros_async/src/executor.rs | 179 +++++++++++++++++++++ cros_async/src/fd_executor.rs | 250 +++++++++++++++++++++++++++++ cros_async/src/lib.rs | 354 ++++++++++++++++++++++++++++++++++++++++++ cros_async/src/select.rs | 110 +++++++++++++ cros_async/src/waker.rs | 42 +++++ 8 files changed, 1053 insertions(+) create mode 100644 cros_async/Cargo.toml create mode 100644 cros_async/src/complete.rs create mode 100644 cros_async/src/executor.rs create mode 100644 cros_async/src/fd_executor.rs create mode 100644 cros_async/src/lib.rs create mode 100644 cros_async/src/select.rs create mode 100644 cros_async/src/waker.rs diff --git a/Cargo.toml b/Cargo.toml index 231e532..009a09f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ overflow-checks = true members = ["qcow_utils"] exclude = [ "assertions", + "cros_async", "data_model", "rand_ish", "sync", diff --git a/cros_async/Cargo.toml b/cros_async/Cargo.toml new file mode 100644 index 0000000..4e62732 --- /dev/null +++ b/cros_async/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "cros_async" +version = "0.1.0" +authors = ["The Chromium OS Authors"] +edition = "2018" + +[dependencies] +libc = "*" +paste = "*" +pin-utils = "0.1.0-alpha.4" +sys_util = { path = "../sys_util" } +syscall_defines = { path = "../syscall_defines" } + +[dependencies.futures] +version = "*" +default-features = false diff --git a/cros_async/src/complete.rs b/cros_async/src/complete.rs new file mode 100644 index 0000000..8455066 --- /dev/null +++ b/cros_async/src/complete.rs @@ -0,0 +1,101 @@ +// Copyright 2020 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// Need non-snake case so the macro can re-use type names for variables. +#![allow(non_snake_case)] + +use std::future::Future; +use std::pin::Pin; +use std::task::Context; + +use futures::future::{maybe_done, FutureExt, MaybeDone}; + +use crate::executor::{FutureList, FutureState, UnitFutures}; + +// Macro-generate future combinators to allow for running different numbers of top-level futures in +// this FutureList. Generates the implementation of `FutureList` for the completion types. For an +// explicit example this is modeled after, see `UnitFutures`. +macro_rules! generate { + ($( + $(#[$doc:meta])* + ($Complete:ident, <$($Fut:ident),*>), + )*) => ($( + $(#[$doc])* + #[must_use = "Combinations of futures don't do anything unless run in an executor."] + paste::item! { + pub(crate) struct $Complete<$($Fut: Future + Unpin),*> { + added_futures: UnitFutures, + $($Fut: MaybeDone<$Fut>,)* + $([<$Fut _state>]: FutureState,)* + } + } + + impl<$($Fut: Future + Unpin),*> $Complete<$($Fut),*> { + paste::item! { + pub(crate) fn new($($Fut: $Fut),*) -> $Complete<$($Fut),*> { + $Complete { + added_futures: UnitFutures::new(), + $($Fut: maybe_done($Fut),)* + $([<$Fut _state>]: FutureState::new(),)* + } + } + } + } + + impl<$($Fut: Future + Unpin),*> FutureList for $Complete<$($Fut),*> { + type Output = ($($Fut::Output),*); + + fn futures_mut(&mut self) -> &mut UnitFutures { + &mut self.added_futures + } + + paste::item! { + fn poll_results(&mut self) -> Option { + let _ = self.added_futures.poll_results(); + + let mut complete = true; + $( + if self.[<$Fut _state>].needs_poll.replace(false) { + let mut ctx = Context::from_waker(&self.[<$Fut _state>].waker); + // The future impls `Unpin`, use `poll_unpin` to avoid wrapping it in + // `Pin` to call `poll`. + complete &= self.$Fut.poll_unpin(&mut ctx).is_ready(); + } + )* + + if complete { + $( + let $Fut = Pin::new(&mut self.$Fut); + )* + Some(($($Fut.take_output().unwrap()), *)) + } else { + None + } + } + + fn any_ready(&self) -> bool { + let mut ready = self.added_futures.any_ready(); + $( + ready |= self.[<$Fut _state>].needs_poll.get(); + )* + ready + } + } + } + )*) +} + +generate! { + /// _Future for the [`complete2`] function. + (Complete2, <_Fut1, _Fut2>), + + /// _Future for the [`complete3`] function. + (Complete3, <_Fut1, _Fut2, _Fut3>), + + /// _Future for the [`complete4`] function. + (Complete4, <_Fut1, _Fut2, _Fut3, _Fut4>), + + /// _Future for the [`complete5`] function. + (Complete5, <_Fut1, _Fut2, _Fut3, _Fut4, _Fut5>), +} diff --git a/cros_async/src/executor.rs b/cros_async/src/executor.rs new file mode 100644 index 0000000..ac788f1 --- /dev/null +++ b/cros_async/src/executor.rs @@ -0,0 +1,179 @@ +// Copyright 2020 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use std::cell::Cell; +use std::collections::VecDeque; +use std::future::Future; +use std::pin::Pin; +use std::rc::Rc; +use std::task::Waker; +use std::task::{Context, Poll}; + +use crate::waker::create_waker; + +/// Represents a future executor that can be run. Implementers of the trait will take a list of +/// futures and poll them until completed. +pub trait Executor { + /// The type returned by the executor. This is normally `()` or a combination of the output the + /// futures produce. + type Output; + + /// Run the executor, this will return once the exit criteria is met. The exit criteria is + /// specified when the executor is created, for example running until all futures are complete. + fn run(&mut self) -> Self::Output; +} + +// Tracks if a future needs to be polled and the waker to use. +pub(crate) struct FutureState { + pub needs_poll: Rc>, + pub waker: Waker, +} + +impl FutureState { + pub fn new() -> FutureState { + let needs_poll = Rc::new(Cell::new(true)); + // Safe because a valid pointer is passed to `create_waker` and the valid result is + // passed to `Waker::from_raw`. And because the reference count to needs_poll is + // incremented by cloning it so it can't be dropped before the waker. + let waker = unsafe { + let clone = needs_poll.clone(); + let raw_waker = create_waker(Rc::into_raw(clone) as *const _); + Waker::from_raw(raw_waker) + }; + FutureState { needs_poll, waker } + } +} + +// Couples a future owned by the executor with a flag that indicates the future is ready to be +// polled. Futures will start with the flag set. After blocking by returning `Poll::Pending`, the +// flag will be false until the waker is triggered and sets the flag to true, signalling the +// executor to poll the future again. +pub(crate) struct ExecutableFuture { + future: Pin>>, + state: FutureState, +} + +impl ExecutableFuture { + // Creates an `ExecutableFuture` from the future. The returned struct is used to track when the + // future should be polled again. + pub fn new(future: Pin>>) -> ExecutableFuture { + ExecutableFuture { + future, + state: FutureState::new(), + } + } + + // Polls the future if needed and returns the result. + // Covers setting up the waker and context before calling the future. + fn poll(&mut self) -> Poll { + let mut ctx = Context::from_waker(&self.state.waker); + let f = self.future.as_mut(); + f.poll(&mut ctx) + } +} + +// Private trait used to allow one executor to behave differently. Using FutureList allows the +// executor code to be common across different collections of crates and different termination +// behavior. For example, one list can decide to exit after the first trait completes, others can +// wait until all are complete. +pub(crate) trait FutureList { + type Output; + + // Return a mutable reference to the list of futures that can be added or removed from this + // List. + fn futures_mut(&mut self) -> &mut UnitFutures; + // Polls all futures that are ready. Returns the results if this list has completed. + fn poll_results(&mut self) -> Option; + // Returns true if any future in the list is ready to be polled. + fn any_ready(&self) -> bool; +} + +// `UnitFutures` is the simplest implementor of `FutureList`. It runs all futures added to it until +// there are none left to poll. The futures must all return `()`. +pub(crate) struct UnitFutures { + futures: VecDeque>, +} + +impl UnitFutures { + // Creates a new, empty list of futures. + pub fn new() -> UnitFutures { + UnitFutures { + futures: VecDeque::new(), + } + } + + // Adds a future to the list of futures to be polled. + pub fn append(&mut self, futures: &mut VecDeque>) { + self.futures.append(futures); + } + + // Polls all futures that are ready to be polled. Removes any futures that indicate they are + // completed. + pub fn poll_all(&mut self) { + let mut i = 0; + while i < self.futures.len() { + let fut = &mut self.futures[i]; + let remove = if fut.state.needs_poll.replace(false) { + fut.poll().is_ready() + } else { + false + }; + if remove { + self.futures.remove(i); + } else { + i += 1; + } + } + } +} + +impl FutureList for UnitFutures { + type Output = (); + + fn futures_mut(&mut self) -> &mut UnitFutures { + self + } + + fn poll_results(&mut self) -> Option { + self.poll_all(); + if self.futures.is_empty() { + Some(()) + } else { + None + } + } + + fn any_ready(&self) -> bool { + self.futures.iter().any(|fut| fut.state.needs_poll.get()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + + #[test] + fn basic_run() { + async fn f(called: Rc) { + called.fetch_add(1, Ordering::Relaxed); + } + + let f1_called = Rc::new(AtomicUsize::new(0)); + let f2_called = Rc::new(AtomicUsize::new(0)); + + let fut1 = Box::pin(f(f1_called.clone())); + let fut2 = Box::pin(f(f2_called.clone())); + + let mut futures = VecDeque::new(); + futures.push_back(ExecutableFuture::new(fut1)); + futures.push_back(ExecutableFuture::new(fut2)); + + let mut uf = UnitFutures::new(); + uf.append(&mut futures); + assert!(uf.poll_results().is_some()); + assert_eq!(f1_called.load(Ordering::Relaxed), 1); + assert_eq!(f2_called.load(Ordering::Relaxed), 1); + } +} diff --git a/cros_async/src/fd_executor.rs b/cros_async/src/fd_executor.rs new file mode 100644 index 0000000..58d6013 --- /dev/null +++ b/cros_async/src/fd_executor.rs @@ -0,0 +1,250 @@ +// Copyright 2020 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +//! The executor runs all given futures to completion. Futures register wakers associated with file +//! descriptors. The wakers will be called when the FD becomes readable or writable depending on +//! the situation. +//! +//! `FdExecutor` is meant to be used with the `futures-rs` crate that provides combinators and +//! utility functions to combine futures. +//! +//! # Example of starting the framework and running a future: +//! +//! ``` +//! # use std::rc::Rc; +//! # use std::cell::RefCell; +//! use cros_async::Executor; +//! async fn my_async(mut x: Rc>) { +//! x.replace(4); +//! } +//! +//! let mut ex = cros_async::empty_executor().expect("Failed creating executor"); +//! let x = Rc::new(RefCell::new(0)); +//! cros_async::fd_executor::add_future(Box::pin(my_async(x.clone()))); +//! ex.run(); +//! assert_eq!(*x.borrow(), 4); +//! ``` + +use std::cell::RefCell; +use std::collections::{BTreeMap, VecDeque}; +use std::fmt::{self, Display}; +use std::fs::File; +use std::future::Future; +use std::os::unix::io::FromRawFd; +use std::os::unix::io::RawFd; +use std::pin::Pin; +use std::task::Waker; + +use sys_util::{PollContext, WatchingEvents}; + +use crate::executor::{ExecutableFuture, Executor, FutureList}; + +#[derive(Debug, PartialEq)] +pub enum Error { + /// Attempts to create two Executors on the same thread fail. + AttemptedDuplicateExecutor, + /// Failed to copy the FD for the polling context. + DuplicatingFd(sys_util::Error), + /// Failed accessing the thread local storage for wakers. + InvalidContext, + /// Creating a context to wait on FDs failed. + CreatingContext(sys_util::Error), + /// PollContext failure. + PollContextError(sys_util::Error), + /// Failed to submit the waker to the polling context. + SubmittingWaker(sys_util::Error), +} +pub type Result = std::result::Result; + +impl Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + use self::Error::*; + + match self { + AttemptedDuplicateExecutor => write!(f, "Cannot have two executors on one thread."), + DuplicatingFd(e) => write!(f, "Failed to copy the FD for the polling context: {}", e), + InvalidContext => write!( + f, + "Invalid context, was the Fd executor created successfully?" + ), + CreatingContext(e) => write!(f, "An error creating the fd waiting context: {}.", e), + PollContextError(e) => write!(f, "PollContext failure: {}", e), + SubmittingWaker(e) => write!(f, "An error adding to the Aio context: {}.", e), + } + } +} + +// Temporary vectors of new additions to the executor. + +// Tracks active wakers and the futures they are associated with. +thread_local!(static STATE: RefCell> = RefCell::new(None)); + +fn add_waker(fd: RawFd, waker: Waker, events: WatchingEvents) -> Result<()> { + STATE.with(|state| { + let mut state = state.borrow_mut(); + if let Some(state) = state.as_mut() { + state.add_waker(fd, waker, events) + } else { + Err(Error::InvalidContext) + } + }) +} + +/// Tells the waking system to wake `waker` when `fd` becomes readable. +/// The 'fd' must be fully owned by the future adding the waker, and must not be closed until the +/// next time the future is polled. If the fd is closed, there is a race where another FD can be +/// opened on top of it causing the next poll to access the new target file. +pub fn add_read_waker(fd: RawFd, waker: Waker) -> Result<()> { + add_waker(fd, waker, WatchingEvents::empty().set_read()) +} + +/// Tells the waking system to wake `waker` when `fd` becomes writable. +/// The 'fd' must be fully owned by the future adding the waker, and must not be closed until the +/// next time the future is polled. If the fd is closed, there is a race where another FD can be +/// opened on top of it causing the next poll to access the new target file. +pub fn add_write_waker(fd: RawFd, waker: Waker) -> Result<()> { + add_waker(fd, waker, WatchingEvents::empty().set_write()) +} + +/// Adds a new top level future to the Executor. +/// These futures must return `()`, indicating they are intended to create side-effects only. +pub fn add_future(future: Pin>>) -> Result<()> { + STATE.with(|state| { + let mut state = state.borrow_mut(); + if let Some(state) = state.as_mut() { + state.new_futures.push_back(ExecutableFuture::new(future)); + Ok(()) + } else { + Err(Error::InvalidContext) + } + }) +} + +// Tracks active wakers and associates wakers with the futures that registered them. +struct FdWakerState { + poll_ctx: PollContext, + token_map: BTreeMap, + next_token: u64, // Next token for adding to the context. + new_futures: VecDeque>, +} + +impl FdWakerState { + fn new() -> Result { + Ok(FdWakerState { + poll_ctx: PollContext::new().map_err(Error::CreatingContext)?, + token_map: BTreeMap::new(), + next_token: 0, + new_futures: VecDeque::new(), + }) + } + + // Adds an fd that, when signaled, will trigger the given waker. + fn add_waker(&mut self, fd: RawFd, waker: Waker, events: WatchingEvents) -> Result<()> { + let duped_fd = unsafe { + // Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD + // will only be added to the poll loop. + File::from_raw_fd(dup_fd(fd)?) + }; + self.poll_ctx + .add_fd_with_events(&duped_fd, events, self.next_token) + .map_err(Error::SubmittingWaker)?; + let next_token = self.next_token; + self.token_map.insert(next_token, (duped_fd, waker)); + self.next_token += 1; + Ok(()) + } + + // Waits until one of the FDs is readable and wakes the associated waker. + fn wait_wake_event(&mut self) -> Result<()> { + let events = self.poll_ctx.wait().map_err(Error::PollContextError)?; + for e in events.iter() { + if let Some((fd, waker)) = self.token_map.remove(&e.token()) { + self.poll_ctx.delete(&fd).map_err(Error::PollContextError)?; + waker.wake_by_ref(); + } + } + Ok(()) + } +} + +/// Runs futures to completion on a single thread. Futures are allowed to block on file descriptors +/// only. Futures can only block on FDs becoming readable or writable. `FdExecutor` is meant to be +/// used where a poll or select loop would be used otherwise. +pub(crate) struct FdExecutor { + futures: T, +} + +impl Executor for FdExecutor { + type Output = Result; + + fn run(&mut self) -> Self::Output { + self.append_futures(); + + loop { + if let Some(output) = self.futures.poll_results() { + return Ok(output); + } + + self.append_futures(); + + // If no futures are ready, sleep until a waker is signaled. + if !self.futures.any_ready() { + STATE.with(|state| { + let mut state = state.borrow_mut(); + if let Some(state) = state.as_mut() { + state.wait_wake_event()?; + } else { + unreachable!("Can't get here without a context being created"); + } + Ok(()) + })?; + } + } + } +} + +impl FdExecutor { + /// Create a new executor. + pub fn new(futures: T) -> Result> { + STATE.with(|state| { + if state.borrow().is_some() { + return Err(Error::AttemptedDuplicateExecutor); + } + state.replace(Some(FdWakerState::new()?)); + Ok(()) + })?; + Ok(FdExecutor { futures }) + } + + // Add any new futures and wakers to the lists. + fn append_futures(&mut self) { + STATE.with(|state| { + let mut state = state.borrow_mut(); + if let Some(state) = state.as_mut() { + self.futures.futures_mut().append(&mut state.new_futures); + } else { + unreachable!("Can't get here without a context being created"); + } + }); + } +} + +impl Drop for FdExecutor { + fn drop(&mut self) { + STATE.with(|state| { + state.replace(None); + }); + } +} + +// Used to dup the FDs passed to the executor so there is a guarantee they aren't closed while +// waiting in TLS to be added to the main polling context. +unsafe fn dup_fd(fd: RawFd) -> Result { + let ret = libc::dup(fd); + if ret < 0 { + Err(Error::DuplicatingFd(sys_util::Error::last())) + } else { + Ok(ret) + } +} diff --git a/cros_async/src/lib.rs b/cros_async/src/lib.rs new file mode 100644 index 0000000..f671a0c --- /dev/null +++ b/cros_async/src/lib.rs @@ -0,0 +1,354 @@ +// Copyright 2020 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +//! An Executor and future combinators based on operations that block on file descriptors. +//! +//! This crate is meant to be used with the `futures-rs` crate that provides further combinators +//! and utility functions to combine and manage futures. All futures will run until they block on a +//! file descriptor becoming readable or writable. Facilities are provided to register future +//! wakers based on such events. +//! +//! # Running top-level futures. +//! +//! Use helper functions based the desired behavior of your application. +//! +//! ## Completing one of several futures. +//! +//! If there are several top level tasks that should run until any one completes, use the "select" +//! family of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run` +//! function will return when the first future completes. The uncompleted futures will also be +//! returned so they can be run further or otherwise cleaned up. These functions are inspired by +//! the `select_all` function from futures-rs, but built to be run inside an FD based executor and +//! to poll only when necessary. See the docs for [`select2`](fn.select2.html), +//! [`select3`](fn.select3.html), [`select4`](fn.select4.html), and [`select5`](fn.select5.html). +//! +//! ## Completing all of several futures. +//! +//! If there are several top level tasks that all need to be completed, use the "complete" family +//! of executor constructors. These return an [`Executor`](trait.Executor.html) whose `run` +//! function will return only once all the futures passed to it have completed. These functions are +//! inspired by the `join_all` function from futures-rs, but built to be run inside an FD based +//! executor and to poll only when necessary. See the docs for [`complete2`](fn.complete2.html), +//! [`complete3`](fn.complete3.html), [`complete4`](fn.complete4.html), and +//! [`complete5`](fn.complete5.html). +//! +//! ## Many futures all returning `()` +//! +//! It there are futures that produce side effects and return `()`, the +//! [`empty_executor`](fn.empty_executor.html) function provides an Executor that runs futures +//! returning `()`. Futures are added using the [`add_future`](fn.add_future.html) function. +//! +//! # Implementing new FD-based futures. +//! +//! When building futures to be run in an `FdExecutor` framework, use the following helper +//! functions to perform common tasks: +//! +//! [`add_read_waker`](fn.add_read_waker.html) - Used to associate a provided FD becoming readable +//! with the future being woken. Used before returning Poll::Pending from a future that waits until +//! an FD is writable. +//! +//! [`add_write_waker`](fn.add_write_waker.html) - Used to associate a provided FD becoming +//! writable with the future being woken. Used before returning Poll::Pending from a future that +//! waits until an FD is readable. +//! +//! [`add_future`](fn.add_future.html) - Used to add a new future to the top-level list of running +//! futures. + +mod complete; +mod executor; +pub mod fd_executor; +mod select; +mod waker; + +pub use executor::Executor; +pub use select::SelectResult; + +use executor::UnitFutures; +use fd_executor::{FdExecutor, Result}; +use std::future::Future; + +/// Creates an empty FdExecutor that can have futures returning `()` added via +/// [`add_future`](fn.add_future.html). +/// +/// # Example +/// +/// ``` +/// use cros_async::{empty_executor, Executor}; +/// use cros_async::fd_executor::add_future; +/// use futures::future::pending; +/// +/// let fut = async { () }; +/// let mut ex = empty_executor().expect("Failed to create executor"); +/// +/// add_future(Box::pin(fut)); +/// ex.run(); +/// ``` +pub fn empty_executor() -> Result { + FdExecutor::new(UnitFutures::new()) +} + +// Select helpers to run until any future completes. + +/// Creates an executor that runs the two given futures until one completes, returning a tuple +/// containing the result of the finished future and the still pending future. +/// +/// # Example +/// +/// ``` +/// use cros_async::{empty_executor, Executor, select2, SelectResult}; +/// use cros_async::fd_executor::add_future; +/// use futures::future::pending; +/// use futures::pin_mut; +/// +/// let first = async {5}; +/// let second = async {let () = pending().await;}; +/// pin_mut!(first); +/// pin_mut!(second); +/// match select2(first, second) { +/// Ok((SelectResult::Finished(5), SelectResult::Pending(_second))) => (), +/// _ => panic!("Select didn't return the first future"), +/// }; +/// ``` +pub fn select2( + f1: F1, + f2: F2, +) -> Result<(SelectResult, SelectResult)> { + FdExecutor::new(select::Select2::new(f1, f2)).and_then(|mut f| f.run()) +} + +/// Creates an executor that runs the three given futures until one or more completes, returning a +/// tuple containing the result of the finished future(s) and the still pending future(s). +/// +/// # Example +/// +/// ``` +/// use cros_async::{empty_executor, Executor, select3, SelectResult}; +/// use cros_async::fd_executor::add_future; +/// use futures::future::pending; +/// use futures::pin_mut; +/// +/// let first = async {4}; +/// let second = async {let () = pending().await;}; +/// let third = async {5}; +/// pin_mut!(first); +/// pin_mut!(second); +/// pin_mut!(third); +/// match select3(first, second, third) { +/// Ok((SelectResult::Finished(4), +/// SelectResult::Pending(_second), +/// SelectResult::Finished(5))) => (), +/// _ => panic!("Select didn't return the futures"), +/// }; +/// ``` +pub fn select3( + f1: F1, + f2: F2, + f3: F3, +) -> Result<(SelectResult, SelectResult, SelectResult)> { + FdExecutor::new(select::Select3::new(f1, f2, f3)).and_then(|mut f| f.run()) +} + +/// Creates an executor that runs the four given futures until one or more completes, returning a +/// tuple containing the result of the finished future(s) and the still pending future(s). +/// +/// # Example +/// +/// ``` +/// use cros_async::{empty_executor, Executor, select4, SelectResult}; +/// use cros_async::fd_executor::add_future; +/// use futures::future::pending; +/// use futures::pin_mut; +/// +/// let first = async {4}; +/// let second = async {let () = pending().await;}; +/// let third = async {5}; +/// let fourth = async {let () = pending().await;}; +/// pin_mut!(first); +/// pin_mut!(second); +/// pin_mut!(third); +/// pin_mut!(fourth); +/// match select4(first, second, third, fourth) { +/// Ok((SelectResult::Finished(4), SelectResult::Pending(_second), +/// SelectResult::Finished(5), SelectResult::Pending(_fourth))) => (), +/// _ => panic!("Select didn't return the futures"), +/// }; +/// ``` +pub fn select4( + f1: F1, + f2: F2, + f3: F3, + f4: F4, +) -> Result<( + SelectResult, + SelectResult, + SelectResult, + SelectResult, +)> { + FdExecutor::new(select::Select4::new(f1, f2, f3, f4)).and_then(|mut f| f.run()) +} + +/// Creates an executor that runs the five given futures until one or more completes, returning a +/// tuple containing the result of the finished future(s) and the still pending future(s). +/// +/// # Example +/// +/// ``` +/// use cros_async::{empty_executor, Executor, select5, SelectResult}; +/// use cros_async::fd_executor::add_future; +/// use futures::future::pending; +/// use futures::pin_mut; +/// +/// let first = async {4}; +/// let second = async {let () = pending().await;}; +/// let third = async {5}; +/// let fourth = async {let () = pending().await;}; +/// let fifth = async {6}; +/// pin_mut!(first); +/// pin_mut!(second); +/// pin_mut!(third); +/// pin_mut!(fourth); +/// pin_mut!(fifth); +/// match select5(first, second, third, fourth, fifth) { +/// Ok((SelectResult::Finished(4), SelectResult::Pending(_second), +/// SelectResult::Finished(5), SelectResult::Pending(_fourth), +/// SelectResult::Finished(6))) => (), +/// _ => panic!("Select didn't return the futures"), +/// }; +/// ``` +pub fn select5< + F1: Future + Unpin, + F2: Future + Unpin, + F3: Future + Unpin, + F4: Future + Unpin, + F5: Future + Unpin, +>( + f1: F1, + f2: F2, + f3: F3, + f4: F4, + f5: F5, +) -> Result<( + SelectResult, + SelectResult, + SelectResult, + SelectResult, + SelectResult, +)> { + FdExecutor::new(select::Select5::new(f1, f2, f3, f4, f5)).and_then(|mut f| f.run()) +} + +// Combination helpers to run until all futures are complete. + +/// Creates an executor that runs the two given futures to completion, returning a tuple of the +/// outputs each yields. +/// +/// # Example +/// +/// ``` +/// use cros_async::{empty_executor, Executor, complete2}; +/// use futures::pin_mut; +/// +/// let first = async {5}; +/// let second = async {6}; +/// pin_mut!(first); +/// pin_mut!(second); +/// assert_eq!(complete2(first, second).unwrap_or((0,0)), (5,6)); +/// ``` +pub fn complete2( + f1: F1, + f2: F2, +) -> Result<(F1::Output, F2::Output)> { + FdExecutor::new(complete::Complete2::new(f1, f2)).and_then(|mut f| f.run()) +} + +/// Creates an executor that runs the three given futures to completion, returning a tuple of the +/// outputs each yields. +/// +/// # Example +/// +/// ``` +/// use cros_async::{empty_executor, Executor, complete3}; +/// use futures::pin_mut; +/// +/// let first = async {5}; +/// let second = async {6}; +/// let third = async {7}; +/// pin_mut!(first); +/// pin_mut!(second); +/// pin_mut!(third); +/// assert_eq!(complete3(first, second, third).unwrap_or((0,0,0)), (5,6,7)); +/// ``` +pub fn complete3( + f1: F1, + f2: F2, + f3: F3, +) -> Result<(F1::Output, F2::Output, F3::Output)> { + FdExecutor::new(complete::Complete3::new(f1, f2, f3)).and_then(|mut f| f.run()) +} + +/// Creates an executor that runs the four given futures to completion, returning a tuple of the +/// outputs each yields. +/// +/// # Example +/// +/// ``` +/// use cros_async::{empty_executor, Executor, complete4}; +/// use futures::pin_mut; +/// +/// let first = async {5}; +/// let second = async {6}; +/// let third = async {7}; +/// let fourth = async {8}; +/// pin_mut!(first); +/// pin_mut!(second); +/// pin_mut!(third); +/// pin_mut!(fourth); +/// assert_eq!(complete4(first, second, third, fourth).unwrap_or((0,0,0,0)), (5,6,7,8)); +/// ``` +pub fn complete4( + f1: F1, + f2: F2, + f3: F3, + f4: F4, +) -> Result<(F1::Output, F2::Output, F3::Output, F4::Output)> { + FdExecutor::new(complete::Complete4::new(f1, f2, f3, f4)).and_then(|mut f| f.run()) +} + +/// Creates an executor that runs the five given futures to completion, returning a tuple of the +/// outputs each yields. +/// +/// # Example +/// +/// ``` +/// use cros_async::{empty_executor, Executor, complete5}; +/// use futures::pin_mut; +/// +/// let first = async {5}; +/// let second = async {6}; +/// let third = async {7}; +/// let fourth = async {8}; +/// let fifth = async {9}; +/// pin_mut!(first); +/// pin_mut!(second); +/// pin_mut!(third); +/// pin_mut!(fourth); +/// pin_mut!(fifth); +/// assert_eq!(complete5(first, second, third, fourth, fifth).unwrap_or((0,0,0,0,0)), +/// (5,6,7,8,9)); +/// ``` +pub fn complete5< + F1: Future + Unpin, + F2: Future + Unpin, + F3: Future + Unpin, + F4: Future + Unpin, + F5: Future + Unpin, +>( + f1: F1, + f2: F2, + f3: F3, + f4: F4, + f5: F5, +) -> Result<(F1::Output, F2::Output, F3::Output, F4::Output, F5::Output)> { + FdExecutor::new(complete::Complete5::new(f1, f2, f3, f4, f5)).and_then(|mut f| f.run()) +} diff --git a/cros_async/src/select.rs b/cros_async/src/select.rs new file mode 100644 index 0000000..8eef317 --- /dev/null +++ b/cros_async/src/select.rs @@ -0,0 +1,110 @@ +// Copyright 2020 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// Need non-snake case so the macro can re-use type names for variables. +#![allow(non_snake_case)] + +use std::future::Future; +use std::pin::Pin; +use std::task::Context; + +use futures::future::{maybe_done, FutureExt, MaybeDone}; + +use crate::executor::{FutureList, FutureState, UnitFutures}; + +pub enum SelectResult { + Pending(F), + Finished(F::Output), +} + +// Macro-generate future combinators to allow for running different numbers of top-level futures in +// this FutureList. Generates the implementation of `FutureList` for the select types. For an +// explicit example this is modeled after, see `UnitFutures`. +macro_rules! generate { + ($( + $(#[$doc:meta])* + ($Select:ident, <$($Fut:ident),*>), + )*) => ($( + $(#[$doc])* + #[must_use = "Combinations of futures don't do anything unless run in an executor."] + paste::item! { + pub(crate) struct $Select<$($Fut: Future + Unpin),*> { + added_futures: UnitFutures, + $($Fut: MaybeDone<$Fut>,)* + $([<$Fut _state>]: FutureState,)* + } + } + + impl<$($Fut: Future + Unpin),*> $Select<$($Fut),*> { + paste::item! { + pub(crate) fn new($($Fut: $Fut),*) -> $Select<$($Fut),*> { + $Select { + added_futures: UnitFutures::new(), + $($Fut: maybe_done($Fut),)* + $([<$Fut _state>]: FutureState::new(),)* + } + } + } + } + + impl<$($Fut: Future + Unpin),*> FutureList for $Select<$($Fut),*> { + type Output = ($(SelectResult<$Fut>),*); + + fn futures_mut(&mut self) -> &mut UnitFutures { + &mut self.added_futures + } + + paste::item! { + fn poll_results(&mut self) -> Option { + let _ = self.added_futures.poll_results(); + + let mut complete = false; + $( + let $Fut = Pin::new(&mut self.$Fut); + if self.[<$Fut _state>].needs_poll.replace(false) { + let mut ctx = Context::from_waker(&self.[<$Fut _state>].waker); + // The future impls `Unpin`, use `poll_unpin` to avoid wrapping it in + // `Pin` to call `poll`. + complete |= self.$Fut.poll_unpin(&mut ctx).is_ready(); + } + )* + + if complete { + Some(($( + match std::mem::replace(&mut self.$Fut, MaybeDone::Gone) { + MaybeDone::Future(f) => SelectResult::Pending(f), + MaybeDone::Done(o) => SelectResult::Finished(o), + MaybeDone::Gone=>unreachable!(), + } + ), *)) + } else { + None + } + } + + fn any_ready(&self) -> bool { + let mut ready = self.added_futures.any_ready(); + $( + ready |= self.[<$Fut _state>].needs_poll.get(); + )* + ready + } + } + } + )*) +} + +generate! { + /// _Future for the [`select2`] function. + (Select2, <_Fut1, _Fut2>), + + /// _Future for the [`select3`] function. + (Select3, <_Fut1, _Fut2, _Fut3>), + + /// _Future for the [`select4`] function. + (Select4, <_Fut1, _Fut2, _Fut3, _Fut4>), + + /// _Future for the [`select5`] function. + (Select5, <_Fut1, _Fut2, _Fut3, _Fut4, _Fut5>), +} diff --git a/cros_async/src/waker.rs b/cros_async/src/waker.rs new file mode 100644 index 0000000..f0dac0f --- /dev/null +++ b/cros_async/src/waker.rs @@ -0,0 +1,42 @@ +// Copyright 2020 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use std::rc::Rc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::task::{RawWaker, RawWakerVTable}; + +// Boiler-plate for creating a waker with function pointers. +// This waker sets the atomic bool it is passed to true. +// The bool will be used by the executor to know which futures to poll + +// Convert the pointer back to the Rc it was created from and drop it. +unsafe fn waker_drop(data_ptr: *const ()) { + // from_raw, then drop + let _rc_bool = Rc::::from_raw(data_ptr as *const _); +} + +unsafe fn waker_wake(_: *const ()) {} + +// Called when the bool should be set to true to wake the waker. +unsafe fn waker_wake_by_ref(data_ptr: *const ()) { + let bool_atomic_ptr = data_ptr as *const AtomicBool; + let bool_atomic_ref = bool_atomic_ptr.as_ref().unwrap(); + bool_atomic_ref.store(true, Ordering::Relaxed); +} + +// The data_ptr will be a pointer to an Rc. +unsafe fn waker_clone(data_ptr: *const ()) -> RawWaker { + let rc_bool = Rc::::from_raw(data_ptr as *const _); + let new_ptr = rc_bool.clone(); + Rc::into_raw(rc_bool); // Don't decrement the ref count of the original, so back to raw. + create_waker(Rc::into_raw(new_ptr) as *const _) +} + +static WAKER_VTABLE: RawWakerVTable = + RawWakerVTable::new(waker_clone, waker_wake, waker_wake_by_ref, waker_drop); + +/// To use safely, data_ptr must be from Rc::from_raw(). +pub unsafe fn create_waker(data_ptr: *const ()) -> RawWaker { + RawWaker::new(data_ptr, &WAKER_VTABLE) +} -- cgit 1.4.1