summary refs log tree commit diff
path: root/cros_async
diff options
context:
space:
mode:
authorDylan Reid <dgreid@chromium.org>2020-05-05 04:13:44 +0000
committerCommit Bot <commit-bot@chromium.org>2020-05-07 22:39:10 +0000
commit0bb7fa603d6250079ac818289b5c887175ea35b3 (patch)
tree08a0bb766fff02188eebe62ef05e42f4c1f55cd4 /cros_async
parent882e2cea3bdeb6341b1e38b04e93ac6ede5a493d (diff)
downloadcrosvm-0bb7fa603d6250079ac818289b5c887175ea35b3.tar
crosvm-0bb7fa603d6250079ac818289b5c887175ea35b3.tar.gz
crosvm-0bb7fa603d6250079ac818289b5c887175ea35b3.tar.bz2
crosvm-0bb7fa603d6250079ac818289b5c887175ea35b3.tar.lz
crosvm-0bb7fa603d6250079ac818289b5c887175ea35b3.tar.xz
crosvm-0bb7fa603d6250079ac818289b5c887175ea35b3.tar.zst
crosvm-0bb7fa603d6250079ac818289b5c887175ea35b3.zip
cros_async: Hide the details of fd_executor
The type of the executor leaked from the cros_async crate. That was fine
until the desire to add a new executor arose. Hide the fd_executor so
that a uring_executor can be substituted on newer kernels.

Change-Id: I8dd309fd47e1b4a6e16da274abbb8431c80474af
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2182042
Reviewed-by: Stephen Barber <smbarber@chromium.org>
Reviewed-by: Chirantan Ekbote <chirantan@chromium.org>
Tested-by: kokoro <noreply+kokoro@google.com>
Tested-by: Dylan Reid <dgreid@chromium.org>
Commit-Queue: Dylan Reid <dgreid@chromium.org>
Diffstat (limited to 'cros_async')
-rw-r--r--cros_async/src/executor.rs3
-rw-r--r--cros_async/src/fd_executor.rs38
-rw-r--r--cros_async/src/lib.rs105
3 files changed, 109 insertions, 37 deletions
diff --git a/cros_async/src/executor.rs b/cros_async/src/executor.rs
index 6641ec7..85d3f70 100644
--- a/cros_async/src/executor.rs
+++ b/cros_async/src/executor.rs
@@ -26,6 +26,9 @@ pub trait Executor {
     fn run(&mut self) -> Self::Output;
 }
 
