summary refs log tree commit diff
path: root/cros_async
diff options
context:
space:
mode:
authorDylan Reid <dgreid@chromium.org>2020-04-24 03:51:19 +0000
committerCommit Bot <commit-bot@chromium.org>2020-04-29 03:04:36 +0000
commit887289e5d455d2cd026a2b178002ed009ea8bdd4 (patch)
tree029605b6be138ae09bc2d00b2167c4a5c1f596c0 /cros_async
parent7b8f776cb807ce1174899e567b2b00a040fe409f (diff)
downloadcrosvm-887289e5d455d2cd026a2b178002ed009ea8bdd4.tar
crosvm-887289e5d455d2cd026a2b178002ed009ea8bdd4.tar.gz
crosvm-887289e5d455d2cd026a2b178002ed009ea8bdd4.tar.bz2
crosvm-887289e5d455d2cd026a2b178002ed009ea8bdd4.tar.lz
crosvm-887289e5d455d2cd026a2b178002ed009ea8bdd4.tar.xz
crosvm-887289e5d455d2cd026a2b178002ed009ea8bdd4.tar.zst
crosvm-887289e5d455d2cd026a2b178002ed009ea8bdd4.zip
cros_async: allow wakers to be canceled
Allowing for wakers to be canceled will allow futures that register
wakers properly implement `Drop`. As it is, they won't ever fire but the
saved FD will leak.

TEST=added 'cancel' unit test to fd_executor:
"cargo test cancel" from the cros_async directory.
Change-Id: Iab5bea6aac0cc689392997745f5dcc8c285200d9

