summary refs log tree commit diff
path: root/async_core
diff options
context:
space:
mode:
authorDylan Reid <dgreid@chromium.org>2020-04-24 05:54:23 +0000
committerCommit Bot <commit-bot@chromium.org>2020-05-05 03:35:11 +0000
commit34c00465d5eba814b9eeebf8971f4667e7f1f75d (patch)
tree4d396a3b9369e4a6122465b36b37d643806b0036 /async_core
parent4381d04dd99956a9d95fe8735b665cb9e8750fae (diff)
downloadcrosvm-34c00465d5eba814b9eeebf8971f4667e7f1f75d.tar
crosvm-34c00465d5eba814b9eeebf8971f4667e7f1f75d.tar.gz
crosvm-34c00465d5eba814b9eeebf8971f4667e7f1f75d.tar.bz2
crosvm-34c00465d5eba814b9eeebf8971f4667e7f1f75d.tar.lz
crosvm-34c00465d5eba814b9eeebf8971f4667e7f1f75d.tar.xz
crosvm-34c00465d5eba814b9eeebf8971f4667e7f1f75d.tar.zst
crosvm-34c00465d5eba814b9eeebf8971f4667e7f1f75d.zip
async_core: eventfd: implement a future for the next value
It is useful to have the ability to get a future for only the next
value. This is helpful when borrowing the EventFd for the duration of a
loop is not feasible. It is also helpful for situations where the future
might be dropped. Because dropping a polled eventfd future can leak an
FD, and there is no way to implement a custom `Drop` for the future
returned by stream, using the new `read_next` is the only way to ensure
there aren't any FD leaks if the future might be dropped before
completion.

TEST=added a unit test that makes use of the new feature and mirrors the
existing stream test.
cargo test eventfd_write_read

Change-Id: I9b20c89be561e4a1ca43f2befc66c16188a91d4b
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2173973
Tested-by: Dylan Reid <dgreid@chromium.org>
Tested-by: kokoro <noreply+kokoro@google.com>
Commit-Queue: Dylan Reid <dgreid@chromium.org>
Reviewed-by: Stephen Barber <smbarber@chromium.org>
Reviewed-by: Chirantan Ekbote <chirantan@chromium.org>
Diffstat (limited to 'async_core')
-rw-r--r--async_core/src/eventfd.rs103
1 files changed, 67 insertions, 36 deletions
diff --git a/async_core/src/eventfd.rs b/async_core/src/eventfd.rs
index fdff83e..271e8a1 100644
--- a/async_core/src/eventfd.rs
+++ b/async_core/src/eventfd.rs
@@ -2,19 +2,18 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-use futures::Stream;
 use std::convert::TryFrom;
 use std::fmt::{self, Display};
+use std::future::Future;
 use std::os::unix::io::AsRawFd;
 use std::pin::Pin;
 use std::task::{Context, Poll};
 
 use libc::{EWOULDBLOCK, O_NONBLOCK};
 
+use cros_async::fd_executor::{self, add_read_waker, cancel_waker, WakerToken};
 use sys_util::{self, add_fd_flags};
 
-use cros_async::fd_executor::{self, add_read_waker};
-
 /// Errors generated while polling for events.
 #[derive(Debug)]
 pub enum Error {
@@ -50,21 +49,20 @@ impl Display for Error {
     }
 }
 
-/// Asynchronous version of `sys_util::EventFd`. Provides an implementation of `futures::Stream` so
-/// that events can be consumed in an async context.
+/// Asynchronous version of `sys_util::EventFd`. Provides asynchronous values that complete when the
+/// next event can be read from the eventfd.
 ///
 /// # Example
 ///
 /// ```
 /// use std::convert::TryInto;
 ///
-/// use async_core::{EventFd };
-/// use futures::StreamExt;
+/// use async_core::{EventFd};
 /// use sys_util::{self};
 ///
 /// async fn process_events() -> std::result::Result<(), Box<dyn std::error::Error>> {
 ///     let mut async_events: EventFd = sys_util::EventFd::new()?.try_into()?;
-///     while let Some(e) = async_events.next().await {
+///     while let Ok(e) = async_events.read_next().await {
 ///         // Handle event here.
 ///     }
 ///     Ok(())