+/// A token returned from `add_waker` that can be used to cancel the waker before it completes.
+pub struct WakerToken(pub(crate) u64);
+
 // Tracks if a future needs to be polled and the waker to use.
 pub(crate) struct FutureState {
     pub needs_poll: Rc<Cell<bool>>,
diff --git a/cros_async/src/fd_executor.rs b/cros_async/src/fd_executor.rs
index cc67793..1ab9ca8 100644
--- a/cros_async/src/fd_executor.rs
+++ b/cros_async/src/fd_executor.rs
@@ -8,23 +8,6 @@
 //!
 //! `FdExecutor` is meant to be used with the `futures-rs` crate that provides combinators and
 //! utility functions to combine futures.
-//!
-//! # Example of starting the framework and running a future:
-//!
-//! ```
-//! # use std::rc::Rc;
-//! # use std::cell::RefCell;
-//! use cros_async::Executor;
-//! async fn my_async(mut x: Rc<RefCell<u64>>) {
-//!     x.replace(4);
-//! }
-//!
-//! let mut ex = cros_async::empty_executor().expect("Failed creating executor");
-//! let x = Rc::new(RefCell::new(0));
-//! cros_async::fd_executor::add_future(Box::pin(my_async(x.clone())));
-//! ex.run();
-//! assert_eq!(*x.borrow(), 4);
-//! ```
 
 use std::cell::RefCell;
 use std::collections::{BTreeMap, VecDeque};
@@ -38,7 +21,7 @@ use std::task::Waker;
 
 use sys_util::{PollContext, WatchingEvents};
 
-use crate::executor::{ExecutableFuture, Executor, FutureList};
+use crate::executor::{ExecutableFuture, Executor, FutureList, WakerToken};
 
 #[derive(Debug, PartialEq)]
 pub enum Error {
@@ -80,9 +63,6 @@ impl Display for Error {
 // Tracks active wakers and the futures they are associated with.
 thread_local!(static STATE: RefCell<Option<FdWakerState>> = RefCell::new(None));
 
-/// A token returned from `add_waker` that can be used to cancel the waker before it completes.
-pub struct WakerToken(u64);
-
 fn add_waker(fd: RawFd, waker: Waker, events: WatchingEvents) -> Result<WakerToken> {
     STATE.with(|state| {
         let mut state = state.borrow_mut();
@@ -276,8 +256,10 @@ unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
 
 #[cfg(test)]
 mod test {
+    use std::cell::RefCell;
     use std::future::Future;
     use std::os::unix::io::AsRawFd;
+    use std::rc::Rc;
     use std::task::{Context, Poll};
 
     use futures::future::Either;
@@ -338,4 +320,18 @@ mod test {
             assert!(state.as_ref().unwrap().token_map.is_empty());
         });
     }
+
+    #[test]
+    fn run() {
+        // Example of starting the framework and running a future:
+        async fn my_async(x: Rc<RefCell<u64>>) {
+            x.replace(4);
+        }
+
+        let mut ex = FdExecutor::new(crate::UnitFutures::new()).expect("Failed creating executor");
+        let x = Rc::new(RefCell::new(0));
+        crate::fd_executor::add_future(Box::pin(my_async(x.clone()))).unwrap();
+        ex.run().unwrap();
+        assert_eq!(*x.borrow(), 4);
+    }
 }
diff --git a/cros_async/src/lib.rs b/cros_async/src/lib.rs
index 00109a8..53f480d 100644
--- a/cros_async/src/lib.rs
+++ b/cros_async/src/lib.rs
@@ -41,8 +41,8 @@
 //!
 //! # Implementing new FD-based futures.
 //!
-//! When building futures to be run in an `FdExecutor` framework, use the following helper
-//! functions to perform common tasks:
+//! When building futures to be run in an `Executor` framework, use the following helper functions
+//! to perform common tasks:
 //!
 //! [`add_read_waker`](fn.add_read_waker.html) - Used to associate a provided FD becoming readable
 //! with the future being woken. Used before returning Poll::Pending from a future that waits until
@@ -57,16 +57,38 @@
 
 mod complete;
 mod executor;
-pub mod fd_executor;
+mod fd_executor;
 mod select;
 mod waker;
 
-pub use executor::Executor;
+pub use executor::{Executor, WakerToken};
 pub use select::SelectResult;
 
 use executor::{RunOne, UnitFutures};
-use fd_executor::{FdExecutor, Result};
+use fd_executor::FdExecutor;
+
+use std::fmt::{self, Display};
 use std::future::Future;
+use std::os::unix::io::RawFd;
+use std::pin::Pin;
+use std::task::Waker;
+
+#[derive(Debug, PartialEq)]
+pub enum Error {
+    /// Error from the FD executor.
+    FdExecutor(fd_executor::Error),
+}
+pub type Result<T> = std::result::Result<T, Error>;
+
+impl Display for Error {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        use self::Error::*;
+
+        match self {
+            FdExecutor(e) => write!(f, "Failure in the FD executor: {}", e),
+        }
+    }
+}
 
 /// Creates an empty FdExecutor that can have futures returning `()` added via
 /// [`add_future`](fn.add_future.html).
@@ -85,7 +107,7 @@ use std::future::Future;
 ///    ex.run();
 ///    ```
 pub fn empty_executor() -> Result<impl Executor> {
-    FdExecutor::new(UnitFutures::new())
+    FdExecutor::new(UnitFutures::new()).map_err(Error::FdExecutor)
 }
 
 /// Creates a FdExecutor that runs one future to completion.
@@ -99,7 +121,9 @@ pub fn empty_executor() -> Result<impl Executor> {
 ///    assert_eq!(Ok(55),run_one(Box::pin(fut)));
 ///    ```
 pub fn run_one<F: Future + Unpin>(fut: F) -> Result<F::Output> {
-    FdExecutor::new(RunOne::new(fut)).and_then(|mut ex| ex.run())
+    FdExecutor::new(RunOne::new(fut))
+        .and_then(|mut ex| ex.run())
+        .map_err(Error::FdExecutor)
 }
 
 // Select helpers to run until any future completes.
@@ -128,7 +152,9 @@ pub fn select2<F1: Future + Unpin, F2: Future + Unpin>(
     f1: F1,
     f2: F2,
 ) -> Result<(SelectResult<F1>, SelectResult<F2>)> {
-    FdExecutor::new(select::Select2::new(f1, f2)).and_then(|mut f| f.run())
+    FdExecutor::new(select::Select2::new(f1, f2))
+        .and_then(|mut f| f.run())
+        .map_err(Error::FdExecutor)
 }
 
 /// Creates an executor that runs the three given futures until one or more completes, returning a
@@ -160,7 +186,9 @@ pub fn select3<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin>(
     f2: F2,
     f3: F3,
 ) -> Result<(SelectResult<F1>, SelectResult<F2>, SelectResult<F3>)> {
-    FdExecutor::new(select::Select3::new(f1, f2, f3)).and_then(|mut f| f.run())
+    FdExecutor::new(select::Select3::new(f1, f2, f3))
+        .and_then(|mut f| f.run())
+        .map_err(Error::FdExecutor)
 }
 
 /// Creates an executor that runs the four given futures until one or more completes, returning a
@@ -199,7 +227,9 @@ pub fn select4<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin, F4: F
     SelectResult<F3>,
     SelectResult<F4>,
 )> {
-    FdExecutor::new(select::Select4::new(f1, f2, f3, f4)).and_then(|mut f| f.run())
+    FdExecutor::new(select::Select4::new(f1, f2, f3, f4))
+        .and_then(|mut f| f.run())
+        .map_err(Error::FdExecutor)
 }
 
 /// Creates an executor that runs the five given futures until one or more completes, returning a
@@ -249,7 +279,9 @@ pub fn select5<
     SelectResult<F4>,
     SelectResult<F5>,
 )> {
-    FdExecutor::new(select::Select5::new(f1, f2, f3, f4, f5)).and_then(|mut f| f.run())
+    FdExecutor::new(select::Select5::new(f1, f2, f3, f4, f5))
+        .and_then(|mut f| f.run())
+        .map_err(Error::FdExecutor)
 }
 
 /// Creates an executor that runs the six given futures until one or more completes, returning a
@@ -304,7 +336,9 @@ pub fn select6<
     SelectResult<F5>,
     SelectResult<F6>,
 )> {
-    FdExecutor::new(select::Select6::new(f1, f2, f3, f4, f5, f6)).and_then(|mut f| f.run())
+    FdExecutor::new(select::Select6::new(f1, f2, f3, f4, f5, f6))
+        .and_then(|mut f| f.run())
+        .map_err(Error::FdExecutor)
 }
 
 // Combination helpers to run until all futures are complete.
@@ -328,7 +362,9 @@ pub fn complete2<F1: Future + Unpin, F2: Future + Unpin>(
     f1: F1,
     f2: F2,
 ) -> Result<(F1::Output, F2::Output)> {
-    FdExecutor::new(complete::Complete2::new(f1, f2)).and_then(|mut f| f.run())
+    FdExecutor::new(complete::Complete2::new(f1, f2))
+        .and_then(|mut f| f.run())
+        .map_err(Error::FdExecutor)
 }
 
 /// Creates an executor that runs the three given futures to completion, returning a tuple of the
@@ -353,7 +389,9 @@ pub fn complete3<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin>(
     f2: F2,
     f3: F3,
 ) -> Result<(F1::Output, F2::Output, F3::Output)> {
-    FdExecutor::new(complete::Complete3::new(f1, f2, f3)).and_then(|mut f| f.run())
+    FdExecutor::new(complete::Complete3::new(f1, f2, f3))
+        .and_then(|mut f| f.run())
+        .map_err(Error::FdExecutor)
 }
 
 /// Creates an executor that runs the four given futures to completion, returning a tuple of the
@@ -381,7 +419,9 @@ pub fn complete4<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin, F4:
     f3: F3,
     f4: F4,
 ) -> Result<(F1::Output, F2::Output, F3::Output, F4::Output)> {
-    FdExecutor::new(complete::Complete4::new(f1, f2, f3, f4)).and_then(|mut f| f.run())
+    FdExecutor::new(complete::Complete4::new(f1, f2, f3, f4))
+        .and_then(|mut f| f.run())
+        .map_err(Error::FdExecutor)
 }
 
 /// Creates an executor that runs the five given futures to completion, returning a tuple of the
