diff options
-rw-r--r-- | crosvm_plugin/src/lib.rs | 32 | ||||
-rw-r--r-- | protos/src/plugin.proto | 4 | ||||
-rw-r--r-- | src/plugin/mod.rs | 62 | ||||
-rw-r--r-- | src/plugin/process.rs | 50 | ||||
-rw-r--r-- | src/plugin/vcpu.rs | 24 |
5 files changed, 127 insertions, 45 deletions
diff --git a/crosvm_plugin/src/lib.rs b/crosvm_plugin/src/lib.rs index d1bb665..a6fd4df 100644 --- a/crosvm_plugin/src/lib.rs +++ b/crosvm_plugin/src/lib.rs @@ -17,6 +17,7 @@ use std::env; use std::fs::File; +use std::io::{Read, Write}; use std::mem::{size_of, swap}; use std::os::raw::{c_int, c_void}; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; @@ -358,14 +359,17 @@ impl crosvm { fn load_all_vcpus(&mut self) -> result::Result<(), c_int> { let mut r = MainRequest::new(); r.mut_get_vcpus(); - let (_, files) = self.main_transaction(&r, &[])?; - if files.is_empty() { + let (_, mut files) = self.main_transaction(&r, &[])?; + if files.is_empty() || files.len() % 2 != 0 { return Err(EPROTO); } - let vcpus = files - .into_iter() - .map(|f| crosvm_vcpu::new(fd_cast(f))) - .collect(); + + let mut vcpus = Vec::with_capacity(files.len() / 2); + while files.len() > 1 { + let write_pipe = files.remove(0); + let read_pipe = files.remove(0); + vcpus.push(crosvm_vcpu::new(fd_cast(read_pipe), fd_cast(write_pipe))); + } // Only called once by the `from_connection` constructor, which makes a new unique // `self.vcpus`. let self_vcpus = Arc::get_mut(&mut self.vcpus).unwrap(); @@ -919,7 +923,8 @@ pub struct crosvm_vcpu_event { } pub struct crosvm_vcpu { - socket: UnixDatagram, + read_pipe: File, + write_pipe: File, send_init: bool, request_buffer: Vec<u8>, response_buffer: Vec<u8>, @@ -927,9 +932,10 @@ pub struct crosvm_vcpu { } impl crosvm_vcpu { - fn new(socket: UnixDatagram) -> crosvm_vcpu { + fn new(read_pipe: File, write_pipe: File) -> crosvm_vcpu { crosvm_vcpu { - socket, + read_pipe, + write_pipe, send_init: true, request_buffer: Vec::new(), response_buffer: vec![0; MAX_DATAGRAM_SIZE], @@ -941,16 +947,16 @@ impl crosvm_vcpu { request .write_to_vec(&mut self.request_buffer) .map_err(proto_error_to_int)?; - self.socket - .send(self.request_buffer.as_slice()) + self.write_pipe + .write(self.request_buffer.as_slice()) .map_err(|e| -e.raw_os_error().unwrap_or(EINVAL))?; Ok(()) } fn vcpu_recv(&mut self) -> result::Result<VcpuResponse, c_int> { let msg_size = self - .socket - .recv(&mut self.response_buffer) + .read_pipe + .read(&mut self.response_buffer) .map_err(|e| -e.raw_os_error().unwrap_or(EINVAL))?; let response: VcpuResponse = diff --git a/protos/src/plugin.proto b/protos/src/plugin.proto index 5b6eec2..de76089 100644 --- a/protos/src/plugin.proto +++ b/protos/src/plugin.proto @@ -323,6 +323,9 @@ message VcpuRequest { repeated CpuidEntry entries = 1; } + message Shutdown { + } + // The type of the message is determined by which of these oneof fields is present in the // protobuf. oneof message { @@ -333,6 +336,7 @@ message VcpuRequest { GetMsrs get_msrs = 5; SetMsrs set_msrs = 6; SetCpuid set_cpuid = 7; + Shutdown shutdown = 8; } } diff --git a/src/plugin/mod.rs b/src/plugin/mod.rs index 128b434..82c686f 100644 --- a/src/plugin/mod.rs +++ b/src/plugin/mod.rs @@ -8,7 +8,7 @@ mod vcpu; use std::fmt::{self, Display}; use std::fs::File; use std::io; -use std::os::unix::io::{FromRawFd, IntoRawFd}; +use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}; use std::os::unix::net::UnixDatagram; use std::path::Path; use std::result; @@ -18,8 +18,9 @@ use std::thread; use std::time::{Duration, Instant}; use libc::{ - c_ulong, ioctl, socketpair, AF_UNIX, EAGAIN, EBADF, EDEADLK, EEXIST, EINTR, EINVAL, ENOENT, - EOVERFLOW, EPERM, FIOCLEX, MS_NODEV, MS_NOEXEC, MS_NOSUID, SIGCHLD, SOCK_SEQPACKET, + c_int, c_ulong, fcntl, ioctl, socketpair, AF_UNIX, EAGAIN, EBADF, EDEADLK, EEXIST, EINTR, + EINVAL, ENOENT, EOVERFLOW, EPERM, FIOCLEX, F_SETPIPE_SZ, MS_NODEV, MS_NOEXEC, MS_NOSUID, + SIGCHLD, SOCK_SEQPACKET, }; use protobuf::ProtobufError; @@ -29,7 +30,7 @@ use io_jail::{self, Minijail}; use kvm::{Datamatch, IoeventAddress, Kvm, Vcpu, VcpuExit, Vm}; use net_util::{Error as TapError, Tap, TapT}; use sys_util::{ - block_signal, clear_signal, drop_capabilities, error, getegid, geteuid, info, + block_signal, clear_signal, drop_capabilities, error, getegid, geteuid, info, pipe, register_signal_handler, validate_raw_fd, warn, Error as SysError, EventFd, GuestMemory, Killable, MmapError, PollContext, PollToken, Result as SysResult, SignalFd, SignalFdError, SIGRTMIN, @@ -46,7 +47,7 @@ const MAX_VCPU_DATAGRAM_SIZE: usize = 0x40000; #[sorted] pub enum Error { CloneEventFd(SysError), - CloneVcpuSocket(io::Error), + CloneVcpuPipe(io::Error), CreateEventFd(SysError), CreateIrqChip(SysError), CreateJail(io_jail::Error), @@ -114,7 +115,7 @@ impl Display for Error { #[sorted] match self { CloneEventFd(e) => write!(f, "failed to clone eventfd: {}", e), - CloneVcpuSocket(e) => write!(f, "failed to clone vcpu socket: {}", e), + CloneVcpuPipe(e) => write!(f, "failed to clone vcpu pipe: {}", e), CreateEventFd(e) => write!(f, "failed to create eventfd: {}", e), CreateIrqChip(e) => write!(f, "failed to create kvm irqchip: {}", e), CreateJail(e) => write!(f, "failed to create jail: {}", e), @@ -197,6 +198,55 @@ fn new_seqpacket_pair() -> SysResult<(UnixDatagram, UnixDatagram)> { } } +struct VcpuPipe { + crosvm_read: File, + plugin_write: File, + plugin_read: File, + crosvm_write: File, +} + +fn new_pipe_pair() -> SysResult<VcpuPipe> { + let to_crosvm = pipe(true)?; + let to_plugin = pipe(true)?; + // Increasing the pipe size can be a nice-to-have to make sure that + // messages get across atomically (and made sure that writes don't block), + // though it's not necessary a hard requirement for things to work. + let flags = unsafe { + fcntl( + to_crosvm.0.as_raw_fd(), + F_SETPIPE_SZ, + MAX_VCPU_DATAGRAM_SIZE as c_int, + ) + }; + if flags < 0 || flags != MAX_VCPU_DATAGRAM_SIZE as i32 { + warn!( + "Failed to adjust size of crosvm pipe (result {}): {}", + flags, + SysError::last() + ); + } + let flags = unsafe { + fcntl( + to_plugin.0.as_raw_fd(), + F_SETPIPE_SZ, + MAX_VCPU_DATAGRAM_SIZE as c_int, + ) + }; + if flags < 0 || flags != MAX_VCPU_DATAGRAM_SIZE as i32 { + warn!( + "Failed to adjust size of plugin pipe (result {}): {}", + flags, + SysError::last() + ); + } + Ok(VcpuPipe { + crosvm_read: to_crosvm.0, + plugin_write: to_crosvm.1, + plugin_read: to_plugin.0, + crosvm_write: to_plugin.1, + }) +} + fn proto_to_sys_err(e: ProtobufError) -> SysError { match e { ProtobufError::IoError(e) => SysError::new(e.raw_os_error().unwrap_or(EINVAL)), diff --git a/src/plugin/process.rs b/src/plugin/process.rs index 3e9655c..50b4465 100644 --- a/src/plugin/process.rs +++ b/src/plugin/process.rs @@ -5,8 +5,8 @@ use std::collections::hash_map::{Entry, HashMap, VacantEntry}; use std::env::set_var; use std::fs::File; +use std::io::Write; use std::mem::transmute; -use std::net::Shutdown; use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::net::UnixDatagram; use std::path::Path; @@ -109,7 +109,7 @@ pub enum ProcessStatus { /// Creates, owns, and handles messages from a plugin process. /// -/// A plugin process has control over a single VM and a fixed number of VCPUs via a set of unix +/// A plugin process has control over a single VM and a fixed number of VCPUs via a set of pipes & unix /// domain socket connections and a protocol defined in `protos::plugin`. The plugin process is run /// in an unprivileged manner as a child process spawned via a path to a arbitrary executable. pub struct Process { @@ -122,7 +122,7 @@ pub struct Process { // Resource to sent to plugin kill_evt: EventFd, - vcpu_sockets: Vec<(UnixDatagram, UnixDatagram)>, + vcpu_pipes: Vec<VcpuPipe>, // Socket Transmission request_buffer: Vec<u8>, @@ -147,10 +147,9 @@ impl Process { let (request_socket, child_socket) = new_seqpacket_pair().map_err(Error::CreateMainSocket)?; - let mut vcpu_sockets: Vec<(UnixDatagram, UnixDatagram)> = - Vec::with_capacity(cpu_count as usize); + let mut vcpu_pipes: Vec<VcpuPipe> = Vec::with_capacity(cpu_count as usize); for _ in 0..cpu_count { - vcpu_sockets.push(new_seqpacket_pair().map_err(Error::CreateVcpuSocket)?); + vcpu_pipes.push(new_pipe_pair().map_err(Error::CreateVcpuSocket)?); } let mut per_vcpu_states: Vec<Arc<Mutex<PerVcpuState>>> = Vec::with_capacity(cpu_count as usize); @@ -184,7 +183,7 @@ impl Process { shared_vcpu_state: Default::default(), per_vcpu_states, kill_evt: EventFd::new().map_err(Error::CreateEventFd)?, - vcpu_sockets, + vcpu_pipes, request_buffer: vec![0; MAX_DATAGRAM_SIZE], response_buffer: Vec::new(), }) @@ -197,14 +196,19 @@ impl Process { /// `PluginVcpu` object, the underlying resources are shared by each `PluginVcpu` resulting from /// the same `cpu_id`. pub fn create_vcpu(&self, cpu_id: u32) -> Result<PluginVcpu> { - let vcpu_socket = self.vcpu_sockets[cpu_id as usize] - .0 + let vcpu_pipe_read = self.vcpu_pipes[cpu_id as usize] + .crosvm_read + .try_clone() + .map_err(Error::CloneVcpuPipe)?; + let vcpu_pipe_write = self.vcpu_pipes[cpu_id as usize] + .crosvm_write .try_clone() - .map_err(Error::CloneVcpuSocket)?; + .map_err(Error::CloneVcpuPipe)?; Ok(PluginVcpu::new( self.shared_vcpu_state.clone(), self.per_vcpu_states[cpu_id as usize].clone(), - vcpu_socket, + vcpu_pipe_read, + vcpu_pipe_write, )) } @@ -255,10 +259,21 @@ impl Process { /// Any subsequent attempt to use the VCPU connections will fail. pub fn signal_kill(&mut self) -> SysResult<()> { self.kill_evt.write(1)?; - // By shutting down our half of the VCPU sockets, any blocked calls in the VCPU threads will - // unblock, allowing them to exit cleanly. - for sock in &self.vcpu_sockets { - sock.0.shutdown(Shutdown::Both)?; + // Normally we'd get any blocked recv() calls in the VCPU threads + // to unblock by calling shutdown(). However, we're using pipes + // (for improved performance), and pipes don't have shutdown so + // instead we'll write a shutdown message to ourselves using the + // the writable side of the pipe (normally used by the plugin). + for pipe in self.vcpu_pipes.iter_mut() { + let mut shutdown_request = VcpuRequest::new(); + shutdown_request.set_shutdown(VcpuRequest_Shutdown::new()); + let mut buffer = Vec::new(); + shutdown_request + .write_to_vec(&mut buffer) + .map_err(proto_to_sys_err)?; + pipe.plugin_write + .write(&buffer[..]) + .map_err(io_to_sys_err)?; } Ok(()) } @@ -590,7 +605,10 @@ impl Process { Ok(()) } else if request.has_get_vcpus() { response.mut_get_vcpus(); - response_fds.extend(self.vcpu_sockets.iter().map(|s| s.1.as_raw_fd())); + for pipe in self.vcpu_pipes.iter() { + response_fds.push(pipe.plugin_write.as_raw_fd()); + response_fds.push(pipe.plugin_read.as_raw_fd()); + } Ok(()) } else if request.has_start() { response.mut_start(); diff --git a/src/plugin/vcpu.rs b/src/plugin/vcpu.rs index 17cd875..c9d811a 100644 --- a/src/plugin/vcpu.rs +++ b/src/plugin/vcpu.rs @@ -7,8 +7,8 @@ use std::cell::{Cell, RefCell}; use std::cmp::min; use std::cmp::{self, Ord, PartialEq, PartialOrd}; use std::collections::btree_set::BTreeSet; +use std::io::{Read, Write}; use std::mem; -use std::os::unix::net::UnixDatagram; use std::sync::{Arc, RwLock}; use libc::{EINVAL, ENOENT, ENOTTY, EPERM, EPIPE, EPROTO}; @@ -274,7 +274,8 @@ impl<'a> VcpuRunData<'a> { pub struct PluginVcpu { shared_vcpu_state: Arc<RwLock<SharedVcpuState>>, per_vcpu_state: Arc<Mutex<PerVcpuState>>, - connection: UnixDatagram, + read_pipe: File, + write_pipe: File, wait_reason: Cell<Option<VcpuResponse_Wait>>, request_buffer: RefCell<Vec<u8>>, response_buffer: RefCell<Vec<u8>>, @@ -285,12 +286,14 @@ impl PluginVcpu { pub fn new( shared_vcpu_state: Arc<RwLock<SharedVcpuState>>, per_vcpu_state: Arc<Mutex<PerVcpuState>>, - connection: UnixDatagram, + read_pipe: File, + write_pipe: File, ) -> PluginVcpu { PluginVcpu { shared_vcpu_state, per_vcpu_state, - connection, + read_pipe, + write_pipe, wait_reason: Default::default(), request_buffer: Default::default(), response_buffer: Default::default(), @@ -419,10 +422,8 @@ impl PluginVcpu { let mut request_buffer = self.request_buffer.borrow_mut(); request_buffer.resize(MAX_VCPU_DATAGRAM_SIZE, 0); - let msg_size = self - .connection - .recv(&mut request_buffer) - .map_err(io_to_sys_err)?; + let mut read_pipe = &self.read_pipe; + let msg_size = read_pipe.read(&mut request_buffer).map_err(io_to_sys_err)?; let mut request = protobuf::parse_from_bytes::<VcpuRequest>(&request_buffer[..msg_size]) @@ -526,6 +527,8 @@ impl PluginVcpu { cpuid_entry.edx = request_entry.edx; } vcpu.set_cpuid2(&cpuid) + } else if request.has_shutdown() { + return Err(SysError::new(EPIPE)); } else { Err(SysError::new(ENOTTY)) }; @@ -543,8 +546,9 @@ impl PluginVcpu { response .write_to_vec(&mut response_buffer) .map_err(proto_to_sys_err)?; - self.connection - .send(&response_buffer[..]) + let mut write_pipe = &self.write_pipe; + write_pipe + .write(&response_buffer[..]) .map_err(io_to_sys_err)?; } |