summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--crosvm_plugin/src/lib.rs32
-rw-r--r--protos/src/plugin.proto4
-rw-r--r--src/plugin/mod.rs62
-rw-r--r--src/plugin/process.rs50
-rw-r--r--src/plugin/vcpu.rs24
5 files changed, 127 insertions, 45 deletions
diff --git a/crosvm_plugin/src/lib.rs b/crosvm_plugin/src/lib.rs
index d1bb665..a6fd4df 100644
--- a/crosvm_plugin/src/lib.rs
+++ b/crosvm_plugin/src/lib.rs
@@ -17,6 +17,7 @@
 
 use std::env;
 use std::fs::File;
+use std::io::{Read, Write};
 use std::mem::{size_of, swap};
 use std::os::raw::{c_int, c_void};
 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
@@ -358,14 +359,17 @@ impl crosvm {
     fn load_all_vcpus(&mut self) -> result::Result<(), c_int> {
         let mut r = MainRequest::new();
         r.mut_get_vcpus();
-        let (_, files) = self.main_transaction(&r, &[])?;
-        if files.is_empty() {
+        let (_, mut files) = self.main_transaction(&r, &[])?;
+        if files.is_empty() || files.len() % 2 != 0 {
             return Err(EPROTO);
         }
-        let vcpus = files
-            .into_iter()
-            .map(|f| crosvm_vcpu::new(fd_cast(f)))
-            .collect();
+
+        let mut vcpus = Vec::with_capacity(files.len() / 2);
+        while files.len() > 1 {
+            let write_pipe = files.remove(0);
+            let read_pipe = files.remove(0);
+            vcpus.push(crosvm_vcpu::new(fd_cast(read_pipe), fd_cast(write_pipe)));
+        }
         // Only called once by the `from_connection` constructor, which makes a new unique
         // `self.vcpus`.
         let self_vcpus = Arc::get_mut(&mut self.vcpus).unwrap();
@@ -919,7 +923,8 @@ pub struct crosvm_vcpu_event {
 }
 
 pub struct crosvm_vcpu {
-    socket: UnixDatagram,
+    read_pipe: File,
+    write_pipe: File,
     send_init: bool,
     request_buffer: Vec<u8>,
     response_buffer: Vec<u8>,
@@ -927,9 +932,10 @@ pub struct crosvm_vcpu {
 }
 
 impl crosvm_vcpu {
-    fn new(socket: UnixDatagram) -> crosvm_vcpu {
+    fn new(read_pipe: File, write_pipe: File) -> crosvm_vcpu {
         crosvm_vcpu {
-            socket,
+            read_pipe,
+            write_pipe,
             send_init: true,
             request_buffer: Vec::new(),
             response_buffer: vec![0; MAX_DATAGRAM_SIZE],
@@ -941,16 +947,16 @@ impl crosvm_vcpu {
         request
             .write_to_vec(&mut self.request_buffer)
             .map_err(proto_error_to_int)?;
-        self.socket
-            .send(self.request_buffer.as_slice())
+        self.write_pipe
+            .write(self.request_buffer.as_slice())
             .map_err(|e| -e.raw_os_error().unwrap_or(EINVAL))?;
         Ok(())
     }
 
     fn vcpu_recv(&mut self) -> result::Result<VcpuResponse, c_int> {
         let msg_size = self
-            .socket
-            .recv(&mut self.response_buffer)
+            .read_pipe
+            .read(&mut self.response_buffer)
             .map_err(|e| -e.raw_os_error().unwrap_or(EINVAL))?;
 
         let response: VcpuResponse =
diff --git a/protos/src/plugin.proto b/protos/src/plugin.proto
index 5b6eec2..de76089 100644
--- a/protos/src/plugin.proto
+++ b/protos/src/plugin.proto
@@ -323,6 +323,9 @@ message VcpuRequest {
         repeated CpuidEntry entries = 1;
     }
 
+    message Shutdown {
+    }
+
     // The type of the message is determined by which of these oneof fields is present in the
     // protobuf.
     oneof message {
@@ -333,6 +336,7 @@ message VcpuRequest {
         GetMsrs get_msrs = 5;
         SetMsrs set_msrs = 6;
         SetCpuid set_cpuid = 7;
+        Shutdown shutdown = 8;
     }
 }
 
diff --git a/src/plugin/mod.rs b/src/plugin/mod.rs
index 128b434..82c686f 100644
--- a/src/plugin/mod.rs
+++ b/src/plugin/mod.rs
@@ -8,7 +8,7 @@ mod vcpu;
 use std::fmt::{self, Display};
 use std::fs::File;
 use std::io;
-use std::os::unix::io::{FromRawFd, IntoRawFd};
+use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
 use std::os::unix::net::UnixDatagram;
 use std::path::Path;
 use std::result;
@@ -18,8 +18,9 @@ use std::thread;
 use std::time::{Duration, Instant};
 
 use libc::{
-    c_ulong, ioctl, socketpair, AF_UNIX, EAGAIN, EBADF, EDEADLK, EEXIST, EINTR, EINVAL, ENOENT,
-    EOVERFLOW, EPERM, FIOCLEX, MS_NODEV, MS_NOEXEC, MS_NOSUID, SIGCHLD, SOCK_SEQPACKET,
+    c_int, c_ulong, fcntl, ioctl, socketpair, AF_UNIX, EAGAIN, EBADF, EDEADLK, EEXIST, EINTR,
+    EINVAL, ENOENT, EOVERFLOW, EPERM, FIOCLEX, F_SETPIPE_SZ, MS_NODEV, MS_NOEXEC, MS_NOSUID,
+    SIGCHLD, SOCK_SEQPACKET,
 };
 
 use protobuf::ProtobufError;
@@ -29,7 +30,7 @@ use io_jail::{self, Minijail};
 use kvm::{Datamatch, IoeventAddress, Kvm, Vcpu, VcpuExit, Vm};
 use net_util::{Error as TapError, Tap, TapT};
 use sys_util::{
-    block_signal, clear_signal, drop_capabilities, error, getegid, geteuid, info,
+    block_signal, clear_signal, drop_capabilities, error, getegid, geteuid, info, pipe,
     register_signal_handler, validate_raw_fd, warn, Error as SysError, EventFd, GuestMemory,
     Killable, MmapError, PollContext, PollToken, Result as SysResult, SignalFd, SignalFdError,
     SIGRTMIN,
@@ -46,7 +47,7 @@ const MAX_VCPU_DATAGRAM_SIZE: usize = 0x40000;
 #[sorted]
 pub enum Error {
     CloneEventFd(SysError),
-    CloneVcpuSocket(io::Error),
+    CloneVcpuPipe(io::Error),
     CreateEventFd(SysError),
     CreateIrqChip(SysError),
     CreateJail(io_jail::Error),
@@ -114,7 +115,7 @@ impl Display for Error {
         #[sorted]
         match self {
             CloneEventFd(e) => write!(f, "failed to clone eventfd: {}", e),
-            CloneVcpuSocket(e) => write!(f, "failed to clone vcpu socket: {}", e),
+            CloneVcpuPipe(e) => write!(f, "failed to clone vcpu pipe: {}", e),
             CreateEventFd(e) => write!(f, "failed to create eventfd: {}", e),
             CreateIrqChip(e) => write!(f, "failed to create kvm irqchip: {}", e),
             CreateJail(e) => write!(f, "failed to create jail: {}", e),
@@ -197,6 +198,55 @@ fn new_seqpacket_pair() -> SysResult<(UnixDatagram, UnixDatagram)> {
     }
 }
 
+struct VcpuPipe {
+    crosvm_read: File,
+    plugin_write: File,
+    plugin_read: File,
+    crosvm_write: File,
+}
+
+fn new_pipe_pair() -> SysResult<VcpuPipe> {
+    let to_crosvm = pipe(true)?;
+    let to_plugin = pipe(true)?;
+    // Increasing the pipe size can be a nice-to-have to make sure that
+    // messages get across atomically (and made sure that writes don't block),
+    // though it's not necessary a hard requirement for things to work.
+    let flags = unsafe {
+        fcntl(
+            to_crosvm.0.as_raw_fd(),
+            F_SETPIPE_SZ,
+            MAX_VCPU_DATAGRAM_SIZE as c_int,
+        )
+    };
+    if flags < 0 || flags != MAX_VCPU_DATAGRAM_SIZE as i32 {
+        warn!(
+            "Failed to adjust size of crosvm pipe (result {}): {}",
+            flags,
+            SysError::last()
+        );
+    }
+    let flags = unsafe {
+        fcntl(
+            to_plugin.0.as_raw_fd(),
+            F_SETPIPE_SZ,
+            MAX_VCPU_DATAGRAM_SIZE as c_int,
+        )
+    };
+    if flags < 0 || flags != MAX_VCPU_DATAGRAM_SIZE as i32 {
+        warn!(
+            "Failed to adjust size of plugin pipe (result {}): {}",
+            flags,
+            SysError::last()
+        );
+    }
+    Ok(VcpuPipe {
+        crosvm_read: to_crosvm.0,
+        plugin_write: to_crosvm.1,
+        plugin_read: to_plugin.0,
+        crosvm_write: to_plugin.1,
+    })
+}
+
 fn proto_to_sys_err(e: ProtobufError) -> SysError {
     match e {
         ProtobufError::IoError(e) => SysError::new(e.raw_os_error().unwrap_or(EINVAL)),
diff --git a/src/plugin/process.rs b/src/plugin/process.rs
index 3e9655c..50b4465 100644
--- a/src/plugin/process.rs
+++ b/src/plugin/process.rs
@@ -5,8 +5,8 @@
 use std::collections::hash_map::{Entry, HashMap, VacantEntry};
 use std::env::set_var;
 use std::fs::File;
+use std::io::Write;
 use std::mem::transmute;
-use std::net::Shutdown;
 use std::os::unix::io::{AsRawFd, RawFd};
 use std::os::unix::net::UnixDatagram;
 use std::path::Path;
@@ -109,7 +109,7 @@ pub enum ProcessStatus {
 
 /// Creates, owns, and handles messages from a plugin process.
 ///
-/// A plugin process has control over a single VM and a fixed number of VCPUs via a set of unix
+/// A plugin process has control over a single VM and a fixed number of VCPUs via a set of pipes & unix
 /// domain socket connections and a protocol defined in `protos::plugin`. The plugin process is run
 /// in an unprivileged manner as a child process spawned via a path to a arbitrary executable.
 pub struct Process {
@@ -122,7 +122,7 @@ pub struct Process {
 
     // Resource to sent to plugin
     kill_evt: EventFd,
-    vcpu_sockets: Vec<(UnixDatagram, UnixDatagram)>,
+    vcpu_pipes: Vec<VcpuPipe>,
 
     // Socket Transmission
     request_buffer: Vec<u8>,
@@ -147,10 +147,9 @@ impl Process {
         let (request_socket, child_socket) =
             new_seqpacket_pair().map_err(Error::CreateMainSocket)?;
 
-        let mut vcpu_sockets: Vec<(UnixDatagram, UnixDatagram)> =
-            Vec::with_capacity(cpu_count as usize);
+        let mut vcpu_pipes: Vec<VcpuPipe> = Vec::with_capacity(cpu_count as usize);
         for _ in 0..cpu_count {
-            vcpu_sockets.push(new_seqpacket_pair().map_err(Error::CreateVcpuSocket)?);
+            vcpu_pipes.push(new_pipe_pair().map_err(Error::CreateVcpuSocket)?);
         }
         let mut per_vcpu_states: Vec<Arc<Mutex<PerVcpuState>>> =
             Vec::with_capacity(cpu_count as usize);
@@ -184,7 +183,7 @@ impl Process {
             shared_vcpu_state: Default::default(),
             per_vcpu_states,
             kill_evt: EventFd::new().map_err(Error::CreateEventFd)?,
-            vcpu_sockets,
+            vcpu_pipes,
             request_buffer: vec![0; MAX_DATAGRAM_SIZE],
             response_buffer: Vec::new(),
         })
@@ -197,14 +196,19 @@ impl Process {
     /// `PluginVcpu` object, the underlying resources are shared by each `PluginVcpu` resulting from
     /// the same `cpu_id`.
     pub fn create_vcpu(&self, cpu_id: u32) -> Result<PluginVcpu> {
-        let vcpu_socket = self.vcpu_sockets[cpu_id as usize]
-            .0
+        let vcpu_pipe_read = self.vcpu_pipes[cpu_id as usize]
+            .crosvm_read
+            .try_clone()
+            .map_err(Error::CloneVcpuPipe)?;
+        let vcpu_pipe_write = self.vcpu_pipes[cpu_id as usize]
+            .crosvm_write
             .try_clone()
-            .map_err(Error::CloneVcpuSocket)?;
+            .map_err(Error::CloneVcpuPipe)?;
         Ok(PluginVcpu::new(
             self.shared_vcpu_state.clone(),
             self.per_vcpu_states[cpu_id as usize].clone(),
-            vcpu_socket,
+            vcpu_pipe_read,
+            vcpu_pipe_write,
         ))
     }
 
@@ -255,10 +259,21 @@ impl Process {
     /// Any subsequent attempt to use the VCPU connections will fail.
     pub fn signal_kill(&mut self) -> SysResult<()> {
         self.kill_evt.write(1)?;
-        // By shutting down our half of the VCPU sockets, any blocked calls in the VCPU threads will
-        // unblock, allowing them to exit cleanly.
-        for sock in &self.vcpu_sockets {
-            sock.0.shutdown(Shutdown::Both)?;
+        // Normally we'd get any blocked recv() calls in the VCPU threads
+        // to unblock by calling shutdown().  However, we're using pipes
+        // (for improved performance), and pipes don't have shutdown so
+        // instead we'll write a shutdown message to ourselves using the
+        // the writable side of the pipe (normally used by the plugin).
+        for pipe in self.vcpu_pipes.iter_mut() {
+            let mut shutdown_request = VcpuRequest::new();
+            shutdown_request.set_shutdown(VcpuRequest_Shutdown::new());
+            let mut buffer = Vec::new();
+            shutdown_request
+                .write_to_vec(&mut buffer)
+                .map_err(proto_to_sys_err)?;
+            pipe.plugin_write
+                .write(&buffer[..])
+                .map_err(io_to_sys_err)?;
         }
         Ok(())
     }
@@ -590,7 +605,10 @@ impl Process {
             Ok(())
         } else if request.has_get_vcpus() {
             response.mut_get_vcpus();
-            response_fds.extend(self.vcpu_sockets.iter().map(|s| s.1.as_raw_fd()));
+            for pipe in self.vcpu_pipes.iter() {
+                response_fds.push(pipe.plugin_write.as_raw_fd());
+                response_fds.push(pipe.plugin_read.as_raw_fd());
+            }
             Ok(())
         } else if request.has_start() {
             response.mut_start();
diff --git a/src/plugin/vcpu.rs b/src/plugin/vcpu.rs
index 17cd875..c9d811a 100644
--- a/src/plugin/vcpu.rs
+++ b/src/plugin/vcpu.rs
@@ -7,8 +7,8 @@ use std::cell::{Cell, RefCell};
 use std::cmp::min;
 use std::cmp::{self, Ord, PartialEq, PartialOrd};
 use std::collections::btree_set::BTreeSet;
+use std::io::{Read, Write};
 use std::mem;
-use std::os::unix::net::UnixDatagram;
 use std::sync::{Arc, RwLock};
 
 use libc::{EINVAL, ENOENT, ENOTTY, EPERM, EPIPE, EPROTO};
@@ -274,7 +274,8 @@ impl<'a> VcpuRunData<'a> {
 pub struct PluginVcpu {
     shared_vcpu_state: Arc<RwLock<SharedVcpuState>>,
     per_vcpu_state: Arc<Mutex<PerVcpuState>>,
-    connection: UnixDatagram,
+    read_pipe: File,
+    write_pipe: File,
     wait_reason: Cell<Option<VcpuResponse_Wait>>,
     request_buffer: RefCell<Vec<u8>>,
     response_buffer: RefCell<Vec<u8>>,
@@ -285,12 +286,14 @@ impl PluginVcpu {
     pub fn new(
         shared_vcpu_state: Arc<RwLock<SharedVcpuState>>,
         per_vcpu_state: Arc<Mutex<PerVcpuState>>,
-        connection: UnixDatagram,
+        read_pipe: File,
+        write_pipe: File,
     ) -> PluginVcpu {
         PluginVcpu {
             shared_vcpu_state,
             per_vcpu_state,
-            connection,
+            read_pipe,
+            write_pipe,
             wait_reason: Default::default(),
             request_buffer: Default::default(),
             response_buffer: Default::default(),
@@ -419,10 +422,8 @@ impl PluginVcpu {
             let mut request_buffer = self.request_buffer.borrow_mut();
             request_buffer.resize(MAX_VCPU_DATAGRAM_SIZE, 0);
 
-            let msg_size = self
-                .connection
-                .recv(&mut request_buffer)
-                .map_err(io_to_sys_err)?;
+            let mut read_pipe = &self.read_pipe;
+            let msg_size = read_pipe.read(&mut request_buffer).map_err(io_to_sys_err)?;
 
             let mut request =
                 protobuf::parse_from_bytes::<VcpuRequest>(&request_buffer[..msg_size])
@@ -526,6 +527,8 @@ impl PluginVcpu {
                     cpuid_entry.edx = request_entry.edx;
                 }
                 vcpu.set_cpuid2(&cpuid)
+            } else if request.has_shutdown() {
+                return Err(SysError::new(EPIPE));
             } else {
                 Err(SysError::new(ENOTTY))
             };
@@ -543,8 +546,9 @@ impl PluginVcpu {
             response
                 .write_to_vec(&mut response_buffer)
                 .map_err(proto_to_sys_err)?;
-            self.connection
-                .send(&response_buffer[..])
+            let mut write_pipe = &self.write_pipe;
+            write_pipe
+                .write(&response_buffer[..])
                 .map_err(io_to_sys_err)?;
         }