diff options
author | Dylan Reid <dgreid@chromium.org> | 2020-05-05 04:13:44 +0000 |
---|---|---|
committer | Commit Bot <commit-bot@chromium.org> | 2020-05-07 22:39:10 +0000 |
commit | 0bb7fa603d6250079ac818289b5c887175ea35b3 (patch) | |
tree | 08a0bb766fff02188eebe62ef05e42f4c1f55cd4 /cros_async | |
parent | 882e2cea3bdeb6341b1e38b04e93ac6ede5a493d (diff) | |
download | crosvm-0bb7fa603d6250079ac818289b5c887175ea35b3.tar crosvm-0bb7fa603d6250079ac818289b5c887175ea35b3.tar.gz crosvm-0bb7fa603d6250079ac818289b5c887175ea35b3.tar.bz2 crosvm-0bb7fa603d6250079ac818289b5c887175ea35b3.tar.lz crosvm-0bb7fa603d6250079ac818289b5c887175ea35b3.tar.xz crosvm-0bb7fa603d6250079ac818289b5c887175ea35b3.tar.zst crosvm-0bb7fa603d6250079ac818289b5c887175ea35b3.zip |
cros_async: Hide the details of fd_executor
The type of the executor leaked from the cros_async crate. That was fine until the desire to add a new executor arose. Hide the fd_executor so that a uring_executor can be substituted on newer kernels. Change-Id: I8dd309fd47e1b4a6e16da274abbb8431c80474af Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2182042 Reviewed-by: Stephen Barber <smbarber@chromium.org> Reviewed-by: Chirantan Ekbote <chirantan@chromium.org> Tested-by: kokoro <noreply+kokoro@google.com> Tested-by: Dylan Reid <dgreid@chromium.org> Commit-Queue: Dylan Reid <dgreid@chromium.org>
Diffstat (limited to 'cros_async')
-rw-r--r-- | cros_async/src/executor.rs | 3 | ||||
-rw-r--r-- | cros_async/src/fd_executor.rs | 38 | ||||
-rw-r--r-- | cros_async/src/lib.rs | 105 |
3 files changed, 109 insertions, 37 deletions
diff --git a/cros_async/src/executor.rs b/cros_async/src/executor.rs index 6641ec7..85d3f70 100644 --- a/cros_async/src/executor.rs +++ b/cros_async/src/executor.rs @@ -26,6 +26,9 @@ pub trait Executor { fn run(&mut self) -> Self::Output; } +/// A token returned from `add_waker` that can be used to cancel the waker before it completes. +pub struct WakerToken(pub(crate) u64); + // Tracks if a future needs to be polled and the waker to use. pub(crate) struct FutureState { pub needs_poll: Rc<Cell<bool>>, diff --git a/cros_async/src/fd_executor.rs b/cros_async/src/fd_executor.rs index cc67793..1ab9ca8 100644 --- a/cros_async/src/fd_executor.rs +++ b/cros_async/src/fd_executor.rs @@ -8,23 +8,6 @@ //! //! `FdExecutor` is meant to be used with the `futures-rs` crate that provides combinators and //! utility functions to combine futures. -//! -//! # Example of starting the framework and running a future: -//! -//! ``` -//! # use std::rc::Rc; -//! # use std::cell::RefCell; -//! use cros_async::Executor; -//! async fn my_async(mut x: Rc<RefCell<u64>>) { -//! x.replace(4); -//! } -//! -//! let mut ex = cros_async::empty_executor().expect("Failed creating executor"); -//! let x = Rc::new(RefCell::new(0)); -//! cros_async::fd_executor::add_future(Box::pin(my_async(x.clone()))); -//! ex.run(); -//! assert_eq!(*x.borrow(), 4); -//! ``` use std::cell::RefCell; use std::collections::{BTreeMap, VecDeque}; @@ -38,7 +21,7 @@ use std::task::Waker; use sys_util::{PollContext, WatchingEvents}; -use crate::executor::{ExecutableFuture, Executor, FutureList}; +use crate::executor::{ExecutableFuture, Executor, FutureList, WakerToken}; #[derive(Debug, PartialEq)] pub enum Error { @@ -80,9 +63,6 @@ impl Display for Error { // Tracks active wakers and the futures they are associated with. thread_local!(static STATE: RefCell<Option<FdWakerState>> = RefCell::new(None)); -/// A token returned from `add_waker` that can be used to cancel the waker before it completes. -pub struct WakerToken(u64); - fn add_waker(fd: RawFd, waker: Waker, events: WatchingEvents) -> Result<WakerToken> { STATE.with(|state| { let mut state = state.borrow_mut(); @@ -276,8 +256,10 @@ unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> { #[cfg(test)] mod test { + use std::cell::RefCell; use std::future::Future; use std::os::unix::io::AsRawFd; + use std::rc::Rc; use std::task::{Context, Poll}; use futures::future::Either; @@ -338,4 +320,18 @@ mod test { assert!(state.as_ref().unwrap().token_map.is_empty()); }); } + + #[test] + fn run() { + // Example of starting the framework and running a future: + async fn my_async(x: Rc<RefCell<u64>>) { + x.replace(4); + } + + let mut ex = FdExecutor::new(crate::UnitFutures::new()).expect("Failed creating executor"); + let x = Rc::new(RefCell::new(0)); + crate::fd_executor::add_future(Box::pin(my_async(x.clone()))).unwrap(); + ex.run().unwrap(); + assert_eq!(*x.borrow(), 4); + } } diff --git a/cros_async/src/lib.rs b/cros_async/src/lib.rs index 00109a8..53f480d 100644 --- a/cros_async/src/lib.rs +++ b/cros_async/src/lib.rs @@ -41,8 +41,8 @@ //! //! # Implementing new FD-based futures. //! -//! When building futures to be run in an `FdExecutor` framework, use the following helper -//! functions to perform common tasks: +//! When building futures to be run in an `Executor` framework, use the following helper functions +//! to perform common tasks: //! //! [`add_read_waker`](fn.add_read_waker.html) - Used to associate a provided FD becoming readable //! with the future being woken. Used before returning Poll::Pending from a future that waits until @@ -57,16 +57,38 @@ mod complete; mod executor; -pub mod fd_executor; +mod fd_executor; mod select; mod waker; -pub use executor::Executor; +pub use executor::{Executor, WakerToken}; pub use select::SelectResult; use executor::{RunOne, UnitFutures}; -use fd_executor::{FdExecutor, Result}; +use fd_executor::FdExecutor; + +use std::fmt::{self, Display}; use std::future::Future; +use std::os::unix::io::RawFd; +use std::pin::Pin; +use std::task::Waker; + +#[derive(Debug, PartialEq)] +pub enum Error { + /// Error from the FD executor. + FdExecutor(fd_executor::Error), +} +pub type Result<T> = std::result::Result<T, Error>; + +impl Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + use self::Error::*; + + match self { + FdExecutor(e) => write!(f, "Failure in the FD executor: {}", e), + } + } +} /// Creates an empty FdExecutor that can have futures returning `()` added via /// [`add_future`](fn.add_future.html). @@ -85,7 +107,7 @@ use std::future::Future; /// ex.run(); /// ``` pub fn empty_executor() -> Result<impl Executor> { - FdExecutor::new(UnitFutures::new()) + FdExecutor::new(UnitFutures::new()).map_err(Error::FdExecutor) } /// Creates a FdExecutor that runs one future to completion. @@ -99,7 +121,9 @@ pub fn empty_executor() -> Result<impl Executor> { /// assert_eq!(Ok(55),run_one(Box::pin(fut))); /// ``` pub fn run_one<F: Future + Unpin>(fut: F) -> Result<F::Output> { - FdExecutor::new(RunOne::new(fut)).and_then(|mut ex| ex.run()) + FdExecutor::new(RunOne::new(fut)) + .and_then(|mut ex| ex.run()) + .map_err(Error::FdExecutor) } // Select helpers to run until any future completes. @@ -128,7 +152,9 @@ pub fn select2<F1: Future + Unpin, F2: Future + Unpin>( f1: F1, f2: F2, ) -> Result<(SelectResult<F1>, SelectResult<F2>)> { - FdExecutor::new(select::Select2::new(f1, f2)).and_then(|mut f| f.run()) + FdExecutor::new(select::Select2::new(f1, f2)) + .and_then(|mut f| f.run()) + .map_err(Error::FdExecutor) } /// Creates an executor that runs the three given futures until one or more completes, returning a @@ -160,7 +186,9 @@ pub fn select3<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin>( f2: F2, f3: F3, ) -> Result<(SelectResult<F1>, SelectResult<F2>, SelectResult<F3>)> { - FdExecutor::new(select::Select3::new(f1, f2, f3)).and_then(|mut f| f.run()) + FdExecutor::new(select::Select3::new(f1, f2, f3)) + .and_then(|mut f| f.run()) + .map_err(Error::FdExecutor) } /// Creates an executor that runs the four given futures until one or more completes, returning a @@ -199,7 +227,9 @@ pub fn select4<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin, F4: F SelectResult<F3>, SelectResult<F4>, )> { - FdExecutor::new(select::Select4::new(f1, f2, f3, f4)).and_then(|mut f| f.run()) + FdExecutor::new(select::Select4::new(f1, f2, f3, f4)) + .and_then(|mut f| f.run()) + .map_err(Error::FdExecutor) } /// Creates an executor that runs the five given futures until one or more completes, returning a @@ -249,7 +279,9 @@ pub fn select5< SelectResult<F4>, SelectResult<F5>, )> { - FdExecutor::new(select::Select5::new(f1, f2, f3, f4, f5)).and_then(|mut f| f.run()) + FdExecutor::new(select::Select5::new(f1, f2, f3, f4, f5)) + .and_then(|mut f| f.run()) + .map_err(Error::FdExecutor) } /// Creates an executor that runs the six given futures until one or more completes, returning a @@ -304,7 +336,9 @@ pub fn select6< SelectResult<F5>, SelectResult<F6>, )> { - FdExecutor::new(select::Select6::new(f1, f2, f3, f4, f5, f6)).and_then(|mut f| f.run()) + FdExecutor::new(select::Select6::new(f1, f2, f3, f4, f5, f6)) + .and_then(|mut f| f.run()) + .map_err(Error::FdExecutor) } // Combination helpers to run until all futures are complete. @@ -328,7 +362,9 @@ pub fn complete2<F1: Future + Unpin, F2: Future + Unpin>( f1: F1, f2: F2, ) -> Result<(F1::Output, F2::Output)> { - FdExecutor::new(complete::Complete2::new(f1, f2)).and_then(|mut f| f.run()) + FdExecutor::new(complete::Complete2::new(f1, f2)) + .and_then(|mut f| f.run()) + .map_err(Error::FdExecutor) } /// Creates an executor that runs the three given futures to completion, returning a tuple of the @@ -353,7 +389,9 @@ pub fn complete3<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin>( f2: F2, f3: F3, ) -> Result<(F1::Output, F2::Output, F3::Output)> { - FdExecutor::new(complete::Complete3::new(f1, f2, f3)).and_then(|mut f| f.run()) + FdExecutor::new(complete::Complete3::new(f1, f2, f3)) + .and_then(|mut f| f.run()) + .map_err(Error::FdExecutor) } /// Creates an executor that runs the four given futures to completion, returning a tuple of the @@ -381,7 +419,9 @@ pub fn complete4<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin, F4: f3: F3, f4: F4, ) -> Result<(F1::Output, F2::Output, F3::Output, F4::Output)> { - FdExecutor::new(complete::Complete4::new(f1, f2, f3, f4)).and_then(|mut f| f.run()) + FdExecutor::new(complete::Complete4::new(f1, f2, f3, f4)) + .and_then(|mut f| f.run()) + .map_err(Error::FdExecutor) } /// Creates an executor that runs the five given futures to completion, returning a tuple of the @@ -419,5 +459,38 @@ pub fn complete5< f4: F4, f5: F5, ) -> Result<(F1::Output, F2::Output, F3::Output, F4::Output, F5::Output)> { - FdExecutor::new(complete::Complete5::new(f1, f2, f3, f4, f5)).and_then(|mut f| f.run()) + FdExecutor::new(complete::Complete5::new(f1, f2, f3, f4, f5)) + .and_then(|mut f| f.run()) + .map_err(Error::FdExecutor) +} + +// Functions to be used by `Future` implementations + +/// Tells the waking system to wake `waker` when `fd` becomes readable. +/// The 'fd' must be fully owned by the future adding the waker, and must not be closed until the +/// next time the future is polled. If the fd is closed, there is a race where another FD can be +/// opened on top of it causing the next poll to access the new target file. +/// Returns a `WakerToken` that can be used to cancel the waker before it completes. +pub fn add_read_waker(fd: RawFd, waker: Waker) -> Result<WakerToken> { + fd_executor::add_read_waker(fd, waker).map_err(Error::FdExecutor) +} + +/// Tells the waking system to wake `waker` when `fd` becomes writable. +/// The 'fd' must be fully owned by the future adding the waker, and must not be closed until the +/// next time the future is polled. If the fd is closed, there is a race where another FD can be +/// opened on top of it causing the next poll to access the new target file. +/// Returns a `WakerToken` that can be used to cancel the waker before it completes. +pub fn add_write_waker(fd: RawFd, waker: Waker) -> Result<WakerToken> { + fd_executor::add_write_waker(fd, waker).map_err(Error::FdExecutor) +} + +/// Cancels the waker that returned the given token if the waker hasn't yet fired. +pub fn cancel_waker(token: WakerToken) -> Result<()> { + fd_executor::cancel_waker(token).map_err(Error::FdExecutor) +} + +/// Adds a new top level future to the Executor. +/// These futures must return `()`, indicating they are intended to create side-effects only. +pub fn add_future(future: Pin<Box<dyn Future<Output = ()>>>) -> Result<()> { + fd_executor::add_future(future).map_err(Error::FdExecutor) } |