diff options
Diffstat (limited to 'src/linux.rs')
-rw-r--r-- | src/linux.rs | 132 |
1 files changed, 92 insertions, 40 deletions
diff --git a/src/linux.rs b/src/linux.rs index 20ef297..2fdda09 100644 --- a/src/linux.rs +++ b/src/linux.rs @@ -11,7 +11,7 @@ use std::fs::{File, OpenOptions}; use std::io::{self, stdin, Read}; use std::mem; use std::os::unix::io::{FromRawFd, RawFd}; -use std::os::unix::net::{UnixDatagram, UnixStream}; +use std::os::unix::net::UnixStream; use std::path::{Path, PathBuf}; use std::str; use std::sync::{Arc, Barrier}; @@ -27,13 +27,17 @@ use devices::{self, PciDevice, VirtioPciDevice}; use io_jail::{self, Minijail}; use kvm::*; use libcras::CrasClient; -use msg_socket::{MsgReceiver, MsgSender, MsgSocket, UnlinkMsgSocket}; +use msg_socket::{MsgError, MsgReceiver, MsgSender, MsgSocket}; use net_util::{Error as NetError, Tap}; use qcow::{self, ImageType, QcowFile}; use rand_ish::SimpleRng; use sync::{Condvar, Mutex}; -use sys_util; -use sys_util::*; +use sys_util::net::{UnixSeqpacket, UnixSeqpacketListener, UnlinkUnixSeqpacketListener}; +use sys_util::{ + self, block_signal, clear_signal, flock, get_blocked_signals, get_group_id, get_user_id, + getegid, geteuid, register_signal_handler, validate_raw_fd, EventFd, FlockOperation, + GuestMemory, Killable, PollContext, PollToken, SignalFd, Terminal, TimerFd, SIGRTMIN, +}; use vhost; use vm_control::{VmRequest, VmResponse, VmRunMode}; @@ -223,9 +227,9 @@ fn create_virtio_devs( cfg: Config, mem: &GuestMemory, _exit_evt: &EventFd, - wayland_device_socket: UnixDatagram, - balloon_device_socket: UnixDatagram, - disk_device_sockets: &mut Vec<UnixDatagram>, + wayland_device_socket: UnixSeqpacket, + balloon_device_socket: UnixSeqpacket, + disk_device_sockets: &mut Vec<UnixSeqpacket>, ) -> std::result::Result<Vec<(Box<PciDevice + 'static>, Option<Minijail>)>, Box<error::Error>> { let default_pivot_root: &str = option_env!("DEFAULT_PIVOT_ROOT").unwrap_or("/var/empty"); @@ -998,22 +1002,20 @@ pub fn run_config(cfg: Config) -> Result<()> { wayland_dmabuf: cfg.wayland_dmabuf, }; - let mut control_sockets = Vec::new(); - if let Some(ref path_string) = cfg.socket_path { - let path = Path::new(path_string); - let dgram = UnixDatagram::bind(path).map_err(Error::CreateSocket)?; - control_sockets.push(UnlinkMsgSocket::<VmResponse, VmRequest>::new( - UnlinkUnixDatagram(dgram), - )); + let control_server_socket = match &cfg.socket_path { + Some(path) => Some(UnlinkUnixSeqpacketListener( + UnixSeqpacketListener::bind(path).map_err(Error::CreateSocket)?, + )), + None => None, }; + + let mut control_sockets = Vec::new(); let (wayland_host_socket, wayland_device_socket) = - UnixDatagram::pair().map_err(Error::CreateSocket)?; - control_sockets.push(UnlinkMsgSocket::<VmResponse, VmRequest>::new( - UnlinkUnixDatagram(wayland_host_socket), - )); + UnixSeqpacket::pair().map_err(Error::CreateSocket)?; + control_sockets.push(MsgSocket::<VmResponse, VmRequest>::new(wayland_host_socket)); // Balloon gets a special socket so balloon requests can be forwarded from the main process. let (balloon_host_socket, balloon_device_socket) = - UnixDatagram::pair().map_err(Error::CreateSocket)?; + UnixSeqpacket::pair().map_err(Error::CreateSocket)?; // Create one control socket per disk. let mut disk_device_sockets = Vec::new(); @@ -1021,7 +1023,7 @@ pub fn run_config(cfg: Config) -> Result<()> { let disk_count = cfg.disks.len(); for _ in 0..disk_count { let (disk_host_socket, disk_device_socket) = - UnixDatagram::pair().map_err(Error::CreateSocket)?; + UnixSeqpacket::pair().map_err(Error::CreateSocket)?; disk_device_sockets.push(disk_device_socket); let disk_host_socket = MsgSocket::<VmRequest, VmResponse>::new(disk_host_socket); disk_host_sockets.push(disk_host_socket); @@ -1040,6 +1042,7 @@ pub fn run_config(cfg: Config) -> Result<()> { .map_err(Error::BuildingVm)?; run_control( linux, + control_server_socket, control_sockets, balloon_host_socket, &disk_host_sockets, @@ -1049,8 +1052,9 @@ pub fn run_config(cfg: Config) -> Result<()> { fn run_control( mut linux: RunnableLinuxVm, - control_sockets: Vec<UnlinkMsgSocket<VmResponse, VmRequest>>, - balloon_host_socket: UnixDatagram, + control_server_socket: Option<UnlinkUnixSeqpacketListener>, + mut control_sockets: Vec<MsgSocket<VmResponse, VmRequest>>, + balloon_host_socket: UnixSeqpacket, disk_host_sockets: &[MsgSocket<VmRequest, VmResponse>], sigchld_fd: SignalFd, ) -> Result<()> { @@ -1082,6 +1086,7 @@ fn run_control( CheckAvailableMemory, LowMemory, LowmemTimer, + VmControlServer, VmControl { index: usize }, } @@ -1101,6 +1106,12 @@ fn run_control( poll_ctx .add(&sigchld_fd, Token::ChildSignal) .map_err(Error::PollContextAdd)?; + + if let Some(socket_server) = &control_server_socket { + poll_ctx + .add(socket_server, Token::VmControlServer) + .map_err(Error::PollContextAdd)?; + } for (index, socket) in control_sockets.iter().enumerate() { poll_ctx .add(socket.as_ref(), Token::VmControl { index }) @@ -1167,6 +1178,8 @@ fn run_control( } } }; + + let mut vm_control_indices_to_remove = Vec::new(); for event in events.iter_readable() { match event.token() { Token::Exit => { @@ -1286,6 +1299,24 @@ fn run_control( .map_err(Error::PollContextAdd)?; } } + Token::VmControlServer => { + if let Some(socket_server) = &control_server_socket { + match socket_server.accept() { + Ok(socket) => { + poll_ctx + .add( + &socket, + Token::VmControl { + index: control_sockets.len(), + }, + ) + .map_err(Error::PollContextAdd)?; + control_sockets.push(MsgSocket::new(socket)); + } + Err(e) => error!("failed to accept socket: {}", e), + } + } + } Token::VmControl { index } => { if let Some(socket) = control_sockets.get(index) { match socket.recv() { @@ -1316,34 +1347,55 @@ fn run_control( } } } - Err(e) => error!("failed to recv VmRequest: {}", e), + Err(e) => { + if let MsgError::BadRecvSize { actual: 0, .. } = e { + vm_control_indices_to_remove.push(index); + } else { + error!("failed to recv VmRequest: {}", e); + } + } } } } } } + for event in events.iter_hungup() { - // It's possible more data is readable and buffered while the socket is hungup, so - // don't delete the socket from the poll context until we're sure all the data is - // read. - if !event.readable() { - match event.token() { - Token::Exit => {} - Token::Stdin => { - let _ = poll_ctx.delete(&stdin_handle); - } - Token::ChildSignal => {} - Token::CheckAvailableMemory => {} - Token::LowMemory => {} - Token::LowmemTimer => {} - Token::VmControl { index } => { - if let Some(socket) = control_sockets.get(index) { - let _ = poll_ctx.delete(socket.as_ref()); - } + match event.token() { + Token::Exit => {} + Token::Stdin => { + let _ = poll_ctx.delete(&stdin_handle); + } + Token::ChildSignal => {} + Token::CheckAvailableMemory => {} + Token::LowMemory => {} + Token::LowmemTimer => {} + Token::VmControlServer => {} + Token::VmControl { index } => { + // It's possible more data is readable and buffered while the socket is hungup, + // so don't delete the socket from the poll context until we're sure all the + // data is read. + match control_sockets.get(index).map(|s| s.get_readable_bytes()) { + Some(Ok(0)) | Some(Err(_)) => vm_control_indices_to_remove.push(index), + Some(Ok(x)) => info!("control index {} has {} bytes readable", index, x), + _ => {} } } } } + + // Sort in reverse so the highest indexes are removed first. This removal algorithm + // preserved correct indexes as each element is removed. + vm_control_indices_to_remove.sort_unstable_by(|a, b| b.cmp(a)); + vm_control_indices_to_remove.dedup(); + for index in vm_control_indices_to_remove { + control_sockets.swap_remove(index); + if let Some(socket) = control_sockets.get(index) { + poll_ctx + .add(socket, Token::VmControl { index }) + .map_err(Error::PollContextAdd)?; + } + } } // VCPU threads MUST see the VmRunMode flag, otherwise they may re-enter the VM. |