summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--devices/src/virtio/block.rs5
-rw-r--r--src/linux.rs201
-rw-r--r--vm_control/src/lib.rs57
3 files changed, 213 insertions, 50 deletions
diff --git a/devices/src/virtio/block.rs b/devices/src/virtio/block.rs
index 80d5103..5a8972a 100644
--- a/devices/src/virtio/block.rs
+++ b/devices/src/virtio/block.rs
@@ -390,6 +390,11 @@ impl Worker {
             }
         };
 
+        if let Err(e) = self.control_socket.send(&DiskControlResult::Ready) {
+            error!("control socket failed to notify readiness: {}", e);
+            return;
+        }
+
         'poll: loop {
             let events = match poll_ctx.wait() {
                 Ok(v) => v,
diff --git a/src/linux.rs b/src/linux.rs
index ec2067c..ccd7d88 100644
--- a/src/linux.rs
+++ b/src/linux.rs
@@ -3,6 +3,7 @@
 // found in the LICENSE file.
 
 use std::cmp::max;
+use std::collections::{BTreeMap, VecDeque};
 use std::convert::TryFrom;
 use std::error::Error as StdError;
 use std::ffi::CStr;
@@ -32,12 +33,12 @@ use acpi_tables::sdt::SDT;
 use devices::virtio::EventDevice;
 use devices::virtio::{self, Console, VirtioDevice};
 use devices::{
-    self, Ac97Backend, Ac97Dev, HostBackendDeviceProvider, PciDevice, VfioContainer, VfioDevice,
-    VfioPciDevice, VirtioPciDevice, XhciController,
+    self, Ac97Backend, Ac97Dev, Bus, HostBackendDeviceProvider, PciDevice, VfioContainer,
+    VfioDevice, VfioPciDevice, VirtioPciDevice, XhciController,
 };
 use io_jail::{self, Minijail};
 use kvm::*;
-use msg_socket::{MsgError, MsgReceiver, MsgSender, MsgSocket};
+use msg_socket::{MsgError, MsgReceiver, MsgResult, MsgSender, MsgSocket};
 use net_util::{Error as NetError, MacAddress, Tap};
 use remain::sorted;
 use resources::{Alloc, MmioType, SystemAllocator};
@@ -45,7 +46,7 @@ use sync::{Condvar, Mutex};
 use sys_util::net::{UnixSeqpacket, UnixSeqpacketListener, UnlinkUnixSeqpacketListener};
 
 use sys_util::{
-    self, block_signal, clear_signal, drop_capabilities, error, flock, get_blocked_signals,
+    self, block_signal, clear_signal, debug, drop_capabilities, error, flock, get_blocked_signals,
     get_group_id, get_user_id, getegid, geteuid, info, register_rt_signal_handler,
     set_cpu_affinity, validate_raw_fd, warn, EventFd, FlockOperation, GuestAddress, GuestMemory,
     Killable, MemoryMappingArena, PollContext, PollToken, Protection, ScopedEvent, SignalFd,
@@ -57,7 +58,7 @@ use vm_control::{
     DiskControlResult, UsbControlSocket, VmControlResponseSocket, VmIrqRequest, VmIrqResponse,
     VmIrqResponseSocket, VmMemoryControlRequestSocket, VmMemoryControlResponseSocket,
     VmMemoryRequest, VmMemoryResponse, VmMsyncRequest, VmMsyncRequestSocket, VmMsyncResponse,
-    VmMsyncResponseSocket, VmRunMode,
+    VmMsyncResponseSocket, VmRequest, VmRunMode,
 };
 
 use crate::{Config, DiskOption, Executable, SharedDir, SharedDirKind, TouchDeviceOption};
@@ -1667,6 +1668,45 @@ fn file_to_i64<P: AsRef<Path>>(path: P) -> io::Result<i64> {
         .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "empty file"))
 }
 
