summary refs log tree commit diff
path: root/sys_util/src/poll.rs
diff options
context:
space:
mode:
authorZach Reizner <zachr@google.com>2018-02-22 22:05:55 -0800
committerchrome-bot <chrome-bot@chromium.org>2018-03-07 16:54:45 -0800
commita5358e8ffd53d04750dc2c3376abc3fb5df92659 (patch)
tree9a69ba015ec288d6853317060dc3b453b4f955be /sys_util/src/poll.rs
parent5b6b192e34311924388715f1981e9ffca7c4b53c (diff)
downloadcrosvm-a5358e8ffd53d04750dc2c3376abc3fb5df92659.tar
crosvm-a5358e8ffd53d04750dc2c3376abc3fb5df92659.tar.gz
crosvm-a5358e8ffd53d04750dc2c3376abc3fb5df92659.tar.bz2
crosvm-a5358e8ffd53d04750dc2c3376abc3fb5df92659.tar.lz
crosvm-a5358e8ffd53d04750dc2c3376abc3fb5df92659.tar.xz
crosvm-a5358e8ffd53d04750dc2c3376abc3fb5df92659.tar.zst
crosvm-a5358e8ffd53d04750dc2c3376abc3fb5df92659.zip
sys_util: add PollContext interface for using epoll
A common cause of silent 100% CPU usage on otherwise idle VMs is because
some poll loop is waiting on sockets that were hung up on. An unrelated
issue is that using the Poller interface requires dynamic allocation on
every poll call for dynamically sized poll lists.

The PollContext struct detects and warns about the first problem at runtime
and solves the latter problem.

TEST=cargo test -p sys_util
BUG=chromium:816692

Change-Id: I42a9c961db07191d25bcba77c5136f5729400ec9
Reviewed-on: https://chromium-review.googlesource.com/933870
Commit-Ready: Zach Reizner <zachr@chromium.org>
Tested-by: Zach Reizner <zachr@chromium.org>
Reviewed-by: Chirantan Ekbote <chirantan@chromium.org>
Diffstat (limited to 'sys_util/src/poll.rs')
-rw-r--r--sys_util/src/poll.rs496
1 files changed, 492 insertions, 4 deletions
diff --git a/sys_util/src/poll.rs b/sys_util/src/poll.rs
index b6f3a1b..0a6da7b 100644
--- a/sys_util/src/poll.rs
+++ b/sys_util/src/poll.rs
@@ -2,15 +2,27 @@
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file.
 
-use std::time::Duration;
-use std::ptr::null;
-use std::os::unix::io::{AsRawFd, RawFd};
+use std::cell::{RefCell, Cell, Ref};
+use std::cmp::min;
+use std::fs::File;
+use std::i32;
+use std::i64;
+use std::marker::PhantomData;
+use std::os::unix::io::{RawFd, AsRawFd, IntoRawFd, FromRawFd};
 use std::os::unix::net::{UnixDatagram, UnixStream};
+use std::ptr::{null, null_mut};
+use std::slice;
+use std::thread;
+use std::time::Duration;
 
-use libc::{c_int, c_long, timespec, time_t, nfds_t, sigset_t, pollfd, syscall, SYS_ppoll, POLLIN};
+use libc::{c_int, c_long, timespec, time_t, nfds_t, sigset_t, pollfd, syscall, SYS_ppoll, POLLIN,
+           EPOLL_CLOEXEC, EPOLLIN, EPOLLHUP, EPOLL_CTL_ADD, EPOLL_CTL_MOD, EPOLL_CTL_DEL,
+           epoll_create1, epoll_ctl, epoll_wait, epoll_event};
 
 use {Result, errno_result};
 
+const POLL_CONTEXT_MAX_EVENTS: usize = 16;
+
 // The libc wrapper suppresses the kernel's changes to timeout, so we use the syscall directly.
 unsafe fn ppoll(fds: *mut pollfd,
                 nfds: nfds_t,
@@ -137,9 +149,409 @@ impl Poller {
     }
 }
 
