summary refs log tree commit diff
path: root/src/hw/virtio/wl.rs
diff options
context:
space:
mode:
authorZach Reizner <zachr@google.com>2017-06-30 15:46:14 -0700
committerchrome-bot <chrome-bot@chromium.org>2017-09-08 17:35:58 -0700
commit2bcf05b2afbcbe1287583a229dbb3e5b6c78aa8c (patch)
treef5fbad7c0b88a8f8727d92692ac996b62647c24c /src/hw/virtio/wl.rs
parent22175fe3681d7102bb62fb89b0bb0cb317973bcc (diff)
downloadcrosvm-2bcf05b2afbcbe1287583a229dbb3e5b6c78aa8c.tar
crosvm-2bcf05b2afbcbe1287583a229dbb3e5b6c78aa8c.tar.gz
crosvm-2bcf05b2afbcbe1287583a229dbb3e5b6c78aa8c.tar.bz2
crosvm-2bcf05b2afbcbe1287583a229dbb3e5b6c78aa8c.tar.lz
crosvm-2bcf05b2afbcbe1287583a229dbb3e5b6c78aa8c.tar.xz
crosvm-2bcf05b2afbcbe1287583a229dbb3e5b6c78aa8c.tar.zst
crosvm-2bcf05b2afbcbe1287583a229dbb3e5b6c78aa8c.zip
crosvm: add virtio wayland device
This adds the virtio wayland device which is activated by default. The wayland
device needs the XDG_RUNTIME_DIR env variable to be set and a running wayland
compositor to connect to in that directory.

TEST=crosvm run <other args>
BUG=chromium:738638