@@ -72,13 +70,32 @@ impl Display for Error {
 /// ```
 pub struct EventFd {
     inner: sys_util::EventFd,
-    done: bool,
 }
 
 impl EventFd {
     pub fn new() -> Result<EventFd> {
         Self::try_from(sys_util::EventFd::new().map_err(Error::EventFdCreate)?)
     }
+
+    /// Asynchronously read the next value from the eventfd.
+    /// Returns a Future that can be `awaited` for the next value.
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// use async_core::EventFd;
+    /// async fn print_events(mut event_fd: EventFd) {
+    ///     loop {
+    ///         match event_fd.read_next().await {
+    ///             Ok(e) => println!("Got event: {}", e),
+    ///             Err(e) => break,
+    ///         }
+    ///     }
+    /// }
+    /// ```
+    pub fn read_next(&mut self) -> NextValFuture {
+        NextValFuture::new(self)
+    }
 }
 
 impl TryFrom<sys_util::EventFd> for EventFd {
@@ -87,58 +104,72 @@ impl TryFrom<sys_util::EventFd> for EventFd {
     fn try_from(eventfd: sys_util::EventFd) -> Result<EventFd> {
         let fd = eventfd.as_raw_fd();
         add_fd_flags(fd, O_NONBLOCK).map_err(Error::SettingNonBlocking)?;
-        Ok(EventFd {
-            inner: eventfd,
-            done: false,
-        })
+        Ok(EventFd { inner: eventfd })
+    }
+}
+
+/// A Future that yields the next value from the eventfd when it is ready.
+pub struct NextValFuture<'a> {
+    eventfd: &'a mut EventFd,
+    waker_token: Option<WakerToken>,
+}
+
+impl<'a> NextValFuture<'a> {
+    fn new(eventfd: &'a mut EventFd) -> NextValFuture<'a> {
+        NextValFuture {
+            eventfd,
+            waker_token: None,
+        }
     }
 }
 
-impl Stream for EventFd {
-    type Item = Result<u64>;
+impl<'a> Future for NextValFuture<'a> {
+    type Output = Result<u64>;
 
-    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
-        if self.done {
-            return Poll::Ready(None);
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+        if let Some(token) = self.waker_token.take() {
+            let _ = cancel_waker(token);
         }
 
-        let res = self
-            .inner
-            .read()
-            .map(|v| Poll::Ready(Some(Ok(v))))
-            .or_else(|e| {
+        match self.eventfd.inner.read() {
+            Ok(v) => Poll::Ready(Ok(v)),
+            Err(e) => {
                 if e.errno() == EWOULDBLOCK {
-                    add_read_waker(self.inner.as_raw_fd(), cx.waker().clone())
-                        .map(|_token| Poll::Pending)
-                        .map_err(Error::AddingWaker)
+                    match add_read_waker(self.eventfd.inner.as_raw_fd(), cx.waker().clone()) {
+                        Ok(token) => {
+                            self.waker_token = Some(token);
+                            Poll::Pending
+                        }
+                        Err(e) => Poll::Ready(Err(Error::AddingWaker(e))),
+                    }
                 } else {
-                    Err(Error::EventFdRead(e))
+                    Poll::Ready(Err(Error::EventFdRead(e)))
                 }
-            });
-
-        match res {
-            Ok(v) => v,
-            Err(e) => {
-                self.done = true;
-                Poll::Ready(Some(Err(e)))
             }
         }
     }
 }
 
+impl<'a> Drop for NextValFuture<'a> {
+    fn drop(&mut self) {
+        if let Some(token) = self.waker_token.take() {
+            let _ = cancel_waker(token);
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
     use cros_async::{select2, SelectResult};
     use futures::future::pending;
     use futures::pin_mut;
-    use futures::stream::StreamExt;
 
     #[test]
     fn eventfd_write_read() {
         let evt = EventFd::new().unwrap();
         async fn read_one(mut evt: EventFd) -> u64 {
-            if let Some(Ok(e)) = evt.next().await {
+            if let Ok(e) = evt.read_next().await {
                 e
             } else {
                 66