diff options
author | Dylan Reid <dgreid@chromium.org> | 2020-04-24 03:51:19 +0000 |
---|---|---|
committer | Commit Bot <commit-bot@chromium.org> | 2020-04-29 03:04:36 +0000 |
commit | 887289e5d455d2cd026a2b178002ed009ea8bdd4 (patch) | |
tree | 029605b6be138ae09bc2d00b2167c4a5c1f596c0 /cros_async | |
parent | 7b8f776cb807ce1174899e567b2b00a040fe409f (diff) | |
download | crosvm-887289e5d455d2cd026a2b178002ed009ea8bdd4.tar crosvm-887289e5d455d2cd026a2b178002ed009ea8bdd4.tar.gz crosvm-887289e5d455d2cd026a2b178002ed009ea8bdd4.tar.bz2 crosvm-887289e5d455d2cd026a2b178002ed009ea8bdd4.tar.lz crosvm-887289e5d455d2cd026a2b178002ed009ea8bdd4.tar.xz crosvm-887289e5d455d2cd026a2b178002ed009ea8bdd4.tar.zst crosvm-887289e5d455d2cd026a2b178002ed009ea8bdd4.zip |
cros_async: allow wakers to be canceled
Allowing for wakers to be canceled will allow futures that register wakers properly implement `Drop`. As it is, they won't ever fire but the saved FD will leak. TEST=added 'cancel' unit test to fd_executor: "cargo test cancel" from the cros_async directory. Change-Id: Iab5bea6aac0cc689392997745f5dcc8c285200d9 Change-Id: I1df1a04897e8d2c5e9c414d84998084607209fb9 Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2164074 Reviewed-by: Chirantan Ekbote <chirantan@chromium.org> Commit-Queue: Dylan Reid <dgreid@chromium.org> Tested-by: Dylan Reid <dgreid@chromium.org>
Diffstat (limited to 'cros_async')
-rw-r--r-- | cros_async/src/fd_executor.rs | 102 |
1 files changed, 97 insertions, 5 deletions
diff --git a/cros_async/src/fd_executor.rs b/cros_async/src/fd_executor.rs index 58d6013..349cc42 100644 --- a/cros_async/src/fd_executor.rs +++ b/cros_async/src/fd_executor.rs @@ -80,7 +80,10 @@ impl Display for Error { // Tracks active wakers and the futures they are associated with. thread_local!(static STATE: RefCell<Option<FdWakerState>> = RefCell::new(None)); -fn add_waker(fd: RawFd, waker: Waker, events: WatchingEvents) -> Result<()> { +/// A token returned from `add_waker` that can be used to cancel the waker before it completes. +pub struct WakerToken(u64); + +fn add_waker(fd: RawFd, waker: Waker, events: WatchingEvents) -> Result<WakerToken> { STATE.with(|state| { let mut state = state.borrow_mut(); if let Some(state) = state.as_mut() { @@ -95,7 +98,8 @@ fn add_waker(fd: RawFd, waker: Waker, events: WatchingEvents) -> Result<()> { /// The 'fd' must be fully owned by the future adding the waker, and must not be closed until the /// next time the future is polled. If the fd is closed, there is a race where another FD can be /// opened on top of it causing the next poll to access the new target file. -pub fn add_read_waker(fd: RawFd, waker: Waker) -> Result<()> { +/// Returns a `WakerToken` that can be used to cancel the waker before it completes. +pub fn add_read_waker(fd: RawFd, waker: Waker) -> Result<WakerToken> { add_waker(fd, waker, WatchingEvents::empty().set_read()) } @@ -103,10 +107,23 @@ pub fn add_read_waker(fd: RawFd, waker: Waker) -> Result<()> { /// The 'fd' must be fully owned by the future adding the waker, and must not be closed until the /// next time the future is polled. If the fd is closed, there is a race where another FD can be /// opened on top of it causing the next poll to access the new target file. -pub fn add_write_waker(fd: RawFd, waker: Waker) -> Result<()> { +/// Returns a `WakerToken` that can be used to cancel the waker before it completes. +pub fn add_write_waker(fd: RawFd, waker: Waker) -> Result<WakerToken> { add_waker(fd, waker, WatchingEvents::empty().set_write()) } +/// Cancels the waker that returned the given token if the waker hasn't yet fired. +pub fn cancel_waker(token: WakerToken) -> Result<()> { + STATE.with(|state| { + let mut state = state.borrow_mut(); + if let Some(state) = state.as_mut() { + state.cancel_waker(token) + } else { + Err(Error::InvalidContext) + } + }) +} + /// Adds a new top level future to the Executor. /// These futures must return `()`, indicating they are intended to create side-effects only. pub fn add_future(future: Pin<Box<dyn Future<Output = ()>>>) -> Result<()> { @@ -140,7 +157,7 @@ impl FdWakerState { } // Adds an fd that, when signaled, will trigger the given waker. - fn add_waker(&mut self, fd: RawFd, waker: Waker, events: WatchingEvents) -> Result<()> { + fn add_waker(&mut self, fd: RawFd, waker: Waker, events: WatchingEvents) -> Result<WakerToken> { let duped_fd = unsafe { // Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD // will only be added to the poll loop. @@ -152,7 +169,7 @@ impl FdWakerState { let next_token = self.next_token; self.token_map.insert(next_token, (duped_fd, waker)); self.next_token += 1; - Ok(()) + Ok(WakerToken(next_token)) } // Waits until one of the FDs is readable and wakes the associated waker. @@ -166,6 +183,15 @@ impl FdWakerState { } Ok(()) } + + // Remove the waker for the given token if it hasn't fired yet. + fn cancel_waker(&mut self, token: WakerToken) -> Result<()> { + if let Some((fd, waker)) = self.token_map.remove(&token.0) { + self.poll_ctx.delete(&fd).map_err(Error::PollContextError)?; + waker.wake_by_ref(); + } + Ok(()) + } } /// Runs futures to completion on a single thread. Futures are allowed to block on file descriptors @@ -248,3 +274,69 @@ unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> { Ok(ret) } } + +#[cfg(test)] +mod test { + use std::future::Future; + use std::os::unix::io::AsRawFd; + use std::task::{Context, Poll}; + + use futures::future::Either; + use futures::pin_mut; + + use super::*; + + struct TestFut { + f: File, + token: Option<WakerToken>, + } + + impl TestFut { + fn new(f: File) -> TestFut { + TestFut { f, token: None } + } + } + + impl Future for TestFut { + type Output = u64; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { + if self.token.is_none() { + self.token = Some(add_read_waker(self.f.as_raw_fd(), cx.waker().clone()).unwrap()); + } + Poll::Pending + } + } + + impl Drop for TestFut { + fn drop(&mut self) { + if let Some(token) = self.token.take() { + cancel_waker(token).unwrap(); + } + } + } + + #[test] + fn cancel() { + async fn do_test() { + let (r, _w) = sys_util::pipe(true).unwrap(); + let done = async { 5usize }; + let pending = TestFut::new(r); + pin_mut!(done); + pin_mut!(pending); + match futures::future::select(pending, done).await { + Either::Right((5, _pending)) => (), + _ => panic!("unexpected select result"), + } + } + + let fut = do_test(); + + let mut ex = FdExecutor::new(crate::UnitFutures::new()).expect("Failed creating executor"); + add_future(Box::pin(fut)).unwrap(); + ex.run().unwrap(); + STATE.with(|state| { + let state = state.borrow_mut(); + assert!(state.as_ref().unwrap().token_map.is_empty()); + }); + } +} |