Change-Id: Iaa417c6bb74739896042318451b4befcac0c1d0e
Reviewed-on: https://chromium-review.googlesource.com/559860
Commit-Ready: Zach Reizner <zachr@chromium.org>
Tested-by: Zach Reizner <zachr@chromium.org>
Reviewed-by: Dylan Reid <dgreid@chromium.org>
Diffstat (limited to 'src/hw/virtio/wl.rs')
-rw-r--r--src/hw/virtio/wl.rs1036
1 files changed, 1036 insertions, 0 deletions
diff --git a/src/hw/virtio/wl.rs b/src/hw/virtio/wl.rs
new file mode 100644
index 0000000..d5a6ff3
--- /dev/null
+++ b/src/hw/virtio/wl.rs
@@ -0,0 +1,1036 @@
+// Copyright 2017 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+//! This module implements the virtio wayland used by the guest to access the host's wayland server.
+//!
+//! The virtio wayland protocol is done over two queues: `in` and `out`. The `in` queue is used for
+//! sending commands to the guest that are generated by the host, usually messages from the wayland
+//! server. The `out` queue is for commands from the guest, usually requests to allocate shared
+//! memory, open a wayland server connection, or send data over an existing connection.
+//!
+//! Each `WlVfd` represents one virtual file descriptor created by either the guest or the host.
+//! Virtual file descriptors contain actual file descriptors, either a shared memory file descriptor
+//! or a unix domain socket to the wayland server. In the shared memory case, there is also an
+//! associated slot that indicates which KVM memory slot the memory is installed into, as well as a
+//! page frame number that the guest can access the memory from.
+//!
+//! The types starting with `Ctrl` are structures representing the virtio wayland protocol "on the
+//! wire." They are decoded/encoded as some variant of `WlOp` for requests and `WlResp` for
+//! responses.
+//!
+//! There is one `WlState` instance that contains every known vfd and the current state of `in`
+//! queue. The `in` queue requires extra state to buffer messages to the guest in case the `in`
+//! queue is already full. The `WlState` also has a control socket necessary to fulfill certain
+//! requests, such as those registering guest memory.
+//!
+//! The `Worker` is responsible for the poll loop over all possible events, encoding/decoding from
+//! the virtio queue, and routing messages in and out of `WlState`. Possible events include the kill
+//! event, available descriptors on the `in` or `out` queue, and incoming data on any vfd's socket.
+
+use std::cell::RefCell;
+use std::collections::btree_map::Entry;
+use std::collections::{BTreeSet as Set, BTreeMap as Map, VecDeque};
+use std::convert::From;
+use std::ffi::CStr;
+use std::fmt;
+use std::fs::File;
+use std::io;
+use std::mem::size_of;
+use std::os::unix::io::{AsRawFd, RawFd};
+use std::os::unix::net::{UnixDatagram, UnixStream};
+use std::path::{PathBuf, Path};
+use std::rc::Rc;
+use std::result;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::thread::spawn;
+
+use data_model::*;
+use data_model::VolatileMemoryError;
+
+use sys_util::{Error, Result, EventFd, Poller, Pollable, Scm, SharedMemory, GuestAddress,
+               GuestMemory, GuestMemoryError};
+
+use vm_control::{VmControlError, VmRequest, VmResponse, MaybeOwnedFd};
+use super::{VirtioDevice, Queue, DescriptorChain, INTERRUPT_STATUS_USED_RING, TYPE_WL};
+
+const VIRTWL_SEND_MAX_ALLOCS: usize = 16;
+const VIRTIO_WL_CMD_VFD_NEW: u32 = 256;
+const VIRTIO_WL_CMD_VFD_CLOSE: u32 = 257;
+const VIRTIO_WL_CMD_VFD_SEND: u32 = 258;
+const VIRTIO_WL_CMD_VFD_RECV: u32 = 259;
+const VIRTIO_WL_CMD_VFD_NEW_CTX: u32 = 260;
+const VIRTIO_WL_RESP_OK: u32 = 4096;
+const VIRTIO_WL_RESP_VFD_NEW: u32 = 4097;
+const VIRTIO_WL_RESP_ERR: u32 = 4352;
+const VIRTIO_WL_RESP_OUT_OF_MEMORY: u32 = 4353;
+const VIRTIO_WL_RESP_INVALID_ID: u32 = 4354;
+const VIRTIO_WL_RESP_INVALID_TYPE: u32 = 4355;
+const VIRTIO_WL_VFD_WRITE: u32 = 0x1;
+const VIRTIO_WL_VFD_MAP: u32 = 0x2;
+const VIRTIO_WL_VFD_CONTROL: u32 = 0x4;
+
+const Q_IN: u32 = 0;
+const Q_OUT: u32 = 1;
+const KILL: u32 = 2;
+const VFD_BASE_TOKEN: u32 = 0x100;
+
+const QUEUE_SIZE: u16 = 16;
+const QUEUE_SIZES: &'static [u16] = &[QUEUE_SIZE, QUEUE_SIZE];
+
+const NEXT_VFD_ID_BASE: u32 = 0x40000000;
+const VFD_ID_HOST_MASK: u32 = NEXT_VFD_ID_BASE;
+const IN_BUFFER_LEN: usize = 4080;
+
+const PAGE_MASK: u64 = 0x0fff;
+
+fn round_to_page_size(v: u64) -> u64 {
+    (v + PAGE_MASK) & !PAGE_MASK
+}
+
+fn parse_new(addr: GuestAddress, mem: &GuestMemory) -> WlResult<WlOp> {
+    const ID_OFFSET: usize = 8;
+    const FLAGS_OFFSET: usize = 12;
+    const SIZE_OFFSET: usize = 24;
+
+    let id: Le32 = mem.read_obj_from_addr(mem.checked_offset(addr, ID_OFFSET)
+                                              .ok_or(WlError::CheckedOffset)?)?;
+    let flags: Le32 =
+        mem.read_obj_from_addr(mem.checked_offset(addr, FLAGS_OFFSET)
+                                    .ok_or(WlError::CheckedOffset)?)?;
+    let size: Le32 = mem.read_obj_from_addr(mem.checked_offset(addr, SIZE_OFFSET)
+                                                .ok_or(WlError::CheckedOffset)?)?;
+    Ok(WlOp::NewAlloc {
+           id: id.into(),
+           flags: flags.into(),
+           size: size.into(),
+       })
+}
+
+fn parse_send(addr: GuestAddress, len: u32, mem: &GuestMemory) -> WlResult<WlOp> {
+    const ID_OFFSET: usize = 8;
+    const VFD_COUNT_OFFSET: usize = 12;
+    const VFDS_OFFSET: usize = 16;
+
+    let id: Le32 = mem.read_obj_from_addr(mem.checked_offset(addr, ID_OFFSET)
+                                              .ok_or(WlError::CheckedOffset)?)?;
+    let vfd_count: Le32 =
+        mem.read_obj_from_addr(mem.checked_offset(addr, VFD_COUNT_OFFSET)
+                                    .ok_or(WlError::CheckedOffset)?)?;
+    let vfd_count: u32 = vfd_count.into();
+    let vfds_addr = mem.checked_offset(addr, VFDS_OFFSET)
+        .ok_or(WlError::CheckedOffset)?;
+    let data_addr = mem.checked_offset(vfds_addr, (vfd_count * 4) as usize)
+        .ok_or(WlError::CheckedOffset)?;
+    Ok(WlOp::Send {
+           id: id.into(),
+           vfds_addr: vfds_addr,
+           vfd_count: vfd_count,
+           data_addr: data_addr,
+           data_len: len - (VFDS_OFFSET as u32) - vfd_count * 4,
+       })
+}
+
+fn parse_id(addr: GuestAddress, mem: &GuestMemory) -> WlResult<u32> {
+    const ID_OFFSET: usize = 8;
+    let id: Le32 = mem.read_obj_from_addr(mem.checked_offset(addr, ID_OFFSET)
+                                              .ok_or(WlError::CheckedOffset)?)?;
+    Ok(id.into())
+}
+
+fn parse_desc(desc: &DescriptorChain, mem: &GuestMemory) -> WlResult<WlOp> {
+    let type_: Le32 = mem.read_obj_from_addr(desc.addr)?;
+    match type_.into() {
+        VIRTIO_WL_CMD_VFD_NEW => parse_new(desc.addr, mem),
+        VIRTIO_WL_CMD_VFD_CLOSE => Ok(WlOp::Close { id: parse_id(desc.addr, mem)? }),
+        VIRTIO_WL_CMD_VFD_SEND => parse_send(desc.addr, desc.len, mem),
+        VIRTIO_WL_CMD_VFD_NEW_CTX => Ok(WlOp::NewCtx { id: parse_id(desc.addr, mem)? }),
+        v => Ok(WlOp::Unsupported { op_type: v }),
+    }
+}
+
+fn encode_vfd_new(desc_mem: VolatileSlice,
+                  resp: bool,
+                  vfd_id: u32,
+                  flags: u32,
+                  pfn: u64,
+                  size: u32)
+                  -> WlResult<u32> {
+    let ctrl_vfd_new = CtrlVfdNew {
+        hdr: CtrlHeader {
+            type_: Le32::from(if resp {
+                                  VIRTIO_WL_RESP_VFD_NEW
+                              } else {
+                                  VIRTIO_WL_CMD_VFD_NEW
+                              }),
+            flags: Le32::from(0),
+        },
+        id: Le32::from(vfd_id),
+        flags: Le32::from(flags),
+        pfn: Le64::from(pfn),
+        size: Le32::from(size),
+    };
+
+    desc_mem.get_ref(0)?.store(ctrl_vfd_new);
+    Ok(size_of::<CtrlVfdNew>() as u32)
+}
+
+fn encode_vfd_recv(desc_mem: VolatileSlice,
+                   vfd_id: u32,
+                   data: &[u8],
+                   vfd_ids: &[u32])
+                   -> WlResult<u32> {
+    let ctrl_vfd_recv = CtrlVfdRecv {
+        hdr: CtrlHeader {
+            type_: Le32::from(VIRTIO_WL_CMD_VFD_RECV),
+            flags: Le32::from(0),
+        },
+        id: Le32::from(vfd_id),
+        vfd_count: Le32::from(vfd_ids.len() as u32),
+    };
+    desc_mem.get_ref(0)?.store(ctrl_vfd_recv);
+
+    let vfd_slice = desc_mem
+        .get_slice(size_of::<CtrlVfdRecv>(), vfd_ids.len() * size_of::<Le32>())?;
+    for (i, &recv_vfd_id) in vfd_ids.iter().enumerate() {
+        vfd_slice
+            .get_ref(size_of::<Le32>() * i)?
+            .store(recv_vfd_id);
+    }
+
+    let data_slice = desc_mem
+        .get_slice(size_of::<CtrlVfdRecv>() + vfd_ids.len() * size_of::<Le32>(),
+                   data.len())?;
+    data_slice.copy_from(data);
+
+    Ok((size_of::<CtrlVfdRecv>() + vfd_ids.len() * size_of::<Le32>() + data.len()) as u32)
+}
+
+fn encode_resp(desc_mem: VolatileSlice, resp: WlResp) -> WlResult<u32> {
+    match resp {
+        WlResp::VfdNew {
+            id,
+            flags,
+            pfn,
+            size,
+            resp,
+        } => encode_vfd_new(desc_mem, resp, id, flags, pfn, size),
+        WlResp::VfdRecv { id, data, vfds } => encode_vfd_recv(desc_mem, id, data, vfds),
+        r => {
+            desc_mem.get_ref(0)?.store(Le32::from(r.get_code()));
+            Ok(size_of::<Le32>() as u32)
+        }
+    }
+}
+
+#[derive(Debug)]
+enum WlError {
+    NewAlloc(Error),
+    AllocSetSize(Error),
+    AllocFromFile(Error),
+    SocketConnect(io::Error),
+    SocketNonBlock(io::Error),
+    VmControl(VmControlError),
+    VmBadResponse,
+    CheckedOffset,
+    GuestMemory(GuestMemoryError),
+    VolatileMemory(VolatileMemoryError),
+    SendVfd(Error),
+    RecvVfd(Error),
+}
+
+type WlResult<T> = result::Result<T, WlError>;
+
+impl From<GuestMemoryError> for WlError {
+    fn from(e: GuestMemoryError) -> WlError {
+        WlError::GuestMemory(e)
+    }
+}
+
+impl From<VolatileMemoryError> for WlError {
+    fn from(e: VolatileMemoryError) -> WlError {
+        WlError::VolatileMemory(e)
+    }
+}
+
+#[derive(Clone)]
+struct VmRequester {
+    inner: Rc<RefCell<(Scm, UnixDatagram)>>,
+}
+
+impl VmRequester {
+    fn new(vm_socket: UnixDatagram) -> VmRequester {
+        VmRequester { inner: Rc::new(RefCell::new((Scm::new(1), vm_socket))) }
+    }
+
+    fn request(&self, request: VmRequest) -> WlResult<VmResponse> {
+        let mut inner = self.inner.borrow_mut();
+        let (ref mut scm, ref mut vm_socket) = *inner;
+        request
+            .send(scm, vm_socket)
+            .map_err(WlError::VmControl)?;
+        VmResponse::recv(scm, vm_socket).map_err(WlError::VmControl)
+    }
+}
+
+#[repr(C)]
+#[derive(Copy, Clone)]
+struct CtrlHeader {
+    type_: Le32,
+    flags: Le32,
+}
+
+#[repr(C)]
+#[derive(Copy, Clone)]
+struct CtrlVfdNew {
+    hdr: CtrlHeader,
+    id: Le32,
+    flags: Le32,
+    pfn: Le64,
+    size: Le32,
+}
+
+unsafe impl DataInit for CtrlVfdNew {}
+
+#[repr(C)]
+#[derive(Copy, Clone)]
+struct CtrlVfdRecv {
+    hdr: CtrlHeader,
+    id: Le32,
+    vfd_count: Le32,
+}
+
+unsafe impl DataInit for CtrlVfdRecv {}
+
+#[derive(Debug)]
+enum WlOp {
+    NewAlloc { id: u32, flags: u32, size: u32 },
+    Close { id: u32 },
+    Send {
+        id: u32,
+        vfds_addr: GuestAddress,
+        vfd_count: u32,
+        data_addr: GuestAddress,
+        data_len: u32,
+    },
+    NewCtx { id: u32 },
+    Unsupported { op_type: u32 },
+}
+
+#[derive(Debug)]
+#[allow(dead_code)]
+enum WlResp<'a> {
+    Ok,
+    VfdNew {
+        id: u32,
+        flags: u32,
+        pfn: u64,
+        size: u32,
+        // The VfdNew variant can be either a response or a command depending on this `resp`. This
+        // is important for the `get_code` method.
+        resp: bool,
+    },
+    VfdRecv {
+        id: u32,
+        data: &'a [u8],
+        vfds: &'a [u32],
+    },
+    Err,
+    OutOfMemory,
+    InvalidId,
+    InvalidType,
+}
+
+impl<'a> WlResp<'a> {
+    fn get_code(&self) -> u32 {
+        match self {
+            &WlResp::Ok => VIRTIO_WL_RESP_OK,
+            &WlResp::VfdNew { resp, .. } => {
+                if resp {
+                    VIRTIO_WL_RESP_VFD_NEW
+                } else {
+                    VIRTIO_WL_CMD_VFD_NEW
+                }
+            }
+            &WlResp::VfdRecv { .. } => VIRTIO_WL_CMD_VFD_RECV,
+            &WlResp::Err => VIRTIO_WL_RESP_ERR,
+            &WlResp::OutOfMemory => VIRTIO_WL_RESP_OUT_OF_MEMORY,
+            &WlResp::InvalidId => VIRTIO_WL_RESP_INVALID_ID,
+            &WlResp::InvalidType => VIRTIO_WL_RESP_INVALID_TYPE,
+        }
+    }
+}
+
+#[derive(Default)]
+struct WlVfd {
+    socket: Option<UnixStream>,
+    guest_shared_memory: Option<SharedMemory>,
+    slot: Option<(u32 /* slot */, u64 /* pfn */, VmRequester)>,
+}
+
+impl fmt::Debug for WlVfd {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(f, "WlVfd {{")?;
+        if let Some(ref s) = self.socket {
+            write!(f, " socket: {}", s.as_raw_fd())?;
+        }
+        if let Some(&(slot, pfn, _)) = self.slot.as_ref() {
+            write!(f, " slot: {} pfn: {}", slot, pfn)?;
+        }
+        write!(f, " }}")
+    }
+}
+
+impl WlVfd {
+    fn connect<P: AsRef<Path>>(path: P) -> WlResult<WlVfd> {
+        let socket = UnixStream::connect(path)
+            .map_err(WlError::SocketConnect)?;
+        socket
+            .set_nonblocking(true)
+            .map_err(WlError::SocketNonBlock)?;
+        Ok(WlVfd {
+               socket: Some(socket),
+               guest_shared_memory: None,
+               slot: None,
+           })
+    }
+
+    fn allocate(vm: VmRequester, size: u64) -> WlResult<WlVfd> {
+        let size_page_aligned = round_to_page_size(size);
+        let mut vfd_shm = SharedMemory::new(Some(CStr::from_bytes_with_nul(b"virtwl_alloc\0")
+                                                     .unwrap()))
+                .map_err(WlError::NewAlloc)?;
+        vfd_shm
+            .set_size(size_page_aligned)
+            .map_err(WlError::AllocSetSize)?;
+        let register_response =
+            vm.request(VmRequest::RegisterMemory(MaybeOwnedFd::Borrowed(vfd_shm.as_raw_fd()),
+                                                   vfd_shm.size() as usize))?;
+        match register_response {
+            VmResponse::RegisterMemory { pfn, slot } => {
+                Ok(WlVfd {
+                       socket: None,
+                       guest_shared_memory: Some(vfd_shm),
+                       slot: Some((slot, pfn, vm)),
+                   })
+            }
+            _ => Err(WlError::VmBadResponse),
+        }
+    }
+
+    fn from_file(vm: VmRequester, fd: File) -> WlResult<WlVfd> {
+        let vfd_shm = SharedMemory::from_raw_fd(fd)
+            .map_err(WlError::AllocFromFile)?;
+        let size = round_to_page_size(vfd_shm.size());
+        let register_response =
+            vm.request(VmRequest::RegisterMemory(MaybeOwnedFd::Borrowed(vfd_shm.as_raw_fd()),
+                                                   size as usize))?;
+        match register_response {
+            VmResponse::RegisterMemory { pfn, slot } => {
+                Ok(WlVfd {
+                       socket: None,
+                       guest_shared_memory: Some(vfd_shm),
+                       slot: Some((slot, pfn, vm)),
+                   })
+            }
+            _ => Err(WlError::VmBadResponse),
+        }
+    }
+
+    fn flags(&self) -> u32 {
+        let mut flags = 0;
+        if self.socket.is_some() {
+            flags |= VIRTIO_WL_VFD_CONTROL;
+        }
+        if self.slot.is_some() {
+            flags |= VIRTIO_WL_VFD_WRITE | VIRTIO_WL_VFD_MAP
+        }
+        flags
+    }
+
+    fn pfn(&self) -> Option<u64> {
+        self.slot.as_ref().map(|s| s.1)
+    }
+
+    fn size(&self) -> Option<u64> {
+        self.guest_shared_memory.as_ref().map(|m| m.size())
+    }
+
+    fn fd(&self) -> Option<RawFd> {
+        self.guest_shared_memory
+            .as_ref()
+            .map(|m| m.as_raw_fd())
+            .or(self.socket.as_ref().map(|s| s.as_raw_fd()))
+    }
+
+    fn send(&mut self, scm: &mut Scm, fds: &[RawFd], data: VolatileSlice) -> WlResult<WlResp> {
+        match self.socket {
+            Some(ref socket) => {
+                scm.send(socket, &[data], fds)
+                    .map_err(WlError::SendVfd)?;
+                Ok(WlResp::Ok)
+            }
+            None => Ok(WlResp::InvalidType),
+        }
+    }
+
+    fn recv(&mut self, scm: &mut Scm, in_file_queue: &mut Vec<File>) -> WlResult<Vec<u8>> {
+        // This awkward looking scope is to allow us to remove self.socket if we discover after
+        // borrowing it that the socket is disconnected.
+        {
+            let socket = match self.socket {
+                Some(ref s) => s,
+                None => return Ok(Vec::new()),
+            };
+            let mut buf = Vec::new();
+            buf.resize(IN_BUFFER_LEN, 0);
+            let old_len = in_file_queue.len();
+            let len = scm.recv(socket, &mut [&mut buf[..]], in_file_queue)
+                .map_err(WlError::RecvVfd)?;
+            // If any data gets read, the return statement avoids removing the socket.
+            if len != 0 || in_file_queue.len() != old_len {
+                buf.truncate(len);
+                buf.shrink_to_fit();
+                return Ok(buf);
+            }
+        }
+        self.socket = None;
+        Ok(Vec::new())
+    }
+
+    fn close(&mut self) -> WlResult<()> {
+        self.socket = None;
+        if let Some((slot, _, vm)) = self.slot.take() {
+            vm.request(VmRequest::UnregisterMemory(slot))?;
+        }
+        Ok(())
+    }
+}
+
+impl Drop for WlVfd {
+    fn drop(&mut self) {
+        let _ = self.close();
+    }
+}
+
+#[derive(Debug)]
+enum WlRecv {
+    Vfd { id: u32 },
+    Data { buf: Vec<u8> },
+}
+
+struct WlState {
+    wayland_path: PathBuf,
+    vm: VmRequester,
+    vfds: Map<u32, WlVfd>,
+    next_vfd_id: u32,
+    scm: Scm,
+    in_file_queue: Vec<File>,
+    in_queue: VecDeque<(u32 /* vfd_id */, WlRecv)>,
+    current_recv_vfd: Option<u32>,
+    recv_vfds: Vec<u32>,
+}
+
+impl WlState {
+    fn new(wayland_path: PathBuf, vm_socket: UnixDatagram) -> WlState {
+        WlState {
+            wayland_path: wayland_path,
+            vm: VmRequester::new(vm_socket),
+            scm: Scm::new(VIRTWL_SEND_MAX_ALLOCS),
+            vfds: Map::new(),
+            next_vfd_id: NEXT_VFD_ID_BASE,
+            in_file_queue: Vec::new(),
+            in_queue: VecDeque::new(),
+            current_recv_vfd: None,
+            recv_vfds: Vec::new(),
+        }
+    }
+
+    fn new_alloc(&mut self, id: u32, flags: u32, size: u32) -> WlResult<WlResp> {
+        if id & VFD_ID_HOST_MASK != 0 {
+            return Ok(WlResp::InvalidId);
+        }
+        if flags & !(VIRTIO_WL_VFD_WRITE | VIRTIO_WL_VFD_MAP) != 0 {
+            return Ok(WlResp::Err);
+        }
+
+        match self.vfds.entry(id) {
+            Entry::Vacant(entry) => {
+                let vfd = WlVfd::allocate(self.vm.clone(), size as u64)?;
+                let resp = WlResp::VfdNew {
+                    id: id,
+                    flags: flags,
+                    pfn: vfd.pfn().unwrap_or_default(),
+                    size: vfd.size().unwrap_or_default() as u32,
+                    resp: true,
+                };
+                entry.insert(vfd);
+                Ok(resp)
+            }
+            Entry::Occupied(_) => Ok(WlResp::InvalidId),
+        }
+    }
+
+    fn new_context(&mut self, id: u32) -> WlResult<WlResp> {
+        if id & VFD_ID_HOST_MASK != 0 {
+            return Ok(WlResp::InvalidId);
+        }
+        match self.vfds.entry(id) {
+            Entry::Vacant(entry) => {
+                entry.insert(WlVfd::connect(&self.wayland_path)?);
+                Ok(WlResp::VfdNew {
+                       id: id,
+                       flags: VIRTIO_WL_VFD_CONTROL,
+                       pfn: 0,
+                       size: 0,
+                       resp: true,
+                   })
+            }
+            Entry::Occupied(_) => Ok(WlResp::InvalidId),
+        }
+    }
+
+    fn close(&mut self, vfd_id: u32) -> WlResult<WlResp> {
+        let mut to_delete = Set::new();
+        for &(dest_vfd_id, ref q) in self.in_queue.iter() {
+            if dest_vfd_id == vfd_id {
+                if let &WlRecv::Vfd { id } = q {
+                    to_delete.insert(id);
+                }
+            }
+        }
+        for vfd_id in to_delete {
+            // Sorry sub-error, we can't have cascading errors leaving us in an inconsistent state.
+            let _ = self.close(vfd_id);
+        }
+        match self.vfds.remove(&vfd_id) {
+            Some(mut vfd) => {
+                self.in_queue.retain(|&(id, _)| id != vfd_id);
+                vfd.close()?;
+                Ok(WlResp::Ok)
+            }
+            None => Ok(WlResp::InvalidId),
+        }
+    }
+
+    fn send(&mut self, vfd_id: u32, vfds: VolatileSlice, data: VolatileSlice) -> WlResult<WlResp> {
+        let vfd_count = vfds.size() / size_of::<Le32>();
+        let mut vfd_ids = [Le32::from(0); VIRTWL_SEND_MAX_ALLOCS];
+        vfds.copy_to(&mut vfd_ids[..]);
+        let mut fds = [0; VIRTWL_SEND_MAX_ALLOCS];
+        for (&id, fd) in vfd_ids[..vfd_count].iter().zip(fds.iter_mut()) {
+            match self.vfds.get(&id.into()) {
+                Some(vfd) => {
+                    match vfd.fd() {
+                        Some(vfd_fd) => *fd = vfd_fd,
+                        None => return Ok(WlResp::InvalidType),
+                    }
+                }
+                None => return Ok(WlResp::InvalidId),
+            }
+        }
+        match self.vfds.get_mut(&vfd_id) {
+            Some(vfd) => vfd.send(&mut self.scm, &fds[..vfd_count], data),
+            None => Ok(WlResp::InvalidId),
+        }
+    }
+
+    fn recv(&mut self, vfd_id: u32) -> WlResult<()> {
+        let buf = match self.vfds.get_mut(&vfd_id) {
+            Some(vfd) => vfd.recv(&mut self.scm, &mut self.in_file_queue)?,
+            None => return Ok(()),
+        };
+        for file in self.in_file_queue.drain(..) {
+            self.vfds
+                .insert(self.next_vfd_id, WlVfd::from_file(self.vm.clone(), file)?);
+            self.in_queue
+                .push_back((vfd_id, WlRecv::Vfd { id: self.next_vfd_id }));
+            self.next_vfd_id += 1;
+        }
+        self.in_queue
+            .push_back((vfd_id, WlRecv::Data { buf: buf }));
+
+        Ok(())
+    }
+
+    fn execute(&mut self, mem: &GuestMemory, op: WlOp) -> WlResult<WlResp> {
+        match op {
+            WlOp::NewAlloc { id, flags, size } => self.new_alloc(id, flags, size),
+            WlOp::Close { id } => self.close(id),
+            WlOp::Send {
+                id,
+                vfds_addr,
+                vfd_count,
+                data_addr,
+                data_len,
+            } => {
+                let vfd_mem = mem.get_slice(vfds_addr.0, (vfd_count as usize) * size_of::<Le32>())?;
+                let data_mem = mem.get_slice(data_addr.0, data_len as usize)?;
+                self.send(id, vfd_mem, data_mem)
+            }
+            WlOp::NewCtx { id } => self.new_context(id),
+            WlOp::Unsupported { .. } => Ok(WlResp::Err),
+        }
+    }
+
+    fn next_recv(&self) -> Option<WlResp> {
+        if let Some(q) = self.in_queue.front() {
+            match q {
+                &(vfd_id, WlRecv::Vfd { id }) => {
+                    if self.current_recv_vfd.is_none() || self.current_recv_vfd == Some(vfd_id) {
+                        match self.vfds.get(&id) {
+                            Some(vfd) => {
+                                Some(WlResp::VfdNew {
+                                         id: id,
+                                         flags: vfd.flags(),
+                                         pfn: vfd.pfn().unwrap_or_default(),
+                                         size: vfd.size().unwrap_or_default() as u32,
+                                         resp: false,
+                                     })
+                            }
+                            _ => {
+                                Some(WlResp::VfdNew {
+                                         id: id,
+                                         flags: 0,
+                                         pfn: 0,
+                                         size: 0,
+                                         resp: false,
+                                     })
+                            }
+                        }
+                    } else {
+                        Some(WlResp::VfdRecv {
+                                 id: self.current_recv_vfd.unwrap(),
+                                 data: &[],
+                                 vfds: &self.recv_vfds[..],
+                             })
+                    }
+                }
+                &(vfd_id, WlRecv::Data { ref buf }) => {
+                    if self.current_recv_vfd.is_none() || self.current_recv_vfd == Some(vfd_id) {
+                        Some(WlResp::VfdRecv {
+                                 id: vfd_id,
+                                 data: &buf[..],
+                                 vfds: &self.recv_vfds[..],
+                             })
+                    } else {
+                        Some(WlResp::VfdRecv {
+                                 id: self.current_recv_vfd.unwrap(),
+                                 data: &[],
+                                 vfds: &self.recv_vfds[..],
+                             })
+                    }
+                }
+            }
+        } else {
+            None
+
+        }
+    }
+
+    fn pop_recv(&mut self) {
+        if let Some(q) = self.in_queue.front() {
+            match q {
+                &(vfd_id, WlRecv::Vfd { id }) => {
+                    if self.current_recv_vfd.is_none() || self.current_recv_vfd == Some(vfd_id) {
+                        self.recv_vfds.push(id);
+                        self.current_recv_vfd = Some(vfd_id);
+                    } else {
+                        self.recv_vfds.clear();
+                        self.current_recv_vfd = None;
+                        return;
+                    }
+                }
+                &(vfd_id, WlRecv::Data { .. }) => {
+                    self.recv_vfds.clear();
+                    self.current_recv_vfd = None;
+                    if !(self.current_recv_vfd.is_none() || self.current_recv_vfd == Some(vfd_id)) {
+                        return;
+                    }
+                }
+            }
+        }
+        self.in_queue.pop_front();
+    }
+
+    fn iter_sockets<'a, F>(&'a self, mut f: F)
+        where F: FnMut(u32, &'a UnixStream)
+    {
+        for (id, socket) in self.vfds
+                .iter()
+                .filter_map(|(&k, v)| v.socket.as_ref().map(|s| (k, s))) {
+            f(id, &socket);
+        }
+    }
+}
+
+struct Worker {
+    mem: GuestMemory,
+    interrupt_evt: EventFd,
+    interrupt_status: Arc<AtomicUsize>,
+    in_queue: Queue,
+    out_queue: Queue,
+    state: WlState,
+    in_desc_chains: VecDeque<(u16, GuestAddress, u32)>,
+}
+
+impl Worker {
+    fn new(mem: GuestMemory,
+           interrupt_evt: EventFd,
+           interrupt_status: Arc<AtomicUsize>,
+           in_queue: Queue,
+           out_queue: Queue,
+           wayland_path: PathBuf,
+           vm_socket: UnixDatagram)
+           -> Worker {
+        Worker {
+            mem: mem,
+            interrupt_evt: interrupt_evt,
+            interrupt_status: interrupt_status,
+            in_queue: in_queue,
+            out_queue: out_queue,
+            state: WlState::new(wayland_path, vm_socket),
+            in_desc_chains: VecDeque::with_capacity(QUEUE_SIZE as usize),
+        }
+    }
+
+    fn signal_used_queue(&self) {
+        self.interrupt_status
+            .fetch_or(INTERRUPT_STATUS_USED_RING as usize, Ordering::SeqCst);
+        let _ = self.interrupt_evt.write(1);
+    }
+
+    fn run(&mut self, mut queue_evts: Vec<EventFd>, kill_evt: EventFd) {
+        let in_queue_evt = queue_evts.remove(0);
+        let out_queue_evt = queue_evts.remove(0);
+        let mut token_vfd_id_map = Map::new();
+        let mut poller = Poller::new(3);
+        'poll: loop {
+            let tokens = {
+                // TODO(zachr): somehow keep pollables from allocating every loop
+                // The capacity is always the 3 static eventfds plus the number of vfd sockets. To
+                // estimate the number of vfd sockets, we use the previous poll's vfd id map size,
+                // which was equal to the number of vfd sockets.
+                let mut pollables = Vec::with_capacity(3 + token_vfd_id_map.len());
+                pollables.push((Q_IN, &in_queue_evt as &Pollable));
+                pollables.push((Q_OUT, &out_queue_evt as &Pollable));
+                pollables.push((KILL, &kill_evt as &Pollable));
+                token_vfd_id_map.clear();
+                // TODO(zachr): leave these out if there is no Q_IN to use
+                self.state
+                    .iter_sockets(|id, socket| {
+                                      let token = VFD_BASE_TOKEN + token_vfd_id_map.len() as u32;
+                                      token_vfd_id_map.insert(token, id);
+                                      pollables.push((token, socket));
+                                  });
+                poller.poll(&pollables[..]).expect("error: failed poll")
+            };
+
+            let mut signal_used = false;
+            for &token in tokens {
+                match token {
+                    Q_IN => {
+                        let _ = in_queue_evt.read();
+                        // Used to buffer descriptor indexes that are invalid for our uses.
+                        let mut rejects = [0u16; QUEUE_SIZE as usize];
+                        let mut rejects_len = 0;
+                        let min_in_desc_len = (size_of::<CtrlVfdRecv>() +
+                                               size_of::<Le32>() * VIRTWL_SEND_MAX_ALLOCS) as
+                                              u32;
+                        self.in_desc_chains.extend(self.in_queue.iter(&self.mem).filter_map(|d| {
+                            if d.len >= min_in_desc_len && d.is_write_only() {
+                                Some((d.index, d.addr, d.len))
+                            } else {
+                                // Can not use queue.add_used directly because it's being borrowed
+                                // for the iterator chain, so we buffer the descriptor index in
+                                // rejects.
+                                rejects[rejects_len] = d.index;
+                                rejects_len += 1;
+                                None
+                            }
+                        }));
+                        for &reject in &rejects[..rejects_len] {
+                            signal_used = true;
+                            self.in_queue.add_used(&self.mem, reject, 0);
+                        }
+                    }
+                    Q_OUT => {
+                        let _ = out_queue_evt.read();
+                        // Used to buffer filled in descriptors that will be added to the used queue
+                        // after iterating the available queue.
+                        let mut used_descs = [(0u16, 0u32); QUEUE_SIZE as usize];
+                        let mut used_descs_len = 0;
+                        let min_resp_desc_len = size_of::<CtrlHeader>() as u32;
+                        for desc in self.out_queue.iter(&self.mem) {
+                            // Expects that each descriptor chain is made of one "in" followed by
+                            // one "out" descriptor.
+                            if !desc.is_write_only() {
+                                if let Some(resp_desc) = desc.next_descriptor() {
+                                    if resp_desc.is_write_only() &&
+                                       resp_desc.len >= min_resp_desc_len {
+                                        let resp = match parse_desc(&desc, &self.mem) {
+                                            Ok(op) => {
+                                                match self.state.execute(&self.mem, op) {
+                                                    Ok(r) => r,
+                                                    _ => WlResp::Err,
+                                                }
+                                            }
+                                            _ => WlResp::Err,
+                                        };
+
+                                        let resp_mem = self.mem
+                                            .get_slice(resp_desc.addr.0, resp_desc.len as usize)
+                                            .unwrap();
+                                        let used_len = encode_resp(resp_mem, resp)
+                                            .unwrap_or_default();
+
+                                        used_descs[used_descs_len] = (desc.index, used_len);
+                                    }
+                                }
+                            } else {
+                                // Chains that are unusable get sent straight back to the used
+                                // queue.
+                                used_descs[used_descs_len] = (desc.index, 0);
+                            }
+                            used_descs_len += 1;
+                        }
+                        for &(index, len) in &used_descs[..used_descs_len] {
+                            signal_used = true;
+                            self.out_queue.add_used(&self.mem, index, len);
+                        }
+                    }
+                    KILL => {
+                        println!("crosvm Wl worker killed");
+                        break 'poll;
+                    }
+                    v => {
+                        if let Some(&id) = token_vfd_id_map.get(&v) {
+                            let res = self.state.recv(id);
+                            if let Err(e) = res {
+                                println!("recv vfd {} error: {:?}", id, e);
+                            }
+                        }
+                    }
+                }
+            }
+
+            // Because this loop should be retried after the in queue is usable or after one of the
+            // VFDs was read, we do it after the poll event responses.
+            while !self.in_desc_chains.is_empty() {
+                let mut should_pop = false;
+                if let Some(in_resp) = self.state.next_recv() {
+                    // self.in_desc_chains is not empty (checked by loop condition) so unwrap is
+                    // safe.
+                    let (index, addr, desc_len) = self.in_desc_chains.pop_front().unwrap();
+                    // This memory location is valid because it came from a queue which always
+                    // checks the descriptor memory locations.
+                    let desc_mem = self.mem.get_slice(addr.0, desc_len as usize).unwrap();
+                    let len = match encode_resp(desc_mem, in_resp) {
+                        Ok(len) => {
+                            should_pop = true;
+                            len
+                        }
+                        Err(e) => {
+                            println!("failed to encode response to descriptor chain: {:?}", e);
+                            0
+                        }
+                    };
+                    signal_used = true;
+                    self.in_queue.add_used(&self.mem, index, len);
+                } else {
+                    break;
+                }
+                if should_pop {
+                    self.state.pop_recv();
+                }
+            }
+
+            if signal_used {
+                self.signal_used_queue();
+            }
+        }
+    }
+}
+
+pub struct Wl {
+    kill_evt: Option<EventFd>,
+    wayland_path: PathBuf,
+    vm_socket: Option<UnixDatagram>,
+}
+
+impl Wl {
+    pub fn new<P: AsRef<Path>>(wayland_path: P, vm_socket: UnixDatagram) -> Result<Wl> {
+        // let kill_evt = EventFd::new()?;
+        //     workers_kill_evt: Some(kill_evt.try_clone()?),
+        Ok(Wl {
+               kill_evt: None,
+               wayland_path: wayland_path.as_ref().to_owned(),
+               vm_socket: Some(vm_socket),
+           })
+    }
+}
+
+impl Drop for Wl {
+    fn drop(&mut self) {
+        if let Some(kill_evt) = self.kill_evt.take() {
+            // Ignore the result because there is nothing we can do about it.
+            let _ = kill_evt.write(1);
+        }
+    }
+}
+
+impl VirtioDevice for Wl {
+    fn keep_fds(&self) -> Vec<RawFd> {
+        let mut keep_fds = Vec::new();
+
+        if let Some(ref vm_socket) = self.vm_socket {
+            keep_fds.push(vm_socket.as_raw_fd());
+        }
+
+        keep_fds
+    }
+
+    fn device_type(&self) -> u32 {
+        TYPE_WL
+    }
+
+    fn queue_max_sizes(&self) -> &[u16] {
+        QUEUE_SIZES
+    }
+
+    fn activate(&mut self,
+                mem: GuestMemory,
+                interrupt_evt: EventFd,
+                status: Arc<AtomicUsize>,
+                mut queues: Vec<Queue>,
+                queue_evts: Vec<EventFd>) {
+        if queues.len() != QUEUE_SIZES.len() || queue_evts.len() != QUEUE_SIZES.len() {
+            return;
+        }
+
+        let (self_kill_evt, kill_evt) =
+            match EventFd::new().and_then(|e| Ok((e.try_clone()?, e))) {
+                Ok(v) => v,
+                Err(e) => {
+                    println!("wl: error creating kill EventFd pair: {:?}", e);
+                    return;
+                }
+            };
+        self.kill_evt = Some(self_kill_evt);
+
+        if let Some(vm_socket) = self.vm_socket.take() {
+            let wayland_path = self.wayland_path.clone();
+            spawn(move || {
+                Worker::new(mem,
+                            interrupt_evt,
+                            status,
+                            queues.remove(0),
+                            queues.remove(0),
+                            wayland_path,
+                            vm_socket)
+                        .run(queue_evts, kill_evt);
+            });
+        }
+    }
+}