summary refs log tree commit diff
diff options
context:
space:
mode:
authorJingkui Wang <jkwang@google.com>2019-03-07 13:43:33 -0800
committerchrome-bot <chrome-bot@chromium.org>2019-03-13 21:04:57 -0700
commit415ee63e043119aecdc2907d83f18a538698ad2d (patch)
tree22dc366816cac02349908e038d1e5a7c4ce4487e
parentea75bd164aedca9c429f1a9421ba944d8c786a50 (diff)
downloadcrosvm-415ee63e043119aecdc2907d83f18a538698ad2d.tar
crosvm-415ee63e043119aecdc2907d83f18a538698ad2d.tar.gz
crosvm-415ee63e043119aecdc2907d83f18a538698ad2d.tar.bz2
crosvm-415ee63e043119aecdc2907d83f18a538698ad2d.tar.lz
crosvm-415ee63e043119aecdc2907d83f18a538698ad2d.tar.xz
crosvm-415ee63e043119aecdc2907d83f18a538698ad2d.tar.zst
crosvm-415ee63e043119aecdc2907d83f18a538698ad2d.zip
add utils for device implementations
event_loop: event loop based on poll context.

async_job_queue: queue a job, it will be invoked on event loop. This
could be used to invoke a function without holding any locks.

BUG=chromium:831850
TEST=local build

Change-Id: Iab61ac43221bf5d635a0138073d7f88401e5ab07
Reviewed-on: https://chromium-review.googlesource.com/1509852
Commit-Ready: Jingkui Wang <jkwang@google.com>
Tested-by: Jingkui Wang <jkwang@google.com>
Tested-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Zach Reizner <zachr@chromium.org>
-rw-r--r--devices/src/lib.rs1
-rw-r--r--devices/src/utils/async_job_queue.rs60
-rw-r--r--devices/src/utils/error.rs35
-rw-r--r--devices/src/utils/event_loop.rs242
-rw-r--r--devices/src/utils/mod.rs11
5 files changed, 349 insertions, 0 deletions
diff --git a/devices/src/lib.rs b/devices/src/lib.rs
index 1df1435..bf631c5 100644
--- a/devices/src/lib.rs
+++ b/devices/src/lib.rs
@@ -36,6 +36,7 @@ pub mod pl030;
 mod proxy;
 mod register_space;
 mod serial;
+mod utils;
 pub mod virtio;
 
 pub use self::bus::Error as BusError;
