summary refs log tree commit diff
diff options
context:
space:
mode:
authorDylan Reid <dgreid@chromium.org>2019-11-22 16:52:03 -0800
committerCommit Bot <commit-bot@chromium.org>2020-02-10 23:46:12 +0000
commiteed7020fba02ecaae74d9563fc264412c7d2a7d8 (patch)
tree192a107f43f9e1a12fc2b133c93d29f363521798
parent2cc138341dc601f3dfd3ebd4233a99b75ddb6bd0 (diff)
downloadcrosvm-eed7020fba02ecaae74d9563fc264412c7d2a7d8.tar
crosvm-eed7020fba02ecaae74d9563fc264412c7d2a7d8.tar.gz
crosvm-eed7020fba02ecaae74d9563fc264412c7d2a7d8.tar.bz2
crosvm-eed7020fba02ecaae74d9563fc264412c7d2a7d8.tar.lz
crosvm-eed7020fba02ecaae74d9563fc264412c7d2a7d8.tar.xz
crosvm-eed7020fba02ecaae74d9563fc264412c7d2a7d8.tar.zst
crosvm-eed7020fba02ecaae74d9563fc264412c7d2a7d8.zip
Add async_core crate
This crate will provide asynchronous helpers wrapping primitives
provided by sys_util. To start EventFDs and MsgReceivers are provided.

