diff options
author | Jingkui Wang <jkwang@google.com> | 2019-03-07 13:43:33 -0800 |
---|---|---|
committer | chrome-bot <chrome-bot@chromium.org> | 2019-03-13 21:04:57 -0700 |
commit | 415ee63e043119aecdc2907d83f18a538698ad2d (patch) | |
tree | 22dc366816cac02349908e038d1e5a7c4ce4487e | |
parent | ea75bd164aedca9c429f1a9421ba944d8c786a50 (diff) | |
download | crosvm-415ee63e043119aecdc2907d83f18a538698ad2d.tar crosvm-415ee63e043119aecdc2907d83f18a538698ad2d.tar.gz crosvm-415ee63e043119aecdc2907d83f18a538698ad2d.tar.bz2 crosvm-415ee63e043119aecdc2907d83f18a538698ad2d.tar.lz crosvm-415ee63e043119aecdc2907d83f18a538698ad2d.tar.xz crosvm-415ee63e043119aecdc2907d83f18a538698ad2d.tar.zst crosvm-415ee63e043119aecdc2907d83f18a538698ad2d.zip |
add utils for device implementations
event_loop: event loop based on poll context. async_job_queue: queue a job, it will be invoked on event loop. This could be used to invoke a function without holding any locks. BUG=chromium:831850 TEST=local build Change-Id: Iab61ac43221bf5d635a0138073d7f88401e5ab07 Reviewed-on: https://chromium-review.googlesource.com/1509852 Commit-Ready: Jingkui Wang <jkwang@google.com> Tested-by: Jingkui Wang <jkwang@google.com> Tested-by: kokoro <noreply+kokoro@google.com> Reviewed-by: Zach Reizner <zachr@chromium.org>
-rw-r--r-- | devices/src/lib.rs | 1 | ||||
-rw-r--r-- | devices/src/utils/async_job_queue.rs | 60 | ||||
-rw-r--r-- | devices/src/utils/error.rs | 35 | ||||
-rw-r--r-- | devices/src/utils/event_loop.rs | 242 | ||||
-rw-r--r-- | devices/src/utils/mod.rs | 11 |
5 files changed, 349 insertions, 0 deletions
diff --git a/devices/src/lib.rs b/devices/src/lib.rs index 1df1435..bf631c5 100644 --- a/devices/src/lib.rs +++ b/devices/src/lib.rs @@ -36,6 +36,7 @@ pub mod pl030; mod proxy; mod register_space; mod serial; +mod utils; pub mod virtio; pub use self::bus::Error as BusError; diff --git a/devices/src/utils/async_job_queue.rs b/devices/src/utils/async_job_queue.rs new file mode 100644 index 0000000..bc99c05 --- /dev/null +++ b/devices/src/utils/async_job_queue.rs @@ -0,0 +1,60 @@ +// Copyright 2018 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use super::{Error, Result}; +use super::{EventHandler, EventLoop}; +use std::mem; +use std::os::unix::io::RawFd; +use std::sync::Arc; +use sync::Mutex; +use sys_util::{EventFd, WatchingEvents}; + +/// Async Job Queue can schedule async jobs. +pub struct AsyncJobQueue { + jobs: Mutex<Vec<Box<FnMut() + 'static + Send>>>, + evt: EventFd, +} + +impl AsyncJobQueue { + /// Init job queue on event loop. + pub fn init(event_loop: &EventLoop) -> Result<Arc<AsyncJobQueue>> { + let evt = EventFd::new().map_err(Error::CreateEventFd)?; + let queue = Arc::new(AsyncJobQueue { + jobs: Mutex::new(Vec::new()), + evt, + }); + let handler: Arc<EventHandler> = queue.clone(); + event_loop.add_event( + &queue.evt, + WatchingEvents::empty().set_read(), + Arc::downgrade(&handler), + ); + Ok(queue) + } + + /// Queue a new job. It will be invoked on event loop. + pub fn queue_job<T: Fn() + 'static + Send>(&self, cb: T) -> Result<()> { + self.jobs.lock().push(Box::new(cb)); + self.evt.write(1).map_err(Error::WriteEventFd) + } +} + +impl EventHandler for AsyncJobQueue { + fn on_event(&self, _fd: RawFd) -> std::result::Result<(), ()> { + // We want to read out the event, but the value is not important. + match self.evt.read() { + Ok(_) => {} + Err(e) => { + error!("read event fd failed {}", e); + return Err(()); + } + } + + let jobs = mem::replace(&mut *self.jobs.lock(), Vec::new()); + for mut cb in jobs { + cb(); + } + Ok(()) + } +} diff --git a/devices/src/utils/error.rs b/devices/src/utils/error.rs new file mode 100644 index 0000000..3d5fcc7 --- /dev/null +++ b/devices/src/utils/error.rs @@ -0,0 +1,35 @@ +// Copyright 2018 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use std::fmt::{self, Display}; +use sys_util::Error as SysError; + +#[derive(Debug)] +pub enum Error { + EventLoopAlreadyFailed, + CreateEventFd(SysError), + ReadEventFd(SysError), + WriteEventFd(SysError), + CreatePollContext(SysError), + PollContextAddFd(SysError), + PollContextDeleteFd(SysError), +} + +impl Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + use self::Error::*; + + match self { + EventLoopAlreadyFailed => write!(f, "event loop already failed due to previous errors"), + CreateEventFd(e) => write!(f, "failed to create event fd: {}", e), + ReadEventFd(e) => write!(f, "failed to read event fd: {}", e), + WriteEventFd(e) => write!(f, "failed to write event fd: {}", e), + CreatePollContext(e) => write!(f, "failed to create poll context: {}", e), + PollContextAddFd(e) => write!(f, "failed to add fd to poll context: {}", e), + PollContextDeleteFd(e) => write!(f, "failed to delete fd from poll context: {}", e), + } + } +} + +pub type Result<T> = std::result::Result<T, Error>; diff --git a/devices/src/utils/event_loop.rs b/devices/src/utils/event_loop.rs new file mode 100644 index 0000000..f27c19c --- /dev/null +++ b/devices/src/utils/event_loop.rs @@ -0,0 +1,242 @@ +// Copyright 2018 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use super::error::{Error, Result}; +use std::collections::BTreeMap; +use std::mem::drop; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::sync::{Arc, Weak}; +use std::thread; +use sync::Mutex; +use sys_util::{EpollContext, EpollEvents, EventFd, PollToken, WatchingEvents}; + +/// A fail handle will do the clean up when we cannot recover from some error. +pub trait FailHandle: Send + Sync { + /// Fail the code. + fn fail(&self); + /// Returns true if already failed. + fn failed(&self) -> bool; +} + +impl FailHandle for Option<Arc<FailHandle>> { + fn fail(&self) { + match *self { + Some(ref handle) => handle.fail(), + None => error!("event loop trying to fail without a fail handle"), + } + } + + fn failed(&self) -> bool { + match &self { + Some(ref handle) => handle.failed(), + None => false, + } + } +} + +/// Fd is a wrapper of RawFd. It implements AsRawFd trait and PollToken trait for RawFd. +/// It does not own the fd, thus won't close the fd when dropped. +struct Fd(pub RawFd); +impl AsRawFd for Fd { + fn as_raw_fd(&self) -> RawFd { + self.0 + } +} + +impl PollToken for Fd { + fn as_raw_token(&self) -> u64 { + self.0 as u64 + } + + fn from_raw_token(data: u64) -> Self { + Fd(data as RawFd) + } +} + +/// EpollEventLoop is an event loop blocked on a set of fds. When a monitered events is triggered, +/// event loop will invoke the mapped handler. +pub struct EventLoop { + fail_handle: Option<Arc<FailHandle>>, + poll_ctx: Arc<EpollContext<Fd>>, + handlers: Arc<Mutex<BTreeMap<RawFd, Weak<EventHandler>>>>, + stop_evt: EventFd, +} + +/// Interface for event handler. +pub trait EventHandler: Send + Sync { + fn on_event(&self, fd: RawFd) -> std::result::Result<(), ()>; +} + +impl EventLoop { + /// Start an event loop. An optional fail handle could be passed to the event loop. + pub fn start( + fail_handle: Option<Arc<FailHandle>>, + ) -> Result<(EventLoop, thread::JoinHandle<()>)> { + let (self_stop_evt, stop_evt) = EventFd::new() + .and_then(|e| Ok((e.try_clone()?, e))) + .map_err(Error::CreateEventFd)?; + + let fd_callbacks: Arc<Mutex<BTreeMap<RawFd, Weak<EventHandler>>>> = + Arc::new(Mutex::new(BTreeMap::new())); + let poll_ctx: EpollContext<Fd> = EpollContext::new() + .and_then(|pc| pc.add(&stop_evt, Fd(stop_evt.as_raw_fd())).and(Ok(pc))) + .map_err(Error::CreatePollContext)?; + + let poll_ctx = Arc::new(poll_ctx); + let event_loop = EventLoop { + fail_handle: fail_handle.clone(), + poll_ctx: poll_ctx.clone(), + handlers: fd_callbacks.clone(), + stop_evt: self_stop_evt, + }; + + let handle = thread::spawn(move || { + let event_loop = EpollEvents::new(); + loop { + if fail_handle.failed() { + error!("xhci controller already failed, stopping event ring"); + return; + } + let events = match poll_ctx.wait(&event_loop) { + Ok(events) => events, + Err(e) => { + error!("cannot poll {:?}", e); + fail_handle.fail(); + return; + } + }; + for event in events.iter() { + if event.token().as_raw_fd() == stop_evt.as_raw_fd() { + return; + } else { + let fd = event.token().as_raw_fd(); + let mut locked = fd_callbacks.lock(); + let weak_handler = match locked.get(&fd) { + Some(cb) => cb.clone(), + None => { + warn!("callback for fd {} already removed", fd); + continue; + } + }; + match weak_handler.upgrade() { + Some(handler) => { + // Drop lock before triggering the event. + drop(locked); + match handler.on_event(fd) { + Ok(()) => {} + Err(_) => { + error!("event loop stopping due to handle event error"); + fail_handle.fail(); + return; + } + }; + } + // If the handler is already gone, we remove the fd. + None => { + let _ = poll_ctx.delete(&Fd(fd)); + if locked.remove(&fd).is_none() { + error!("fail to remove handler for file descriptor {}", fd); + } + } + }; + } + } + } + }); + + Ok((event_loop, handle)) + } + + /// Add a new event to event loop. The event handler will be invoked when `event` happens on + /// `fd`. + /// + /// If the same `fd` is added multiple times, the old handler will be replaced. + /// EventLoop will not keep `handler` alive, if handler is dropped when `event` is triggered, + /// the event will be removed. + pub fn add_event( + &self, + fd: &AsRawFd, + events: WatchingEvents, + handler: Weak<EventHandler>, + ) -> Result<()> { + if self.fail_handle.failed() { + return Err(Error::EventLoopAlreadyFailed); + } + self.handlers.lock().insert(fd.as_raw_fd(), handler); + // This might fail due to epoll syscall. Check epoll_ctl(2). + self.poll_ctx + .add_fd_with_events(fd, events, Fd(fd.as_raw_fd())) + .map_err(Error::PollContextAddFd) + } + + /// Removes event for this `fd`. This function returns false if it fails. + /// + /// EventLoop does not guarantee all events for `fd` is handled. + pub fn remove_event_for_fd(&self, fd: &AsRawFd) -> Result<()> { + if self.fail_handle.failed() { + return Err(Error::EventLoopAlreadyFailed); + } + // This might fail due to epoll syscall. Check epoll_ctl(2). + self.poll_ctx + .delete(fd) + .map_err(Error::PollContextDeleteFd)?; + self.handlers.lock().remove(&fd.as_raw_fd()); + Ok(()) + } + + /// Stops this event loop asynchronously. Previous events might not be handled. + pub fn stop(&self) { + match self.stop_evt.write(1) { + Ok(_) => {} + Err(_) => { + error!("fail to send event loop stop event, it might already stopped"); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; + use std::sync::{Arc, Condvar, Mutex}; + use sys_util::EventFd; + + struct EventLoopTestHandler { + val: Mutex<u8>, + cvar: Condvar, + } + + impl EventHandler for EventLoopTestHandler { + fn on_event(&self, fd: RawFd) -> std::result::Result<(), ()> { + let _ = unsafe { EventFd::from_raw_fd(fd).read() }; + *self.val.lock().unwrap() += 1; + self.cvar.notify_one(); + Ok(()) + } + } + + #[test] + fn event_loop_test() { + let (l, j) = EventLoop::start(None).unwrap(); + let (self_evt, evt) = match EventFd::new().and_then(|e| Ok((e.try_clone()?, e))) { + Ok(v) => v, + Err(e) => { + error!("failed creating EventFd pair: {:?}", e); + return; + } + }; + let h = Arc::new(EventLoopTestHandler { + val: Mutex::new(0), + cvar: Condvar::new(), + }); + let t: Arc<EventHandler> = h.clone(); + l.add_event(&evt, WatchingEvents::empty().set_read(), Arc::downgrade(&t)); + self_evt.write(1).unwrap(); + let _ = h.cvar.wait(h.val.lock().unwrap()).unwrap(); + l.stop(); + j.join().unwrap(); + assert_eq!(*(h.val.lock().unwrap()), 1); + } +} diff --git a/devices/src/utils/mod.rs b/devices/src/utils/mod.rs new file mode 100644 index 0000000..6f35025 --- /dev/null +++ b/devices/src/utils/mod.rs @@ -0,0 +1,11 @@ +// Copyright 2018 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +mod async_job_queue; +mod error; +mod event_loop; + +pub use self::async_job_queue::*; +pub use self::error::*; +pub use self::event_loop::*; |