summary refs log tree commit diff
diff options
context:
space:
mode:
authorZach Reizner <zachr@google.com>2019-02-13 17:33:32 -0800
committerchrome-bot <chrome-bot@chromium.org>2019-02-28 03:24:24 -0800
commita60744b42ee2589e9318029cf3fd7d87fd73f29d (patch)
tree4819c7b24caab92956d95474e638eb067a5ce926
parentb7196e2a1c1eb7123e7eace5418b7eb4a3e24dbe (diff)
downloadcrosvm-a60744b42ee2589e9318029cf3fd7d87fd73f29d.tar
crosvm-a60744b42ee2589e9318029cf3fd7d87fd73f29d.tar.gz
crosvm-a60744b42ee2589e9318029cf3fd7d87fd73f29d.tar.bz2
crosvm-a60744b42ee2589e9318029cf3fd7d87fd73f29d.tar.lz
crosvm-a60744b42ee2589e9318029cf3fd7d87fd73f29d.tar.xz
crosvm-a60744b42ee2589e9318029cf3fd7d87fd73f29d.tar.zst
crosvm-a60744b42ee2589e9318029cf3fd7d87fd73f29d.zip
crosvm: use seqpacket rather than datagram sockets
The advantage of seqpacket is that they are connection oriented. A
listener can be created that accepts new connections, useful for the
path based VM control sockets. Previously, the only bidirectional
sockets in crosvm were either stream based or made using socketpair.

This change also whitelists sendmsg and recvmsg for the common device
policy.

TEST=cargo test
BUG=chromium:848187

Change-Id: I83fd46f54bce105a7730632cd013b5e7047db22b
Reviewed-on: https://chromium-review.googlesource.com/1470917
Commit-Ready: Zach Reizner <zachr@chromium.org>
Tested-by: kokoro <noreply+kokoro@google.com>
Tested-by: Zach Reizner <zachr@chromium.org>
Reviewed-by: Daniel Verkamp <dverkamp@chromium.org>
-rw-r--r--devices/src/proxy.rs8
-rw-r--r--devices/src/virtio/balloon.rs11
-rw-r--r--devices/src/virtio/block.rs11
-rw-r--r--devices/src/virtio/wl.rs13
-rw-r--r--msg_socket/src/lib.rs77
-rw-r--r--msg_socket/src/msg_on_socket.rs10
-rw-r--r--seccomp/arm/common_device.policy2
-rw-r--r--seccomp/x86_64/common_device.policy2
-rw-r--r--src/linux.rs132
-rw-r--r--src/main.rs18
-rw-r--r--vm_control/src/lib.rs7
11 files changed, 160 insertions, 131 deletions
diff --git a/devices/src/proxy.rs b/devices/src/proxy.rs
index 4bc3405..a91d2ed 100644
--- a/devices/src/proxy.rs
+++ b/devices/src/proxy.rs
@@ -7,14 +7,14 @@
 use libc::pid_t;
 
 use std::os::unix::io::{AsRawFd, RawFd};
-use std::os::unix::net::UnixDatagram;
 use std::process;
 use std::time::Duration;
 use std::{self, fmt, io};
 
+use io_jail::{self, Minijail};
 use msg_socket::{MsgOnSocket, MsgReceiver, MsgSender, MsgSocket};
+use sys_util::net::UnixSeqpacket;
 
-use io_jail::{self, Minijail};
 use BusDevice;
 
 /// Errors for proxy devices.
@@ -64,7 +64,7 @@ enum CommandResult {
     ReadConfigResult(u32),
 }
 