Change-Id: Ia8862adafca995a3e3ab56582acc166a37fc8d2c
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/1955046
Reviewed-by: Dylan Reid <dgreid@chromium.org>
Tested-by: Dylan Reid <dgreid@chromium.org>
Commit-Queue: Dylan Reid <dgreid@chromium.org>
-rw-r--r--Cargo.toml1
-rw-r--r--async_core/Cargo.toml15
-rw-r--r--async_core/src/eventfd.rs166
-rw-r--r--async_core/src/lib.rs11
4 files changed, 193 insertions, 0 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 009a09f..e626ae0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -19,6 +19,7 @@ overflow-checks = true
 members = ["qcow_utils"]
 exclude = [
     "assertions",
+    "async_core",
     "cros_async",
     "data_model",
     "rand_ish",
diff --git a/async_core/Cargo.toml b/async_core/Cargo.toml
new file mode 100644
index 0000000..1555bd9
--- /dev/null
+++ b/async_core/Cargo.toml
@@ -0,0 +1,15 @@
+[package]
+name = "async_core"
+version = "0.1.0"
+authors = ["The Chromium OS Authors"]
+edition = "2018"
+
+[dependencies]
+libc = "*"
+cros_async = { path = "../cros_async" }
+sys_util = { path = "../sys_util" }
+syscall_defines = { path = "../syscall_defines" }
+
+[dependencies.futures]
+version = "*"
+default-features = false
diff --git a/async_core/src/eventfd.rs b/async_core/src/eventfd.rs
new file mode 100644
index 0000000..4030fb8
--- /dev/null
+++ b/async_core/src/eventfd.rs
@@ -0,0 +1,166 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use futures::Stream;
+use std::convert::TryFrom;
+use std::fmt::{self, Display};
+use std::os::unix::io::{AsRawFd};
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+use libc::{EWOULDBLOCK, O_NONBLOCK};
+
+use sys_util::{self, add_fd_flags};
+
+use cros_async::fd_executor::{self, add_read_waker};
+
+/// Errors generated while polling for events.
+#[derive(Debug)]
+pub enum Error {
+    /// An error occurred attempting to register a waker with the executor.
+    AddingWaker(fd_executor::Error),
+    /// Failure creating the event FD.
+    EventFdCreate(sys_util::Error),
+    /// An error occurred when reading the event FD.
+    EventFdRead(sys_util::Error),
+    /// An error occurred when setting the event FD non-blocking.
+    SettingNonBlocking(sys_util::Error),
+}
+pub type Result<T> = std::result::Result<T, Error>;
+
+impl std::error::Error for Error {}
+
+impl Display for Error {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        use self::Error::*;
+
+        match self {
+            AddingWaker(e) => write!(
+                f,
+                "An error occurred attempting to register a waker with the executor: {}.",
+                e
+            ),
+            EventFdCreate(e) => write!(f, "An error occurred when creating the event FD: {}.", e),
+            EventFdRead(e) => write!(f, "An error occurred when reading the event FD: {}.", e),
+            SettingNonBlocking(e) => {
+                write!(f, "An error occurred setting the FD non-blocking: {}.", e)
+            }
+        }
+    }
+}
+
+/// Asynchronous version of `sys_util::EventFd`. Provides an implementation of `futures::Stream` so
+/// that events can be consumed in an async context.
+///
+/// # Example
+///
+/// ```
+/// use std::convert::TryInto;
+///
+/// use async_core::{EventFd };
+/// use futures::StreamExt;
+/// use sys_util::{self};
+///
+/// async fn process_events() -> std::result::Result<(), Box<dyn std::error::Error>> {
+///     let mut async_events: EventFd = sys_util::EventFd::new()?.try_into()?;
+///     while let Some(e) = async_events.next().await {
+///         // Handle event here.
+///     }
+///     Ok(())
+/// }
+/// ```
+pub struct EventFd {
+    inner: sys_util::EventFd,
+    done: bool,
+}
+
+impl EventFd {
+    pub fn new() -> Result<EventFd> {
+        Self::try_from(sys_util::EventFd::new().map_err(Error::EventFdCreate)?)
+    }
+}
+
+impl TryFrom<sys_util::EventFd> for EventFd {
+    type Error = crate::eventfd::Error;
+
+    fn try_from(eventfd: sys_util::EventFd) -> Result<EventFd> {
+        let fd = eventfd.as_raw_fd();
+        add_fd_flags(fd, O_NONBLOCK).map_err(Error::SettingNonBlocking)?;
+        Ok(EventFd {
+            inner: eventfd,
+            done: false,
+        })
+    }
+}
+
+impl Stream for EventFd {
+    type Item = Result<u64>;
+
+    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
+        if self.done {
+            return Poll::Ready(None);
+        }
+
+        let res = self
+            .inner
+            .read()
+            .map(|v| Poll::Ready(Some(Ok(v))))
+            .or_else(|e| {
+                if e.errno() == EWOULDBLOCK {
+                    add_read_waker(self.inner.as_raw_fd(), cx.waker().clone())
+                        .map(|()| Poll::Pending)
+                        .map_err(Error::AddingWaker)
+                } else {
+                    Err(Error::EventFdRead(e))
+                }
+            });
+
+        match res {
+            Ok(v) => v,
+            Err(e) => {
+                self.done = true;
+                Poll::Ready(Some(Err(e)))
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use cros_async::{select2, SelectResult};
+    use futures::future::pending;
+    use futures::pin_mut;
+    use futures::stream::StreamExt;
+
+    #[test]
+    fn eventfd_write_read() {
+        let evt = EventFd::new().unwrap();
+        async fn read_one(mut evt: EventFd) -> u64 {
+            if let Some(Ok(e)) = evt.next().await {
+                e
+            } else {
+                66
+            }
+        }
+        async fn write_pend(evt: sys_util::EventFd) {
+            evt.write(55).unwrap();
+            let () = pending().await;
+        }
+        let write_evt = evt.inner.try_clone().unwrap();
+
+        let r = read_one(evt);
+        pin_mut!(r);
+        let w = write_pend(write_evt);
+        pin_mut!(w);
+
+        if let Ok((SelectResult::Finished(read_res), SelectResult::Pending(_pend_fut))) =
+            select2(r, w)
+        {
+            assert_eq!(read_res, 55);
+        } else {
+            panic!("wrong futures returned from select2");
+        }
+    }
+}
diff --git a/async_core/src/lib.rs b/async_core/src/lib.rs
new file mode 100644
index 0000000..96cdcf1
--- /dev/null
+++ b/async_core/src/lib.rs
@@ -0,0 +1,11 @@
+// Copyright 2019 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+//! Extensions using cros_async and futures-rs to add asynchronous operations to sys_util features.
+//! Provides basic `Futures` implementations for some of the interfaces provided by the `sys_util`
+//! crate.
+
+mod eventfd;
+
+pub use eventfd::EventFd;