diff options
author | Jingkui Wang <jkwang@google.com> | 2019-03-07 13:43:33 -0800 |
---|---|---|
committer | chrome-bot <chrome-bot@chromium.org> | 2019-03-13 21:04:57 -0700 |
commit | 415ee63e043119aecdc2907d83f18a538698ad2d (patch) | |
tree | 22dc366816cac02349908e038d1e5a7c4ce4487e /devices/src/utils/event_loop.rs | |
parent | ea75bd164aedca9c429f1a9421ba944d8c786a50 (diff) | |
download | crosvm-415ee63e043119aecdc2907d83f18a538698ad2d.tar crosvm-415ee63e043119aecdc2907d83f18a538698ad2d.tar.gz crosvm-415ee63e043119aecdc2907d83f18a538698ad2d.tar.bz2 crosvm-415ee63e043119aecdc2907d83f18a538698ad2d.tar.lz crosvm-415ee63e043119aecdc2907d83f18a538698ad2d.tar.xz crosvm-415ee63e043119aecdc2907d83f18a538698ad2d.tar.zst crosvm-415ee63e043119aecdc2907d83f18a538698ad2d.zip |
add utils for device implementations
event_loop: event loop based on poll context. async_job_queue: queue a job, it will be invoked on event loop. This could be used to invoke a function without holding any locks. BUG=chromium:831850 TEST=local build Change-Id: Iab61ac43221bf5d635a0138073d7f88401e5ab07 Reviewed-on: https://chromium-review.googlesource.com/1509852 Commit-Ready: Jingkui Wang <jkwang@google.com> Tested-by: Jingkui Wang <jkwang@google.com> Tested-by: kokoro <noreply+kokoro@google.com> Reviewed-by: Zach Reizner <zachr@chromium.org>
Diffstat (limited to 'devices/src/utils/event_loop.rs')
-rw-r--r-- | devices/src/utils/event_loop.rs | 242 |
1 files changed, 242 insertions, 0 deletions
diff --git a/devices/src/utils/event_loop.rs b/devices/src/utils/event_loop.rs new file mode 100644 index 0000000..f27c19c --- /dev/null +++ b/devices/src/utils/event_loop.rs @@ -0,0 +1,242 @@ +// Copyright 2018 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use super::error::{Error, Result}; +use std::collections::BTreeMap; +use std::mem::drop; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::sync::{Arc, Weak}; +use std::thread; +use sync::Mutex; +use sys_util::{EpollContext, EpollEvents, EventFd, PollToken, WatchingEvents}; + +/// A fail handle will do the clean up when we cannot recover from some error. +pub trait FailHandle: Send + Sync { + /// Fail the code. + fn fail(&self); + /// Returns true if already failed. + fn failed(&self) -> bool; +} + +impl FailHandle for Option<Arc<FailHandle>> { + fn fail(&self) { + match *self { + Some(ref handle) => handle.fail(), + None => error!("event loop trying to fail without a fail handle"), + } + } + + fn failed(&self) -> bool { + match &self { + Some(ref handle) => handle.failed(), + None => false, + } + } +} + +/// Fd is a wrapper of RawFd. It implements AsRawFd trait and PollToken trait for RawFd. +/// It does not own the fd, thus won't close the fd when dropped. +struct Fd(pub RawFd); +impl AsRawFd for Fd { + fn as_raw_fd(&self) -> RawFd { + self.0 + } +} + +impl PollToken for Fd { + fn as_raw_token(&self) -> u64 { + self.0 as u64 + } + + fn from_raw_token(data: u64) -> Self { + Fd(data as RawFd) + } +} + +/// EpollEventLoop is an event loop blocked on a set of fds. When a monitered events is triggered, +/// event loop will invoke the mapped handler. +pub struct EventLoop { + fail_handle: Option<Arc<FailHandle>>, + poll_ctx: Arc<EpollContext<Fd>>, + handlers: Arc<Mutex<BTreeMap<RawFd, Weak<EventHandler>>>>, + stop_evt: EventFd, +} + +/// Interface for event handler. +pub trait EventHandler: Send + Sync { + fn on_event(&self, fd: RawFd) -> std::result::Result<(), ()>; +} + +impl EventLoop { + /// Start an event loop. An optional fail handle could be passed to the event loop. + pub fn start( + fail_handle: Option<Arc<FailHandle>>, + ) -> Result<(EventLoop, thread::JoinHandle<()>)> { + let (self_stop_evt, stop_evt) = EventFd::new() + .and_then(|e| Ok((e.try_clone()?, e))) + .map_err(Error::CreateEventFd)?; + + let fd_callbacks: Arc<Mutex<BTreeMap<RawFd, Weak<EventHandler>>>> = + Arc::new(Mutex::new(BTreeMap::new())); + let poll_ctx: EpollContext<Fd> = EpollContext::new() + .and_then(|pc| pc.add(&stop_evt, Fd(stop_evt.as_raw_fd())).and(Ok(pc))) + .map_err(Error::CreatePollContext)?; + + let poll_ctx = Arc::new(poll_ctx); + let event_loop = EventLoop { + fail_handle: fail_handle.clone(), + poll_ctx: poll_ctx.clone(), + handlers: fd_callbacks.clone(), + stop_evt: self_stop_evt, + }; + + let handle = thread::spawn(move || { + let event_loop = EpollEvents::new(); + loop { + if fail_handle.failed() { + error!("xhci controller already failed, stopping event ring"); + return; + } + let events = match poll_ctx.wait(&event_loop) { + Ok(events) => events, + Err(e) => { + error!("cannot poll {:?}", e); + fail_handle.fail(); + return; + } + }; + for event in events.iter() { + if event.token().as_raw_fd() == stop_evt.as_raw_fd() { + return; + } else { + let fd = event.token().as_raw_fd(); + let mut locked = fd_callbacks.lock(); + let weak_handler = match locked.get(&fd) { + Some(cb) => cb.clone(), + None => { + warn!("callback for fd {} already removed", fd); + continue; + } + }; + match weak_handler.upgrade() { + Some(handler) => { + // Drop lock before triggering the event. + drop(locked); + match handler.on_event(fd) { + Ok(()) => {} + Err(_) => { + error!("event loop stopping due to handle event error"); + fail_handle.fail(); + return; + } + }; + } + // If the handler is already gone, we remove the fd. + None => { + let _ = poll_ctx.delete(&Fd(fd)); + if locked.remove(&fd).is_none() { + error!("fail to remove handler for file descriptor {}", fd); + } + } + }; + } + } + } + }); + + Ok((event_loop, handle)) + } + + /// Add a new event to event loop. The event handler will be invoked when `event` happens on + /// `fd`. + /// + /// If the same `fd` is added multiple times, the old handler will be replaced. + /// EventLoop will not keep `handler` alive, if handler is dropped when `event` is triggered, + /// the event will be removed. + pub fn add_event( + &self, + fd: &AsRawFd, + events: WatchingEvents, + handler: Weak<EventHandler>, + ) -> Result<()> { + if self.fail_handle.failed() { + return Err(Error::EventLoopAlreadyFailed); + } + self.handlers.lock().insert(fd.as_raw_fd(), handler); + // This might fail due to epoll syscall. Check epoll_ctl(2). + self.poll_ctx + .add_fd_with_events(fd, events, Fd(fd.as_raw_fd())) + .map_err(Error::PollContextAddFd) + } + + /// Removes event for this `fd`. This function returns false if it fails. + /// + /// EventLoop does not guarantee all events for `fd` is handled. + pub fn remove_event_for_fd(&self, fd: &AsRawFd) -> Result<()> { + if self.fail_handle.failed() { + return Err(Error::EventLoopAlreadyFailed); + } + // This might fail due to epoll syscall. Check epoll_ctl(2). + self.poll_ctx + .delete(fd) + .map_err(Error::PollContextDeleteFd)?; + self.handlers.lock().remove(&fd.as_raw_fd()); + Ok(()) + } + + /// Stops this event loop asynchronously. Previous events might not be handled. + pub fn stop(&self) { + match self.stop_evt.write(1) { + Ok(_) => {} + Err(_) => { + error!("fail to send event loop stop event, it might already stopped"); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; + use std::sync::{Arc, Condvar, Mutex}; + use sys_util::EventFd; + + struct EventLoopTestHandler { + val: Mutex<u8>, + cvar: Condvar, + } + + impl EventHandler for EventLoopTestHandler { + fn on_event(&self, fd: RawFd) -> std::result::Result<(), ()> { + let _ = unsafe { EventFd::from_raw_fd(fd).read() }; + *self.val.lock().unwrap() += 1; + self.cvar.notify_one(); + Ok(()) + } + } + + #[test] + fn event_loop_test() { + let (l, j) = EventLoop::start(None).unwrap(); + let (self_evt, evt) = match EventFd::new().and_then(|e| Ok((e.try_clone()?, e))) { + Ok(v) => v, + Err(e) => { + error!("failed creating EventFd pair: {:?}", e); + return; + } + }; + let h = Arc::new(EventLoopTestHandler { + val: Mutex::new(0), + cvar: Condvar::new(), + }); + let t: Arc<EventHandler> = h.clone(); + l.add_event(&evt, WatchingEvents::empty().set_read(), Arc::downgrade(&t)); + self_evt.write(1).unwrap(); + let _ = h.cvar.wait(h.val.lock().unwrap()).unwrap(); + l.stop(); + j.join().unwrap(); + assert_eq!(*(h.val.lock().unwrap()), 1); + } +} |