// Copyright 2020 The Chromium OS Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. use std::convert::TryFrom; use std::fmt::{self, Display}; use std::future::Future; use std::os::unix::io::AsRawFd; use std::pin::Pin; use std::task::{Context, Poll}; use libc::{EWOULDBLOCK, O_NONBLOCK}; use cros_async::{self, add_read_waker, cancel_waker, WakerToken}; use sys_util::{self, add_fd_flags}; /// Errors generated while polling for events. #[derive(Debug)] pub enum Error { /// An error occurred attempting to register a waker with the executor. AddingWaker(cros_async::Error), /// Failure creating the event FD. EventFdCreate(sys_util::Error), /// An error occurred when reading the event FD. EventFdRead(sys_util::Error), /// An error occurred when setting the event FD non-blocking. SettingNonBlocking(sys_util::Error), } pub type Result = std::result::Result; impl std::error::Error for Error {} impl Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { use self::Error::*; match self { AddingWaker(e) => write!( f, "An error occurred attempting to register a waker with the executor: {}.", e ), EventFdCreate(e) => write!(f, "An error occurred when creating the event FD: {}.", e), EventFdRead(e) => write!(f, "An error occurred when reading the event FD: {}.", e), SettingNonBlocking(e) => { write!(f, "An error occurred setting the FD non-blocking: {}.", e) } } } } /// Asynchronous version of `sys_util::EventFd`. Provides asynchronous values that complete when the /// next event can be read from the eventfd. /// /// # Example /// /// ``` /// use std::convert::TryInto; /// /// use async_core::{EventFd}; /// use sys_util::{self}; /// /// async fn process_events() -> std::result::Result<(), Box> { /// let mut async_events: EventFd = sys_util::EventFd::new()?.try_into()?; /// while let Ok(e) = async_events.read_next().await { /// // Handle event here. /// } /// Ok(()) /// } /// ``` pub struct EventFd { inner: sys_util::EventFd, } impl EventFd { pub fn new() -> Result { Self::try_from(sys_util::EventFd::new().map_err(Error::EventFdCreate)?) } /// Asynchronously read the next value from the eventfd. /// Returns a Future that can be `awaited` for the next value. /// /// # Example /// /// ``` /// use async_core::EventFd; /// async fn print_events(mut event_fd: EventFd) { /// loop { /// match event_fd.read_next().await { /// Ok(e) => println!("Got event: {}", e), /// Err(e) => break, /// } /// } /// } /// ``` pub fn read_next(&mut self) -> NextValFuture { NextValFuture::new(self) } } impl TryFrom for EventFd { type Error = crate::eventfd::Error; fn try_from(eventfd: sys_util::EventFd) -> Result { let fd = eventfd.as_raw_fd(); add_fd_flags(fd, O_NONBLOCK).map_err(Error::SettingNonBlocking)?; Ok(EventFd { inner: eventfd }) } } /// A Future that yields the next value from the eventfd when it is ready. pub struct NextValFuture<'a> { eventfd: &'a mut EventFd, waker_token: Option, } impl<'a> NextValFuture<'a> { fn new(eventfd: &'a mut EventFd) -> NextValFuture<'a> { NextValFuture { eventfd, waker_token: None, } } } impl<'a> Future for NextValFuture<'a> { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { if let Some(token) = self.waker_token.take() { let _ = cancel_waker(token); } match self.eventfd.inner.read() { Ok(v) => Poll::Ready(Ok(v)), Err(e) => { if e.errno() == EWOULDBLOCK { match add_read_waker(self.eventfd.inner.as_raw_fd(), cx.waker().clone()) { Ok(token) => { self.waker_token = Some(token); Poll::Pending } Err(e) => Poll::Ready(Err(Error::AddingWaker(e))), } } else { Poll::Ready(Err(Error::EventFdRead(e))) } } } } } impl<'a> Drop for NextValFuture<'a> { fn drop(&mut self) { if let Some(token) = self.waker_token.take() { let _ = cancel_waker(token); } } } #[cfg(test)] mod tests { use super::*; use cros_async::{select2, SelectResult}; use futures::future::pending; use futures::pin_mut; #[test] fn eventfd_write_read() { let evt = EventFd::new().unwrap(); async fn read_one(mut evt: EventFd) -> u64 { if let Ok(e) = evt.read_next().await { e } else { 66 } } async fn write_pend(evt: sys_util::EventFd) { evt.write(55).unwrap(); let () = pending().await; } let write_evt = evt.inner.try_clone().unwrap(); let r = read_one(evt); pin_mut!(r); let w = write_pend(write_evt); pin_mut!(w); if let Ok((SelectResult::Finished(read_res), SelectResult::Pending(_pend_fut))) = select2(r, w) { assert_eq!(read_res, 55); } else { panic!("wrong futures returned from select2"); } } }