summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--devices/src/proxy.rs8
-rw-r--r--devices/src/virtio/balloon.rs11
-rw-r--r--devices/src/virtio/block.rs11
-rw-r--r--devices/src/virtio/wl.rs13
-rw-r--r--msg_socket/src/lib.rs77
-rw-r--r--msg_socket/src/msg_on_socket.rs10
-rw-r--r--seccomp/arm/common_device.policy2
-rw-r--r--seccomp/x86_64/common_device.policy2
-rw-r--r--src/linux.rs132
-rw-r--r--src/main.rs18
-rw-r--r--vm_control/src/lib.rs7
11 files changed, 160 insertions, 131 deletions
diff --git a/devices/src/proxy.rs b/devices/src/proxy.rs
index 4bc3405..a91d2ed 100644
--- a/devices/src/proxy.rs
+++ b/devices/src/proxy.rs
@@ -7,14 +7,14 @@
 use libc::pid_t;
 
 use std::os::unix::io::{AsRawFd, RawFd};
-use std::os::unix::net::UnixDatagram;
 use std::process;
 use std::time::Duration;
 use std::{self, fmt, io};
 
+use io_jail::{self, Minijail};
 use msg_socket::{MsgOnSocket, MsgReceiver, MsgSender, MsgSocket};
+use sys_util::net::UnixSeqpacket;
 
-use io_jail::{self, Minijail};
 use BusDevice;
 
 /// Errors for proxy devices.
@@ -64,7 +64,7 @@ enum CommandResult {
     ReadConfigResult(u32),
 }
 