+/// Trait for a token that can be associated with an `fd` in a `PollContext`.
+pub trait PollToken {
+    /// Converts this token into a u64 that can be turned back into a token via `from_raw_token`.
+    fn as_raw_token(&self) -> u64;
+
+    /// Converts a raw token as returned from `as_raw_token` back into a token.
+    ///
+    /// It is invalid to give a raw token that was not returned via `as_raw_token` from the same
+    /// `Self`. The implementation can expect that this will never happen as a result of its usage
+    /// in `PollContext`.
+    fn from_raw_token(data: u64) -> Self;
+}
+
+impl PollToken for usize {
+    fn as_raw_token(&self) -> u64 {
+        *self as u64
+    }
+
+    fn from_raw_token(data: u64) -> Self {
+        data as Self
+    }
+}
+
+impl PollToken for u64 {
+    fn as_raw_token(&self) -> u64 {
+        *self as u64
+    }
+
+    fn from_raw_token(data: u64) -> Self {
+        data as Self
+    }
+}
+
+impl PollToken for u32 {
+    fn as_raw_token(&self) -> u64 {
+        *self as u64
+    }
+
+    fn from_raw_token(data: u64) -> Self {
+        data as Self
+    }
+}
+
+impl PollToken for u16 {
+    fn as_raw_token(&self) -> u64 {
+        *self as u64
+    }
+
+    fn from_raw_token(data: u64) -> Self {
+        data as Self
+    }
+}
+
+impl PollToken for u8 {
+    fn as_raw_token(&self) -> u64 {
+        *self as u64
+    }
+
+    fn from_raw_token(data: u64) -> Self {
+        data as Self
+    }
+}
+
+impl PollToken for () {
+    fn as_raw_token(&self) -> u64 {
+        0
+    }
+
+    fn from_raw_token(_data: u64) -> Self {}
+}
+
+/// An event returned by `PollContext::wait`.
+pub struct PollEvent<'a, T> {
+    event: &'a epoll_event,
+    token: PhantomData<T>, // Needed to satisfy usage of T
+}
+
+impl<'a, T: PollToken> PollEvent<'a, T> {
+    /// Gets the token associated in `PollContext::add` with this event.
+    pub fn token(&self) -> T {
+        T::from_raw_token(self.event.u64)
+    }
+
+    /// True if the `fd` associated with this token in `PollContext::add` is readable.
+    pub fn readable(&self) -> bool {
+        self.event.events & (EPOLLIN as u32) != 0
+    }
+
+    /// True if the `fd` associated with this token in `PollContext::add` has been hungup on.
+    pub fn hungup(&self) -> bool {
+        self.event.events & (EPOLLHUP as u32) != 0
+    }
+}
+
+/// An iterator over some (sub)set of events returned by `PollContext::wait`.
+pub struct PollEventIter<'a, I, T>
+    where I: Iterator<Item = &'a epoll_event>
+{
+    mask: u32,
+    iter: I,
+    tokens: PhantomData<[T]>, // Needed to satisfy usage of T
+}
+
+impl<'a, I, T> Iterator for PollEventIter<'a, I, T>
+    where I: Iterator<Item = &'a epoll_event>,
+          T: PollToken
+{
+    type Item = PollEvent<'a, T>;
+    fn next(&mut self) -> Option<Self::Item> {
+        let mask = self.mask;
+        self.iter
+            .find(|event| (event.events & mask) != 0)
+            .map(|event| {
+                     PollEvent {
+                         event,
+                         token: PhantomData,
+                     }
+                 })
+    }
+}
+
+/// The list of event returned by `PollContext::wait`.
+pub struct PollEvents<'a, T> {
+    count: usize,
+    events: Ref<'a, [epoll_event; POLL_CONTEXT_MAX_EVENTS]>,
+    tokens: PhantomData<[T]>, // Needed to satisfy usage of T
+}
+
+impl<'a, T: PollToken> PollEvents<'a, T> {
+    /// Iterates over each event.
+    pub fn iter(&self) -> PollEventIter<slice::Iter<epoll_event>, T> {
+        PollEventIter {
+            mask: 0xffffffff,
+            iter: self.events[..self.count].iter(),
+            tokens: PhantomData,
+        }
+    }
+
+    /// Iterates over each readable event.
+    pub fn iter_readable(&self) -> PollEventIter<slice::Iter<epoll_event>, T> {
+        PollEventIter {
+            mask: EPOLLIN as u32,
+            iter: self.events[..self.count].iter(),
+            tokens: PhantomData,
+        }
+    }
+
+    /// Iterates over each hungup event.
+    pub fn iter_hungup(&self) -> PollEventIter<slice::Iter<epoll_event>, T> {
+        PollEventIter {
+            mask: EPOLLHUP as u32,
+            iter: self.events[..self.count].iter(),
+            tokens: PhantomData,
+        }
+    }
+}
+
+/// Used to poll multiple objects that have file descriptors.
+///
+/// # Example
+///
+/// ```
+/// # use sys_util::{Result, EventFd, PollContext, PollEvents};
+/// # fn test() -> Result<()> {
+///     let evt1 = EventFd::new()?;
+///     let evt2 = EventFd::new()?;
+///     evt2.write(1)?;
+///
+///     let ctx: PollContext<u32> = PollContext::new()?;
+///     ctx.add(&evt1, 1)?;
+///     ctx.add(&evt2, 2)?;
+///
+///     let pollevents: PollEvents<u32> = ctx.wait()?;
+///     let tokens: Vec<u32> = pollevents.iter_readable().map(|e| e.token()).collect();
+///     assert_eq!(&tokens[..], &[2]);
+/// #   Ok(())
+/// # }
+/// ```
+pub struct PollContext<T> {
+    epoll_ctx: File,
+
+    // We use a RefCell here so that the `wait` method only requires an immutable self reference
+    // while returning the events (encapsulated by PollEvents). Without the RefCell, `wait` would
+    // hold a mutable reference that lives as long as its returned reference (i.e. the PollEvents),
+    // even though that reference is immutable. This is terribly inconvenient for the caller because
+    // the borrow checking would prevent them from using `delete` and `add` while the events are in
+    // scope.
+    events: RefCell<[epoll_event; POLL_CONTEXT_MAX_EVENTS]>,
+
+    // Hangup busy loop detection variables. See `check_for_hungup_busy_loop`.
+    hangups: Cell<usize>,
+    max_hangups: Cell<usize>,
+
+    // Needed to satisfy usage of T
+    tokens: PhantomData<[T]>,
+}
+
+impl<T: PollToken> PollContext<T> {
+    /// Creates a new `PollContext`.
+    pub fn new() -> Result<PollContext<T>> {
+        // Safe because we check the return value.
+        let epoll_fd = unsafe { epoll_create1(EPOLL_CLOEXEC) };
+        if epoll_fd < 0 {
+            return errno_result();
+        }
+        Ok(PollContext {
+               epoll_ctx: unsafe { File::from_raw_fd(epoll_fd) },
+               events: RefCell::new([epoll_event { events: 0, u64: 0 }; POLL_CONTEXT_MAX_EVENTS]),
+               hangups: Cell::new(0),
+               max_hangups: Cell::new(0),
+               // Safe because the `epoll_fd` is valid and we hold unique ownership.
+               tokens: PhantomData,
+           })
+    }
+
+    /// Adds the given `fd` to this context and associates the given `token` with the `fd`'s events.
+    ///
+    /// A `fd` can only be added once and does not need to be kept open. If the `fd` is dropped and
+    /// there were no duplicated file descriptors (i.e. adding the same descriptor with a different
+    /// FD number) added to this context, events will not be reported by `wait` anymore.
+    pub fn add(&self, fd: &AsRawFd, token: T) -> Result<()> {
+        let mut evt = epoll_event {
+            events: EPOLLIN as u32,
+            u64: token.as_raw_token(),
+        };
+        // Safe because we give a valid epoll FD and FD to watch, as well as a valid epoll_event
+        // structure. Then we check the return value.
+        let ret = unsafe {
+            epoll_ctl(self.epoll_ctx.as_raw_fd(),
+                      EPOLL_CTL_ADD,
+                      fd.as_raw_fd(),
+                      &mut evt)
+        };
+        if ret < 0 {
+            return errno_result();
+        };
+
+        // Used to detect busy loop waits caused by unhandled hangup events.
+        self.hangups.set(0);
+        self.max_hangups.set(self.max_hangups.get() + 1);
+        Ok(())
+    }
+
+    /// If `fd` was previously added to this context, the token associated with it will be replaced
+    /// with the given `token`.
+    pub fn modify(&self, fd: &AsRawFd, token: T) -> Result<()> {
+        let mut evt = epoll_event {
+            events: EPOLLIN as u32,
+            u64: token.as_raw_token(),
+        };
+        // Safe because we give a valid epoll FD and FD to modify, as well as a valid epoll_event
+        // structure. Then we check the return value.
+        let ret = unsafe {
+            epoll_ctl(self.epoll_ctx.as_raw_fd(),
+                      EPOLL_CTL_MOD,
+                      fd.as_raw_fd(),
+                      &mut evt)
+        };
+        if ret < 0 {
+            return errno_result();
+        };
+        Ok(())
+    }
+
+    /// Deletes the given `fd` from this context.
+    ///
+    /// If an `fd`'s token shows up in the list of hangup events, it should be removed using this
+    /// method or by closing/dropping (if and only if the fd was never dup()'d/fork()'d) the `fd`.
+    /// Failure to do so will cause the `wait` method to always return immediately, causing ~100%
+    /// CPU load.
+    pub fn delete(&self, fd: &AsRawFd) -> Result<()> {
+        // Safe because we give a valid epoll FD and FD to stop watching. Then we check the return
+        // value.
+        let ret = unsafe {
+            epoll_ctl(self.epoll_ctx.as_raw_fd(),
+                      EPOLL_CTL_DEL,
+                      fd.as_raw_fd(),
+                      null_mut())
+        };
+        if ret < 0 {
+            return errno_result();
+        };
+
+        // Used to detect busy loop waits caused by unhandled hangup events.
+        self.hangups.set(0);
+        self.max_hangups.set(self.max_hangups.get() - 1);
+        Ok(())
+    }
+
+    // This method determines if the the user of wait is misusing the `PollContext` by leaving FDs
+    // in this `PollContext` that have been shutdown or hungup on. Such an FD will cause `wait` to
+    // return instantly with a hungup event. If that FD is perpetually left in this context, a busy
+    // loop burning ~100% of one CPU will silently occur with no human visible malfunction.
+    //
+    // How do we know if the client of this context is ignoring hangups? A naive implementation
+    // would trigger if consecutive wait calls yield hangup events, but there are legitimate cases
+    // for this, such as two distinct sockets becoming hungup across two consecutive wait calls. A
+    // smarter implementation would only trigger if `delete` wasn't called between waits that
+    // yielded hangups. Sadly `delete` isn't the only way to remove an FD from this context. The
+    // other way is for the client to close the hungup FD, which automatically removes it from this
+    // context. Assuming that the client always uses close, this implementation would too eagerly
+    // trigger.
+    //
+    // The implementation used here keeps an upper bound of FDs in this context using a counter
+    // hooked into add/delete (which is imprecise because close can also remove FDs without us
+    // knowing). The number of consecutive (no add or delete in between) hangups yielded by wait
+    // calls is counted and compared to the upper bound. If the upper bound is exceeded by the
+    // consecutive hangups, the implementation triggers the check and logs.
+    //
+    // This implementation has false negatives because the upper bound can be completely too high,
+    // in the worst case caused by only using close instead of delete. However, this method has the
+    // advantage of always triggering eventually genuine busy loop cases, requires no dynamic
+    // allocations, is fast and constant time to compute, and has no false positives.
+    fn check_for_hungup_busy_loop(&self, new_hangups: usize) {
+        let old_hangups = self.hangups.get();
+        let max_hangups = self.max_hangups.get();
+        if old_hangups <= max_hangups && old_hangups + new_hangups > max_hangups {
+            warn!("busy poll wait loop with hungup FDs detected on thread {}",
+                  thread::current().name().unwrap_or(""));
+            // This panic is helpful for tests of this functionality.
+            #[cfg(test)]
+            panic!("hungup busy loop detected");
+        }
+        self.hangups.set(old_hangups + new_hangups);
+    }
+
+    /// Waits for any events to occur in FDs that were previously added to this context.
+    ///
+    /// The events are level-triggered, meaning that if any events are unhandled (i.e. not reading
+    /// for readable events and not closing for hungup events), subsequent calls to `wait` will
+    /// return immediately. The consequence of not handling an event perpetually while calling
+    /// `wait` is that the callers loop will degenerated to busy loop polling, pinning a CPU to
+    /// ~100% usage.
+    ///
+    /// # Panics
+    /// Panics if the returned `PollEvents` structure is not dropped before subsequent `wait` calls.
+    pub fn wait(&self) -> Result<PollEvents<T>> {
+        self.wait_timeout(Duration::new(i64::MAX as u64, 0))
+    }
+
+    /// Like `wait` except will only block for a maximum of the given `timeout`.
+    ///
+    /// This may return earlier than `timeout` with zero events if the duration indicated exceeds
+    /// system limits.
+    pub fn wait_timeout(&self, timeout: Duration) -> Result<PollEvents<T>> {
+        let timeout_millis = if timeout.as_secs() as i64 == i64::max_value() {
+            // We make the convenient assumption that 2^63 seconds is an effectively unbounded time
+            // frame. This is meant to mesh with `wait` calling us with no timeout.
+            -1
+        } else {
+            // In cases where we the number of milliseconds would overflow an i32, we substitute the
+            // maximum timeout which is ~24.8 days.
+            let millis = timeout
+                .as_secs()
+                .checked_mul(1_000)
+                .and_then(|ms| ms.checked_add(timeout.subsec_nanos() as u64 / 1_000_000))
+                .unwrap_or(i32::max_value() as u64);
+            min(i32::max_value() as u64, millis) as i32
+        };
+        let ret = {
+            let mut epoll_events = self.events.borrow_mut();
+            let max_events = epoll_events.len() as c_int;
+            // Safe because we give an epoll context and a properly sized epoll_events array
+            // pointer, which we trust the kernel to fill in properly.
+            unsafe {
+                epoll_wait(self.epoll_ctx.as_raw_fd(),
+                           &mut epoll_events[0],
+                           max_events,
+                           timeout_millis)
+            }
+        };
+        if ret < 0 {
+            return errno_result();
+        }
+        let epoll_events = self.events.borrow();
+        let events = PollEvents {
+            count: ret as usize,
+            events: epoll_events,
+            tokens: PhantomData,
+        };
+        let hangups = events.iter_hungup().count();
+        self.check_for_hungup_busy_loop(hangups);
+        Ok(events)
+    }
+}
+
+impl<T: PollToken> AsRawFd for PollContext<T> {
+    fn as_raw_fd(&self) -> RawFd {
+        self.epoll_ctx.as_raw_fd()
+    }
+}
+
+impl<T: PollToken> IntoRawFd for PollContext<T> {
+    fn into_raw_fd(self) -> RawFd {
+        self.epoll_ctx.into_raw_fd()
+    }
+}
+
+
 #[cfg(test)]
 mod tests {
     use super::*;
+    use std::time::Instant;
     use EventFd;
 
     #[test]
@@ -182,4 +594,80 @@ mod tests {
                    Ok([1].as_ref()));
         assert!(timeout_dur < initial_dur);
     }
