From 34c00465d5eba814b9eeebf8971f4667e7f1f75d Mon Sep 17 00:00:00 2001 From: Dylan Reid Date: Fri, 24 Apr 2020 05:54:23 +0000 Subject: async_core: eventfd: implement a future for the next value It is useful to have the ability to get a future for only the next value. This is helpful when borrowing the EventFd for the duration of a loop is not feasible. It is also helpful for situations where the future might be dropped. Because dropping a polled eventfd future can leak an FD, and there is no way to implement a custom `Drop` for the future returned by stream, using the new `read_next` is the only way to ensure there aren't any FD leaks if the future might be dropped before completion. TEST=added a unit test that makes use of the new feature and mirrors the existing stream test. cargo test eventfd_write_read Change-Id: I9b20c89be561e4a1ca43f2befc66c16188a91d4b Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/2173973 Tested-by: Dylan Reid Tested-by: kokoro Commit-Queue: Dylan Reid Reviewed-by: Stephen Barber Reviewed-by: Chirantan Ekbote --- async_core/src/eventfd.rs | 103 ++++++++++++++++++++++++++++++---------------- 1 file changed, 67 insertions(+), 36 deletions(-) (limited to 'async_core') diff --git a/async_core/src/eventfd.rs b/async_core/src/eventfd.rs index fdff83e..271e8a1 100644 --- a/async_core/src/eventfd.rs +++ b/async_core/src/eventfd.rs @@ -2,19 +2,18 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. -use futures::Stream; use std::convert::TryFrom; use std::fmt::{self, Display}; +use std::future::Future; use std::os::unix::io::AsRawFd; use std::pin::Pin; use std::task::{Context, Poll}; use libc::{EWOULDBLOCK, O_NONBLOCK}; +use cros_async::fd_executor::{self, add_read_waker, cancel_waker, WakerToken}; use sys_util::{self, add_fd_flags}; -use cros_async::fd_executor::{self, add_read_waker}; - /// Errors generated while polling for events. #[derive(Debug)] pub enum Error { @@ -50,21 +49,20 @@ impl Display for Error { } } -/// Asynchronous version of `sys_util::EventFd`. Provides an implementation of `futures::Stream` so -/// that events can be consumed in an async context. +/// Asynchronous version of `sys_util::EventFd`. Provides asynchronous values that complete when the +/// next event can be read from the eventfd. /// /// # Example /// /// ``` /// use std::convert::TryInto; /// -/// use async_core::{EventFd }; -/// use futures::StreamExt; +/// use async_core::{EventFd}; /// use sys_util::{self}; /// /// async fn process_events() -> std::result::Result<(), Box> { /// let mut async_events: EventFd = sys_util::EventFd::new()?.try_into()?; -/// while let Some(e) = async_events.next().await { +/// while let Ok(e) = async_events.read_next().await { /// // Handle event here. /// } /// Ok(()) @@ -72,13 +70,32 @@ impl Display for Error { /// ``` pub struct EventFd { inner: sys_util::EventFd, - done: bool, } impl EventFd { pub fn new() -> Result { Self::try_from(sys_util::EventFd::new().map_err(Error::EventFdCreate)?) } + + /// Asynchronously read the next value from the eventfd. + /// Returns a Future that can be `awaited` for the next value. + /// + /// # Example + /// + /// ``` + /// use async_core::EventFd; + /// async fn print_events(mut event_fd: EventFd) { + /// loop { + /// match event_fd.read_next().await { + /// Ok(e) => println!("Got event: {}", e), + /// Err(e) => break, + /// } + /// } + /// } + /// ``` + pub fn read_next(&mut self) -> NextValFuture { + NextValFuture::new(self) + } } impl TryFrom for EventFd { @@ -87,58 +104,72 @@ impl TryFrom for EventFd { fn try_from(eventfd: sys_util::EventFd) -> Result { let fd = eventfd.as_raw_fd(); add_fd_flags(fd, O_NONBLOCK).map_err(Error::SettingNonBlocking)?; - Ok(EventFd { - inner: eventfd, - done: false, - }) + Ok(EventFd { inner: eventfd }) + } +} + +/// A Future that yields the next value from the eventfd when it is ready. +pub struct NextValFuture<'a> { + eventfd: &'a mut EventFd, + waker_token: Option, +} + +impl<'a> NextValFuture<'a> { + fn new(eventfd: &'a mut EventFd) -> NextValFuture<'a> { + NextValFuture { + eventfd, + waker_token: None, + } } } -impl Stream for EventFd { - type Item = Result; +impl<'a> Future for NextValFuture<'a> { + type Output = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - if self.done { - return Poll::Ready(None); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + if let Some(token) = self.waker_token.take() { + let _ = cancel_waker(token); } - let res = self - .inner - .read() - .map(|v| Poll::Ready(Some(Ok(v)))) - .or_else(|e| { + match self.eventfd.inner.read() { + Ok(v) => Poll::Ready(Ok(v)), + Err(e) => { if e.errno() == EWOULDBLOCK { - add_read_waker(self.inner.as_raw_fd(), cx.waker().clone()) - .map(|_token| Poll::Pending) - .map_err(Error::AddingWaker) + match add_read_waker(self.eventfd.inner.as_raw_fd(), cx.waker().clone()) { + Ok(token) => { + self.waker_token = Some(token); + Poll::Pending + } + Err(e) => Poll::Ready(Err(Error::AddingWaker(e))), + } } else { - Err(Error::EventFdRead(e)) + Poll::Ready(Err(Error::EventFdRead(e))) } - }); - - match res { - Ok(v) => v, - Err(e) => { - self.done = true; - Poll::Ready(Some(Err(e))) } } } } +impl<'a> Drop for NextValFuture<'a> { + fn drop(&mut self) { + if let Some(token) = self.waker_token.take() { + let _ = cancel_waker(token); + } + } +} + #[cfg(test)] mod tests { use super::*; use cros_async::{select2, SelectResult}; use futures::future::pending; use futures::pin_mut; - use futures::stream::StreamExt; #[test] fn eventfd_write_read() { let evt = EventFd::new().unwrap(); async fn read_one(mut evt: EventFd) -> u64 { - if let Some(Ok(e)) = evt.next().await { + if let Ok(e) = evt.read_next().await { e } else { 66 -- cgit 1.4.1