patches and low-level development discussion
 help / color / mirror / code / Atom feed
From: Alyssa Ross <hi@alyssa.is>
To: devel@spectrum-os.org
Subject: [PATCH crosvm 2/2] crosvm: fix deadlock on early VmRequest
Date: Sun, 14 Jun 2020 11:43:44 +0000	[thread overview]
Message-ID: <20200614114344.22642-3-hi@alyssa.is> (raw)
In-Reply-To: <20200614114344.22642-1-hi@alyssa.is>

If a DiskCommand was received on the crosvm socket before the
virtio-block device was activated, the Token::VmRequest case in the
main event loop would forward the request to the block device socket,
and then wait syncronously for a response.  That response would never
come because the device hadn't been activated, and it would never be
activated because the event loop would never continue, and therefore
never be able to respond to the event that causes the device to be
activated.  crosvm would therefore just hang forever, waiting for a
response that would never come.

This patch fixes this deadlock by keeping track of whether devices
that send a response in this way have been activated yet.  If they
have already been activated, messages are sent and responses are
received as normal.  If they have not been activated, messages are
instead put into a per-device queue.  Once the device is activated,
queued messages are processed all at once, and then the device is
marked as ready, and the queue is dropped.  Future messages are
processed immediately as they come in, with no further queueing.

A device indicates that it is ready by sending a message on its
socket.  The main crosvm event loop can then poll the socket, to be
notified when the device is ready.  This poll event will only trigger
once -- once it has been received, it is removed from the poll
context.

Currently, the only device type that responds to external control
messages AND needs to be activated by an event is the block device.
The balloon device does not respond to messages, and the xhci
controller device is activated up front.  The code is nevertheless
structured so that it should be very easy to drop another kind of
device in to the queuing system, should that be required.
---
 devices/src/virtio/block.rs |   5 +
 src/linux.rs                | 201 ++++++++++++++++++++++++++++++------
 vm_control/src/lib.rs       |  57 +++++++---
 3 files changed, 213 insertions(+), 50 deletions(-)

diff --git a/devices/src/virtio/block.rs b/devices/src/virtio/block.rs
index 80d51030..5a8972a7 100644
--- a/devices/src/virtio/block.rs
+++ b/devices/src/virtio/block.rs
@@ -390,6 +390,11 @@ impl Worker {
             }
         };
 
+        if let Err(e) = self.control_socket.send(&DiskControlResult::Ready) {
+            error!("control socket failed to notify readiness: {}", e);
+            return;
+        }
+
         'poll: loop {
             let events = match poll_ctx.wait() {
                 Ok(v) => v,
diff --git a/src/linux.rs b/src/linux.rs
index ec2067cf..ccd7d88c 100644
--- a/src/linux.rs
+++ b/src/linux.rs
@@ -3,6 +3,7 @@
 // found in the LICENSE file.
 
 use std::cmp::max;
+use std::collections::{BTreeMap, VecDeque};
 use std::convert::TryFrom;
 use std::error::Error as StdError;
 use std::ffi::CStr;
@@ -32,12 +33,12 @@ use acpi_tables::sdt::SDT;
 use devices::virtio::EventDevice;
 use devices::virtio::{self, Console, VirtioDevice};
 use devices::{
-    self, Ac97Backend, Ac97Dev, HostBackendDeviceProvider, PciDevice, VfioContainer, VfioDevice,
-    VfioPciDevice, VirtioPciDevice, XhciController,
+    self, Ac97Backend, Ac97Dev, Bus, HostBackendDeviceProvider, PciDevice, VfioContainer,
+    VfioDevice, VfioPciDevice, VirtioPciDevice, XhciController,
 };
 use io_jail::{self, Minijail};
 use kvm::*;
-use msg_socket::{MsgError, MsgReceiver, MsgSender, MsgSocket};
+use msg_socket::{MsgError, MsgReceiver, MsgResult, MsgSender, MsgSocket};
 use net_util::{Error as NetError, MacAddress, Tap};
 use remain::sorted;
 use resources::{Alloc, MmioType, SystemAllocator};
@@ -45,7 +46,7 @@ use sync::{Condvar, Mutex};
 use sys_util::net::{UnixSeqpacket, UnixSeqpacketListener, UnlinkUnixSeqpacketListener};
 
 use sys_util::{
-    self, block_signal, clear_signal, drop_capabilities, error, flock, get_blocked_signals,
+    self, block_signal, clear_signal, debug, drop_capabilities, error, flock, get_blocked_signals,
     get_group_id, get_user_id, getegid, geteuid, info, register_rt_signal_handler,
     set_cpu_affinity, validate_raw_fd, warn, EventFd, FlockOperation, GuestAddress, GuestMemory,
     Killable, MemoryMappingArena, PollContext, PollToken, Protection, ScopedEvent, SignalFd,
@@ -57,7 +58,7 @@ use vm_control::{
     DiskControlResult, UsbControlSocket, VmControlResponseSocket, VmIrqRequest, VmIrqResponse,
     VmIrqResponseSocket, VmMemoryControlRequestSocket, VmMemoryControlResponseSocket,
     VmMemoryRequest, VmMemoryResponse, VmMsyncRequest, VmMsyncRequestSocket, VmMsyncResponse,
-    VmMsyncResponseSocket, VmRunMode,
+    VmMsyncResponseSocket, VmRequest, VmRunMode,
 };
 
 use crate::{Config, DiskOption, Executable, SharedDir, SharedDirKind, TouchDeviceOption};
@@ -1667,6 +1668,45 @@ fn file_to_i64<P: AsRef<Path>>(path: P) -> io::Result<i64> {
         .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "empty file"))
 }
 