@@ -419,5 +459,38 @@ pub fn complete5<
     f4: F4,
     f5: F5,
 ) -> Result<(F1::Output, F2::Output, F3::Output, F4::Output, F5::Output)> {
-    FdExecutor::new(complete::Complete5::new(f1, f2, f3, f4, f5)).and_then(|mut f| f.run())
+    FdExecutor::new(complete::Complete5::new(f1, f2, f3, f4, f5))
+        .and_then(|mut f| f.run())
+        .map_err(Error::FdExecutor)
+}
+
+// Functions to be used by `Future` implementations
+
+/// Tells the waking system to wake `waker` when `fd` becomes readable.
+/// The 'fd' must be fully owned by the future adding the waker, and must not be closed until the
+/// next time the future is polled. If the fd is closed, there is a race where another FD can be
+/// opened on top of it causing the next poll to access the new target file.
+/// Returns a `WakerToken` that can be used to cancel the waker before it completes.
+pub fn add_read_waker(fd: RawFd, waker: Waker) -> Result<WakerToken> {
+    fd_executor::add_read_waker(fd, waker).map_err(Error::FdExecutor)
+}
+
+/// Tells the waking system to wake `waker` when `fd` becomes writable.
+/// The 'fd' must be fully owned by the future adding the waker, and must not be closed until the
+/// next time the future is polled. If the fd is closed, there is a race where another FD can be
+/// opened on top of it causing the next poll to access the new target file.
+/// Returns a `WakerToken` that can be used to cancel the waker before it completes.
+pub fn add_write_waker(fd: RawFd, waker: Waker) -> Result<WakerToken> {
+    fd_executor::add_write_waker(fd, waker).map_err(Error::FdExecutor)
+}
+
+/// Cancels the waker that returned the given token if the waker hasn't yet fired.
+pub fn cancel_waker(token: WakerToken) -> Result<()> {
+    fd_executor::cancel_waker(token).map_err(Error::FdExecutor)
+}
+
+/// Adds a new top level future to the Executor.
+/// These futures must return `()`, indicating they are intended to create side-effects only.
+pub fn add_future(future: Pin<Box<dyn Future<Output = ()>>>) -> Result<()> {
+    fd_executor::add_future(future).map_err(Error::FdExecutor)
 }