-fn child_proc(sock: UnixDatagram, device: &mut BusDevice) {
+fn child_proc(sock: UnixSeqpacket, device: &mut BusDevice) {
     let mut running = true;
     let sock = MsgSocket::<CommandResult, Command>::new(sock);
 
@@ -138,7 +138,7 @@ impl ProxyDevice {
         mut keep_fds: Vec<RawFd>,
     ) -> Result<ProxyDevice> {
         let debug_label = device.debug_label();
-        let (child_sock, parent_sock) = UnixDatagram::pair().map_err(Error::Io)?;
+        let (child_sock, parent_sock) = UnixSeqpacket::pair().map_err(Error::Io)?;
 
         keep_fds.push(child_sock.as_raw_fd());
         // Forking here is safe as long as the program is still single threaded.
diff --git a/devices/src/virtio/balloon.rs b/devices/src/virtio/balloon.rs
index 1dfb72c..e9f36f6 100644
--- a/devices/src/virtio/balloon.rs
+++ b/devices/src/virtio/balloon.rs
@@ -8,13 +8,14 @@ use std::fmt::{self, Display};
 use std::io::Write;
 use std::mem;
 use std::os::unix::io::{AsRawFd, RawFd};
-use std::os::unix::net::UnixDatagram;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
 use std::thread;
 
 use byteorder::{ByteOrder, LittleEndian, ReadBytesExt, WriteBytesExt};
-use sys_util::{self, EventFd, GuestAddress, GuestMemory, PollContext, PollToken};
+use sys_util::{
+    self, net::UnixSeqpacket, EventFd, GuestAddress, GuestMemory, PollContext, PollToken,
+};
 
 use super::{
     DescriptorChain, Queue, VirtioDevice, INTERRUPT_STATUS_CONFIG_CHANGED,
@@ -67,7 +68,7 @@ struct Worker {
     interrupt_evt: EventFd,
     interrupt_resample_evt: EventFd,
     config: Arc<BalloonConfig>,
-    command_socket: UnixDatagram,
+    command_socket: UnixSeqpacket,
 }
 
 fn valid_inflate_desc(desc: &DescriptorChain) -> bool {
@@ -230,7 +231,7 @@ impl Worker {
 
 /// Virtio device for memory balloon inflation/deflation.
 pub struct Balloon {
-    command_socket: Option<UnixDatagram>,
+    command_socket: Option<UnixSeqpacket>,
     config: Arc<BalloonConfig>,
     features: u64,
     kill_evt: Option<EventFd>,
@@ -238,7 +239,7 @@ pub struct Balloon {
 
 impl Balloon {
     /// Create a new virtio balloon device.
-    pub fn new(command_socket: UnixDatagram) -> Result<Balloon> {
+    pub fn new(command_socket: UnixSeqpacket) -> Result<Balloon> {
         Ok(Balloon {
             command_socket: Some(command_socket),
             config: Arc::new(BalloonConfig {
diff --git a/devices/src/virtio/block.rs b/devices/src/virtio/block.rs
index 9dcf828..3c5d2fd 100644
--- a/devices/src/virtio/block.rs
+++ b/devices/src/virtio/block.rs
@@ -7,7 +7,6 @@ use std::fmt::{self, Display};
 use std::io::{self, Read, Seek, SeekFrom, Write};
 use std::mem::{size_of, size_of_val};
 use std::os::unix::io::{AsRawFd, RawFd};
-use std::os::unix::net::UnixDatagram;
 use std::result;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
@@ -19,8 +18,8 @@ use sync::Mutex;
 use sys_util::Error as SysError;
 use sys_util::Result as SysResult;
 use sys_util::{
-    EventFd, FileSetLen, FileSync, GuestAddress, GuestMemory, GuestMemoryError, PollContext,
-    PollToken, PunchHole, TimerFd, WriteZeroes,
+    net::UnixSeqpacket, EventFd, FileSetLen, FileSync, GuestAddress, GuestMemory, GuestMemoryError,
+    PollContext, PollToken, PunchHole, TimerFd, WriteZeroes,
 };
 
 use data_model::{DataInit, Le16, Le32, Le64};
@@ -695,7 +694,7 @@ impl<T: DiskFile> Worker<T> {
         self.interrupt_evt.write(1).unwrap();
     }
 
-    fn run(&mut self, queue_evt: EventFd, kill_evt: EventFd, control_socket: UnixDatagram) {
+    fn run(&mut self, queue_evt: EventFd, kill_evt: EventFd, control_socket: UnixSeqpacket) {
         #[derive(PollToken)]
         enum Token {
             FlushTimer,
@@ -819,7 +818,7 @@ pub struct Block<T: DiskFile> {
     disk_size: Arc<Mutex<u64>>,
     avail_features: u64,
     read_only: bool,
-    control_socket: Option<UnixDatagram>,
+    control_socket: Option<UnixSeqpacket>,
 }
 
 fn build_config_space(disk_size: u64) -> virtio_blk_config {
@@ -844,7 +843,7 @@ impl<T: DiskFile> Block<T> {
     pub fn new(
         mut disk_image: T,
         read_only: bool,
-        control_socket: Option<UnixDatagram>,
+        control_socket: Option<UnixSeqpacket>,
     ) -> SysResult<Block<T>> {
         let disk_size = disk_image.seek(SeekFrom::End(0))? as u64;
         if disk_size % SECTOR_SIZE != 0 {
diff --git a/devices/src/virtio/wl.rs b/devices/src/virtio/wl.rs
index 4d43825..0220d42 100644
--- a/devices/src/virtio/wl.rs
+++ b/devices/src/virtio/wl.rs
@@ -41,7 +41,7 @@ use std::mem::{size_of, size_of_val};
 #[cfg(feature = "wl-dmabuf")]
 use std::os::raw::{c_uint, c_ulonglong};
 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
-use std::os::unix::net::{UnixDatagram, UnixStream};
+use std::os::unix::net::UnixStream;
 use std::path::{Path, PathBuf};
 use std::rc::Rc;
 use std::result;
@@ -59,6 +59,7 @@ use data_model::*;
 use msg_socket::{MsgError, MsgReceiver, MsgSender, MsgSocket};
 #[cfg(feature = "wl-dmabuf")]
 use resources::GpuMemoryDesc;
+use sys_util::net::UnixSeqpacket;
 use sys_util::{
     pipe, round_up_to_page_size, Error, EventFd, FileFlags, GuestAddress, GuestMemory,
     GuestMemoryError, PollContext, PollToken, Result, ScmSocket, SharedMemory,
@@ -490,7 +491,7 @@ struct VmRequester {
 }
 
 impl VmRequester {
-    fn new(vm_socket: UnixDatagram) -> VmRequester {
+    fn new(vm_socket: UnixSeqpacket) -> VmRequester {
         VmRequester {
             inner: Rc::new(RefCell::new(MsgSocket::<VmRequest, VmResponse>::new(
                 vm_socket,
@@ -1004,7 +1005,7 @@ struct WlState {
 impl WlState {
     fn new(
         wayland_path: PathBuf,
-        vm_socket: UnixDatagram,
+        vm_socket: UnixSeqpacket,
         use_transition_flags: bool,
         resource_bridge: Option<ResourceRequestSocket>,
     ) -> WlState {
@@ -1488,7 +1489,7 @@ impl Worker {
         in_queue: Queue,
         out_queue: Queue,
         wayland_path: PathBuf,
-        vm_socket: UnixDatagram,
+        vm_socket: UnixSeqpacket,
         use_transition_flags: bool,
         resource_bridge: Option<ResourceRequestSocket>,
     ) -> Worker {
@@ -1678,7 +1679,7 @@ impl Worker {
 pub struct Wl {
     kill_evt: Option<EventFd>,
     wayland_path: PathBuf,
-    vm_socket: Option<UnixDatagram>,
+    vm_socket: Option<UnixSeqpacket>,
     resource_bridge: Option<ResourceRequestSocket>,
     use_transition_flags: bool,
 }
@@ -1686,7 +1687,7 @@ pub struct Wl {
 impl Wl {
     pub fn new<P: AsRef<Path>>(
         wayland_path: P,
-        vm_socket: UnixDatagram,
+        vm_socket: UnixSeqpacket,
         resource_bridge: Option<ResourceRequestSocket>,
     ) -> Result<Wl> {
         Ok(Wl {
diff --git a/msg_socket/src/lib.rs b/msg_socket/src/lib.rs
index cc2cdc9..fd1f2bc 100644
--- a/msg_socket/src/lib.rs
+++ b/msg_socket/src/lib.rs
@@ -13,9 +13,9 @@ mod msg_on_socket;
 
 use std::io::Result;
 use std::marker::PhantomData;
+use std::ops::Deref;
 use std::os::unix::io::{AsRawFd, RawFd};
-use std::os::unix::net::UnixDatagram;
-use sys_util::{Error as SysError, ScmSocket, UnlinkUnixDatagram};
+use sys_util::{net::UnixSeqpacket, Error as SysError, ScmSocket};
 
 pub use msg_on_socket::*;
 pub use msg_on_socket_derive::*;
@@ -24,7 +24,7 @@ pub use msg_on_socket_derive::*;
 /// direction.
 pub fn pair<Request: MsgOnSocket, Response: MsgOnSocket>(
 ) -> Result<(MsgSocket<Request, Response>, MsgSocket<Response, Request>)> {
-    let (sock1, sock2) = UnixDatagram::pair()?;
+    let (sock1, sock2) = UnixSeqpacket::pair()?;
     let requester = MsgSocket {
         sock: sock1,
         _i: PhantomData,
@@ -40,14 +40,14 @@ pub fn pair<Request: MsgOnSocket, Response: MsgOnSocket>(
 
 /// Bidirection sock that support both send and recv.
 pub struct MsgSocket<I: MsgOnSocket, O: MsgOnSocket> {
-    sock: UnixDatagram,
+    sock: UnixSeqpacket,
     _i: PhantomData<I>,
     _o: PhantomData<O>,
 }
 
 impl<I: MsgOnSocket, O: MsgOnSocket> MsgSocket<I, O> {
     // Create a new MsgSocket.
-    pub fn new(s: UnixDatagram) -> MsgSocket<I, O> {
+    pub fn new(s: UnixSeqpacket) -> MsgSocket<I, O> {
         MsgSocket {
             sock: s,
             _i: PhantomData,
@@ -56,33 +56,22 @@ impl<I: MsgOnSocket, O: MsgOnSocket> MsgSocket<I, O> {
     }
 }
 
-/// Bidirection sock that support both send and recv.
-pub struct UnlinkMsgSocket<I: MsgOnSocket, O: MsgOnSocket> {
-    sock: UnlinkUnixDatagram,
-    _i: PhantomData<I>,
-    _o: PhantomData<O>,
-}
-
-impl<I: MsgOnSocket, O: MsgOnSocket> UnlinkMsgSocket<I, O> {
-    // Create a new MsgSocket.
-    pub fn new(s: UnlinkUnixDatagram) -> UnlinkMsgSocket<I, O> {
-        UnlinkMsgSocket {
-            sock: s,
-            _i: PhantomData,
-            _o: PhantomData,
-        }
+impl<I: MsgOnSocket, O: MsgOnSocket> Deref for MsgSocket<I, O> {
+    type Target = UnixSeqpacket;
+    fn deref(&self) -> &Self::Target {
+        &self.sock
     }
 }
 
 /// One direction socket that only supports sending.
 pub struct Sender<M: MsgOnSocket> {
-    sock: UnixDatagram,
+    sock: UnixSeqpacket,
     _m: PhantomData<M>,
 }
 
 impl<M: MsgOnSocket> Sender<M> {
     /// Create a new sender sock.
-    pub fn new(s: UnixDatagram) -> Sender<M> {
+    pub fn new(s: UnixSeqpacket) -> Sender<M> {
         Sender {
             sock: s,
             _m: PhantomData,
@@ -92,13 +81,13 @@ impl<M: MsgOnSocket> Sender<M> {
 
 /// One direction socket that only supports receiving.
 pub struct Receiver<M: MsgOnSocket> {
-    sock: UnixDatagram,
+    sock: UnixSeqpacket,
     _m: PhantomData<M>,
 }
 
 impl<M: MsgOnSocket> Receiver<M> {
     /// Create a new receiver sock.
-    pub fn new(s: UnixDatagram) -> Receiver<M> {
+    pub fn new(s: UnixSeqpacket) -> Receiver<M> {
         Receiver {
             sock: s,
             _m: PhantomData,
@@ -106,8 +95,8 @@ impl<M: MsgOnSocket> Receiver<M> {
     }
 }
 
-impl<I: MsgOnSocket, O: MsgOnSocket> AsRef<UnixDatagram> for MsgSocket<I, O> {
-    fn as_ref(&self) -> &UnixDatagram {
+impl<I: MsgOnSocket, O: MsgOnSocket> AsRef<UnixSeqpacket> for MsgSocket<I, O> {
+    fn as_ref(&self) -> &UnixSeqpacket {
         &self.sock
     }
 }
@@ -118,20 +107,8 @@ impl<I: MsgOnSocket, O: MsgOnSocket> AsRawFd for MsgSocket<I, O> {
     }
 }
 
-impl<I: MsgOnSocket, O: MsgOnSocket> AsRef<UnixDatagram> for UnlinkMsgSocket<I, O> {
-    fn as_ref(&self) -> &UnixDatagram {
-        self.sock.as_ref()
-    }
-}
-
-impl<I: MsgOnSocket, O: MsgOnSocket> AsRawFd for UnlinkMsgSocket<I, O> {
-    fn as_raw_fd(&self) -> RawFd {
-        self.as_ref().as_raw_fd()
-    }
-}
-
-impl<M: MsgOnSocket> AsRef<UnixDatagram> for Sender<M> {
-    fn as_ref(&self) -> &UnixDatagram {
+impl<M: MsgOnSocket> AsRef<UnixSeqpacket> for Sender<M> {
+    fn as_ref(&self) -> &UnixSeqpacket {
         &self.sock
     }
 }
@@ -142,8 +119,8 @@ impl<M: MsgOnSocket> AsRawFd for Sender<M> {
     }
 }
 
-impl<M: MsgOnSocket> AsRef<UnixDatagram> for Receiver<M> {
-    fn as_ref(&self) -> &UnixDatagram {
+impl<M: MsgOnSocket> AsRef<UnixSeqpacket> for Receiver<M> {
+    fn as_ref(&self) -> &UnixSeqpacket {
         &self.sock
     }
 }
@@ -155,7 +132,7 @@ impl<M: MsgOnSocket> AsRawFd for Receiver<M> {
 }
 
 /// Types that could send a message.
-pub trait MsgSender<M: MsgOnSocket>: AsRef<UnixDatagram> {
+pub trait MsgSender<M: MsgOnSocket>: AsRef<UnixSeqpacket> {
     fn send(&self, msg: &M) -> MsgResult<()> {
         let msg_size = M::msg_size();
         let fd_size = M::max_fd_count();
@@ -163,7 +140,7 @@ pub trait MsgSender<M: MsgOnSocket>: AsRef<UnixDatagram> {
         let mut fd_buffer: Vec<RawFd> = vec![0; fd_size];
 
         let fd_size = msg.write_to_buffer(&mut msg_buffer, &mut fd_buffer)?;
-        let sock: &UnixDatagram = self.as_ref();
+        let sock: &UnixSeqpacket = self.as_ref();
         if fd_size == 0 {
             handle_eintr!(sock.send(&msg_buffer))
                 .map_err(|e| MsgError::Send(SysError::new(e.raw_os_error().unwrap_or(0))))?;
@@ -176,14 +153,14 @@ pub trait MsgSender<M: MsgOnSocket>: AsRef<UnixDatagram> {
 }
 
 /// Types that could receive a message.
-pub trait MsgReceiver<M: MsgOnSocket>: AsRef<UnixDatagram> {
+pub trait MsgReceiver<M: MsgOnSocket>: AsRef<UnixSeqpacket> {
     fn recv(&self) -> MsgResult<M> {
         let msg_size = M::msg_size();
         let fd_size = M::max_fd_count();
         let mut msg_buffer: Vec<u8> = vec![0; msg_size];
         let mut fd_buffer: Vec<RawFd> = vec![0; fd_size];
 
-        let sock: &UnixDatagram = self.as_ref();
+        let sock: &UnixSeqpacket = self.as_ref();
 
         let (recv_msg_size, recv_fd_size) = {
             if fd_size == 0 {
@@ -197,7 +174,10 @@ pub trait MsgReceiver<M: MsgOnSocket>: AsRef<UnixDatagram> {
             }
         };
         if msg_size != recv_msg_size {
-            return Err(MsgError::BadRecvSize(msg_size));
+            return Err(MsgError::BadRecvSize {
+                expected: msg_size,
+                actual: recv_msg_size,
+            });
         }
         // Safe because fd buffer is read from socket.
         let (v, read_fd_size) = unsafe {
@@ -213,8 +193,5 @@ pub trait MsgReceiver<M: MsgOnSocket>: AsRef<UnixDatagram> {
 impl<I: MsgOnSocket, O: MsgOnSocket> MsgSender<I> for MsgSocket<I, O> {}
 impl<I: MsgOnSocket, O: MsgOnSocket> MsgReceiver<O> for MsgSocket<I, O> {}
 
-impl<I: MsgOnSocket, O: MsgOnSocket> MsgSender<I> for UnlinkMsgSocket<I, O> {}
-impl<I: MsgOnSocket, O: MsgOnSocket> MsgReceiver<O> for UnlinkMsgSocket<I, O> {}
-
 impl<M: MsgOnSocket> MsgSender<M> for Sender<M> {}
 impl<M: MsgOnSocket> MsgReceiver<M> for Receiver<M> {}
diff --git a/msg_socket/src/msg_on_socket.rs b/msg_socket/src/msg_on_socket.rs
index 493c57c..8b01850 100644
--- a/msg_socket/src/msg_on_socket.rs
+++ b/msg_socket/src/msg_on_socket.rs
@@ -23,8 +23,8 @@ pub enum MsgError {
     /// The type of a received request or response is unknown.
     InvalidType,
     /// There was not the expected amount of data when receiving a message. The inner
-    /// value is how much data is needed.
-    BadRecvSize(usize),
+    /// value is how much data is expected and how much data was actually received.
+    BadRecvSize { expected: usize, actual: usize },
     /// There was no associated file descriptor received for a request that expected it.
     ExpectFd,
     /// There was some associated file descriptor received but not used when deserialize.
@@ -47,7 +47,11 @@ impl Display for MsgError {
             Send(e) => write!(f, "failed to send request or response: {}", e),
             Recv(e) => write!(f, "failed to receive request or response: {}", e),
             InvalidType => write!(f, "invalid type"),
-            BadRecvSize(n) => write!(f, "wrong amount of data received; expected {} bytes", n),
+            BadRecvSize { expected, actual } => write!(
+                f,
+                "wrong amount of data received; expected {} bytes; got {} bytes",
+                expected, actual
+            ),
             ExpectFd => write!(f, "missing associated file descriptor for request"),
             NotExpectFd => write!(f, "unexpected file descriptor is unused"),
             WrongFdBufferSize => write!(f, "fd buffer size too small"),
diff --git a/seccomp/arm/common_device.policy b/seccomp/arm/common_device.policy
index 9ccf48b..d2b5a6b 100644
--- a/seccomp/arm/common_device.policy
+++ b/seccomp/arm/common_device.policy
@@ -32,11 +32,13 @@ prctl: arg0 == PR_SET_NAME
 read: 1
 recv: 1
 recvfrom: 1
+recvmsg: 1
 restart_syscall: 1
 rt_sigaction: 1
 rt_sigprocmask: 1
 rt_sigreturn: 1
 sched_getaffinity: 1
+sendmsg: 1
 set_robust_list: 1
 sigaltstack: 1
 write: 1
diff --git a/seccomp/x86_64/common_device.policy b/seccomp/x86_64/common_device.policy
index 7fa6e52..2379b95 100644
--- a/seccomp/x86_64/common_device.policy
+++ b/seccomp/x86_64/common_device.policy
@@ -31,11 +31,13 @@ ppoll: 1
 prctl: arg0 == PR_SET_NAME
 read: 1
 recvfrom: 1
+recvmsg: 1
 restart_syscall: 1
 rt_sigaction: 1
 rt_sigprocmask: 1
 rt_sigreturn: 1
 sched_getaffinity: 1
+sendmsg: 1
 set_robust_list: 1
 sigaltstack: 1
 write: 1
diff --git a/src/linux.rs b/src/linux.rs
index 20ef297..2fdda09 100644
--- a/src/linux.rs
+++ b/src/linux.rs
@@ -11,7 +11,7 @@ use std::fs::{File, OpenOptions};
 use std::io::{self, stdin, Read};
 use std::mem;
 use std::os::unix::io::{FromRawFd, RawFd};
-use std::os::unix::net::{UnixDatagram, UnixStream};
+use std::os::unix::net::UnixStream;
 use std::path::{Path, PathBuf};
 use std::str;
 use std::sync::{Arc, Barrier};
@@ -27,13 +27,17 @@ use devices::{self, PciDevice, VirtioPciDevice};
 use io_jail::{self, Minijail};
 use kvm::*;
 use libcras::CrasClient;
-use msg_socket::{MsgReceiver, MsgSender, MsgSocket, UnlinkMsgSocket};
+use msg_socket::{MsgError, MsgReceiver, MsgSender, MsgSocket};
 use net_util::{Error as NetError, Tap};
 use qcow::{self, ImageType, QcowFile};
 use rand_ish::SimpleRng;
 use sync::{Condvar, Mutex};
-use sys_util;
-use sys_util::*;
+use sys_util::net::{UnixSeqpacket, UnixSeqpacketListener, UnlinkUnixSeqpacketListener};
+use sys_util::{
+    self, block_signal, clear_signal, flock, get_blocked_signals, get_group_id, get_user_id,
+    getegid, geteuid, register_signal_handler, validate_raw_fd, EventFd, FlockOperation,
+    GuestMemory, Killable, PollContext, PollToken, SignalFd, Terminal, TimerFd, SIGRTMIN,
+};
 use vhost;
 use vm_control::{VmRequest, VmResponse, VmRunMode};
 
@@ -223,9 +227,9 @@ fn create_virtio_devs(
     cfg: Config,
     mem: &GuestMemory,
     _exit_evt: &EventFd,
-    wayland_device_socket: UnixDatagram,
-    balloon_device_socket: UnixDatagram,
-    disk_device_sockets: &mut Vec<UnixDatagram>,
+    wayland_device_socket: UnixSeqpacket,
+    balloon_device_socket: UnixSeqpacket,
+    disk_device_sockets: &mut Vec<UnixSeqpacket>,
 ) -> std::result::Result<Vec<(Box<PciDevice + 'static>, Option<Minijail>)>, Box<error::Error>> {
     let default_pivot_root: &str = option_env!("DEFAULT_PIVOT_ROOT").unwrap_or("/var/empty");
 
@@ -998,22 +1002,20 @@ pub fn run_config(cfg: Config) -> Result<()> {
         wayland_dmabuf: cfg.wayland_dmabuf,
     };
 
-    let mut control_sockets = Vec::new();
-    if let Some(ref path_string) = cfg.socket_path {
-        let path = Path::new(path_string);
-        let dgram = UnixDatagram::bind(path).map_err(Error::CreateSocket)?;
-        control_sockets.push(UnlinkMsgSocket::<VmResponse, VmRequest>::new(
-            UnlinkUnixDatagram(dgram),
-        ));
+    let control_server_socket = match &cfg.socket_path {
+        Some(path) => Some(UnlinkUnixSeqpacketListener(
+            UnixSeqpacketListener::bind(path).map_err(Error::CreateSocket)?,
+        )),
+        None => None,
     };
+
+    let mut control_sockets = Vec::new();
     let (wayland_host_socket, wayland_device_socket) =
-        UnixDatagram::pair().map_err(Error::CreateSocket)?;
-    control_sockets.push(UnlinkMsgSocket::<VmResponse, VmRequest>::new(
-        UnlinkUnixDatagram(wayland_host_socket),
-    ));
+        UnixSeqpacket::pair().map_err(Error::CreateSocket)?;
+    control_sockets.push(MsgSocket::<VmResponse, VmRequest>::new(wayland_host_socket));
     // Balloon gets a special socket so balloon requests can be forwarded from the main process.
     let (balloon_host_socket, balloon_device_socket) =
-        UnixDatagram::pair().map_err(Error::CreateSocket)?;
+        UnixSeqpacket::pair().map_err(Error::CreateSocket)?;
 
     // Create one control socket per disk.
     let mut disk_device_sockets = Vec::new();
@@ -1021,7 +1023,7 @@ pub fn run_config(cfg: Config) -> Result<()> {
     let disk_count = cfg.disks.len();
     for _ in 0..disk_count {
         let (disk_host_socket, disk_device_socket) =
-            UnixDatagram::pair().map_err(Error::CreateSocket)?;
+            UnixSeqpacket::pair().map_err(Error::CreateSocket)?;
         disk_device_sockets.push(disk_device_socket);
         let disk_host_socket = MsgSocket::<VmRequest, VmResponse>::new(disk_host_socket);
         disk_host_sockets.push(disk_host_socket);
@@ -1040,6 +1042,7 @@ pub fn run_config(cfg: Config) -> Result<()> {
     .map_err(Error::BuildingVm)?;
     run_control(
         linux,
+        control_server_socket,
         control_sockets,
         balloon_host_socket,
         &disk_host_sockets,
@@ -1049,8 +1052,9 @@ pub fn run_config(cfg: Config) -> Result<()> {
 
 fn run_control(
     mut linux: RunnableLinuxVm,
-    control_sockets: Vec<UnlinkMsgSocket<VmResponse, VmRequest>>,
-    balloon_host_socket: UnixDatagram,
+    control_server_socket: Option<UnlinkUnixSeqpacketListener>,
+    mut control_sockets: Vec<MsgSocket<VmResponse, VmRequest>>,
+    balloon_host_socket: UnixSeqpacket,
     disk_host_sockets: &[MsgSocket<VmRequest, VmResponse>],
     sigchld_fd: SignalFd,
 ) -> Result<()> {
@@ -1082,6 +1086,7 @@ fn run_control(
         CheckAvailableMemory,
         LowMemory,
         LowmemTimer,
+        VmControlServer,
         VmControl { index: usize },
     }
 
@@ -1101,6 +1106,12 @@ fn run_control(
     poll_ctx
         .add(&sigchld_fd, Token::ChildSignal)
         .map_err(Error::PollContextAdd)?;
+
+    if let Some(socket_server) = &control_server_socket {
+        poll_ctx
+            .add(socket_server, Token::VmControlServer)
+            .map_err(Error::PollContextAdd)?;
+    }
     for (index, socket) in control_sockets.iter().enumerate() {
         poll_ctx
             .add(socket.as_ref(), Token::VmControl { index })
@@ -1167,6 +1178,8 @@ fn run_control(
                 }
             }
         };
+
+        let mut vm_control_indices_to_remove = Vec::new();
         for event in events.iter_readable() {
             match event.token() {
                 Token::Exit => {
@@ -1286,6 +1299,24 @@ fn run_control(
                             .map_err(Error::PollContextAdd)?;
                     }
                 }
+                Token::VmControlServer => {
+                    if let Some(socket_server) = &control_server_socket {
+                        match socket_server.accept() {
+                            Ok(socket) => {
+                                poll_ctx
+                                    .add(
+                                        &socket,
+                                        Token::VmControl {
+                                            index: control_sockets.len(),
+                                        },
+                                    )
+                                    .map_err(Error::PollContextAdd)?;
+                                control_sockets.push(MsgSocket::new(socket));
+                            }
+                            Err(e) => error!("failed to accept socket: {}", e),
+                        }
+                    }
+                }
                 Token::VmControl { index } => {
                     if let Some(socket) = control_sockets.get(index) {
                         match socket.recv() {
@@ -1316,34 +1347,55 @@ fn run_control(
                                     }
                                 }
                             }
-                            Err(e) => error!("failed to recv VmRequest: {}", e),
+                            Err(e) => {
+                                if let MsgError::BadRecvSize { actual: 0, .. } = e {
+                                    vm_control_indices_to_remove.push(index);
+                                } else {
+                                    error!("failed to recv VmRequest: {}", e);
+                                }
+                            }
                         }
                     }
                 }
             }
         }
+
         for event in events.iter_hungup() {
-            // It's possible more data is readable and buffered while the socket is hungup, so
-            // don't delete the socket from the poll context until we're sure all the data is
-            // read.
-            if !event.readable() {
-                match event.token() {
-                    Token::Exit => {}
-                    Token::Stdin => {
-                        let _ = poll_ctx.delete(&stdin_handle);
-                    }
-                    Token::ChildSignal => {}
-                    Token::CheckAvailableMemory => {}
-                    Token::LowMemory => {}
-                    Token::LowmemTimer => {}
-                    Token::VmControl { index } => {
-                        if let Some(socket) = control_sockets.get(index) {
-                            let _ = poll_ctx.delete(socket.as_ref());
-                        }
+            match event.token() {
+                Token::Exit => {}
+                Token::Stdin => {
+                    let _ = poll_ctx.delete(&stdin_handle);
+                }
+                Token::ChildSignal => {}
+                Token::CheckAvailableMemory => {}
+                Token::LowMemory => {}
+                Token::LowmemTimer => {}
+                Token::VmControlServer => {}
+                Token::VmControl { index } => {
+                    // It's possible more data is readable and buffered while the socket is hungup,
+                    // so don't delete the socket from the poll context until we're sure all the
+                    // data is read.
+                    match control_sockets.get(index).map(|s| s.get_readable_bytes()) {
+                        Some(Ok(0)) | Some(Err(_)) => vm_control_indices_to_remove.push(index),
+                        Some(Ok(x)) => info!("control index {} has {} bytes readable", index, x),
+                        _ => {}
                     }
                 }
             }
         }
+
+        // Sort in reverse so the highest indexes are removed first. This removal algorithm
+        // preserved correct indexes as each element is removed.
+        vm_control_indices_to_remove.sort_unstable_by(|a, b| b.cmp(a));
+        vm_control_indices_to_remove.dedup();
+        for index in vm_control_indices_to_remove {
+            control_sockets.swap_remove(index);
+            if let Some(socket) = control_sockets.get(index) {
+                poll_ctx
+                    .add(socket, Token::VmControl { index })
+                    .map_err(Error::PollContextAdd)?;
+            }
+        }
     }
 
     // VCPU threads MUST see the VmRunMode flag, otherwise they may re-enter the VM.
diff --git a/src/main.rs b/src/main.rs
index 4965deb..32380b7 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -46,14 +46,13 @@ pub mod plugin;
 use std::fs::OpenOptions;
 use std::net;
 use std::os::unix::io::RawFd;
-use std::os::unix::net::UnixDatagram;
 use std::path::PathBuf;
 use std::string::String;
 use std::thread::sleep;
 use std::time::Duration;
 
 use qcow::QcowFile;
-use sys_util::{getpid, kill_process_group, reap_child, syslog};
+use sys_util::{getpid, kill_process_group, net::UnixSeqpacket, reap_child, syslog};
 
 use argument::{print_help, set_arguments, Argument};
 use msg_socket::{MsgSender, Sender};
@@ -723,10 +722,7 @@ fn vms_request(
 
     let mut return_result = Ok(());
     for socket_path in args {
-        match UnixDatagram::unbound().and_then(|s| {
-            s.connect(&socket_path)?;
-            Ok(s)
-        }) {
+        match UnixSeqpacket::connect(&socket_path) {
             Ok(s) => {
                 let sender = Sender::<VmRequest>::new(s);
                 if let Err(e) = sender.send(request) {
@@ -788,10 +784,7 @@ fn balloon_vms(mut args: std::env::Args) -> std::result::Result<(), ()> {
 
     let mut return_result = Ok(());
     for socket_path in args {
-        match UnixDatagram::unbound().and_then(|s| {
-            s.connect(&socket_path)?;
-            Ok(s)
-        }) {
+        match UnixSeqpacket::connect(&socket_path) {
             Ok(s) => {
                 let sender = Sender::<VmRequest>::new(s);
                 if let Err(e) = sender.send(&VmRequest::BalloonAdjust(num_bytes)) {
@@ -881,10 +874,7 @@ fn disk_cmd(mut args: std::env::Args) -> std::result::Result<(), ()> {
 
     let mut return_result = Ok(());
     for socket_path in args {
-        match UnixDatagram::unbound().and_then(|s| {
-            s.connect(&socket_path)?;
-            Ok(s)
-        }) {
+        match UnixSeqpacket::connect(&socket_path) {
             Ok(s) => {
                 let sender = Sender::<VmRequest>::new(s);
                 if let Err(e) = sender.send(&request) {
diff --git a/vm_control/src/lib.rs b/vm_control/src/lib.rs
index 514a964..f0fc209 100644
--- a/vm_control/src/lib.rs
+++ b/vm_control/src/lib.rs
@@ -22,7 +22,6 @@ use std::fmt::{self, Display};
 use std::fs::File;
 use std::io::{Seek, SeekFrom};
 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
-use std::os::unix::net::UnixDatagram;
 
 use libc::{EINVAL, ENODEV};
 
@@ -30,7 +29,9 @@ use byteorder::{LittleEndian, WriteBytesExt};
 use kvm::{Datamatch, IoeventAddress, Vm};
 use msg_socket::{MsgOnSocket, MsgReceiver, MsgResult, MsgSender, MsgSocket};
 use resources::{GpuMemoryDesc, SystemAllocator};
-use sys_util::{Error as SysError, EventFd, GuestAddress, MemoryMapping, MmapError, Result};
+use sys_util::{
+    net::UnixSeqpacket, Error as SysError, EventFd, GuestAddress, MemoryMapping, MmapError, Result,
+};
 
 /// A file descriptor either borrowed or owned by this.
 pub enum MaybeOwnedFd {
@@ -170,7 +171,7 @@ impl VmRequest {
         vm: &mut Vm,
         sys_allocator: &mut SystemAllocator,
         run_mode: &mut Option<VmRunMode>,
-        balloon_host_socket: &UnixDatagram,
+        balloon_host_socket: &UnixSeqpacket,
         disk_host_sockets: &[MsgSocket<VmRequest, VmResponse>],
     ) -> VmResponse {
         match *self {