+/// Returns a boolean indicating whether the VM should be exited.
+fn do_vm_request(
+    request: VmRequest,
+    device_socket: Option<&UnixSeqpacket>,
+    control_socket: &VmControlResponseSocket,
+    run_mode_arc: &Arc<VcpuRunMode>,
+    vcpu_handles: &mut Vec<JoinHandle<()>>,
+    io_bus: &mut Bus,
+) -> MsgResult<bool> {
+    let mut run_mode_opt = None;
+    let response = request.execute(&mut run_mode_opt, device_socket);
+    control_socket.send(&response)?;
+    if let Some(run_mode) = run_mode_opt {
+        info!("control socket changed run mode to {}", run_mode);
+        match run_mode {
+            VmRunMode::Exiting => Ok(true),
+            VmRunMode::Running => {
+                if let VmRunMode::Suspending = *run_mode_arc.mtx.lock() {
+                    io_bus.notify_resume();
+                }
+                run_mode_arc.set_and_notify(VmRunMode::Running);
+                for handle in vcpu_handles {
+                    let _ = handle.kill(SIGRTMIN() + 0);
+                }
+                Ok(false)
+            }
+            other => {
+                run_mode_arc.set_and_notify(other);
+                for handle in vcpu_handles {
+                    let _ = handle.kill(SIGRTMIN() + 0);
+                }
+                Ok(false)
+            }
+        }
+    } else {
+        Ok(false)
+    }
+}
+
 pub fn run_config(cfg: Config) -> Result<()> {
     if cfg.sandbox {
         // Printing something to the syslog before entering minijail so that libc's syslogger has a
@@ -1818,7 +1858,7 @@ fn run_control(
 ) -> Result<()> {
     const LOWMEM_AVAILABLE: &str = "/sys/kernel/mm/chromeos-low_mem/available";
 
-    #[derive(PollToken)]
+    #[derive(Debug, PollToken)]
     enum Token {
         Exit,
         Suspend,
@@ -1828,6 +1868,7 @@ fn run_control(
         BalloonResult,
         VmControlServer,
         VmControl { index: usize },
+        DeviceReady { sock_fd: RawFd },
     }
 
     stdin()
@@ -1912,6 +1953,38 @@ fn run_control(
     }
     vcpu_thread_barrier.wait();
 
+    struct QueuedDeviceReq {
+        request: VmRequest,
+        control_sock_index: usize,
+    }
+
+    /// A device can either be waiting, or ready.
+    ///
+    /// If it's waiting, we queue up events to send to it once it's ready.  We can't just send the
+    /// requests straight away and let them sit on the socket until the device comes up, because we
+    /// want to syncronously wait for a response after sending.  If the device hasn't been activated
+    /// when we do this, we'd sit waiting for a response forever and never activate the device.
+    enum DeviceStatus<'a> {
+        Waiting(&'a UnixSeqpacket, VecDeque<QueuedDeviceReq>),
+        Ready,
+    }
+
+    let mut queued_device_reqs = BTreeMap::<RawFd, DeviceStatus>::new();
+
+    // Each socket will send a ready message when it's ready to receive requests.  Add the sockets
+    // to the event loop, so that when they become ready, we can process their queues.  After the
+    // initial queue is processed, the device becomes ready, and the socket is removed from the
+    // event loop.
+    for socket in disk_host_sockets.iter().map(AsRef::as_ref) {
+        let token = Token::DeviceReady {
+            sock_fd: socket.as_raw_fd(),
+        };
+        poll_ctx.add(socket, token).map_err(Error::PollContextAdd)?;
+
+        let status = DeviceStatus::Waiting(socket, VecDeque::new());
+        queued_device_reqs.insert(socket.as_raw_fd(), status);
+    }
+
     let mut ioapic_delayed = Vec::<usize>::default();
     'poll: loop {
         let events = {
@@ -2110,40 +2183,44 @@ fn run_control(
                         match socket {
                             TaggedControlSocket::Vm(socket) => match socket.recv() {
                                 Ok(request) => {
-                                    let mut run_mode_opt = None;
-                                    let response = request.execute(
-                                        &mut run_mode_opt,
+                                    let device_socket = request.socket(
                                         &balloon_host_socket,
-                                        disk_host_sockets,
+                                        &disk_host_sockets,
                                         &usb_control_socket,
                                     );
-                                    if let Err(e) = socket.send(&response) {
-                                        error!("failed to send VmResponse: {}", e);
-                                    }
-                                    if let Some(run_mode) = run_mode_opt {
-                                        info!("control socket changed run mode to {}", run_mode);
-                                        match run_mode {
-                                            VmRunMode::Exiting => {
-                                                break 'poll;
-                                            }
-                                            VmRunMode::Running => {
-                                                if let VmRunMode::Suspending =
-                                                    *run_mode_arc.mtx.lock()
-                                                {
-                                                    linux.io_bus.notify_resume();
-                                                }
-                                                run_mode_arc.set_and_notify(VmRunMode::Running);
-                                                for handle in &vcpu_handles {
-                                                    let _ = handle.kill(SIGRTMIN() + 0);
-                                                }
-                                            }
-                                            other => {
-                                                run_mode_arc.set_and_notify(other);
-                                                for handle in &vcpu_handles {
-                                                    let _ = handle.kill(SIGRTMIN() + 0);
+
+                                    match device_socket.map(|s| {
+                                        queued_device_reqs.get_mut(&s.as_raw_fd()).unwrap()
+                                    }) {
+                                        None | Some(DeviceStatus::Ready) => {
+                                            match do_vm_request(
+                                                request,
+                                                device_socket,
+                                                socket,
+                                                &run_mode_arc,
+                                                &mut vcpu_handles,
+                                                &mut linux.io_bus,
+                                            ) {
+                                                Ok(true) => break 'poll,
+                                                Ok(false) => {}
+                                                Err(e) => {
+                                                    error!("failed to handle VmRequest: {}", e);
+                                                    continue 'poll;
                                                 }
                                             }
                                         }
+
+                                        Some(DeviceStatus::Waiting(_, queue)) => {
+                                            debug!(
+                                                "Device {} not yet ready. Queueing request.",
+                                                device_socket.unwrap().as_raw_fd()
+                                            );
+
+                                            queue.push_back(QueuedDeviceReq {
+                                                request,
+                                                control_sock_index: index,
+                                            });
+                                        }
                                     }
                                 }
                                 Err(e) => {
@@ -2204,6 +2281,61 @@ fn run_control(
                         }
                     }
                 }
+                Token::DeviceReady { sock_fd } => {
+                    if let Some(DeviceStatus::Waiting(device_sock, mut queue)) =
+                        queued_device_reqs.remove(&sock_fd)
+                    {
+                        debug!(
+                            "Device {} is ready. Processing its queue of {} item(s).",
+                            sock_fd,
+                            queue.len()
+                        );
+
+                        // Deal with the message sent to indicate readiness.  Its contents don't matter.
+                        if let Err(e) = device_sock.recv(&mut []) {
+                            error!(
+                                "Failed to recv ready notification from device socket: {}",
+                                e
+                            );
+                            continue 'poll;
+                        }
+
+                        while let Some(QueuedDeviceReq {
+                            request,
+                            control_sock_index,
+                        }) = queue.pop_front()
+                        {
+                            let control_sock = match control_sockets.get(control_sock_index) {
+                                Some(TaggedControlSocket::Vm(s)) => s,
+                                _ => unreachable!(),
+                            };
+
+                            match do_vm_request(
+                                request,
+                                Some(device_sock),
+                                control_sock,
+                                &run_mode_arc,
+                                &mut vcpu_handles,
+                                &mut linux.io_bus,
+                            ) {
+                                Ok(true) => break 'poll,
+                                Ok(false) => {}
+                                Err(e) => {
+                                    error!("failed to handle VmRequest: {}", e);
+                                    continue 'poll;
+                                }
+                            }
+                        }
+
+                        if let Err(e) = poll_ctx.delete(device_sock) {
+                            error!("failed to delete poll token: {}", e);
+                        }
+                    } else {
+                        error!("Received ready notification for a device that isn't waiting");
+                    }
+
+                    queued_device_reqs.insert(sock_fd, DeviceStatus::Ready);
+                }
             }
         }
 
@@ -2229,6 +2361,7 @@ fn run_control(
                         _ => {}
                     }
                 }
+                Token::DeviceReady { .. } => {}
             }
         }
 
diff --git a/vm_control/src/lib.rs b/vm_control/src/lib.rs
index c63fbfe6..d4922ab9 100644
--- a/vm_control/src/lib.rs
+++ b/vm_control/src/lib.rs
@@ -19,8 +19,9 @@ use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
 use libc::{EINVAL, EIO, ENODEV};
 
 use kvm::{IrqRoute, IrqSource, Vm};
-use msg_socket::{MsgError, MsgOnSocket, MsgReceiver, MsgResult, MsgSender, MsgSocket};
+use msg_socket::{MsgError, MsgOnSocket, MsgResult, MsgSocket, UnixSeqpacketExt};
 use resources::{Alloc, GpuMemoryDesc, MmioType, SystemAllocator};
+use sys_util::net::UnixSeqpacket;
 use sys_util::{error, Error as SysError, EventFd, GuestAddress, MemoryMapping, MmapError, Result};
 
 /// A data structure that either owns or borrows a file descriptor.
@@ -189,6 +190,7 @@ impl Display for DiskControlCommand {
 
 #[derive(MsgOnSocket, Debug)]
 pub enum DiskControlResult {
+    Ready,
     Ok,
     Err(SysError),
 }
@@ -563,17 +565,34 @@ fn register_memory(
 }
 
 impl VmRequest {
+    pub fn socket<'s>(
+        &self,
+        balloon_host_socket: &'s BalloonControlRequestSocket,
+        disk_host_sockets: &'s [DiskControlRequestSocket],
+        usb_control_socket: &'s UsbControlSocket,
+    ) -> Option<&'s UnixSeqpacket> {
+        use VmRequest::*;
+        match *self {
+            Exit => None,
+            Suspend => None,
+            Resume => None,
+            BalloonCommand(_) => Some(balloon_host_socket.as_ref()),
+            DiskCommand { disk_index, .. } => disk_host_sockets.get(disk_index).map(AsRef::as_ref),
+            UsbCommand(_) => Some(usb_control_socket.as_ref()),
+        }
+    }
+
     /// Executes this request on the given Vm and other mutable state.
     ///
     /// This does not return a result, instead encapsulating the success or failure in a
     /// `VmResponse` with the intended purpose of sending the response back over the  socket that
     /// received this `VmRequest`.
+    ///
+    /// The `socket` parameter must be the value that was obtained by calling `VmRequest::socket`.
     pub fn execute(
         &self,
         run_mode: &mut Option<VmRunMode>,
-        balloon_host_socket: &BalloonControlRequestSocket,
-        disk_host_sockets: &[DiskControlRequestSocket],
-        usb_control_socket: &UsbControlSocket,
+        socket: Option<&UnixSeqpacket>,
     ) -> VmResponse {
         match *self {
             VmRequest::Exit => {
@@ -589,14 +608,18 @@ impl VmRequest {
                 VmResponse::Ok
             }
             VmRequest::BalloonCommand(BalloonControlCommand::Adjust { num_bytes }) => {
-                match balloon_host_socket.send(&BalloonControlCommand::Adjust { num_bytes }) {
+                match socket
+                    .unwrap()
+                    .send_msg_on_socket(&BalloonControlCommand::Adjust { num_bytes })
+                {
                     Ok(_) => VmResponse::Ok,
                     Err(_) => VmResponse::Err(SysError::last()),
                 }
             }
             VmRequest::BalloonCommand(BalloonControlCommand::Stats) => {
-                match balloon_host_socket.send(&BalloonControlCommand::Stats {}) {
-                    Ok(_) => match balloon_host_socket.recv() {
+                let socket = socket.unwrap();
+                match socket.send_msg_on_socket(&BalloonControlCommand::Stats {}) {
+                    Ok(_) => match socket.recv_msg_on_socket() {
                         Ok(BalloonControlResult::Stats {
                             stats,
                             balloon_actual,
@@ -612,19 +635,20 @@ impl VmRequest {
                     Err(_) => VmResponse::Err(SysError::last()),
                 }
             }
-            VmRequest::DiskCommand {
-                disk_index,
-                ref command,
-            } => {
+            VmRequest::DiskCommand { ref command, .. } => {
                 // Forward the request to the block device process via its control socket.
-                if let Some(sock) = disk_host_sockets.get(disk_index) {
-                    if let Err(e) = sock.send(command) {
+                if let Some(sock) = socket {
+                    if let Err(e) = sock.send_msg_on_socket(command) {
                         error!("disk socket send failed: {}", e);
                         VmResponse::Err(SysError::new(EINVAL))
                     } else {
-                        match sock.recv() {
+                        match sock.recv_msg_on_socket() {
                             Ok(DiskControlResult::Ok) => VmResponse::Ok,
                             Ok(DiskControlResult::Err(e)) => VmResponse::Err(e),
+                            Ok(resp) => {
+                                error!("unexpected disk socket result: {:?}", resp);
+                                VmResponse::Err(SysError::new(EINVAL))
+                            }
                             Err(e) => {
                                 error!("disk socket recv failed: {}", e);
                                 VmResponse::Err(SysError::new(EINVAL))
@@ -636,12 +660,13 @@ impl VmRequest {
                 }
             }
             VmRequest::UsbCommand(ref cmd) => {
-                let res = usb_control_socket.send(cmd);
+                let socket = socket.unwrap();
+                let res = socket.send_msg_on_socket(cmd);
                 if let Err(e) = res {
                     error!("fail to send command to usb control socket: {}", e);
                     return VmResponse::Err(SysError::new(EIO));
                 }
-                match usb_control_socket.recv() {
+                match socket.recv_msg_on_socket() {
                     Ok(response) => VmResponse::UsbResponse(response),
                     Err(e) => {
                         error!("fail to recv command from usb control socket: {}", e);
-- 
2.26.2

  parent reply	other threads:[~2020-06-14 11:44 UTC|newest]

Thread overview: 11+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2020-06-14 11:43 [PATCH crosvm 0/2] Fix " Alyssa Ross
2020-06-14 11:43 ` [PATCH crosvm 1/2] msg_socket: introduce UnixSeqpacketExt Alyssa Ross
2020-06-16  0:17   ` Cole Helbling
2020-06-16  9:32     ` Alyssa Ross
2020-06-22 22:06       ` Cole Helbling
2020-06-23  2:32         ` Alyssa Ross
2020-06-25  1:54       ` impaqt
2020-07-09 13:24         ` Alyssa Ross
2020-06-14 11:43 ` Alyssa Ross [this message]
2020-06-16  1:08   ` [PATCH crosvm 2/2] crosvm: fix deadlock on early VmRequest Cole Helbling
2020-06-16  9:39     ` Alyssa Ross

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20200614114344.22642-3-hi@alyssa.is \
    --to=hi@alyssa.is \
    --cc=devel@spectrum-os.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://spectrum-os.org/git/crosvm
	https://spectrum-os.org/git/doc
	https://spectrum-os.org/git/mktuntap
	https://spectrum-os.org/git/nixpkgs
	https://spectrum-os.org/git/spectrum
	https://spectrum-os.org/git/ucspi-vsock
	https://spectrum-os.org/git/www

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).