+/// Returns a boolean indicating whether the VM should be exited.
+fn do_vm_request(
+    request: VmRequest,
+    device_socket: Option<&UnixSeqpacket>,
+    control_socket: &VmControlResponseSocket,
+    run_mode_arc: &Arc<VcpuRunMode>,
+    vcpu_handles: &mut Vec<JoinHandle<()>>,
+    io_bus: &mut Bus,
+) -> MsgResult<bool> {
+    let mut run_mode_opt = None;
+    let response = request.execute(&mut run_mode_opt, device_socket);
+    control_socket.send(&response)?;
+    if let Some(run_mode) = run_mode_opt {
+        info!("control socket changed run mode to {}", run_mode);
+        match run_mode {
+            VmRunMode::Exiting => Ok(true),
+            VmRunMode::Running => {
+                if let VmRunMode::Suspending = *run_mode_arc.mtx.lock() {
+                    io_bus.notify_resume();
+                }
+                run_mode_arc.set_and_notify(VmRunMode::Running);
+                for handle in vcpu_handles {
+                    let _ = handle.kill(SIGRTMIN() + 0);
+                }
+                Ok(false)
+            }
+            other => {
+                run_mode_arc.set_and_notify(other);
+                for handle in vcpu_handles {
+                    let _ = handle.kill(SIGRTMIN() + 0);
+                }
+                Ok(false)
+            }
+        }
+    } else {
+        Ok(false)
+    }
+}
+
 pub fn run_config(cfg: Config) -> Result<()> {
     if cfg.sandbox {
         // Printing something to the syslog before entering minijail so that libc's syslogger has a
@@ -1818,7 +1858,7 @@ fn run_control(
 ) -> Result<()> {
     const LOWMEM_AVAILABLE: &str = "/sys/kernel/mm/chromeos-low_mem/available";
 
-    #[derive(PollToken)]
+    #[derive(Debug, PollToken)]
     enum Token {
         Exit,
         Suspend,
@@ -1828,6 +1868,7 @@ fn run_control(
         BalloonResult,
         VmControlServer,
         VmControl { index: usize },
+        DeviceReady { sock_fd: RawFd },
     }
 
     stdin()
@@ -1912,6 +1953,38 @@ fn run_control(
     }
     vcpu_thread_barrier.wait();
 
+    struct QueuedDeviceReq {
+        request: VmRequest,
+        control_sock_index: usize,
+    }
+
+    /// A device can either be waiting, or ready.
+    ///
+    /// If it's waiting, we queue up events to send to it once it's ready.  We can't just send the
+    /// requests straight away and let them sit on the socket until the device comes up, because we
+    /// want to syncronously wait for a response after sending.  If the device hasn't been activated
+    /// when we do this, we'd sit waiting for a response forever and never activate the device.
+    enum DeviceStatus<'a> {
+        Waiting(&'a UnixSeqpacket, VecDeque<QueuedDeviceReq>),
+        Ready,
+    }
+
+    let mut queued_device_reqs = BTreeMap::<RawFd, DeviceStatus>::new();
+
+    // Each socket will send a ready message when it's ready to receive requests.  Add the sockets
+    // to the event loop, so that when they become ready, we can process their queues.  After the
+    // initial queue is processed, the device becomes ready, and the socket is removed from the
+    // event loop.
+    for socket in disk_host_sockets.iter().map(AsRef::as_ref) {
+        let token = Token::DeviceReady {
+            sock_fd: socket.as_raw_fd(),
+        };
+        poll_ctx.add(socket, token).map_err(Error::PollContextAdd)?;
+
+        let status = DeviceStatus::Waiting(socket, VecDeque::new());
+        queued_device_reqs.insert(socket.as_raw_fd(), status);
+    }
+
     let mut ioapic_delayed = Vec::<usize>::default();
     'poll: loop {
         let events = {
@@ -2110,40 +2183,44 @@ fn run_control(
                         match socket {
                             TaggedControlSocket::Vm(socket) => match socket.recv() {
                                 Ok(request) => {
-                                    let mut run_mode_opt = None;
-                                    let response = request.execute(
-                                        &mut run_mode_opt,
+                                    let device_socket = request.socket(
                                         &balloon_host_socket,
-                                        disk_host_sockets,
+                                        &disk_host_sockets,
                                         &usb_control_socket,
                                     );
-                                    if let Err(e) = socket.send(&response) {
-                                        error!("failed to send VmResponse: {}", e);
-                                    }
-                                    if let Some(run_mode) = run_mode_opt {
-                                        info!("control socket changed run mode to {}", run_mode);
-                                        match run_mode {
-                                            VmRunMode::Exiting => {
-                                                break 'poll;
-                                            }
-                                            VmRunMode::Running => {
-                                                if let VmRunMode::Suspending =
-                                                    *run_mode_arc.mtx.lock()
-                                                {
-                                                    linux.io_bus.notify_resume();
-                                                }
-                                                run_mode_arc.set_and_notify(VmRunMode::Running);
-                                                for handle in &vcpu_handles {
-                                                    let _ = handle.kill(SIGRTMIN() + 0);
-                                                }
-                                            }
-                                            other => {
-                                                run_mode_arc.set_and_notify(other);
-                                                for handle in &vcpu_handles {
-                                                    let _ = handle.kill(SIGRTMIN() + 0);
+
+                                    match device_socket.map(|s| {
+                                        queued_device_reqs.get_mut(&s.as_raw_fd()).unwrap()
+                                    }) {
+                                        None | Some(DeviceStatus::Ready) => {
+                                            match do_vm_request(
+                                                request,
+                                                device_socket,
+                                                socket,
+                                                &run_mode_arc,
+                                                &mut vcpu_handles,
+                                                &mut linux.io_bus,
+                                            ) {
+                                                Ok(true) => break 'poll,
+                                                Ok(false) => {}
+                                                Err(e) => {
+                                                    error!("failed to handle VmRequest: {}", e);
+                                                    continue 'poll;
                                                 }
                                             }
                                         }
+
+                                        Some(DeviceStatus::Waiting(_, queue)) => {
+                                            debug!(
+                                                "Device {} not yet ready. Queueing request.",
+                                                device_socket.unwrap().as_raw_fd()
+                                            );
+
+                                            queue.push_back(QueuedDeviceReq {
+                                                request,
+                                                control_sock_index: index,
+                                            });
+                                        }
                                     }
                                 }
                                 Err(e) => {
@@ -2204,6 +2281,61 @@ fn run_control(
                         }
                     }
                 }
+                Token::DeviceReady { sock_fd } => {
+                    if let Some(DeviceStatus::Waiting(device_sock, mut queue)) =
+                        queued_device_reqs.remove(&sock_fd)
+                    {
+                        debug!(
+                            "Device {} is ready. Processing its queue of {} item(s).",
+                            sock_fd,
+                            queue.len()
+                        );
+
+                        // Deal with the message sent to indicate readiness.  Its contents don't matter.
+                        if let Err(e) = device_sock.recv(&mut []) {
+                            error!(
+                                "Failed to recv ready notification from device socket: {}",
+                                e
+                            );
+                            continue 'poll;
+                        }
+
+                        while let Some(QueuedDeviceReq {
+                            request,
+                            control_sock_index,
+                        }) = queue.pop_front()
+                        {
+                            let control_sock = match control_sockets.get(control_sock_index) {
+                                Some(TaggedControlSocket::Vm(s)) => s,
+                                _ => unreachable!(),
+                            };
+
+                            match do_vm_request(
+                                request,
+                                Some(device_sock),
+                                control_sock,
+                                &run_mode_arc,
+                                &mut vcpu_handles,
+                                &mut linux.io_bus,
+                            ) {
+                                Ok(true) => break 'poll,
+                                Ok(false) => {}
+                                Err(e) => {
+                                    error!("failed to handle VmRequest: {}", e);
+                                    continue 'poll;
+                                }
+                            }
+                        }
+
+                        if let Err(e) = poll_ctx.delete(device_sock) {
+                            error!("failed to delete poll token: {}", e);
+                        }
+                    } else {
+                        error!("Received ready notification for a device that isn't waiting");
+                    }
+
+                    queued_device_reqs.insert(sock_fd, DeviceStatus::Ready);
+                }
             }
         }
 
@@ -2229,6 +2361,7 @@ fn run_control(
                         _ => {}
                     }
                 }
+                Token::DeviceReady { .. } => {}
             }
         }
 
diff --git a/vm_control/src/lib.rs b/vm_control/src/lib.rs
index c63fbfe..d4922ab 100644
--- a/vm_control/src/lib.rs
+++ b/vm_control/src/lib.rs
@@ -19,8 +19,9 @@ use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
 use libc::{EINVAL, EIO, ENODEV};
 
 use kvm::{IrqRoute, IrqSource, Vm};
-use msg_socket::{MsgError, MsgOnSocket, MsgReceiver, MsgResult, MsgSender, MsgSocket};
+use msg_socket::{MsgError, MsgOnSocket, MsgResult, MsgSocket, UnixSeqpacketExt};
 use resources::{Alloc, GpuMemoryDesc, MmioType, SystemAllocator};
+use sys_util::net::UnixSeqpacket;
 use sys_util::{error, Error as SysError, EventFd, GuestAddress, MemoryMapping, MmapError, Result};
 
 /// A data structure that either owns or borrows a file descriptor.
@@ -189,6 +190,7 @@ impl Display for DiskControlCommand {
 
 #[derive(MsgOnSocket, Debug)]
 pub enum DiskControlResult {
+    Ready,
     Ok,
     Err(SysError),
 }
@@ -563,17 +565,34 @@ fn register_memory(
 }
 
 impl VmRequest {
+    pub fn socket<'s>(
+        &self,
+        balloon_host_socket: &'s BalloonControlRequestSocket,
+        disk_host_sockets: &'s [DiskControlRequestSocket],
+        usb_control_socket: &'s UsbControlSocket,
+    ) -> Option<&'s UnixSeqpacket> {
+        use VmRequest::*;
+        match *self {
+            Exit => None,
+            Suspend => None,
+            Resume => None,
+            BalloonCommand(_) => Some(balloon_host_socket.as_ref()),
+            DiskCommand { disk_index, .. } => disk_host_sockets.get(disk_index).map(AsRef::as_ref),
+            UsbCommand(_) => Some(usb_control_socket.as_ref()),
+        }
+    }
+
     /// Executes this request on the given Vm and other mutable state.
     ///
     /// This does not return a result, instead encapsulating the success or failure in a
     /// `VmResponse` with the intended purpose of sending the response back over the  socket that
     /// received this `VmRequest`.
+    ///
+    /// The `socket` parameter must be the value that was obtained by calling `VmRequest::socket`.
     pub fn execute(
         &self,
         run_mode: &mut Option<VmRunMode>,
-        balloon_host_socket: &BalloonControlRequestSocket,
-        disk_host_sockets: &[DiskControlRequestSocket],
-        usb_control_socket: &UsbControlSocket,
+        socket: Option<&UnixSeqpacket>,
     ) -> VmResponse {
         match *self {
             VmRequest::Exit => {
@@ -589,14 +608,18 @@ impl VmRequest {
                 VmResponse::Ok
             }
             VmRequest::BalloonCommand(BalloonControlCommand::Adjust { num_bytes }) => {
-                match balloon_host_socket.send(&BalloonControlCommand::Adjust { num_bytes }) {
+                match socket
+                    .unwrap()
+                    .send_msg_on_socket(&BalloonControlCommand::Adjust { num_bytes })
+                {
                     Ok(_) => VmResponse::Ok,
                     Err(_) => VmResponse::Err(SysError::last()),
                 }
             }
             VmRequest::BalloonCommand(BalloonControlCommand::Stats) => {
-                match balloon_host_socket.send(&BalloonControlCommand::Stats {}) {
-                    Ok(_) => match balloon_host_socket.recv() {
+                let socket = socket.unwrap();
+                match socket.send_msg_on_socket(&BalloonControlCommand::Stats {}) {
+                    Ok(_) => match socket.recv_msg_on_socket() {
                         Ok(BalloonControlResult::Stats {
                             stats,
                             balloon_actual,
@@ -612,19 +635,20 @@ impl VmRequest {
                     Err(_) => VmResponse::Err(SysError::last()),
                 }
             }
-            VmRequest::DiskCommand {
-                disk_index,
-                ref command,
-            } => {
+            VmRequest::DiskCommand { ref command, .. } => {
                 // Forward the request to the block device process via its control socket.
-                if let Some(sock) = disk_host_sockets.get(disk_index) {
-                    if let Err(e) = sock.send(command) {
+                if let Some(sock) = socket {
+                    if let Err(e) = sock.send_msg_on_socket(command) {
                         error!("disk socket send failed: {}", e);
                         VmResponse::Err(SysError::new(EINVAL))
                     } else {
-                        match sock.recv() {
+                        match sock.recv_msg_on_socket() {
                             Ok(DiskControlResult::Ok) => VmResponse::Ok,
                             Ok(DiskControlResult::Err(e)) => VmResponse::Err(e),
+                            Ok(resp) => {
+                                error!("unexpected disk socket result: {:?}", resp);
+                                VmResponse::Err(SysError::new(EINVAL))
+                            }
                             Err(e) => {
                                 error!("disk socket recv failed: {}", e);
                                 VmResponse::Err(SysError::new(EINVAL))
@@ -636,12 +660,13 @@ impl VmRequest {
                 }
             }
             VmRequest::UsbCommand(ref cmd) => {
-                let res = usb_control_socket.send(cmd);
+                let socket = socket.unwrap();
+                let res = socket.send_msg_on_socket(cmd);
                 if let Err(e) = res {
                     error!("fail to send command to usb control socket: {}", e);
                     return VmResponse::Err(SysError::new(EIO));
                 }
-                match usb_control_socket.recv() {
+                match socket.recv_msg_on_socket() {
                     Ok(response) => VmResponse::UsbResponse(response),
                     Err(e) => {
                         error!("fail to recv command from usb control socket: {}", e);