-fn child_proc(sock: UnixDatagram, device: &mut BusDevice) {
+fn child_proc(sock: UnixSeqpacket, device: &mut BusDevice) {
     let mut running = true;
     let sock = MsgSocket::<CommandResult, Command>::new(sock);
 
@@ -138,7 +138,7 @@ impl ProxyDevice {
         mut keep_fds: Vec<RawFd>,
     ) -> Result<ProxyDevice> {
         let debug_label = device.debug_label();
-        let (child_sock, parent_sock) = UnixDatagram::pair().map_err(Error::Io)?;
+        let (child_sock, parent_sock) = UnixSeqpacket::pair().map_err(Error::Io)?;
 
         keep_fds.push(child_sock.as_raw_fd());
         // Forking here is safe as long as the program is still single threaded.
diff --git a/devices/src/virtio/balloon.rs b/devices/src/virtio/balloon.rs
index 1dfb72c..e9f36f6 100644
--- a/devices/src/virtio/balloon.rs
+++ b/devices/src/virtio/balloon.rs
@@ -8,13 +8,14 @@ use std::fmt::{self, Display};
 use std::io::Write;
 use std::mem;
 use std::os::unix::io::{AsRawFd, RawFd};
-use std::os::unix::net::UnixDatagram;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
 use std::thread;
 
 use byteorder::{ByteOrder, LittleEndian, ReadBytesExt, WriteBytesExt};
-use sys_util::{self, EventFd, GuestAddress, GuestMemory, PollContext, PollToken};
+use sys_util::{
+    self, net::UnixSeqpacket, EventFd, GuestAddress, GuestMemory, PollContext, PollToken,
+};
 
 use super::{
     DescriptorChain, Queue, VirtioDevice, INTERRUPT_STATUS_CONFIG_CHANGED,
@@ -67,7 +68,7 @@ struct Worker {
     interrupt_evt: EventFd,
     interrupt_resample_evt: EventFd,
     config: Arc<BalloonConfig>,
-    command_socket: UnixDatagram,
+    command_socket: UnixSeqpacket,
 }
 
 fn valid_inflate_desc(desc: &DescriptorChain) -> bool {
@@ -230,7 +231,7 @@ impl Worker {
 
 /// Virtio device for memory balloon inflation/deflation.
 pub struct Balloon {
-    command_socket: Option<UnixDatagram>,
+    command_socket: Option<UnixSeqpacket>,
     config: Arc<BalloonConfig>,
     features: u64,
     kill_evt: Option<EventFd>,
@@ -238,7 +239,7 @@ pub struct Balloon {
 
 impl Balloon {
     /// Create a new virtio balloon device.
-    pub fn new(command_socket: UnixDatagram) -> Result<Balloon> {
+    pub fn new(command_socket: UnixSeqpacket) -> Result<Balloon> {
         Ok(Balloon {
             command_socket: Some(command_socket),
             config: Arc::new(BalloonConfig {
diff --git a/devices/src/virtio/block.rs b/devices/src/virtio/block.rs
index 9dcf828..3c5d2fd 100644
--- a/devices/src/virtio/block.rs
+++ b/devices/src/virtio/block.rs
@@ -7,7 +7,6 @@ use std::fmt::{self, Display};
 use std::io::{self, Read, Seek, SeekFrom, Write};
 use std::mem::{size_of, size_of_val};
 use std::os::unix::io::{AsRawFd, RawFd};
-use std::os::unix::net::UnixDatagram;
 use std::result;
 use std::sync::atomic::{AtomicUsize, Ordering};
 use std::sync::Arc;
@@ -19,8 +18,8 @@ use sync::Mutex;
 use sys_util::Error as SysError;
 use sys_util::Result as SysResult;
 use sys_util::{
-    EventFd, FileSetLen, FileSync, GuestAddress, GuestMemory, GuestMemoryError, PollContext,
-    PollToken, PunchHole, TimerFd, WriteZeroes,
+    net::UnixSeqpacket, EventFd, FileSetLen, FileSync, GuestAddress, GuestMemory, GuestMemoryError,
+    PollContext, PollToken, PunchHole, TimerFd, WriteZeroes,
 };
 
 use data_model::{DataInit, Le16, Le32, Le64};
@@ -695,7 +694,7 @@ impl<T: DiskFile> Worker<T> {
         self.interrupt_evt.write(1).unwrap();
     }
 
-    fn run(&mut self, queue_evt: EventFd, kill_evt: EventFd, control_socket: UnixDatagram) {
+    fn run(&mut self, queue_evt: EventFd, kill_evt: EventFd, control_socket: UnixSeqpacket) {
         #[derive(PollToken)]
         enum Token {
             FlushTimer,
@@ -819,7 +818,7 @@ pub struct Block<T: DiskFile> {
     disk_size: Arc<Mutex<u64>>,
     avail_features: u64,
     read_only: bool,
-    control_socket: Option<UnixDatagram>,
+    control_socket: Option<UnixSeqpacket>,
 }
 
 fn build_config_space(disk_size: u64) -> virtio_blk_config {
@@ -844,7 +843,7 @@ impl<T: DiskFile> Block<T> {
     pub fn new(
         mut disk_image: T,
         read_only: bool,
-        control_socket: Option<UnixDatagram>,
+        control_socket: Option<UnixSeqpacket>,
     ) -> SysResult<Block<T>> {
         let disk_size = disk_image.seek(SeekFrom::End(0))? as u64;
         if disk_size % SECTOR_SIZE != 0 {
diff --git a/devices/src/virtio/wl.rs b/devices/src/virtio/wl.rs
index 4d43825..0220d42 100644
--- a/devices/src/virtio/wl.rs
+++ b/devices/src/virtio/wl.rs
@@ -41,7 +41,7 @@ use std::mem::{size_of, size_of_val};
 #[cfg(feature = "wl-dmabuf")]
 use std::os::raw::{c_uint, c_ulonglong};
 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
-use std::os::unix::net::{UnixDatagram, UnixStream};
+use std::os::unix::net::UnixStream;
 use std::path::{Path, PathBuf};
 use std::rc::Rc;
 use std::result;
@@ -59,6 +59,7 @@ use data_model::*;
 use msg_socket::{MsgError, MsgReceiver, MsgSender, MsgSocket};
 #[cfg(feature = "wl-dmabuf")]
 use resources::GpuMemoryDesc;
+use sys_util::net::UnixSeqpacket;
 use sys_util::{
     pipe, round_up_to_page_size, Error, EventFd, FileFlags, GuestAddress, GuestMemory,
     GuestMemoryError, PollContext, PollToken, Result, ScmSocket, SharedMemory,
@@ -490,7 +491,7 @@ struct VmRequester {
 }
 
 impl VmRequester {
-    fn new(vm_socket: UnixDatagram) -> VmRequester {
+    fn new(vm_socket: UnixSeqpacket) -> VmRequester {
         VmRequester {
             inner: Rc::new(RefCell::new(MsgSocket::<VmRequest, VmResponse>::new(
                 vm_socket,
@@ -1004,7 +1005,7 @@ struct WlState {
 impl WlState {
     fn new(
         wayland_path: PathBuf,
-        vm_socket: UnixDatagram,
+        vm_socket: UnixSeqpacket,
         use_transition_flags: bool,
         resource_bridge: Option<ResourceRequestSocket>,
     ) -> WlState {
@@ -1488,7 +1489,7 @@ impl Worker {
         in_queue: Queue,
         out_queue: Queue,
         wayland_path: PathBuf,
-        vm_socket: UnixDatagram,
+        vm_socket: UnixSeqpacket,
         use_transition_flags: bool,
         resource_bridge: Option<ResourceRequestSocket>,
     ) -> Worker {
@@ -1678,7 +1679,7 @@ impl Worker {
 pub struct Wl {
     kill_evt: Option<EventFd>,
     wayland_path: PathBuf,
-    vm_socket: Option<UnixDatagram>,
+    vm_socket: Option<UnixSeqpacket>,
     resource_bridge: Option<ResourceRequestSocket>,
     use_transition_flags: bool,
 }
@@ -1686,7 +1687,7 @@ pub struct Wl {
 impl Wl {
     pub fn new<P: AsRef<Path>>(
         wayland_path: P,
-        vm_socket: UnixDatagram,
+        vm_socket: UnixSeqpacket,
         resource_bridge: Option<ResourceRequestSocket>,
     ) -> Result<Wl> {
         Ok(Wl {
diff --git a/msg_socket/src/lib.rs b/msg_socket/src/lib.rs
index cc2cdc9..fd1f2bc 100644
--- a/msg_socket/src/lib.rs
+++ b/msg_socket/src/lib.rs
@@ -13,9 +13,9 @@ mod msg_on_socket;
 
 use std::io::Result;
 use std::marker::PhantomData;
+use std::ops::Deref;
 use std::os::unix::io::{AsRawFd, RawFd};
-use std::os::unix::net::UnixDatagram;
-use sys_util::{Error as SysError, ScmSocket, UnlinkUnixDatagram};
+use sys_util::{net::UnixSeqpacket, Error as SysError, ScmSocket};
 
 pub use msg_on_socket::*;
 pub use msg_on_socket_derive::*;
@@ -24,7 +24,7 @@ pub use msg_on_socket_derive::*;
 /// direction.
 pub fn pair<Request: MsgOnSocket, Response: MsgOnSocket>(
 ) -> Result<(MsgSocket<Request, Response>, MsgSocket<Response, Request>)> {
-    let (sock1, sock2) = UnixDatagram::pair()?;
+    let (sock1, sock2) = UnixSeqpacket::pair()?;
     let requester = MsgSocket {
         sock: sock1,
         _i: PhantomData,
@@ -40,14 +40,14 @@ pub fn pair<Request: MsgOnSocket, Response: MsgOnSocket>(
 
 /// Bidirection sock that support both send and recv.
 pub struct MsgSocket<I: MsgOnSocket, O: MsgOnSocket> {
-    sock: UnixDatagram,
+    sock: UnixSeqpacket,
     _i: PhantomData<I>,
     _o: PhantomData<O>,
 }
 
 impl<I: MsgOnSocket, O: MsgOnSocket> MsgSocket<I, O> {
     // Create a new MsgSocket.
-    pub fn new(s: UnixDatagram) -> MsgSocket<I, O> {
+    pub fn new(s: UnixSeqpacket) -> MsgSocket<I, O> {
         MsgSocket {
             sock: s,
             _i: PhantomData,
@@ -56,33 +56,22 @@ impl<I: MsgOnSocket, O: MsgOnSocket> MsgSocket<I, O> {
     }
 }
 
-/// Bidirection sock that support both send and recv.
-pub struct UnlinkMsgSocket<I: MsgOnSocket, O: MsgOnSocket> {
-    sock: UnlinkUnixDatagram,
-    _i: PhantomData<I>,
-    _o: PhantomData<O>,
-}
-
-impl<I: MsgOnSocket, O: MsgOnSocket> UnlinkMsgSocket<I, O> {
-    // Create a new MsgSocket.
-    pub fn new(s: UnlinkUnixDatagram) -> UnlinkMsgSocket<I, O> {
-        UnlinkMsgSocket {
-            sock: s,
-            _i: PhantomData,
-            _o: PhantomData,
-        }
+impl<I: MsgOnSocket, O: MsgOnSocket> Deref for MsgSocket<I, O> {
+    type Target = UnixSeqpacket;
+    fn deref(&self) -> &Self::Target {
+        &self.sock
     }
 }
 
 /// One direction socket that only supports sending.
 pub struct Sender<M: MsgOnSocket> {
-    sock: UnixDatagram,
+    sock: UnixSeqpacket,
     _m: PhantomData<M>,
 }
 
 impl<M: MsgOnSocket> Sender<M> {
     /// Create a new sender sock.
-    pub fn new(s: UnixDatagram) -> Sender<M> {
+    pub fn new(s: UnixSeqpacket) -> Sender<M> {
         Sender {
             sock: s,
             _m: PhantomData,
@@ -92,13 +81,13 @@ impl<M: MsgOnSocket> Sender<M> {
 
 /// One direction socket that only supports receiving.
 pub struct Receiver<M: MsgOnSocket> {
-    sock: UnixDatagram,
+    sock: UnixSeqpacket,
     _m: PhantomData<M>,
 }
 
 impl<M: MsgOnSocket> Receiver<M> {
     /// Create a new receiver sock.
-    pub fn new(s: UnixDatagram) -> Receiver<M> {
+    pub fn new(s: UnixSeqpacket) -> Receiver<M> {
         Receiver {
             sock: s,
             _m: PhantomData,
@@ -106,8 +95,8 @@ impl<M: MsgOnSocket> Receiver<M> {
     }
 }
 
-impl<I: MsgOnSocket, O: MsgOnSocket> AsRef<UnixDatagram> for MsgSocket<I, O> {
-    fn as_ref(&self) -> &UnixDatagram {
+impl<I: MsgOnSocket, O: MsgOnSocket> AsRef<UnixSeqpacket> for MsgSocket<I, O> {
+    fn as_ref(&self) -> &UnixSeqpacket {
         &self.sock
     }
 }
@@ -118,20 +107,8 @@ impl<I: MsgOnSocket, O: MsgOnSocket> AsRawFd for MsgSocket<I, O> {
     }
 }
 
-impl<I: MsgOnSocket, O: MsgOnSocket> AsRef<UnixDatagram> for UnlinkMsgSocket<I, O> {
-    fn as_ref(&self) -> &UnixDatagram {
-        self.sock.as_ref()
-    }
-}
-
-impl<I: MsgOnSocket, O: MsgOnSocket> AsRawFd for UnlinkMsgSocket<I, O> {
-    fn as_raw_fd(&self) -> RawFd {
-        self.as_ref().as_raw_fd()
-    }
-}
-
-impl<M: MsgOnSocket> AsRef<UnixDatagram> for Sender<M> {
-    fn as_ref(&self) -> &UnixDatagram {
+impl<M: MsgOnSocket> AsRef<UnixSeqpacket> for Sender<M> {
+    fn as_ref(&self) -> &UnixSeqpacket {
         &self.sock
     }
 }
@@ -142,8 +119,8 @@ impl<M: MsgOnSocket> AsRawFd for Sender<M> {
     }
 }
 
-impl<M: MsgOnSocket> AsRef<UnixDatagram> for Receiver<M> {
-    fn as_ref(&self) -> &UnixDatagram {
+impl<M: MsgOnSocket> AsRef<UnixSeqpacket> for Receiver<M> {
+    fn as_ref(&self) -> &UnixSeqpacket {
         &self.sock
     }
 }
@@ -155,7 +132,7 @@ impl<M: MsgOnSocket> AsRawFd for Receiver<M> {
 }
 
 /// Types that could send a message.
-pub trait MsgSender<M: MsgOnSocket>: AsRef<UnixDatagram> {
+pub trait MsgSender<M: MsgOnSocket>: AsRef<UnixSeqpacket> {
     fn send(&self, msg: &M) -> MsgResult<()> {
         let msg_size = M::msg_size();
         let fd_size = M::max_fd_count();
@@ -163,7 +140,7 @@ pub trait MsgSender<M: MsgOnSocket>: AsRef<UnixDatagram> {
         let mut fd_buffer: Vec<RawFd> = vec![0; fd_size];
 
         let fd_size = msg.write_to_buffer(&mut msg_buffer, &mut fd_buffer)?;
-        let sock: &UnixDatagram = self.as_ref();
+        let sock: &UnixSeqpacket = self.as_ref();
         if fd_size == 0 {
             handle_eintr!(sock.send(&msg_buffer))
                 .map_err(|e| MsgError::Send(SysError::new(e.raw_os_error().unwrap_or(0))))?;
@@ -176,14 +153,14 @@ pub trait MsgSender<M: MsgOnSocket>: AsRef<UnixDatagram> {
 }
 
 /// Types that could receive a message.
-pub trait MsgReceiver<M: MsgOnSocket>: AsRef<UnixDatagram> {
+pub trait MsgReceiver<M: MsgOnSocket>: AsRef<UnixSeqpacket> {
     fn recv(&self) -> MsgResult<M> {
         let msg_size = M::msg_size();
         let fd_size = M::max_fd_count();
         let mut msg_buffer: Vec<u8> = vec![0; msg_size];
         let mut fd_buffer: Vec<RawFd> = vec![0; fd_size];
 
-        let sock: &UnixDatagram = self.as_ref();
+        let sock: &UnixSeqpacket = self.as_ref();
 
         let (recv_msg_size, recv_fd_size) = {
             if fd_size == 0 {
@@ -197,7 +174,10 @@ pub trait MsgReceiver<M: MsgOnSocket>: AsRef<UnixDatagram> {
             }
         };
         if msg_size != recv_msg_size {
-            return Err(MsgError::BadRecvSize(msg_size));
+            return Err(MsgError::BadRecvSize {
+                expected: msg_size,
+                actual: recv_msg_size,
+            });
         }
         // Safe because fd buffer is read from socket.
         let (v, read_fd_size) = unsafe {
@@ -213,8 +193,5 @@ pub trait MsgReceiver<M: MsgOnSocket>: AsRef<UnixDatagram> {
 impl<I: MsgOnSocket, O: MsgOnSocket> MsgSender<I> for MsgSocket<I, O> {}
 impl<I: MsgOnSocket, O: MsgOnSocket> MsgReceiver<O> for MsgSocket<I, O> {}
 
-impl<I: MsgOnSocket, O: MsgOnSocket> MsgSender<I> for UnlinkMsgSocket<I, O> {}
-impl<I: MsgOnSocket, O: MsgOnSocket> MsgReceiver<O> for UnlinkMsgSocket<I, O> {}
-
 impl<M: MsgOnSocket> MsgSender<M> for Sender<M> {}
 impl<M: MsgOnSocket> MsgReceiver<M> for Receiver<M> {}
diff --git a/msg_socket/src/msg_on_socket.rs b/msg_socket/src/msg_on_socket.rs
index 493c57c..8b01850 100644
--- a/msg_socket/src/msg_on_socket.rs
+++ b/msg_socket/src/msg_on_socket.rs
@@ -23,8 +23,8 @@ pub enum MsgError {
     /// The type of a received request or response is unknown.
     InvalidType,
     /// There was not the expected amount of data when receiving a message. The inner
-    /// value is how much data is needed.
-    BadRecvSize(usize),
+    /// value is how much data is expected and how much data was actually received.
+    BadRecvSize { expected: usize, actual: usize },
     /// There was no associated file descriptor received for a request that expected it.
     ExpectFd,
     /// There was some associated file descriptor received but not used when deserialize.
@@ -47,7 +47,11 @@ impl Display for MsgError {
             Send(e) => write!(f, "failed to send request or response: {}", e),
             Recv(e) => write!(f, "failed to receive request or response: {}", e),
             InvalidType => write!(f, "invalid type"),
-            BadRecvSize(n) => write!(f, "wrong amount of data received; expected {} bytes", n),
+            BadRecvSize { expected, actual } => write!(
+                f,
+                "wrong amount of data received; expected {} bytes; got {} bytes",
+                expected, actual
+            ),
             ExpectFd => write!(f, "missing associated file descriptor for request"),
             NotExpectFd => write!(f, "unexpected file descriptor is unused"),
             WrongFdBufferSize => write!(f, "fd buffer size too small"),
diff --git a/seccomp/arm/common_device.policy b/seccomp/arm/common_device.policy
index 9ccf48b..d2b5a6b 100644
--- a/seccomp/arm/common_device.policy
+++ b/seccomp/arm/common_device.policy
@@ -32,11 +32,13 @@ prctl: arg0 == PR_SET_NAME
 read: 1
 recv: 1
 recvfrom: 1
+recvmsg: 1
 restart_syscall: 1
 rt_sigaction: 1
 rt_sigprocmask: 1
 rt_sigreturn: 1
 sched_getaffinity: 1
+sendmsg: 1
 set_robust_list: 1
 sigaltstack: 1
 write: 1
diff --git a/seccomp/x86_64/common_device.policy b/seccomp/x86_64/common_device.policy
index 7fa6e52..2379b95 100644
--- a/seccomp/x86_64/common_device.policy
+++ b/seccomp/x86_64/common_device.policy
@@ -31,11 +31,13 @@ ppoll: 1
 prctl: arg0 == PR_SET_NAME
 read: 1
 recvfrom: 1
+recvmsg: 1
 restart_syscall: 1
 rt_sigaction: 1
 rt_sigprocmask: 1
 rt_sigreturn: 1
 sched_getaffinity: 1
+sendmsg: 1
 set_robust_list: 1
 sigaltstack: 1
 write: 1
diff --git a/src/linux.rs b/src/linux.rs
index 20ef297..2fdda09 100644
--- a/src/linux.rs
+++ b/src/linux.rs
@@ -11,7 +11,7 @@ use std::fs::{File, OpenOptions};
 use std::io::{self, stdin, Read};
 use std::mem;
 use std::os::unix::io::{FromRawFd, RawFd};
-use std::os::unix::net::{UnixDatagram, UnixStream};
+use std::os::unix::net::UnixStream;
 use std::path::{Path, PathBuf};
 use std::str;
 use std::sync::{Arc, Barrier};
@@ -27,13 +27,17 @@ use devices::{self, PciDevice, VirtioPciDevice};
 use io_jail::{self, Minijail};
 use kvm::*;
 use libcras::CrasClient;
-use msg_socket::{MsgReceiver, MsgSender, MsgSocket, UnlinkMsgSocket};
+use msg_socket::{MsgError, MsgReceiver, MsgSender, MsgSocket};
 use net_util::{Error as NetError, Tap};
 use qcow::{self, ImageType, QcowFile};
 use rand_ish::SimpleRng;
 use sync::{Condvar, Mutex};
-use sys_util;
-use sys_util::*;
+use sys_util::net::{UnixSeqpacket, UnixSeqpacketListener, UnlinkUnixSeqpacketListener};
+use sys_util::{
+    self, block_signal, clear_signal, flock, get_blocked_signals, get_group_id, get_user_id,
+    getegid, geteuid, register_signal_handler, validate_raw_fd, EventFd, FlockOperation,
+    GuestMemory, Killable, PollContext, PollToken, SignalFd, Terminal, TimerFd, SIGRTMIN,
+};
 use vhost;
 use vm_control::{VmRequest, VmResponse, VmRunMode};
 
@@ -223,9 +227,9 @@ fn create_virtio_devs(
     cfg: Config,
     mem: &GuestMemory,
     _exit_evt: &EventFd,
-    wayland_device_socket: UnixDatagram,
-    balloon_device_socket: UnixDatagram,
-    disk_device_sockets: &mut Vec<UnixDatagram>,
+    wayland_device_socket: UnixSeqpacket,
+    balloon_device_socket: UnixSeqpacket,
+    disk_device_sockets: &mut Vec<UnixSeqpacket>,
 ) -> std::result::Result<Vec<(Box<PciDevice + 'static>, Option<Minijail>)>, Box<error::Error>> {
     let default_pivot_root: &str = option_env!("DEFAULT_PIVOT_ROOT").unwrap_or("/var/empty");
 
@@ -998,22 +1002,20 @@ pub fn run_config(cfg: Config) -> Result<()> {
         wayland_dmabuf: cfg.wayland_dmabuf,
     };
 
-    let mut control_sockets = Vec::new();
-    if let Some(ref path_string) = cfg.socket_path {
-        let path = Path::new(path_string);
-        let dgram = UnixDatagram::bind(path).map_err(Error::CreateSocket)?;
-        control_sockets.push(UnlinkMsgSocket::<VmResponse, VmRequest>::new(
-            UnlinkUnixDatagram(dgram),
-        ));
+    let control_server_socket = match &cfg.socket_path {
+        Some(path) => Some(UnlinkUnixSeqpacketListener(
+            UnixSeqpacketListener::bind(path).map_err(Error::CreateSocket)?,
+        )),
+        None => None,
     };
+
+    let mut control_sockets = Vec::new();
     let (wayland_host_socket, wayland_device_socket) =
-        UnixDatagram::pair().map_err(Error::CreateSocket)?;
-    control_sockets.push(UnlinkMsgSocket::<VmResponse, VmRequest>::new(
-        UnlinkUnixDatagram(wayland_host_socket),
-    ));
+        UnixSeqpacket::pair().map_err(Error::CreateSocket)?;
+    control_sockets.push(MsgSocket::<VmResponse, VmRequest>::new(wayland_host_socket));
     // Balloon gets a special socket so balloon requests can be forwarded from the main process.
     let (balloon_host_socket, balloon_device_socket) =
-        UnixDatagram::pair().map_err(Error::CreateSocket)?;
+        UnixSeqpacket::pair().map_err(Error::CreateSocket)?;
 
     // Create one control socket per disk.
     let mut disk_device_sockets = Vec::new();
@@ -1021,7 +1023,7 @@ pub fn run_config(cfg: Config) -> Result<()> {
     let disk_count = cfg.disks.len();
     for _ in 0..disk_count {
         let (disk_host_socket, disk_device_socket) =
-            UnixDatagram::pair().map_err(Error::CreateSocket)?;
+            UnixSeqpacket::pair().map_err(Error::CreateSocket)?;
         disk_device_sockets.push(disk_device_socket);
         let disk_host_socket = MsgSocket::<VmRequest, VmResponse>::new(disk_host_socket);
         disk_host_sockets.push(disk_host_socket);
@@ -1040,6 +1042,7 @@ pub fn run_config(cfg: Config) -> Result<()> {
     .map_err(Error::BuildingVm)?;
     run_control(
         linux,
+        control_server_socket,
         control_sockets,
         balloon_host_socket,
         &disk_host_sockets,
@@ -1049,8 +1052,9 @@ pub fn run_config(cfg: Config) -> Result<()> {
 
 fn run_control(
     mut linux: RunnableLinuxVm,
-    control_sockets: Vec<UnlinkMsgSocket<VmResponse, VmRequest>>,
-    balloon_host_socket: UnixDatagram,
+    control_server_socket: Option<UnlinkUnixSeqpacketListener>,
+    mut control_sockets: Vec<MsgSocket<VmResponse, VmRequest>>,
+    balloon_host_socket: UnixSeqpacket,
     disk_host_sockets: &[MsgSocket<VmRequest, VmResponse>],
     sigchld_fd: SignalFd,
 ) -> Result<()> {
@@ -1082,6 +1086,7 @@ fn run_control(
         CheckAvailableMemory,
         LowMemory,
         LowmemTimer,
+        VmControlServer,
         VmControl { index: usize },
     }
 
@@ -1101,6 +1106,12 @@ fn run_control(
     poll_ctx
         .add(&sigchld_fd, Token::ChildSignal)
         .map_err(Error::PollContextAdd)?;
+
+    if let Some(socket_server) = &control_server_socket {
+        poll_ctx
+            .add(socket_server, Token::VmControlServer)
+            .map_err(Error::PollContextAdd)?;
+    }
     for (index, socket) in control_sockets.iter().enumerate() {
         poll_ctx
             .add(socket.as_ref(), Token::VmControl { index })
@@ -1167,6 +1178,8 @@ fn run_control(
                 }
             }
         };
+
+        let mut vm_control_indices_to_remove = Vec::new();
         for event in events.iter_readable() {
             match event.token() {
                 Token::Exit => {
@@ -1286,6 +1299,24 @@ fn run_control(
                             .map_err(Error::PollContextAdd)?;
                     }
                 }
+                Token::VmControlServer => {
+                    if let Some(socket_server) = &control_server_socket {
+                        match socket_server.accept() {
+                            Ok(socket) => {
+                                poll_ctx
+                                    .add(
+                                        &socket,
+                                        Token::VmControl {
+                                            index: control_sockets.len(),
+                                        },
+                                    )
+                                    .map_err(Error::PollContextAdd)?;
+                                control_sockets.push(MsgSocket::new(socket));
+                            }
+                            Err(e) => error!("failed to accept socket: {}", e),
+                        }
+                    }
+                }
                 Token::VmControl { index } => {
                     if let Some(socket) = control_sockets.get(index) {
                         match socket.recv() {
@@ -1316,34 +1347,55 @@ fn run_control(
                                     }
                                 }
                             }
-                            Err(e) => error!("failed to recv VmRequest: {}", e),
+                            Err(e) => {
+                                if let MsgError::BadRecvSize { actual: 0, .. } = e {
+                                    vm_control_indices_to_remove.push(index);
+                                } else {
+                                    error!("failed to recv VmRequest: {}", e);
+                                }
+                            }
                         }
                     }
                 }
             }
         }
+
         for event in events.iter_hungup() {
-            // It's possible more data is readable and buffered while the socket is hungup, so
-            // don't delete the socket from the poll context until we're sure all the data is
-            // read.
-            if !event.readable() {
-                match event.token() {
-                    Token::Exit => {}
-                    Token::Stdin => {
-                        let _ = poll_ctx.delete(&stdin_handle);
-                    }
-                    Token::ChildSignal => {}
-                    Token::CheckAvailableMemory => {}
-                    Token::LowMemory => {}
-                    Token::LowmemTimer => {}
-                    Token::VmControl { index } => {
-                        if let Some(socket) = control_sockets.get(index) {
-                            let _ = poll_ctx.delete(socket.as_ref());
-                        }
+            match event.token() {
+                Token::Exit => {}
+                Token::Stdin => {
+                    let _ = poll_ctx.delete(&stdin_handle);
+                }
+                Token::ChildSignal => {}
+                Token::CheckAvailableMemory => {}
+                Token::LowMemory => {}
+                Token::LowmemTimer => {}
+                Token::VmControlServer => {}
+                Token::VmControl { index } => {
+                    // It's possible more data is readable and buffered while the socket is hungup,
+                    // so don't delete the socket from the poll context until we're sure all the
+                    // data is read.
+                    match control_sockets.get(index).map(|s| s.get_readable_bytes()) {
+                        Some(Ok(0)) | Some(Err(_)) => vm_control_indices_to_remove.push(index),
+                        Some(Ok(x)) => info!("control index {} has {} bytes readable", index, x),
+                        _ => {}
                     }
                 }
             }
         }
+
+        // Sort in reverse so the highest indexes are removed first. This removal algorithm
+        // preserved correct indexes as each element is removed.
+        vm_control_indices_to_remove.sort_unstable_by(|a, b| b.cmp(a));
+        vm_control_indices_to_remove.dedup();
+        for index in vm_control_indices_to_remove {
+            control_sockets.swap_remove(index);
+            if let Some(socket) = control_sockets.get(index) {
+                poll_ctx
+                    .add(socket, Token::VmControl { index })
+                    .map_err(Error::PollContextAdd)?;
+            }
+        }
     }
 
     // VCPU threads MUST see the VmRunMode flag, otherwise they may re-enter the VM.
diff --git a/src/main.rs b/src/main.rs
index 4965deb..32380b7 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -46,14 +46,13 @@ pub mod plugin;
 use std::fs::OpenOptions;
 use std::net;
 use std::os::unix::io::RawFd;
-use std::os::unix::net::UnixDatagram;
 use std::path::PathBuf;
 use std::string::String;
 use std::thread::sleep;
 use std::time::Duration;
 
 use qcow::QcowFile;
-use sys_util::{getpid, kill_process_group, reap_child, syslog};
+use sys_util::{getpid, kill_process_group, net::UnixSeqpacket, reap_child, syslog};
 
 use argument::{print_help, set_arguments, Argument};
 use msg_socket::{MsgSender, Sender};
@@ -723,10 +722,7 @@ fn vms_request(
 
     let mut return_result = Ok(());
     for socket_path in args {
-        match UnixDatagram::unbound().and_then(|s| {
-            s.connect(&socket_path)?;
-            Ok(s)
-        }) {
+        match UnixSeqpacket::connect(&socket_path) {
             Ok(s) => {
                 let sender = Sender::<VmRequest>::new(s);
                 if let Err(e) = sender.send(request) {
@@ -788,10 +784,7 @@ fn balloon_vms(mut args: std::env::Args) -> std::result::Result<(), ()> {
 
     let mut return_result = Ok(());
     for socket_path in args {
-        match UnixDatagram::unbound().and_then(|s| {
-            s.connect(&socket_path)?;
-            Ok(s)
-        }) {
+        match UnixSeqpacket::connect(&socket_path) {
             Ok(s) => {
                 let sender = Sender::<VmRequest>::new(s);
                 if let Err(e) = sender.send(&VmRequest::BalloonAdjust(num_bytes)) {
@@ -881,10 +874,7 @@ fn disk_cmd(mut args: std::env::Args) -> std::result::Result<(), ()> {
 
     let mut return_result = Ok(());
     for socket_path in args {
-        match UnixDatagram::unbound().and_then(|s| {
-            s.connect(&socket_path)?;
-            Ok(s)
-        }) {
+        match UnixSeqpacket::connect(&socket_path) {
             Ok(s) => {
                 let sender = Sender::<VmRequest>::new(s);
                 if let Err(e) = sender.send(&request) {
diff --git a/vm_control/src/lib.rs b/vm_control/src/lib.rs
index 514a964..f0fc209 100644
--- a/vm_control/src/lib.rs
+++ b/vm_control/src/lib.rs
@@ -22,7 +22,6 @@ use std::fmt::{self, Display};
 use std::fs::File;
 use std::io::{Seek, SeekFrom};
 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
-use std::os::unix::net::UnixDatagram;
 
 use libc::{EINVAL, ENODEV};
 
@@ -30,7 +29,9 @@ use byteorder::{LittleEndian, WriteBytesExt};
 use kvm::{Datamatch, IoeventAddress, Vm};
 use msg_socket::{MsgOnSocket, MsgReceiver, MsgResult, MsgSender, MsgSocket};
 use resources::{GpuMemoryDesc, SystemAllocator};
-use sys_util::{Error as SysError, EventFd, GuestAddress, MemoryMapping, MmapError, Result};
+use sys_util::{
+    net::UnixSeqpacket, Error as SysError, EventFd, GuestAddress, MemoryMapping, MmapError, Result,
+};
 
 /// A file descriptor either borrowed or owned by this.
 pub enum MaybeOwnedFd {
@@ -170,7 +171,7 @@ impl VmRequest {
         vm: &mut Vm,
         sys_allocator: &mut SystemAllocator,
         run_mode: &mut Option<VmRunMode>,
-        balloon_host_socket: &UnixDatagram,
+        balloon_host_socket: &UnixSeqpacket,
         disk_host_sockets: &[MsgSocket<VmRequest, VmResponse>],
     ) -> VmResponse {
         match *self {