diff options
-rw-r--r-- | devices/src/proxy.rs | 8 | ||||
-rw-r--r-- | devices/src/virtio/balloon.rs | 11 | ||||
-rw-r--r-- | devices/src/virtio/block.rs | 11 | ||||
-rw-r--r-- | devices/src/virtio/wl.rs | 13 | ||||
-rw-r--r-- | msg_socket/src/lib.rs | 77 | ||||
-rw-r--r-- | msg_socket/src/msg_on_socket.rs | 10 | ||||
-rw-r--r-- | seccomp/arm/common_device.policy | 2 | ||||
-rw-r--r-- | seccomp/x86_64/common_device.policy | 2 | ||||
-rw-r--r-- | src/linux.rs | 132 | ||||
-rw-r--r-- | src/main.rs | 18 | ||||
-rw-r--r-- | vm_control/src/lib.rs | 7 |
11 files changed, 160 insertions, 131 deletions
diff --git a/devices/src/proxy.rs b/devices/src/proxy.rs index 4bc3405..a91d2ed 100644 --- a/devices/src/proxy.rs +++ b/devices/src/proxy.rs @@ -7,14 +7,14 @@ use libc::pid_t; use std::os::unix::io::{AsRawFd, RawFd}; -use std::os::unix::net::UnixDatagram; use std::process; use std::time::Duration; use std::{self, fmt, io}; +use io_jail::{self, Minijail}; use msg_socket::{MsgOnSocket, MsgReceiver, MsgSender, MsgSocket}; +use sys_util::net::UnixSeqpacket; -use io_jail::{self, Minijail}; use BusDevice; /// Errors for proxy devices. @@ -64,7 +64,7 @@ enum CommandResult { ReadConfigResult(u32), } -fn child_proc(sock: UnixDatagram, device: &mut BusDevice) { +fn child_proc(sock: UnixSeqpacket, device: &mut BusDevice) { let mut running = true; let sock = MsgSocket::<CommandResult, Command>::new(sock); @@ -138,7 +138,7 @@ impl ProxyDevice { mut keep_fds: Vec<RawFd>, ) -> Result<ProxyDevice> { let debug_label = device.debug_label(); - let (child_sock, parent_sock) = UnixDatagram::pair().map_err(Error::Io)?; + let (child_sock, parent_sock) = UnixSeqpacket::pair().map_err(Error::Io)?; keep_fds.push(child_sock.as_raw_fd()); // Forking here is safe as long as the program is still single threaded. diff --git a/devices/src/virtio/balloon.rs b/devices/src/virtio/balloon.rs index 1dfb72c..e9f36f6 100644 --- a/devices/src/virtio/balloon.rs +++ b/devices/src/virtio/balloon.rs @@ -8,13 +8,14 @@ use std::fmt::{self, Display}; use std::io::Write; use std::mem; use std::os::unix::io::{AsRawFd, RawFd}; -use std::os::unix::net::UnixDatagram; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::thread; use byteorder::{ByteOrder, LittleEndian, ReadBytesExt, WriteBytesExt}; -use sys_util::{self, EventFd, GuestAddress, GuestMemory, PollContext, PollToken}; +use sys_util::{ + self, net::UnixSeqpacket, EventFd, GuestAddress, GuestMemory, PollContext, PollToken, +}; use super::{ DescriptorChain, Queue, VirtioDevice, INTERRUPT_STATUS_CONFIG_CHANGED, @@ -67,7 +68,7 @@ struct Worker { interrupt_evt: EventFd, interrupt_resample_evt: EventFd, config: Arc<BalloonConfig>, - command_socket: UnixDatagram, + command_socket: UnixSeqpacket, } fn valid_inflate_desc(desc: &DescriptorChain) -> bool { @@ -230,7 +231,7 @@ impl Worker { /// Virtio device for memory balloon inflation/deflation. pub struct Balloon { - command_socket: Option<UnixDatagram>, + command_socket: Option<UnixSeqpacket>, config: Arc<BalloonConfig>, features: u64, kill_evt: Option<EventFd>, @@ -238,7 +239,7 @@ pub struct Balloon { impl Balloon { /// Create a new virtio balloon device. - pub fn new(command_socket: UnixDatagram) -> Result<Balloon> { + pub fn new(command_socket: UnixSeqpacket) -> Result<Balloon> { Ok(Balloon { command_socket: Some(command_socket), config: Arc::new(BalloonConfig { diff --git a/devices/src/virtio/block.rs b/devices/src/virtio/block.rs index 9dcf828..3c5d2fd 100644 --- a/devices/src/virtio/block.rs +++ b/devices/src/virtio/block.rs @@ -7,7 +7,6 @@ use std::fmt::{self, Display}; use std::io::{self, Read, Seek, SeekFrom, Write}; use std::mem::{size_of, size_of_val}; use std::os::unix::io::{AsRawFd, RawFd}; -use std::os::unix::net::UnixDatagram; use std::result; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -19,8 +18,8 @@ use sync::Mutex; use sys_util::Error as SysError; use sys_util::Result as SysResult; use sys_util::{ - EventFd, FileSetLen, FileSync, GuestAddress, GuestMemory, GuestMemoryError, PollContext, - PollToken, PunchHole, TimerFd, WriteZeroes, + net::UnixSeqpacket, EventFd, FileSetLen, FileSync, GuestAddress, GuestMemory, GuestMemoryError, + PollContext, PollToken, PunchHole, TimerFd, WriteZeroes, }; use data_model::{DataInit, Le16, Le32, Le64}; @@ -695,7 +694,7 @@ impl<T: DiskFile> Worker<T> { self.interrupt_evt.write(1).unwrap(); } - fn run(&mut self, queue_evt: EventFd, kill_evt: EventFd, control_socket: UnixDatagram) { + fn run(&mut self, queue_evt: EventFd, kill_evt: EventFd, control_socket: UnixSeqpacket) { #[derive(PollToken)] enum Token { FlushTimer, @@ -819,7 +818,7 @@ pub struct Block<T: DiskFile> { disk_size: Arc<Mutex<u64>>, avail_features: u64, read_only: bool, - control_socket: Option<UnixDatagram>, + control_socket: Option<UnixSeqpacket>, } fn build_config_space(disk_size: u64) -> virtio_blk_config { @@ -844,7 +843,7 @@ impl<T: DiskFile> Block<T> { pub fn new( mut disk_image: T, read_only: bool, - control_socket: Option<UnixDatagram>, + control_socket: Option<UnixSeqpacket>, ) -> SysResult<Block<T>> { let disk_size = disk_image.seek(SeekFrom::End(0))? as u64; if disk_size % SECTOR_SIZE != 0 { diff --git a/devices/src/virtio/wl.rs b/devices/src/virtio/wl.rs index 4d43825..0220d42 100644 --- a/devices/src/virtio/wl.rs +++ b/devices/src/virtio/wl.rs @@ -41,7 +41,7 @@ use std::mem::{size_of, size_of_val}; #[cfg(feature = "wl-dmabuf")] use std::os::raw::{c_uint, c_ulonglong}; use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; -use std::os::unix::net::{UnixDatagram, UnixStream}; +use std::os::unix::net::UnixStream; use std::path::{Path, PathBuf}; use std::rc::Rc; use std::result; @@ -59,6 +59,7 @@ use data_model::*; use msg_socket::{MsgError, MsgReceiver, MsgSender, MsgSocket}; #[cfg(feature = "wl-dmabuf")] use resources::GpuMemoryDesc; +use sys_util::net::UnixSeqpacket; use sys_util::{ pipe, round_up_to_page_size, Error, EventFd, FileFlags, GuestAddress, GuestMemory, GuestMemoryError, PollContext, PollToken, Result, ScmSocket, SharedMemory, @@ -490,7 +491,7 @@ struct VmRequester { } impl VmRequester { - fn new(vm_socket: UnixDatagram) -> VmRequester { + fn new(vm_socket: UnixSeqpacket) -> VmRequester { VmRequester { inner: Rc::new(RefCell::new(MsgSocket::<VmRequest, VmResponse>::new( vm_socket, @@ -1004,7 +1005,7 @@ struct WlState { impl WlState { fn new( wayland_path: PathBuf, - vm_socket: UnixDatagram, + vm_socket: UnixSeqpacket, use_transition_flags: bool, resource_bridge: Option<ResourceRequestSocket>, ) -> WlState { @@ -1488,7 +1489,7 @@ impl Worker { in_queue: Queue, out_queue: Queue, wayland_path: PathBuf, - vm_socket: UnixDatagram, + vm_socket: UnixSeqpacket, use_transition_flags: bool, resource_bridge: Option<ResourceRequestSocket>, ) -> Worker { @@ -1678,7 +1679,7 @@ impl Worker { pub struct Wl { kill_evt: Option<EventFd>, wayland_path: PathBuf, - vm_socket: Option<UnixDatagram>, + vm_socket: Option<UnixSeqpacket>, resource_bridge: Option<ResourceRequestSocket>, use_transition_flags: bool, } @@ -1686,7 +1687,7 @@ pub struct Wl { impl Wl { pub fn new<P: AsRef<Path>>( wayland_path: P, - vm_socket: UnixDatagram, + vm_socket: UnixSeqpacket, resource_bridge: Option<ResourceRequestSocket>, ) -> Result<Wl> { Ok(Wl { diff --git a/msg_socket/src/lib.rs b/msg_socket/src/lib.rs index cc2cdc9..fd1f2bc 100644 --- a/msg_socket/src/lib.rs +++ b/msg_socket/src/lib.rs @@ -13,9 +13,9 @@ mod msg_on_socket; use std::io::Result; use std::marker::PhantomData; +use std::ops::Deref; use std::os::unix::io::{AsRawFd, RawFd}; -use std::os::unix::net::UnixDatagram; -use sys_util::{Error as SysError, ScmSocket, UnlinkUnixDatagram}; +use sys_util::{net::UnixSeqpacket, Error as SysError, ScmSocket}; pub use msg_on_socket::*; pub use msg_on_socket_derive::*; @@ -24,7 +24,7 @@ pub use msg_on_socket_derive::*; /// direction. pub fn pair<Request: MsgOnSocket, Response: MsgOnSocket>( ) -> Result<(MsgSocket<Request, Response>, MsgSocket<Response, Request>)> { - let (sock1, sock2) = UnixDatagram::pair()?; + let (sock1, sock2) = UnixSeqpacket::pair()?; let requester = MsgSocket { sock: sock1, _i: PhantomData, @@ -40,14 +40,14 @@ pub fn pair<Request: MsgOnSocket, Response: MsgOnSocket>( /// Bidirection sock that support both send and recv. pub struct MsgSocket<I: MsgOnSocket, O: MsgOnSocket> { - sock: UnixDatagram, + sock: UnixSeqpacket, _i: PhantomData<I>, _o: PhantomData<O>, } impl<I: MsgOnSocket, O: MsgOnSocket> MsgSocket<I, O> { // Create a new MsgSocket. - pub fn new(s: UnixDatagram) -> MsgSocket<I, O> { + pub fn new(s: UnixSeqpacket) -> MsgSocket<I, O> { MsgSocket { sock: s, _i: PhantomData, @@ -56,33 +56,22 @@ impl<I: MsgOnSocket, O: MsgOnSocket> MsgSocket<I, O> { } } -/// Bidirection sock that support both send and recv. -pub struct UnlinkMsgSocket<I: MsgOnSocket, O: MsgOnSocket> { - sock: UnlinkUnixDatagram, - _i: PhantomData<I>, - _o: PhantomData<O>, -} - -impl<I: MsgOnSocket, O: MsgOnSocket> UnlinkMsgSocket<I, O> { - // Create a new MsgSocket. - pub fn new(s: UnlinkUnixDatagram) -> UnlinkMsgSocket<I, O> { - UnlinkMsgSocket { - sock: s, - _i: PhantomData, - _o: PhantomData, - } +impl<I: MsgOnSocket, O: MsgOnSocket> Deref for MsgSocket<I, O> { + type Target = UnixSeqpacket; + fn deref(&self) -> &Self::Target { + &self.sock } } /// One direction socket that only supports sending. pub struct Sender<M: MsgOnSocket> { - sock: UnixDatagram, + sock: UnixSeqpacket, _m: PhantomData<M>, } impl<M: MsgOnSocket> Sender<M> { /// Create a new sender sock. - pub fn new(s: UnixDatagram) -> Sender<M> { + pub fn new(s: UnixSeqpacket) -> Sender<M> { Sender { sock: s, _m: PhantomData, @@ -92,13 +81,13 @@ impl<M: MsgOnSocket> Sender<M> { /// One direction socket that only supports receiving. pub struct Receiver<M: MsgOnSocket> { - sock: UnixDatagram, + sock: UnixSeqpacket, _m: PhantomData<M>, } impl<M: MsgOnSocket> Receiver<M> { /// Create a new receiver sock. - pub fn new(s: UnixDatagram) -> Receiver<M> { + pub fn new(s: UnixSeqpacket) -> Receiver<M> { Receiver { sock: s, _m: PhantomData, @@ -106,8 +95,8 @@ impl<M: MsgOnSocket> Receiver<M> { } } -impl<I: MsgOnSocket, O: MsgOnSocket> AsRef<UnixDatagram> for MsgSocket<I, O> { - fn as_ref(&self) -> &UnixDatagram { +impl<I: MsgOnSocket, O: MsgOnSocket> AsRef<UnixSeqpacket> for MsgSocket<I, O> { + fn as_ref(&self) -> &UnixSeqpacket { &self.sock } } @@ -118,20 +107,8 @@ impl<I: MsgOnSocket, O: MsgOnSocket> AsRawFd for MsgSocket<I, O> { } } -impl<I: MsgOnSocket, O: MsgOnSocket> AsRef<UnixDatagram> for UnlinkMsgSocket<I, O> { - fn as_ref(&self) -> &UnixDatagram { - self.sock.as_ref() - } -} - -impl<I: MsgOnSocket, O: MsgOnSocket> AsRawFd for UnlinkMsgSocket<I, O> { - fn as_raw_fd(&self) -> RawFd { - self.as_ref().as_raw_fd() - } -} - -impl<M: MsgOnSocket> AsRef<UnixDatagram> for Sender<M> { - fn as_ref(&self) -> &UnixDatagram { +impl<M: MsgOnSocket> AsRef<UnixSeqpacket> for Sender<M> { + fn as_ref(&self) -> &UnixSeqpacket { &self.sock } } @@ -142,8 +119,8 @@ impl<M: MsgOnSocket> AsRawFd for Sender<M> { } } -impl<M: MsgOnSocket> AsRef<UnixDatagram> for Receiver<M> { - fn as_ref(&self) -> &UnixDatagram { +impl<M: MsgOnSocket> AsRef<UnixSeqpacket> for Receiver<M> { + fn as_ref(&self) -> &UnixSeqpacket { &self.sock } } @@ -155,7 +132,7 @@ impl<M: MsgOnSocket> AsRawFd for Receiver<M> { } /// Types that could send a message. -pub trait MsgSender<M: MsgOnSocket>: AsRef<UnixDatagram> { +pub trait MsgSender<M: MsgOnSocket>: AsRef<UnixSeqpacket> { fn send(&self, msg: &M) -> MsgResult<()> { let msg_size = M::msg_size(); let fd_size = M::max_fd_count(); @@ -163,7 +140,7 @@ pub trait MsgSender<M: MsgOnSocket>: AsRef<UnixDatagram> { let mut fd_buffer: Vec<RawFd> = vec![0; fd_size]; let fd_size = msg.write_to_buffer(&mut msg_buffer, &mut fd_buffer)?; - let sock: &UnixDatagram = self.as_ref(); + let sock: &UnixSeqpacket = self.as_ref(); if fd_size == 0 { handle_eintr!(sock.send(&msg_buffer)) .map_err(|e| MsgError::Send(SysError::new(e.raw_os_error().unwrap_or(0))))?; @@ -176,14 +153,14 @@ pub trait MsgSender<M: MsgOnSocket>: AsRef<UnixDatagram> { } /// Types that could receive a message. -pub trait MsgReceiver<M: MsgOnSocket>: AsRef<UnixDatagram> { +pub trait MsgReceiver<M: MsgOnSocket>: AsRef<UnixSeqpacket> { fn recv(&self) -> MsgResult<M> { let msg_size = M::msg_size(); let fd_size = M::max_fd_count(); let mut msg_buffer: Vec<u8> = vec![0; msg_size]; let mut fd_buffer: Vec<RawFd> = vec![0; fd_size]; - let sock: &UnixDatagram = self.as_ref(); + let sock: &UnixSeqpacket = self.as_ref(); let (recv_msg_size, recv_fd_size) = { if fd_size == 0 { @@ -197,7 +174,10 @@ pub trait MsgReceiver<M: MsgOnSocket>: AsRef<UnixDatagram> { } }; if msg_size != recv_msg_size { - return Err(MsgError::BadRecvSize(msg_size)); + return Err(MsgError::BadRecvSize { + expected: msg_size, + actual: recv_msg_size, + }); } // Safe because fd buffer is read from socket. let (v, read_fd_size) = unsafe { @@ -213,8 +193,5 @@ pub trait MsgReceiver<M: MsgOnSocket>: AsRef<UnixDatagram> { impl<I: MsgOnSocket, O: MsgOnSocket> MsgSender<I> for MsgSocket<I, O> {} impl<I: MsgOnSocket, O: MsgOnSocket> MsgReceiver<O> for MsgSocket<I, O> {} -impl<I: MsgOnSocket, O: MsgOnSocket> MsgSender<I> for UnlinkMsgSocket<I, O> {} -impl<I: MsgOnSocket, O: MsgOnSocket> MsgReceiver<O> for UnlinkMsgSocket<I, O> {} - impl<M: MsgOnSocket> MsgSender<M> for Sender<M> {} impl<M: MsgOnSocket> MsgReceiver<M> for Receiver<M> {} diff --git a/msg_socket/src/msg_on_socket.rs b/msg_socket/src/msg_on_socket.rs index 493c57c..8b01850 100644 --- a/msg_socket/src/msg_on_socket.rs +++ b/msg_socket/src/msg_on_socket.rs @@ -23,8 +23,8 @@ pub enum MsgError { /// The type of a received request or response is unknown. InvalidType, /// There was not the expected amount of data when receiving a message. The inner - /// value is how much data is needed. - BadRecvSize(usize), + /// value is how much data is expected and how much data was actually received. + BadRecvSize { expected: usize, actual: usize }, /// There was no associated file descriptor received for a request that expected it. ExpectFd, /// There was some associated file descriptor received but not used when deserialize. @@ -47,7 +47,11 @@ impl Display for MsgError { Send(e) => write!(f, "failed to send request or response: {}", e), Recv(e) => write!(f, "failed to receive request or response: {}", e), InvalidType => write!(f, "invalid type"), - BadRecvSize(n) => write!(f, "wrong amount of data received; expected {} bytes", n), + BadRecvSize { expected, actual } => write!( + f, + "wrong amount of data received; expected {} bytes; got {} bytes", + expected, actual + ), ExpectFd => write!(f, "missing associated file descriptor for request"), NotExpectFd => write!(f, "unexpected file descriptor is unused"), WrongFdBufferSize => write!(f, "fd buffer size too small"), diff --git a/seccomp/arm/common_device.policy b/seccomp/arm/common_device.policy index 9ccf48b..d2b5a6b 100644 --- a/seccomp/arm/common_device.policy +++ b/seccomp/arm/common_device.policy @@ -32,11 +32,13 @@ prctl: arg0 == PR_SET_NAME read: 1 recv: 1 recvfrom: 1 +recvmsg: 1 restart_syscall: 1 rt_sigaction: 1 rt_sigprocmask: 1 rt_sigreturn: 1 sched_getaffinity: 1 +sendmsg: 1 set_robust_list: 1 sigaltstack: 1 write: 1 diff --git a/seccomp/x86_64/common_device.policy b/seccomp/x86_64/common_device.policy index 7fa6e52..2379b95 100644 --- a/seccomp/x86_64/common_device.policy +++ b/seccomp/x86_64/common_device.policy @@ -31,11 +31,13 @@ ppoll: 1 prctl: arg0 == PR_SET_NAME read: 1 recvfrom: 1 +recvmsg: 1 restart_syscall: 1 rt_sigaction: 1 rt_sigprocmask: 1 rt_sigreturn: 1 sched_getaffinity: 1 +sendmsg: 1 set_robust_list: 1 sigaltstack: 1 write: 1 diff --git a/src/linux.rs b/src/linux.rs index 20ef297..2fdda09 100644 --- a/src/linux.rs +++ b/src/linux.rs @@ -11,7 +11,7 @@ use std::fs::{File, OpenOptions}; use std::io::{self, stdin, Read}; use std::mem; use std::os::unix::io::{FromRawFd, RawFd}; -use std::os::unix::net::{UnixDatagram, UnixStream}; +use std::os::unix::net::UnixStream; use std::path::{Path, PathBuf}; use std::str; use std::sync::{Arc, Barrier}; @@ -27,13 +27,17 @@ use devices::{self, PciDevice, VirtioPciDevice}; use io_jail::{self, Minijail}; use kvm::*; use libcras::CrasClient; -use msg_socket::{MsgReceiver, MsgSender, MsgSocket, UnlinkMsgSocket}; +use msg_socket::{MsgError, MsgReceiver, MsgSender, MsgSocket}; use net_util::{Error as NetError, Tap}; use qcow::{self, ImageType, QcowFile}; use rand_ish::SimpleRng; use sync::{Condvar, Mutex}; -use sys_util; -use sys_util::*; +use sys_util::net::{UnixSeqpacket, UnixSeqpacketListener, UnlinkUnixSeqpacketListener}; +use sys_util::{ + self, block_signal, clear_signal, flock, get_blocked_signals, get_group_id, get_user_id, + getegid, geteuid, register_signal_handler, validate_raw_fd, EventFd, FlockOperation, + GuestMemory, Killable, PollContext, PollToken, SignalFd, Terminal, TimerFd, SIGRTMIN, +}; use vhost; use vm_control::{VmRequest, VmResponse, VmRunMode}; @@ -223,9 +227,9 @@ fn create_virtio_devs( cfg: Config, mem: &GuestMemory, _exit_evt: &EventFd, - wayland_device_socket: UnixDatagram, - balloon_device_socket: UnixDatagram, - disk_device_sockets: &mut Vec<UnixDatagram>, + wayland_device_socket: UnixSeqpacket, + balloon_device_socket: UnixSeqpacket, + disk_device_sockets: &mut Vec<UnixSeqpacket>, ) -> std::result::Result<Vec<(Box<PciDevice + 'static>, Option<Minijail>)>, Box<error::Error>> { let default_pivot_root: &str = option_env!("DEFAULT_PIVOT_ROOT").unwrap_or("/var/empty"); @@ -998,22 +1002,20 @@ pub fn run_config(cfg: Config) -> Result<()> { wayland_dmabuf: cfg.wayland_dmabuf, }; - let mut control_sockets = Vec::new(); - if let Some(ref path_string) = cfg.socket_path { - let path = Path::new(path_string); - let dgram = UnixDatagram::bind(path).map_err(Error::CreateSocket)?; - control_sockets.push(UnlinkMsgSocket::<VmResponse, VmRequest>::new( - UnlinkUnixDatagram(dgram), - )); + let control_server_socket = match &cfg.socket_path { + Some(path) => Some(UnlinkUnixSeqpacketListener( + UnixSeqpacketListener::bind(path).map_err(Error::CreateSocket)?, + )), + None => None, }; + + let mut control_sockets = Vec::new(); let (wayland_host_socket, wayland_device_socket) = - UnixDatagram::pair().map_err(Error::CreateSocket)?; - control_sockets.push(UnlinkMsgSocket::<VmResponse, VmRequest>::new( - UnlinkUnixDatagram(wayland_host_socket), - )); + UnixSeqpacket::pair().map_err(Error::CreateSocket)?; + control_sockets.push(MsgSocket::<VmResponse, VmRequest>::new(wayland_host_socket)); // Balloon gets a special socket so balloon requests can be forwarded from the main process. let (balloon_host_socket, balloon_device_socket) = - UnixDatagram::pair().map_err(Error::CreateSocket)?; + UnixSeqpacket::pair().map_err(Error::CreateSocket)?; // Create one control socket per disk. let mut disk_device_sockets = Vec::new(); @@ -1021,7 +1023,7 @@ pub fn run_config(cfg: Config) -> Result<()> { let disk_count = cfg.disks.len(); for _ in 0..disk_count { let (disk_host_socket, disk_device_socket) = - UnixDatagram::pair().map_err(Error::CreateSocket)?; + UnixSeqpacket::pair().map_err(Error::CreateSocket)?; disk_device_sockets.push(disk_device_socket); let disk_host_socket = MsgSocket::<VmRequest, VmResponse>::new(disk_host_socket); disk_host_sockets.push(disk_host_socket); @@ -1040,6 +1042,7 @@ pub fn run_config(cfg: Config) -> Result<()> { .map_err(Error::BuildingVm)?; run_control( linux, + control_server_socket, control_sockets, balloon_host_socket, &disk_host_sockets, @@ -1049,8 +1052,9 @@ pub fn run_config(cfg: Config) -> Result<()> { fn run_control( mut linux: RunnableLinuxVm, - control_sockets: Vec<UnlinkMsgSocket<VmResponse, VmRequest>>, - balloon_host_socket: UnixDatagram, + control_server_socket: Option<UnlinkUnixSeqpacketListener>, + mut control_sockets: Vec<MsgSocket<VmResponse, VmRequest>>, + balloon_host_socket: UnixSeqpacket, disk_host_sockets: &[MsgSocket<VmRequest, VmResponse>], sigchld_fd: SignalFd, ) -> Result<()> { @@ -1082,6 +1086,7 @@ fn run_control( CheckAvailableMemory, LowMemory, LowmemTimer, + VmControlServer, VmControl { index: usize }, } @@ -1101,6 +1106,12 @@ fn run_control( poll_ctx .add(&sigchld_fd, Token::ChildSignal) .map_err(Error::PollContextAdd)?; + + if let Some(socket_server) = &control_server_socket { + poll_ctx + .add(socket_server, Token::VmControlServer) + .map_err(Error::PollContextAdd)?; + } for (index, socket) in control_sockets.iter().enumerate() { poll_ctx .add(socket.as_ref(), Token::VmControl { index }) @@ -1167,6 +1178,8 @@ fn run_control( } } }; + + let mut vm_control_indices_to_remove = Vec::new(); for event in events.iter_readable() { match event.token() { Token::Exit => { @@ -1286,6 +1299,24 @@ fn run_control( .map_err(Error::PollContextAdd)?; } } + Token::VmControlServer => { + if let Some(socket_server) = &control_server_socket { + match socket_server.accept() { + Ok(socket) => { + poll_ctx + .add( + &socket, + Token::VmControl { + index: control_sockets.len(), + }, + ) + .map_err(Error::PollContextAdd)?; + control_sockets.push(MsgSocket::new(socket)); + } + Err(e) => error!("failed to accept socket: {}", e), + } + } + } Token::VmControl { index } => { if let Some(socket) = control_sockets.get(index) { match socket.recv() { @@ -1316,34 +1347,55 @@ fn run_control( } } } - Err(e) => error!("failed to recv VmRequest: {}", e), + Err(e) => { + if let MsgError::BadRecvSize { actual: 0, .. } = e { + vm_control_indices_to_remove.push(index); + } else { + error!("failed to recv VmRequest: {}", e); + } + } } } } } } + for event in events.iter_hungup() { - // It's possible more data is readable and buffered while the socket is hungup, so - // don't delete the socket from the poll context until we're sure all the data is - // read. - if !event.readable() { - match event.token() { - Token::Exit => {} - Token::Stdin => { - let _ = poll_ctx.delete(&stdin_handle); - } - Token::ChildSignal => {} - Token::CheckAvailableMemory => {} - Token::LowMemory => {} - Token::LowmemTimer => {} - Token::VmControl { index } => { - if let Some(socket) = control_sockets.get(index) { - let _ = poll_ctx.delete(socket.as_ref()); - } + match event.token() { + Token::Exit => {} + Token::Stdin => { + let _ = poll_ctx.delete(&stdin_handle); + } + Token::ChildSignal => {} + Token::CheckAvailableMemory => {} + Token::LowMemory => {} + Token::LowmemTimer => {} + Token::VmControlServer => {} + Token::VmControl { index } => { + // It's possible more data is readable and buffered while the socket is hungup, + // so don't delete the socket from the poll context until we're sure all the + // data is read. + match control_sockets.get(index).map(|s| s.get_readable_bytes()) { + Some(Ok(0)) | Some(Err(_)) => vm_control_indices_to_remove.push(index), + Some(Ok(x)) => info!("control index {} has {} bytes readable", index, x), + _ => {} } } } } + + // Sort in reverse so the highest indexes are removed first. This removal algorithm + // preserved correct indexes as each element is removed. + vm_control_indices_to_remove.sort_unstable_by(|a, b| b.cmp(a)); + vm_control_indices_to_remove.dedup(); + for index in vm_control_indices_to_remove { + control_sockets.swap_remove(index); + if let Some(socket) = control_sockets.get(index) { + poll_ctx + .add(socket, Token::VmControl { index }) + .map_err(Error::PollContextAdd)?; + } + } } // VCPU threads MUST see the VmRunMode flag, otherwise they may re-enter the VM. diff --git a/src/main.rs b/src/main.rs index 4965deb..32380b7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -46,14 +46,13 @@ pub mod plugin; use std::fs::OpenOptions; use std::net; use std::os::unix::io::RawFd; -use std::os::unix::net::UnixDatagram; use std::path::PathBuf; use std::string::String; use std::thread::sleep; use std::time::Duration; use qcow::QcowFile; -use sys_util::{getpid, kill_process_group, reap_child, syslog}; +use sys_util::{getpid, kill_process_group, net::UnixSeqpacket, reap_child, syslog}; use argument::{print_help, set_arguments, Argument}; use msg_socket::{MsgSender, Sender}; @@ -723,10 +722,7 @@ fn vms_request( let mut return_result = Ok(()); for socket_path in args { - match UnixDatagram::unbound().and_then(|s| { - s.connect(&socket_path)?; - Ok(s) - }) { + match UnixSeqpacket::connect(&socket_path) { Ok(s) => { let sender = Sender::<VmRequest>::new(s); if let Err(e) = sender.send(request) { @@ -788,10 +784,7 @@ fn balloon_vms(mut args: std::env::Args) -> std::result::Result<(), ()> { let mut return_result = Ok(()); for socket_path in args { - match UnixDatagram::unbound().and_then(|s| { - s.connect(&socket_path)?; - Ok(s) - }) { + match UnixSeqpacket::connect(&socket_path) { Ok(s) => { let sender = Sender::<VmRequest>::new(s); if let Err(e) = sender.send(&VmRequest::BalloonAdjust(num_bytes)) { @@ -881,10 +874,7 @@ fn disk_cmd(mut args: std::env::Args) -> std::result::Result<(), ()> { let mut return_result = Ok(()); for socket_path in args { - match UnixDatagram::unbound().and_then(|s| { - s.connect(&socket_path)?; - Ok(s) - }) { + match UnixSeqpacket::connect(&socket_path) { Ok(s) => { let sender = Sender::<VmRequest>::new(s); if let Err(e) = sender.send(&request) { diff --git a/vm_control/src/lib.rs b/vm_control/src/lib.rs index 514a964..f0fc209 100644 --- a/vm_control/src/lib.rs +++ b/vm_control/src/lib.rs @@ -22,7 +22,6 @@ use std::fmt::{self, Display}; use std::fs::File; use std::io::{Seek, SeekFrom}; use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; -use std::os::unix::net::UnixDatagram; use libc::{EINVAL, ENODEV}; @@ -30,7 +29,9 @@ use byteorder::{LittleEndian, WriteBytesExt}; use kvm::{Datamatch, IoeventAddress, Vm}; use msg_socket::{MsgOnSocket, MsgReceiver, MsgResult, MsgSender, MsgSocket}; use resources::{GpuMemoryDesc, SystemAllocator}; -use sys_util::{Error as SysError, EventFd, GuestAddress, MemoryMapping, MmapError, Result}; +use sys_util::{ + net::UnixSeqpacket, Error as SysError, EventFd, GuestAddress, MemoryMapping, MmapError, Result, +}; /// A file descriptor either borrowed or owned by this. pub enum MaybeOwnedFd { @@ -170,7 +171,7 @@ impl VmRequest { vm: &mut Vm, sys_allocator: &mut SystemAllocator, run_mode: &mut Option<VmRunMode>, - balloon_host_socket: &UnixDatagram, + balloon_host_socket: &UnixSeqpacket, disk_host_sockets: &[MsgSocket<VmRequest, VmResponse>], ) -> VmResponse { match *self { |