diff options
-rw-r--r-- | devices/src/virtio/block.rs | 5 | ||||
-rw-r--r-- | src/linux.rs | 201 | ||||
-rw-r--r-- | vm_control/src/lib.rs | 57 |
3 files changed, 213 insertions, 50 deletions
diff --git a/devices/src/virtio/block.rs b/devices/src/virtio/block.rs index 80d5103..5a8972a 100644 --- a/devices/src/virtio/block.rs +++ b/devices/src/virtio/block.rs @@ -390,6 +390,11 @@ impl Worker { } }; + if let Err(e) = self.control_socket.send(&DiskControlResult::Ready) { + error!("control socket failed to notify readiness: {}", e); + return; + } + 'poll: loop { let events = match poll_ctx.wait() { Ok(v) => v, diff --git a/src/linux.rs b/src/linux.rs index ec2067c..ccd7d88 100644 --- a/src/linux.rs +++ b/src/linux.rs @@ -3,6 +3,7 @@ // found in the LICENSE file. use std::cmp::max; +use std::collections::{BTreeMap, VecDeque}; use std::convert::TryFrom; use std::error::Error as StdError; use std::ffi::CStr; @@ -32,12 +33,12 @@ use acpi_tables::sdt::SDT; use devices::virtio::EventDevice; use devices::virtio::{self, Console, VirtioDevice}; use devices::{ - self, Ac97Backend, Ac97Dev, HostBackendDeviceProvider, PciDevice, VfioContainer, VfioDevice, - VfioPciDevice, VirtioPciDevice, XhciController, + self, Ac97Backend, Ac97Dev, Bus, HostBackendDeviceProvider, PciDevice, VfioContainer, + VfioDevice, VfioPciDevice, VirtioPciDevice, XhciController, }; use io_jail::{self, Minijail}; use kvm::*; -use msg_socket::{MsgError, MsgReceiver, MsgSender, MsgSocket}; +use msg_socket::{MsgError, MsgReceiver, MsgResult, MsgSender, MsgSocket}; use net_util::{Error as NetError, MacAddress, Tap}; use remain::sorted; use resources::{Alloc, MmioType, SystemAllocator}; @@ -45,7 +46,7 @@ use sync::{Condvar, Mutex}; use sys_util::net::{UnixSeqpacket, UnixSeqpacketListener, UnlinkUnixSeqpacketListener}; use sys_util::{ - self, block_signal, clear_signal, drop_capabilities, error, flock, get_blocked_signals, + self, block_signal, clear_signal, debug, drop_capabilities, error, flock, get_blocked_signals, get_group_id, get_user_id, getegid, geteuid, info, register_rt_signal_handler, set_cpu_affinity, validate_raw_fd, warn, EventFd, FlockOperation, GuestAddress, GuestMemory, Killable, MemoryMappingArena, PollContext, PollToken, Protection, ScopedEvent, SignalFd, @@ -57,7 +58,7 @@ use vm_control::{ DiskControlResult, UsbControlSocket, VmControlResponseSocket, VmIrqRequest, VmIrqResponse, VmIrqResponseSocket, VmMemoryControlRequestSocket, VmMemoryControlResponseSocket, VmMemoryRequest, VmMemoryResponse, VmMsyncRequest, VmMsyncRequestSocket, VmMsyncResponse, - VmMsyncResponseSocket, VmRunMode, + VmMsyncResponseSocket, VmRequest, VmRunMode, }; use crate::{Config, DiskOption, Executable, SharedDir, SharedDirKind, TouchDeviceOption}; @@ -1667,6 +1668,45 @@ fn file_to_i64<P: AsRef<Path>>(path: P) -> io::Result<i64> { .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "empty file")) } +/// Returns a boolean indicating whether the VM should be exited. +fn do_vm_request( + request: VmRequest, + device_socket: Option<&UnixSeqpacket>, + control_socket: &VmControlResponseSocket, + run_mode_arc: &Arc<VcpuRunMode>, + vcpu_handles: &mut Vec<JoinHandle<()>>, + io_bus: &mut Bus, +) -> MsgResult<bool> { + let mut run_mode_opt = None; + let response = request.execute(&mut run_mode_opt, device_socket); + control_socket.send(&response)?; + if let Some(run_mode) = run_mode_opt { + info!("control socket changed run mode to {}", run_mode); + match run_mode { + VmRunMode::Exiting => Ok(true), + VmRunMode::Running => { + if let VmRunMode::Suspending = *run_mode_arc.mtx.lock() { + io_bus.notify_resume(); + } + run_mode_arc.set_and_notify(VmRunMode::Running); + for handle in vcpu_handles { + let _ = handle.kill(SIGRTMIN() + 0); + } + Ok(false) + } + other => { + run_mode_arc.set_and_notify(other); + for handle in vcpu_handles { + let _ = handle.kill(SIGRTMIN() + 0); + } + Ok(false) + } + } + } else { + Ok(false) + } +} + pub fn run_config(cfg: Config) -> Result<()> { if cfg.sandbox { // Printing something to the syslog before entering minijail so that libc's syslogger has a @@ -1818,7 +1858,7 @@ fn run_control( ) -> Result<()> { const LOWMEM_AVAILABLE: &str = "/sys/kernel/mm/chromeos-low_mem/available"; - #[derive(PollToken)] + #[derive(Debug, PollToken)] enum Token { Exit, Suspend, @@ -1828,6 +1868,7 @@ fn run_control( BalloonResult, VmControlServer, VmControl { index: usize }, + DeviceReady { sock_fd: RawFd }, } stdin() @@ -1912,6 +1953,38 @@ fn run_control( } vcpu_thread_barrier.wait(); + struct QueuedDeviceReq { + request: VmRequest, + control_sock_index: usize, + } + + /// A device can either be waiting, or ready. + /// + /// If it's waiting, we queue up events to send to it once it's ready. We can't just send the + /// requests straight away and let them sit on the socket until the device comes up, because we + /// want to syncronously wait for a response after sending. If the device hasn't been activated + /// when we do this, we'd sit waiting for a response forever and never activate the device. + enum DeviceStatus<'a> { + Waiting(&'a UnixSeqpacket, VecDeque<QueuedDeviceReq>), + Ready, + } + + let mut queued_device_reqs = BTreeMap::<RawFd, DeviceStatus>::new(); + + // Each socket will send a ready message when it's ready to receive requests. Add the sockets + // to the event loop, so that when they become ready, we can process their queues. After the + // initial queue is processed, the device becomes ready, and the socket is removed from the + // event loop. + for socket in disk_host_sockets.iter().map(AsRef::as_ref) { + let token = Token::DeviceReady { + sock_fd: socket.as_raw_fd(), + }; + poll_ctx.add(socket, token).map_err(Error::PollContextAdd)?; + + let status = DeviceStatus::Waiting(socket, VecDeque::new()); + queued_device_reqs.insert(socket.as_raw_fd(), status); + } + let mut ioapic_delayed = Vec::<usize>::default(); 'poll: loop { let events = { @@ -2110,40 +2183,44 @@ fn run_control( match socket { TaggedControlSocket::Vm(socket) => match socket.recv() { Ok(request) => { - let mut run_mode_opt = None; - let response = request.execute( - &mut run_mode_opt, + let device_socket = request.socket( &balloon_host_socket, - disk_host_sockets, + &disk_host_sockets, &usb_control_socket, ); - if let Err(e) = socket.send(&response) { - error!("failed to send VmResponse: {}", e); - } - if let Some(run_mode) = run_mode_opt { - info!("control socket changed run mode to {}", run_mode); - match run_mode { - VmRunMode::Exiting => { - break 'poll; - } - VmRunMode::Running => { - if let VmRunMode::Suspending = - *run_mode_arc.mtx.lock() - { - linux.io_bus.notify_resume(); - } - run_mode_arc.set_and_notify(VmRunMode::Running); - for handle in &vcpu_handles { - let _ = handle.kill(SIGRTMIN() + 0); - } - } - other => { - run_mode_arc.set_and_notify(other); - for handle in &vcpu_handles { - let _ = handle.kill(SIGRTMIN() + 0); + + match device_socket.map(|s| { + queued_device_reqs.get_mut(&s.as_raw_fd()).unwrap() + }) { + None | Some(DeviceStatus::Ready) => { + match do_vm_request( + request, + device_socket, + socket, + &run_mode_arc, + &mut vcpu_handles, + &mut linux.io_bus, + ) { + Ok(true) => break 'poll, + Ok(false) => {} + Err(e) => { + error!("failed to handle VmRequest: {}", e); + continue 'poll; } } } + + Some(DeviceStatus::Waiting(_, queue)) => { + debug!( + "Device {} not yet ready. Queueing request.", + device_socket.unwrap().as_raw_fd() + ); + + queue.push_back(QueuedDeviceReq { + request, + control_sock_index: index, + }); + } } } Err(e) => { @@ -2204,6 +2281,61 @@ fn run_control( } } } + Token::DeviceReady { sock_fd } => { + if let Some(DeviceStatus::Waiting(device_sock, mut queue)) = + queued_device_reqs.remove(&sock_fd) + { + debug!( + "Device {} is ready. Processing its queue of {} item(s).", + sock_fd, + queue.len() + ); + + // Deal with the message sent to indicate readiness. Its contents don't matter. + if let Err(e) = device_sock.recv(&mut []) { + error!( + "Failed to recv ready notification from device socket: {}", + e + ); + continue 'poll; + } + + while let Some(QueuedDeviceReq { + request, + control_sock_index, + }) = queue.pop_front() + { + let control_sock = match control_sockets.get(control_sock_index) { + Some(TaggedControlSocket::Vm(s)) => s, + _ => unreachable!(), + }; + + match do_vm_request( + request, + Some(device_sock), + control_sock, + &run_mode_arc, + &mut vcpu_handles, + &mut linux.io_bus, + ) { + Ok(true) => break 'poll, + Ok(false) => {} + Err(e) => { + error!("failed to handle VmRequest: {}", e); + continue 'poll; + } + } + } + + if let Err(e) = poll_ctx.delete(device_sock) { + error!("failed to delete poll token: {}", e); + } + } else { + error!("Received ready notification for a device that isn't waiting"); + } + + queued_device_reqs.insert(sock_fd, DeviceStatus::Ready); + } } } @@ -2229,6 +2361,7 @@ fn run_control( _ => {} } } + Token::DeviceReady { .. } => {} } } diff --git a/vm_control/src/lib.rs b/vm_control/src/lib.rs index c63fbfe..d4922ab 100644 --- a/vm_control/src/lib.rs +++ b/vm_control/src/lib.rs @@ -19,8 +19,9 @@ use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; use libc::{EINVAL, EIO, ENODEV}; use kvm::{IrqRoute, IrqSource, Vm}; -use msg_socket::{MsgError, MsgOnSocket, MsgReceiver, MsgResult, MsgSender, MsgSocket}; +use msg_socket::{MsgError, MsgOnSocket, MsgResult, MsgSocket, UnixSeqpacketExt}; use resources::{Alloc, GpuMemoryDesc, MmioType, SystemAllocator}; +use sys_util::net::UnixSeqpacket; use sys_util::{error, Error as SysError, EventFd, GuestAddress, MemoryMapping, MmapError, Result}; /// A data structure that either owns or borrows a file descriptor. @@ -189,6 +190,7 @@ impl Display for DiskControlCommand { #[derive(MsgOnSocket, Debug)] pub enum DiskControlResult { + Ready, Ok, Err(SysError), } @@ -563,17 +565,34 @@ fn register_memory( } impl VmRequest { + pub fn socket<'s>( + &self, + balloon_host_socket: &'s BalloonControlRequestSocket, + disk_host_sockets: &'s [DiskControlRequestSocket], + usb_control_socket: &'s UsbControlSocket, + ) -> Option<&'s UnixSeqpacket> { + use VmRequest::*; + match *self { + Exit => None, + Suspend => None, + Resume => None, + BalloonCommand(_) => Some(balloon_host_socket.as_ref()), + DiskCommand { disk_index, .. } => disk_host_sockets.get(disk_index).map(AsRef::as_ref), + UsbCommand(_) => Some(usb_control_socket.as_ref()), + } + } + /// Executes this request on the given Vm and other mutable state. /// /// This does not return a result, instead encapsulating the success or failure in a /// `VmResponse` with the intended purpose of sending the response back over the socket that /// received this `VmRequest`. + /// + /// The `socket` parameter must be the value that was obtained by calling `VmRequest::socket`. pub fn execute( &self, run_mode: &mut Option<VmRunMode>, - balloon_host_socket: &BalloonControlRequestSocket, - disk_host_sockets: &[DiskControlRequestSocket], - usb_control_socket: &UsbControlSocket, + socket: Option<&UnixSeqpacket>, ) -> VmResponse { match *self { VmRequest::Exit => { @@ -589,14 +608,18 @@ impl VmRequest { VmResponse::Ok } VmRequest::BalloonCommand(BalloonControlCommand::Adjust { num_bytes }) => { - match balloon_host_socket.send(&BalloonControlCommand::Adjust { num_bytes }) { + match socket + .unwrap() + .send_msg_on_socket(&BalloonControlCommand::Adjust { num_bytes }) + { Ok(_) => VmResponse::Ok, Err(_) => VmResponse::Err(SysError::last()), } } VmRequest::BalloonCommand(BalloonControlCommand::Stats) => { - match balloon_host_socket.send(&BalloonControlCommand::Stats {}) { - Ok(_) => match balloon_host_socket.recv() { + let socket = socket.unwrap(); + match socket.send_msg_on_socket(&BalloonControlCommand::Stats {}) { + Ok(_) => match socket.recv_msg_on_socket() { Ok(BalloonControlResult::Stats { stats, balloon_actual, @@ -612,19 +635,20 @@ impl VmRequest { Err(_) => VmResponse::Err(SysError::last()), } } - VmRequest::DiskCommand { - disk_index, - ref command, - } => { + VmRequest::DiskCommand { ref command, .. } => { // Forward the request to the block device process via its control socket. - if let Some(sock) = disk_host_sockets.get(disk_index) { - if let Err(e) = sock.send(command) { + if let Some(sock) = socket { + if let Err(e) = sock.send_msg_on_socket(command) { error!("disk socket send failed: {}", e); VmResponse::Err(SysError::new(EINVAL)) } else { - match sock.recv() { + match sock.recv_msg_on_socket() { Ok(DiskControlResult::Ok) => VmResponse::Ok, Ok(DiskControlResult::Err(e)) => VmResponse::Err(e), + Ok(resp) => { + error!("unexpected disk socket result: {:?}", resp); + VmResponse::Err(SysError::new(EINVAL)) + } Err(e) => { error!("disk socket recv failed: {}", e); VmResponse::Err(SysError::new(EINVAL)) @@ -636,12 +660,13 @@ impl VmRequest { } } VmRequest::UsbCommand(ref cmd) => { - let res = usb_control_socket.send(cmd); + let socket = socket.unwrap(); + let res = socket.send_msg_on_socket(cmd); if let Err(e) = res { error!("fail to send command to usb control socket: {}", e); return VmResponse::Err(SysError::new(EIO)); } - match usb_control_socket.recv() { + match socket.recv_msg_on_socket() { Ok(response) => VmResponse::UsbResponse(response), Err(e) => { error!("fail to recv command from usb control socket: {}", e); |