summary refs log tree commit diff
path: root/src/plugin
diff options
context:
space:
mode:
authorMatt Delco <delco@google.com>2019-04-17 15:14:11 -0700
committerchrome-bot <chrome-bot@chromium.org>2019-04-24 15:51:11 -0700
commite3fdadb8e18d567f7e2377613d875f12243b785a (patch)
treea3296ca5dc8e0c3c11dbaecec3957329c92b8f21 /src/plugin
parent2ec62db5f77cd52fe934e87d3be3c832251f4748 (diff)
downloadcrosvm-e3fdadb8e18d567f7e2377613d875f12243b785a.tar
crosvm-e3fdadb8e18d567f7e2377613d875f12243b785a.tar.gz
crosvm-e3fdadb8e18d567f7e2377613d875f12243b785a.tar.bz2
crosvm-e3fdadb8e18d567f7e2377613d875f12243b785a.tar.lz
crosvm-e3fdadb8e18d567f7e2377613d875f12243b785a.tar.xz
crosvm-e3fdadb8e18d567f7e2377613d875f12243b785a.tar.zst
crosvm-e3fdadb8e18d567f7e2377613d875f12243b785a.zip
crosvm: use pipe instead of socket for vcpu communication
Pipes have better performance than sockets, so switch the vcpu
communication over to pipes.  The vm communication channels will
continue to use sockets since that communication isn't performance
critical (and those messages sometimes exchange file descriptors, and
that functionality requires sockets).

TEST=local compile and confirmed that my diagnostic plugin is still
happy. The time it takes to run my benchmark plugin has decreased by
20%.  This combined with my prior commit results in a net wall-clock
time reduction of 32%.
BUG=None

Change-Id: I44c198d62a3bbe3b539ff6ac79707d02488876e3
Signed-off-by: Matt Delco <delco@google.com>
Reviewed-on: https://chromium-review.googlesource.com/1572873
Commit-Ready: Matt Delco <delco@chromium.org>
Tested-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Zach Reizner <zachr@chromium.org>
Diffstat (limited to 'src/plugin')
-rw-r--r--src/plugin/mod.rs62
-rw-r--r--src/plugin/process.rs50
-rw-r--r--src/plugin/vcpu.rs24
3 files changed, 104 insertions, 32 deletions
diff --git a/src/plugin/mod.rs b/src/plugin/mod.rs
index 128b434..82c686f 100644
--- a/src/plugin/mod.rs
+++ b/src/plugin/mod.rs
@@ -8,7 +8,7 @@ mod vcpu;
 use std::fmt::{self, Display};
 use std::fs::File;
 use std::io;
-use std::os::unix::io::{FromRawFd, IntoRawFd};
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
 use std::os::unix::net::UnixDatagram;
 use std::path::Path;
 use std::result;
@@ -18,8 +18,9 @@ use std::thread;
 use std::time::{Duration, Instant};
 
 use libc::{
-    c_ulong, ioctl, socketpair, AF_UNIX, EAGAIN, EBADF, EDEADLK, EEXIST, EINTR, EINVAL, ENOENT,
-    EOVERFLOW, EPERM, FIOCLEX, MS_NODEV, MS_NOEXEC, MS_NOSUID, SIGCHLD, SOCK_SEQPACKET,
+    c_int, c_ulong, fcntl, ioctl, socketpair, AF_UNIX, EAGAIN, EBADF, EDEADLK, EEXIST, EINTR,
+    EINVAL, ENOENT, EOVERFLOW, EPERM, FIOCLEX, F_SETPIPE_SZ, MS_NODEV, MS_NOEXEC, MS_NOSUID,
+    SIGCHLD, SOCK_SEQPACKET,
 };
 
 use protobuf::ProtobufError;