+
+    #[test]
+    fn poll_context() {
+        let evt1 = EventFd::new().unwrap();
+        let evt2 = EventFd::new().unwrap();
+        evt1.write(1).unwrap();
+        evt2.write(1).unwrap();
+        let ctx: PollContext<u32> = PollContext::new().unwrap();
+        ctx.add(&evt1, 1).unwrap();
+        ctx.add(&evt2, 2).unwrap();
+
+        let mut evt_count = 0;
+        while evt_count < 2 {
+            for event in ctx.wait().unwrap().iter_readable() {
+                evt_count += 1;
+                match event.token() {
+                    1 => {
+                        evt1.read().unwrap();
+                        ctx.delete(&evt1).unwrap();
+                    }
+                    2 => {
+                        evt2.read().unwrap();
+                        ctx.delete(&evt2).unwrap();
+                    }
+                    _ => panic!("unexpected token"),
+                };
+            }
+        }
+        assert_eq!(evt_count, 2);
+    }
+
+    #[test]
+    fn poll_context_overflow() {
+        const EVT_COUNT: usize = POLL_CONTEXT_MAX_EVENTS * 2 + 1;
+        let ctx: PollContext<usize> = PollContext::new().unwrap();
+        let mut evts = Vec::with_capacity(EVT_COUNT);
+        for i in 0..EVT_COUNT {
+            let evt = EventFd::new().unwrap();
+            evt.write(1).unwrap();
+            ctx.add(&evt, i).unwrap();
+            evts.push(evt);
+        }
+        let mut evt_count = 0;
+        while evt_count < EVT_COUNT {
+            for event in ctx.wait().unwrap().iter_readable() {
+                evts[event.token()].read().unwrap();
+                evt_count += 1;
+            }
+        }
+    }
+
+    #[test]
+    #[should_panic]
+    fn poll_context_hungup() {
+        let (s1, s2) = UnixStream::pair().unwrap();
+        let ctx: PollContext<u32> = PollContext::new().unwrap();
+        ctx.add(&s1, 1).unwrap();
+
+        // Causes s1 to receive hangup events, which we purposefully ignore to trip the detection
+        // logic in `PollContext`.
+        drop(s2);
+
+        // Should easily panic within this many iterations.
+        for _ in 0..1000 {
+            ctx.wait().unwrap();
+        }
+    }
+
+    #[test]
+    fn poll_context_timeout() {
+        let ctx: PollContext<u32> = PollContext::new().unwrap();
+        let dur = Duration::from_millis(10);
+        let start_inst = Instant::now();
+        ctx.wait_timeout(dur).unwrap();
+        assert!(start_inst.elapsed() >= dur);
+    }
 }