diff options
Diffstat (limited to 'cros_async/src/lib.rs')
-rw-r--r-- | cros_async/src/lib.rs | 105 |
1 files changed, 89 insertions, 16 deletions
diff --git a/cros_async/src/lib.rs b/cros_async/src/lib.rs index 00109a8..53f480d 100644 --- a/cros_async/src/lib.rs +++ b/cros_async/src/lib.rs @@ -41,8 +41,8 @@ //! //! # Implementing new FD-based futures. //! -//! When building futures to be run in an `FdExecutor` framework, use the following helper -//! functions to perform common tasks: +//! When building futures to be run in an `Executor` framework, use the following helper functions +//! to perform common tasks: //! //! [`add_read_waker`](fn.add_read_waker.html) - Used to associate a provided FD becoming readable //! with the future being woken. Used before returning Poll::Pending from a future that waits until @@ -57,16 +57,38 @@ mod complete; mod executor; -pub mod fd_executor; +mod fd_executor; mod select; mod waker; -pub use executor::Executor; +pub use executor::{Executor, WakerToken}; pub use select::SelectResult; use executor::{RunOne, UnitFutures}; -use fd_executor::{FdExecutor, Result}; +use fd_executor::FdExecutor; + +use std::fmt::{self, Display}; use std::future::Future; +use std::os::unix::io::RawFd; +use std::pin::Pin; +use std::task::Waker; + +#[derive(Debug, PartialEq)] +pub enum Error { + /// Error from the FD executor. + FdExecutor(fd_executor::Error), +} +pub type Result<T> = std::result::Result<T, Error>; + +impl Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + use self::Error::*; + + match self { + FdExecutor(e) => write!(f, "Failure in the FD executor: {}", e), + } + } +} /// Creates an empty FdExecutor that can have futures returning `()` added via /// [`add_future`](fn.add_future.html). @@ -85,7 +107,7 @@ use std::future::Future; /// ex.run(); /// ``` pub fn empty_executor() -> Result<impl Executor> { - FdExecutor::new(UnitFutures::new()) + FdExecutor::new(UnitFutures::new()).map_err(Error::FdExecutor) } /// Creates a FdExecutor that runs one future to completion. @@ -99,7 +121,9 @@ pub fn empty_executor() -> Result<impl Executor> { /// assert_eq!(Ok(55),run_one(Box::pin(fut))); /// ``` pub fn run_one<F: Future + Unpin>(fut: F) -> Result<F::Output> { - FdExecutor::new(RunOne::new(fut)).and_then(|mut ex| ex.run()) + FdExecutor::new(RunOne::new(fut)) + .and_then(|mut ex| ex.run()) + .map_err(Error::FdExecutor) } // Select helpers to run until any future completes. @@ -128,7 +152,9 @@ pub fn select2<F1: Future + Unpin, F2: Future + Unpin>( f1: F1, f2: F2, ) -> Result<(SelectResult<F1>, SelectResult<F2>)> { - FdExecutor::new(select::Select2::new(f1, f2)).and_then(|mut f| f.run()) + FdExecutor::new(select::Select2::new(f1, f2)) + .and_then(|mut f| f.run()) + .map_err(Error::FdExecutor) } /// Creates an executor that runs the three given futures until one or more completes, returning a @@ -160,7 +186,9 @@ pub fn select3<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin>( f2: F2, f3: F3, ) -> Result<(SelectResult<F1>, SelectResult<F2>, SelectResult<F3>)> { - FdExecutor::new(select::Select3::new(f1, f2, f3)).and_then(|mut f| f.run()) + FdExecutor::new(select::Select3::new(f1, f2, f3)) + .and_then(|mut f| f.run()) + .map_err(Error::FdExecutor) } /// Creates an executor that runs the four given futures until one or more completes, returning a @@ -199,7 +227,9 @@ pub fn select4<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin, F4: F SelectResult<F3>, SelectResult<F4>, )> { - FdExecutor::new(select::Select4::new(f1, f2, f3, f4)).and_then(|mut f| f.run()) + FdExecutor::new(select::Select4::new(f1, f2, f3, f4)) + .and_then(|mut f| f.run()) + .map_err(Error::FdExecutor) } /// Creates an executor that runs the five given futures until one or more completes, returning a @@ -249,7 +279,9 @@ pub fn select5< SelectResult<F4>, SelectResult<F5>, )> { - FdExecutor::new(select::Select5::new(f1, f2, f3, f4, f5)).and_then(|mut f| f.run()) + FdExecutor::new(select::Select5::new(f1, f2, f3, f4, f5)) + .and_then(|mut f| f.run()) + .map_err(Error::FdExecutor) } /// Creates an executor that runs the six given futures until one or more completes, returning a @@ -304,7 +336,9 @@ pub fn select6< SelectResult<F5>, SelectResult<F6>, )> { - FdExecutor::new(select::Select6::new(f1, f2, f3, f4, f5, f6)).and_then(|mut f| f.run()) + FdExecutor::new(select::Select6::new(f1, f2, f3, f4, f5, f6)) + .and_then(|mut f| f.run()) + .map_err(Error::FdExecutor) } // Combination helpers to run until all futures are complete. @@ -328,7 +362,9 @@ pub fn complete2<F1: Future + Unpin, F2: Future + Unpin>( f1: F1, f2: F2, ) -> Result<(F1::Output, F2::Output)> { - FdExecutor::new(complete::Complete2::new(f1, f2)).and_then(|mut f| f.run()) + FdExecutor::new(complete::Complete2::new(f1, f2)) + .and_then(|mut f| f.run()) + .map_err(Error::FdExecutor) } /// Creates an executor that runs the three given futures to completion, returning a tuple of the @@ -353,7 +389,9 @@ pub fn complete3<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin>( f2: F2, f3: F3, ) -> Result<(F1::Output, F2::Output, F3::Output)> { - FdExecutor::new(complete::Complete3::new(f1, f2, f3)).and_then(|mut f| f.run()) + FdExecutor::new(complete::Complete3::new(f1, f2, f3)) + .and_then(|mut f| f.run()) + .map_err(Error::FdExecutor) } /// Creates an executor that runs the four given futures to completion, returning a tuple of the @@ -381,7 +419,9 @@ pub fn complete4<F1: Future + Unpin, F2: Future + Unpin, F3: Future + Unpin, F4: f3: F3, f4: F4, ) -> Result<(F1::Output, F2::Output, F3::Output, F4::Output)> { - FdExecutor::new(complete::Complete4::new(f1, f2, f3, f4)).and_then(|mut f| f.run()) + FdExecutor::new(complete::Complete4::new(f1, f2, f3, f4)) + .and_then(|mut f| f.run()) + .map_err(Error::FdExecutor) } /// Creates an executor that runs the five given futures to completion, returning a tuple of the @@ -419,5 +459,38 @@ pub fn complete5< f4: F4, f5: F5, ) -> Result<(F1::Output, F2::Output, F3::Output, F4::Output, F5::Output)> { - FdExecutor::new(complete::Complete5::new(f1, f2, f3, f4, f5)).and_then(|mut f| f.run()) + FdExecutor::new(complete::Complete5::new(f1, f2, f3, f4, f5)) + .and_then(|mut f| f.run()) + .map_err(Error::FdExecutor) +} + +// Functions to be used by `Future` implementations + +/// Tells the waking system to wake `waker` when `fd` becomes readable. +/// The 'fd' must be fully owned by the future adding the waker, and must not be closed until the +/// next time the future is polled. If the fd is closed, there is a race where another FD can be +/// opened on top of it causing the next poll to access the new target file. +/// Returns a `WakerToken` that can be used to cancel the waker before it completes. +pub fn add_read_waker(fd: RawFd, waker: Waker) -> Result<WakerToken> { + fd_executor::add_read_waker(fd, waker).map_err(Error::FdExecutor) +} + +/// Tells the waking system to wake `waker` when `fd` becomes writable. +/// The 'fd' must be fully owned by the future adding the waker, and must not be closed until the +/// next time the future is polled. If the fd is closed, there is a race where another FD can be +/// opened on top of it causing the next poll to access the new target file. +/// Returns a `WakerToken` that can be used to cancel the waker before it completes. +pub fn add_write_waker(fd: RawFd, waker: Waker) -> Result<WakerToken> { + fd_executor::add_write_waker(fd, waker).map_err(Error::FdExecutor) +} + +/// Cancels the waker that returned the given token if the waker hasn't yet fired. +pub fn cancel_waker(token: WakerToken) -> Result<()> { + fd_executor::cancel_waker(token).map_err(Error::FdExecutor) +} + +/// Adds a new top level future to the Executor. +/// These futures must return `()`, indicating they are intended to create side-effects only. +pub fn add_future(future: Pin<Box<dyn Future<Output = ()>>>) -> Result<()> { + fd_executor::add_future(future).map_err(Error::FdExecutor) } |