@@ -29,7 +30,7 @@ use io_jail::{self, Minijail};
 use kvm::{Datamatch, IoeventAddress, Kvm, Vcpu, VcpuExit, Vm};
 use net_util::{Error as TapError, Tap, TapT};
 use sys_util::{
-    block_signal, clear_signal, drop_capabilities, error, getegid, geteuid, info,
+    block_signal, clear_signal, drop_capabilities, error, getegid, geteuid, info, pipe,
     register_signal_handler, validate_raw_fd, warn, Error as SysError, EventFd, GuestMemory,
     Killable, MmapError, PollContext, PollToken, Result as SysResult, SignalFd, SignalFdError,
     SIGRTMIN,
@@ -46,7 +47,7 @@ const MAX_VCPU_DATAGRAM_SIZE: usize = 0x40000;
 #[sorted]
 pub enum Error {
     CloneEventFd(SysError),
-    CloneVcpuSocket(io::Error),
+    CloneVcpuPipe(io::Error),
     CreateEventFd(SysError),
     CreateIrqChip(SysError),
     CreateJail(io_jail::Error),
@@ -114,7 +115,7 @@ impl Display for Error {
         #[sorted]
         match self {
             CloneEventFd(e) => write!(f, "failed to clone eventfd: {}", e),
-            CloneVcpuSocket(e) => write!(f, "failed to clone vcpu socket: {}", e),
+            CloneVcpuPipe(e) => write!(f, "failed to clone vcpu pipe: {}", e),
             CreateEventFd(e) => write!(f, "failed to create eventfd: {}", e),
             CreateIrqChip(e) => write!(f, "failed to create kvm irqchip: {}", e),
             CreateJail(e) => write!(f, "failed to create jail: {}", e),
@@ -197,6 +198,55 @@ fn new_seqpacket_pair() -> SysResult<(UnixDatagram, UnixDatagram)> {
     }
 }
 
+struct VcpuPipe {
+    crosvm_read: File,
+    plugin_write: File,
+    plugin_read: File,
+    crosvm_write: File,
+}
+
+fn new_pipe_pair() -> SysResult<VcpuPipe> {
+    let to_crosvm = pipe(true)?;
+    let to_plugin = pipe(true)?;
+    // Increasing the pipe size can be a nice-to-have to make sure that
+    // messages get across atomically (and made sure that writes don't block),
+    // though it's not necessary a hard requirement for things to work.
+    let flags = unsafe {
+        fcntl(
+            to_crosvm.0.as_raw_fd(),
+            F_SETPIPE_SZ,
+            MAX_VCPU_DATAGRAM_SIZE as c_int,
+        )
+    };
+    if flags < 0 || flags != MAX_VCPU_DATAGRAM_SIZE as i32 {
+        warn!(
+            "Failed to adjust size of crosvm pipe (result {}): {}",
+            flags,
+            SysError::last()
+        );
+    }
+    let flags = unsafe {
+        fcntl(
+            to_plugin.0.as_raw_fd(),
+            F_SETPIPE_SZ,
+            MAX_VCPU_DATAGRAM_SIZE as c_int,
+        )
+    };
+    if flags < 0 || flags != MAX_VCPU_DATAGRAM_SIZE as i32 {
+        warn!(
+            "Failed to adjust size of plugin pipe (result {}): {}",
+            flags,
+            SysError::last()
+        );
+    }
+    Ok(VcpuPipe {
+        crosvm_read: to_crosvm.0,
+        plugin_write: to_crosvm.1,
+        plugin_read: to_plugin.0,
+        crosvm_write: to_plugin.1,
+    })
+}
+
 fn proto_to_sys_err(e: ProtobufError) -> SysError {
     match e {
         ProtobufError::IoError(e) => SysError::new(e.raw_os_error().unwrap_or(EINVAL)),
diff --git a/src/plugin/process.rs b/src/plugin/process.rs
index 3e9655c..50b4465 100644
--- a/src/plugin/process.rs
+++ b/src/plugin/process.rs
@@ -5,8 +5,8 @@
 use std::collections::hash_map::{Entry, HashMap, VacantEntry};
 use std::env::set_var;
 use std::fs::File;
+use std::io::Write;
 use std::mem::transmute;
-use std::net::Shutdown;
 use std::os::unix::io::{AsRawFd, RawFd};
 use std::os::unix::net::UnixDatagram;
 use std::path::Path;
@@ -109,7 +109,7 @@ pub enum ProcessStatus {
 
 /// Creates, owns, and handles messages from a plugin process.
 ///
-/// A plugin process has control over a single VM and a fixed number of VCPUs via a set of unix
+/// A plugin process has control over a single VM and a fixed number of VCPUs via a set of pipes & unix
 /// domain socket connections and a protocol defined in `protos::plugin`. The plugin process is run
 /// in an unprivileged manner as a child process spawned via a path to a arbitrary executable.
 pub struct Process {
@@ -122,7 +122,7 @@ pub struct Process {
 
     // Resource to sent to plugin
     kill_evt: EventFd,
-    vcpu_sockets: Vec<(UnixDatagram, UnixDatagram)>,
+    vcpu_pipes: Vec<VcpuPipe>,
 
     // Socket Transmission
     request_buffer: Vec<u8>,
@@ -147,10 +147,9 @@ impl Process {
         let (request_socket, child_socket) =
             new_seqpacket_pair().map_err(Error::CreateMainSocket)?;
 
-        let mut vcpu_sockets: Vec<(UnixDatagram, UnixDatagram)> =
-            Vec::with_capacity(cpu_count as usize);
+        let mut vcpu_pipes: Vec<VcpuPipe> = Vec::with_capacity(cpu_count as usize);
         for _ in 0..cpu_count {
-            vcpu_sockets.push(new_seqpacket_pair().map_err(Error::CreateVcpuSocket)?);
+            vcpu_pipes.push(new_pipe_pair().map_err(Error::CreateVcpuSocket)?);
         }
         let mut per_vcpu_states: Vec<Arc<Mutex<PerVcpuState>>> =
             Vec::with_capacity(cpu_count as usize);
@@ -184,7 +183,7 @@ impl Process {
             shared_vcpu_state: Default::default(),
             per_vcpu_states,
             kill_evt: EventFd::new().map_err(Error::CreateEventFd)?,
-            vcpu_sockets,
+            vcpu_pipes,
             request_buffer: vec![0; MAX_DATAGRAM_SIZE],
             response_buffer: Vec::new(),
         })
@@ -197,14 +196,19 @@ impl Process {
     /// `PluginVcpu` object, the underlying resources are shared by each `PluginVcpu` resulting from
     /// the same `cpu_id`.
     pub fn create_vcpu(&self, cpu_id: u32) -> Result<PluginVcpu> {
-        let vcpu_socket = self.vcpu_sockets[cpu_id as usize]
-            .0
+        let vcpu_pipe_read = self.vcpu_pipes[cpu_id as usize]
+            .crosvm_read
+            .try_clone()
+            .map_err(Error::CloneVcpuPipe)?;
+        let vcpu_pipe_write = self.vcpu_pipes[cpu_id as usize]
+            .crosvm_write
             .try_clone()
-            .map_err(Error::CloneVcpuSocket)?;
+            .map_err(Error::CloneVcpuPipe)?;
         Ok(PluginVcpu::new(
             self.shared_vcpu_state.clone(),
             self.per_vcpu_states[cpu_id as usize].clone(),
-            vcpu_socket,
+            vcpu_pipe_read,
+            vcpu_pipe_write,
         ))
     }
 
@@ -255,10 +259,21 @@ impl Process {
     /// Any subsequent attempt to use the VCPU connections will fail.
     pub fn signal_kill(&mut self) -> SysResult<()> {
         self.kill_evt.write(1)?;
-        // By shutting down our half of the VCPU sockets, any blocked calls in the VCPU threads will
-        // unblock, allowing them to exit cleanly.
-        for sock in &self.vcpu_sockets {
-            sock.0.shutdown(Shutdown::Both)?;
+        // Normally we'd get any blocked recv() calls in the VCPU threads
+        // to unblock by calling shutdown().  However, we're using pipes
+        // (for improved performance), and pipes don't have shutdown so
+        // instead we'll write a shutdown message to ourselves using the
+        // the writable side of the pipe (normally used by the plugin).
+        for pipe in self.vcpu_pipes.iter_mut() {
+            let mut shutdown_request = VcpuRequest::new();
+            shutdown_request.set_shutdown(VcpuRequest_Shutdown::new());
+            let mut buffer = Vec::new();
+            shutdown_request
+                .write_to_vec(&mut buffer)
+                .map_err(proto_to_sys_err)?;
+            pipe.plugin_write
+                .write(&buffer[..])
+                .map_err(io_to_sys_err)?;
         }
         Ok(())
     }
@@ -590,7 +605,10 @@ impl Process {
             Ok(())
         } else if request.has_get_vcpus() {
             response.mut_get_vcpus();
-            response_fds.extend(self.vcpu_sockets.iter().map(|s| s.1.as_raw_fd()));
+            for pipe in self.vcpu_pipes.iter() {
+                response_fds.push(pipe.plugin_write.as_raw_fd());
+                response_fds.push(pipe.plugin_read.as_raw_fd());
+            }
             Ok(())
         } else if request.has_start() {
             response.mut_start();
diff --git a/src/plugin/vcpu.rs b/src/plugin/vcpu.rs
index 17cd875..c9d811a 100644
--- a/src/plugin/vcpu.rs
+++ b/src/plugin/vcpu.rs
@@ -7,8 +7,8 @@ use std::cell::{Cell, RefCell};
 use std::cmp::min;
 use std::cmp::{self, Ord, PartialEq, PartialOrd};
 use std::collections::btree_set::BTreeSet;
+use std::io::{Read, Write};
 use std::mem;
-use std::os::unix::net::UnixDatagram;
 use std::sync::{Arc, RwLock};
 
 use libc::{EINVAL, ENOENT, ENOTTY, EPERM, EPIPE, EPROTO};
@@ -274,7 +274,8 @@ impl<'a> VcpuRunData<'a> {
 pub struct PluginVcpu {
     shared_vcpu_state: Arc<RwLock<SharedVcpuState>>,
     per_vcpu_state: Arc<Mutex<PerVcpuState>>,
-    connection: UnixDatagram,
+    read_pipe: File,
+    write_pipe: File,
     wait_reason: Cell<Option<VcpuResponse_Wait>>,
     request_buffer: RefCell<Vec<u8>>,
     response_buffer: RefCell<Vec<u8>>,
@@ -285,12 +286,14 @@ impl PluginVcpu {
     pub fn new(
         shared_vcpu_state: Arc<RwLock<SharedVcpuState>>,
         per_vcpu_state: Arc<Mutex<PerVcpuState>>,
-        connection: UnixDatagram,
+        read_pipe: File,
+        write_pipe: File,
     ) -> PluginVcpu {
         PluginVcpu {
             shared_vcpu_state,
             per_vcpu_state,
-            connection,
+            read_pipe,
+            write_pipe,
             wait_reason: Default::default(),
             request_buffer: Default::default(),
             response_buffer: Default::default(),
@@ -419,10 +422,8 @@ impl PluginVcpu {
             let mut request_buffer = self.request_buffer.borrow_mut();
             request_buffer.resize(MAX_VCPU_DATAGRAM_SIZE, 0);
 
-            let msg_size = self
-                .connection
-                .recv(&mut request_buffer)
-                .map_err(io_to_sys_err)?;
+            let mut read_pipe = &self.read_pipe;
+            let msg_size = read_pipe.read(&mut request_buffer).map_err(io_to_sys_err)?;
 
             let mut request =
                 protobuf::parse_from_bytes::<VcpuRequest>(&request_buffer[..msg_size])
@@ -526,6 +527,8 @@ impl PluginVcpu {
                     cpuid_entry.edx = request_entry.edx;
                 }
                 vcpu.set_cpuid2(&cpuid)
+            } else if request.has_shutdown() {
+                return Err(SysError::new(EPIPE));
             } else {
                 Err(SysError::new(ENOTTY))
             };
@@ -543,8 +546,9 @@ impl PluginVcpu {
             response
                 .write_to_vec(&mut response_buffer)
                 .map_err(proto_to_sys_err)?;
-            self.connection
-                .send(&response_buffer[..])
+            let mut write_pipe = &self.write_pipe;
+            write_pipe
+                .write(&response_buffer[..])
                 .map_err(io_to_sys_err)?;
         }