Change-Id: I1df1a04897e8d2c5e9c414d84998084607209fb9
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2164074
Reviewed-by: Chirantan Ekbote <chirantan@chromium.org>
Commit-Queue: Dylan Reid <dgreid@chromium.org>
Tested-by: Dylan Reid <dgreid@chromium.org>
Diffstat (limited to 'cros_async')
-rw-r--r--cros_async/src/fd_executor.rs102
1 files changed, 97 insertions, 5 deletions
diff --git a/cros_async/src/fd_executor.rs b/cros_async/src/fd_executor.rs
index 58d6013..349cc42 100644
--- a/cros_async/src/fd_executor.rs
+++ b/cros_async/src/fd_executor.rs
@@ -80,7 +80,10 @@ impl Display for Error {
 // Tracks active wakers and the futures they are associated with.
 thread_local!(static STATE: RefCell<Option<FdWakerState>> = RefCell::new(None));
 
-fn add_waker(fd: RawFd, waker: Waker, events: WatchingEvents) -> Result<()> {
+/// A token returned from `add_waker` that can be used to cancel the waker before it completes.
+pub struct WakerToken(u64);
+
+fn add_waker(fd: RawFd, waker: Waker, events: WatchingEvents) -> Result<WakerToken> {
     STATE.with(|state| {
         let mut state = state.borrow_mut();
         if let Some(state) = state.as_mut() {
@@ -95,7 +98,8 @@ fn add_waker(fd: RawFd, waker: Waker, events: WatchingEvents) -> Result<()> {
 /// The 'fd' must be fully owned by the future adding the waker, and must not be closed until the
 /// next time the future is polled. If the fd is closed, there is a race where another FD can be
 /// opened on top of it causing the next poll to access the new target file.
-pub fn add_read_waker(fd: RawFd, waker: Waker) -> Result<()> {
+/// Returns a `WakerToken` that can be used to cancel the waker before it completes.
+pub fn add_read_waker(fd: RawFd, waker: Waker) -> Result<WakerToken> {
     add_waker(fd, waker, WatchingEvents::empty().set_read())
 }
 
@@ -103,10 +107,23 @@ pub fn add_read_waker(fd: RawFd, waker: Waker) -> Result<()> {
 /// The 'fd' must be fully owned by the future adding the waker, and must not be closed until the
 /// next time the future is polled. If the fd is closed, there is a race where another FD can be
 /// opened on top of it causing the next poll to access the new target file.
-pub fn add_write_waker(fd: RawFd, waker: Waker) -> Result<()> {
+/// Returns a `WakerToken` that can be used to cancel the waker before it completes.
+pub fn add_write_waker(fd: RawFd, waker: Waker) -> Result<WakerToken> {
     add_waker(fd, waker, WatchingEvents::empty().set_write())
 }
 
+/// Cancels the waker that returned the given token if the waker hasn't yet fired.
+pub fn cancel_waker(token: WakerToken) -> Result<()> {
+    STATE.with(|state| {
+        let mut state = state.borrow_mut();
+        if let Some(state) = state.as_mut() {
+            state.cancel_waker(token)
+        } else {
+            Err(Error::InvalidContext)
+        }
+    })
+}
+
 /// Adds a new top level future to the Executor.
 /// These futures must return `()`, indicating they are intended to create side-effects only.
 pub fn add_future(future: Pin<Box<dyn Future<Output = ()>>>) -> Result<()> {
@@ -140,7 +157,7 @@ impl FdWakerState {
     }
 
     // Adds an fd that, when signaled, will trigger the given waker.
-    fn add_waker(&mut self, fd: RawFd, waker: Waker, events: WatchingEvents) -> Result<()> {
+    fn add_waker(&mut self, fd: RawFd, waker: Waker, events: WatchingEvents) -> Result<WakerToken> {
         let duped_fd = unsafe {
             // Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD
             // will only be added to the poll loop.
@@ -152,7 +169,7 @@ impl FdWakerState {
         let next_token = self.next_token;
         self.token_map.insert(next_token, (duped_fd, waker));
         self.next_token += 1;
-        Ok(())
+        Ok(WakerToken(next_token))
     }
 
     // Waits until one of the FDs is readable and wakes the associated waker.
@@ -166,6 +183,15 @@ impl FdWakerState {
         }
         Ok(())
     }
+
+    // Remove the waker for the given token if it hasn't fired yet.
+    fn cancel_waker(&mut self, token: WakerToken) -> Result<()> {
+        if let Some((fd, waker)) = self.token_map.remove(&token.0) {
+            self.poll_ctx.delete(&fd).map_err(Error::PollContextError)?;
+            waker.wake_by_ref();
+        }
+        Ok(())
+    }
 }
 
 /// Runs futures to completion on a single thread. Futures are allowed to block on file descriptors
@@ -248,3 +274,69 @@ unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
         Ok(ret)
     }
 }
+
+#[cfg(test)]
+mod test {
+    use std::future::Future;
+    use std::os::unix::io::AsRawFd;
+    use std::task::{Context, Poll};
+
+    use futures::future::Either;
+    use futures::pin_mut;
+
+    use super::*;
+
+    struct TestFut {
+        f: File,
+        token: Option<WakerToken>,
+    }
+
+    impl TestFut {
+        fn new(f: File) -> TestFut {
+            TestFut { f, token: None }
+        }
+    }
+
+    impl Future for TestFut {
+        type Output = u64;
+        fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+            if self.token.is_none() {
+                self.token = Some(add_read_waker(self.f.as_raw_fd(), cx.waker().clone()).unwrap());
+            }
+            Poll::Pending
+        }
+    }
+
+    impl Drop for TestFut {
+        fn drop(&mut self) {
+            if let Some(token) = self.token.take() {
+                cancel_waker(token).unwrap();
+            }
+        }
+    }
+
+    #[test]
+    fn cancel() {
+        async fn do_test() {
+            let (r, _w) = sys_util::pipe(true).unwrap();
+            let done = async { 5usize };
+            let pending = TestFut::new(r);
+            pin_mut!(done);
+            pin_mut!(pending);
+            match futures::future::select(pending, done).await {
+                Either::Right((5, _pending)) => (),
+                _ => panic!("unexpected select result"),
+            }
+        }
+
+        let fut = do_test();
+
+        let mut ex = FdExecutor::new(crate::UnitFutures::new()).expect("Failed creating executor");
+        add_future(Box::pin(fut)).unwrap();
+        ex.run().unwrap();
+        STATE.with(|state| {
+            let state = state.borrow_mut();
+            assert!(state.as_ref().unwrap().token_map.is_empty());
+        });
+    }
+}