diff --git a/devices/src/utils/async_job_queue.rs b/devices/src/utils/async_job_queue.rs
new file mode 100644
index 0000000..bc99c05
--- /dev/null
+++ b/devices/src/utils/async_job_queue.rs
@@ -0,0 +1,60 @@
+// Copyright 2018 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use super::{Error, Result};
+use super::{EventHandler, EventLoop};
+use std::mem;
+use std::os::unix::io::RawFd;
+use std::sync::Arc;
+use sync::Mutex;
+use sys_util::{EventFd, WatchingEvents};
+
+/// Async Job Queue can schedule async jobs.
+pub struct AsyncJobQueue {
+    jobs: Mutex<Vec<Box<FnMut() + 'static + Send>>>,
+    evt: EventFd,
+}
+
+impl AsyncJobQueue {
+    /// Init job queue on event loop.
+    pub fn init(event_loop: &EventLoop) -> Result<Arc<AsyncJobQueue>> {
+        let evt = EventFd::new().map_err(Error::CreateEventFd)?;
+        let queue = Arc::new(AsyncJobQueue {
+            jobs: Mutex::new(Vec::new()),
+            evt,
+        });
+        let handler: Arc<EventHandler> = queue.clone();
+        event_loop.add_event(
+            &queue.evt,
+            WatchingEvents::empty().set_read(),
+            Arc::downgrade(&handler),
+        );
+        Ok(queue)
+    }
+
+    /// Queue a new job. It will be invoked on event loop.
+    pub fn queue_job<T: Fn() + 'static + Send>(&self, cb: T) -> Result<()> {
+        self.jobs.lock().push(Box::new(cb));
+        self.evt.write(1).map_err(Error::WriteEventFd)
+    }
+}
+
+impl EventHandler for AsyncJobQueue {
+    fn on_event(&self, _fd: RawFd) -> std::result::Result<(), ()> {
+        // We want to read out the event, but the value is not important.
+        match self.evt.read() {
+            Ok(_) => {}
+            Err(e) => {
+                error!("read event fd failed {}", e);
+                return Err(());
+            }
+        }
+
+        let jobs = mem::replace(&mut *self.jobs.lock(), Vec::new());
+        for mut cb in jobs {
+            cb();
+        }
+        Ok(())
+    }
+}
diff --git a/devices/src/utils/error.rs b/devices/src/utils/error.rs
new file mode 100644
index 0000000..3d5fcc7
--- /dev/null
+++ b/devices/src/utils/error.rs
@@ -0,0 +1,35 @@
+// Copyright 2018 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use std::fmt::{self, Display};
+use sys_util::Error as SysError;
+
+#[derive(Debug)]
+pub enum Error {
+    EventLoopAlreadyFailed,
+    CreateEventFd(SysError),
+    ReadEventFd(SysError),
+    WriteEventFd(SysError),
+    CreatePollContext(SysError),
+    PollContextAddFd(SysError),
+    PollContextDeleteFd(SysError),
+}
+
+impl Display for Error {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        use self::Error::*;
+
+        match self {
+            EventLoopAlreadyFailed => write!(f, "event loop already failed due to previous errors"),
+            CreateEventFd(e) => write!(f, "failed to create event fd: {}", e),
+            ReadEventFd(e) => write!(f, "failed to read event fd: {}", e),
+            WriteEventFd(e) => write!(f, "failed to write event fd: {}", e),
+            CreatePollContext(e) => write!(f, "failed to create poll context: {}", e),
+            PollContextAddFd(e) => write!(f, "failed to add fd to poll context: {}", e),
+            PollContextDeleteFd(e) => write!(f, "failed to delete fd from poll context: {}", e),
+        }
+    }
+}
+
+pub type Result<T> = std::result::Result<T, Error>;
diff --git a/devices/src/utils/event_loop.rs b/devices/src/utils/event_loop.rs
new file mode 100644
index 0000000..f27c19c
--- /dev/null
+++ b/devices/src/utils/event_loop.rs
@@ -0,0 +1,242 @@
+// Copyright 2018 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use super::error::{Error, Result};
+use std::collections::BTreeMap;
+use std::mem::drop;
+use std::os::unix::io::{AsRawFd, RawFd};
+use std::sync::{Arc, Weak};
+use std::thread;
+use sync::Mutex;
+use sys_util::{EpollContext, EpollEvents, EventFd, PollToken, WatchingEvents};
+
+/// A fail handle will do the clean up when we cannot recover from some error.
+pub trait FailHandle: Send + Sync {
+    /// Fail the code.
+    fn fail(&self);
+    /// Returns true if already failed.
+    fn failed(&self) -> bool;
+}
+
+impl FailHandle for Option<Arc<FailHandle>> {
+    fn fail(&self) {
+        match *self {
+            Some(ref handle) => handle.fail(),
+            None => error!("event loop trying to fail without a fail handle"),
+        }
+    }
+
+    fn failed(&self) -> bool {
+        match &self {
+            Some(ref handle) => handle.failed(),
+            None => false,
+        }
+    }
+}
+
+/// Fd is a wrapper of RawFd. It implements AsRawFd trait and PollToken trait for RawFd.
+/// It does not own the fd, thus won't close the fd when dropped.
+struct Fd(pub RawFd);
+impl AsRawFd for Fd {
+    fn as_raw_fd(&self) -> RawFd {
+        self.0
+    }
+}
+
+impl PollToken for Fd {
+    fn as_raw_token(&self) -> u64 {
+        self.0 as u64
+    }
+
+    fn from_raw_token(data: u64) -> Self {
+        Fd(data as RawFd)
+    }
+}
+
+/// EpollEventLoop is an event loop blocked on a set of fds. When a monitered events is triggered,
+/// event loop will invoke the mapped handler.
+pub struct EventLoop {
+    fail_handle: Option<Arc<FailHandle>>,
+    poll_ctx: Arc<EpollContext<Fd>>,
+    handlers: Arc<Mutex<BTreeMap<RawFd, Weak<EventHandler>>>>,
+    stop_evt: EventFd,
+}
+
+/// Interface for event handler.
+pub trait EventHandler: Send + Sync {
+    fn on_event(&self, fd: RawFd) -> std::result::Result<(), ()>;
+}
+
+impl EventLoop {
+    /// Start an event loop. An optional fail handle could be passed to the event loop.
+    pub fn start(
+        fail_handle: Option<Arc<FailHandle>>,
+    ) -> Result<(EventLoop, thread::JoinHandle<()>)> {
+        let (self_stop_evt, stop_evt) = EventFd::new()
+            .and_then(|e| Ok((e.try_clone()?, e)))
+            .map_err(Error::CreateEventFd)?;
+
+        let fd_callbacks: Arc<Mutex<BTreeMap<RawFd, Weak<EventHandler>>>> =
+            Arc::new(Mutex::new(BTreeMap::new()));
+        let poll_ctx: EpollContext<Fd> = EpollContext::new()
+            .and_then(|pc| pc.add(&stop_evt, Fd(stop_evt.as_raw_fd())).and(Ok(pc)))
+            .map_err(Error::CreatePollContext)?;
+
+        let poll_ctx = Arc::new(poll_ctx);
+        let event_loop = EventLoop {
+            fail_handle: fail_handle.clone(),
+            poll_ctx: poll_ctx.clone(),
+            handlers: fd_callbacks.clone(),
+            stop_evt: self_stop_evt,
+        };
+
+        let handle = thread::spawn(move || {
+            let event_loop = EpollEvents::new();
+            loop {
+                if fail_handle.failed() {
+                    error!("xhci controller already failed, stopping event ring");
+                    return;
+                }
+                let events = match poll_ctx.wait(&event_loop) {
+                    Ok(events) => events,
+                    Err(e) => {
+                        error!("cannot poll {:?}", e);
+                        fail_handle.fail();
+                        return;
+                    }
+                };
+                for event in events.iter() {
+                    if event.token().as_raw_fd() == stop_evt.as_raw_fd() {
+                        return;
+                    } else {
+                        let fd = event.token().as_raw_fd();
+                        let mut locked = fd_callbacks.lock();
+                        let weak_handler = match locked.get(&fd) {
+                            Some(cb) => cb.clone(),
+                            None => {
+                                warn!("callback for fd {} already removed", fd);
+                                continue;
+                            }
+                        };
+                        match weak_handler.upgrade() {
+                            Some(handler) => {
+                                // Drop lock before triggering the event.
+                                drop(locked);
+                                match handler.on_event(fd) {
+                                    Ok(()) => {}
+                                    Err(_) => {
+                                        error!("event loop stopping due to handle event error");
+                                        fail_handle.fail();
+                                        return;
+                                    }
+                                };
+                            }
+                            // If the handler is already gone, we remove the fd.
+                            None => {
+                                let _ = poll_ctx.delete(&Fd(fd));
+                                if locked.remove(&fd).is_none() {
+                                    error!("fail to remove handler for file descriptor {}", fd);
+                                }
+                            }
+                        };
+                    }
+                }
+            }
+        });
+
+        Ok((event_loop, handle))
+    }
+
+    /// Add a new event to event loop. The event handler will be invoked when `event` happens on
+    /// `fd`.
+    ///
+    /// If the same `fd` is added multiple times, the old handler will be replaced.
+    /// EventLoop will not keep `handler` alive, if handler is dropped when `event` is triggered,
+    /// the event will be removed.
+    pub fn add_event(
+        &self,
+        fd: &AsRawFd,
+        events: WatchingEvents,
+        handler: Weak<EventHandler>,
+    ) -> Result<()> {
+        if self.fail_handle.failed() {
+            return Err(Error::EventLoopAlreadyFailed);
+        }
+        self.handlers.lock().insert(fd.as_raw_fd(), handler);
+        // This might fail due to epoll syscall. Check epoll_ctl(2).
+        self.poll_ctx
+            .add_fd_with_events(fd, events, Fd(fd.as_raw_fd()))
+            .map_err(Error::PollContextAddFd)
+    }
+
+    /// Removes event for this `fd`. This function returns false if it fails.
+    ///
+    /// EventLoop does not guarantee all events for `fd` is handled.
+    pub fn remove_event_for_fd(&self, fd: &AsRawFd) -> Result<()> {
+        if self.fail_handle.failed() {
+            return Err(Error::EventLoopAlreadyFailed);
+        }
+        // This might fail due to epoll syscall. Check epoll_ctl(2).
+        self.poll_ctx
+            .delete(fd)
+            .map_err(Error::PollContextDeleteFd)?;
+        self.handlers.lock().remove(&fd.as_raw_fd());
+        Ok(())
+    }
+
+    /// Stops this event loop asynchronously. Previous events might not be handled.
+    pub fn stop(&self) {
+        match self.stop_evt.write(1) {
+            Ok(_) => {}
+            Err(_) => {
+                error!("fail to send event loop stop event, it might already stopped");
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
+    use std::sync::{Arc, Condvar, Mutex};
+    use sys_util::EventFd;
+
+    struct EventLoopTestHandler {
+        val: Mutex<u8>,
+        cvar: Condvar,
+    }
+
+    impl EventHandler for EventLoopTestHandler {
+        fn on_event(&self, fd: RawFd) -> std::result::Result<(), ()> {
+            let _ = unsafe { EventFd::from_raw_fd(fd).read() };
+            *self.val.lock().unwrap() += 1;
+            self.cvar.notify_one();
+            Ok(())
+        }
+    }
+
+    #[test]
+    fn event_loop_test() {
+        let (l, j) = EventLoop::start(None).unwrap();
+        let (self_evt, evt) = match EventFd::new().and_then(|e| Ok((e.try_clone()?, e))) {
+            Ok(v) => v,
+            Err(e) => {
+                error!("failed creating EventFd pair: {:?}", e);
+                return;
+            }
+        };
+        let h = Arc::new(EventLoopTestHandler {
+            val: Mutex::new(0),
+            cvar: Condvar::new(),
+        });
+        let t: Arc<EventHandler> = h.clone();
+        l.add_event(&evt, WatchingEvents::empty().set_read(), Arc::downgrade(&t));
+        self_evt.write(1).unwrap();
+        let _ = h.cvar.wait(h.val.lock().unwrap()).unwrap();
+        l.stop();
+        j.join().unwrap();
+        assert_eq!(*(h.val.lock().unwrap()), 1);
+    }
+}
diff --git a/devices/src/utils/mod.rs b/devices/src/utils/mod.rs
new file mode 100644
index 0000000..6f35025
--- /dev/null
+++ b/devices/src/utils/mod.rs
@@ -0,0 +1,11 @@
+// Copyright 2018 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+mod async_job_queue;
+mod error;
+mod event_loop;
+
+pub use self::async_job_queue::*;
+pub use self::error::*;
+pub use self::event_loop::*;