summary refs log tree commit diff
path: root/devices/src/virtio/console.rs
diff options
context:
space:
mode:
Diffstat (limited to 'devices/src/virtio/console.rs')
-rw-r--r--devices/src/virtio/console.rs449
1 files changed, 449 insertions, 0 deletions
diff --git a/devices/src/virtio/console.rs b/devices/src/virtio/console.rs
new file mode 100644
index 0000000..38f5bf1
--- /dev/null
+++ b/devices/src/virtio/console.rs
@@ -0,0 +1,449 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+use std::io::{self, Read};
+use std::os::unix::io::RawFd;
+use std::sync::mpsc::{channel, Receiver, TryRecvError};
+use std::thread;
+
+use data_model::{DataInit, Le16, Le32};
+use sys_util::{error, EventFd, GuestMemory, PollContext, PollToken};
+
+use super::{
+    copy_config, Interrupt, Queue, Reader, VirtioDevice, Writer, TYPE_CONSOLE, VIRTIO_F_VERSION_1,
+};
+
+const QUEUE_SIZE: u16 = 256;
+
+// For now, just implement port 0 (receiveq and transmitq).
+// If VIRTIO_CONSOLE_F_MULTIPORT is implemented, more queues will be needed.
+const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE, QUEUE_SIZE];
+
+#[derive(Copy, Clone, Debug, Default)]
+#[repr(C)]
+struct virtio_console_config {
+    cols: Le16,
+    rows: Le16,
+    max_nr_ports: Le32,
+    emerg_wr: Le32,
+}
+
+// Safe because it only has data and has no implicit padding.
+unsafe impl DataInit for virtio_console_config {}
+
+struct Worker {
+    mem: GuestMemory,
+    interrupt: Interrupt,
+    input: Option<Box<dyn io::Read + Send>>,
+    output: Option<Box<dyn io::Write + Send>>,
+}
+
+fn write_output(output: &mut Box<dyn io::Write>, data: &[u8]) -> io::Result<()> {
+    output.write_all(&data)?;
+    output.flush()
+}
+
+impl Worker {
+    fn process_transmit_request(
+        mut reader: Reader,
+        output: &mut Box<dyn io::Write>,
+    ) -> io::Result<u32> {
+        let len = reader.available_bytes();
+        let mut data = vec![0u8; len];
+        reader.read_exact(&mut data)?;
+        write_output(output, &data)?;
+        Ok(0)
+    }
+
+    fn process_transmit_queue(
+        &mut self,
+        transmit_queue: &mut Queue,
+        output: &mut Box<dyn io::Write>,
+    ) {
+        let mut needs_interrupt = false;
+        while let Some(avail_desc) = transmit_queue.pop(&self.mem) {
+            let desc_index = avail_desc.index;
+
+            let reader = match Reader::new(&self.mem, avail_desc) {
+                Ok(r) => r,
+                Err(e) => {
+                    error!("console: failed to create reader: {}", e);
+                    transmit_queue.add_used(&self.mem, desc_index, 0);
+                    needs_interrupt = true;
+                    continue;
+                }
+            };
+
+            let len = match Self::process_transmit_request(reader, output) {
+                Ok(written) => written,
+                Err(e) => {
+                    error!("console: process_transmit_request failed: {}", e);
+                    0
+                }
+            };
+
+            transmit_queue.add_used(&self.mem, desc_index, len);
+            needs_interrupt = true;
+        }
+
+        if needs_interrupt {
+            self.interrupt.signal_used_queue(transmit_queue.vector);
+        }
+    }
+
+    // Start a thread that reads self.input and sends the input back via the returned channel.
+    //
+    // `in_avail_evt` will be triggered by the thread when new input is available.
+    fn spawn_input_thread(&mut self, in_avail_evt: &EventFd) -> Option<Receiver<u8>> {
+        let mut rx = match self.input.take() {
+            Some(input) => input,
+            None => return None,
+        };
+
+        let (send_channel, recv_channel) = channel();
+
+        let thread_in_avail_evt = match in_avail_evt.try_clone() {
+            Ok(evt) => evt,
+            Err(e) => {
+                error!("failed to clone in_avail_evt: {}", e);
+                return None;
+            }
+        };
+
+        // The input thread runs in detached mode and will exit when channel is disconnected because
+        // the console device has been dropped.
+        let res = thread::Builder::new()
+            .name(format!("console_input"))
+            .spawn(move || {
+                let mut rx_buf = [0u8; 1];
+                loop {
+                    match rx.read(&mut rx_buf) {
+                        Ok(0) => break, // Assume the stream of input has ended.
+                        Ok(_) => {
+                            if send_channel.send(rx_buf[0]).is_err() {
+                                // The receiver has disconnected.
+                                break;
+                            }
+                            thread_in_avail_evt.write(1).unwrap();
+                        }
+                        Err(e) => {
+                            // Being interrupted is not an error, but everything else is.
+                            if e.kind() != io::ErrorKind::Interrupted {
+                                error!(
+                                    "failed to read for bytes to queue into console device: {}",
+                                    e
+                                );
+                                break;
+                            }
+                        }
+                    }
+                }
+            });
+        if let Err(e) = res {
+            error!("failed to spawn input thread: {}", e);
+            return None;
+        }
+        Some(recv_channel)
+    }
+
+    // Check for input from `in_channel_opt` and transfer it to the receive queue, if any.
+    fn handle_input(
+        &mut self,
+        in_channel_opt: &mut Option<Receiver<u8>>,
+        receive_queue: &mut Queue,
+    ) {
+        let in_channel = match in_channel_opt.as_ref() {
+            Some(v) => v,
+            None => return,
+        };
+
+        while let Some(desc) = receive_queue.peek(&self.mem) {
+            let desc_index = desc.index;
+            let mut writer = match Writer::new(&self.mem, desc) {
+                Ok(w) => w,
+                Err(e) => {
+                    error!("console: failed to create Writer: {}", e);
+                    break;
+                }
+            };
+
+            let mut disconnected = false;
+            while writer.available_bytes() > 0 {
+                match in_channel.try_recv() {
+                    Ok(byte) => {
+                        writer.write_obj(byte).unwrap();
+                    }
+                    Err(TryRecvError::Empty) => break,
+                    Err(TryRecvError::Disconnected) => {
+                        disconnected = true;
+                        break;
+                    }
+                }
+            }
+
+            let bytes_written = writer.bytes_written() as u32;
+
+            if bytes_written > 0 {
+                receive_queue.pop_peeked(&self.mem);
+                receive_queue.add_used(&self.mem, desc_index, bytes_written);
+                self.interrupt.signal_used_queue(receive_queue.vector);
+            }
+
+            if disconnected {
+                // Set in_channel to None so that future handle_input calls exit early.
+                in_channel_opt.take();
+                return;
+            }
+
+            if bytes_written == 0 {
+                break;
+            }
+        }
+    }
+
+    fn run(&mut self, mut queues: Vec<Queue>, mut queue_evts: Vec<EventFd>, kill_evt: EventFd) {
+        #[derive(PollToken)]
+        enum Token {
+            ReceiveQueueAvailable,
+            TransmitQueueAvailable,
+            InputAvailable,
+            InterruptResample,
+            Kill,
+        }
+
+        // Device -> driver
+        let (mut receive_queue, receive_evt) = (queues.remove(0), queue_evts.remove(0));
+
+        // Driver -> device
+        let (mut transmit_queue, transmit_evt) = (queues.remove(0), queue_evts.remove(0));
+
+        let in_avail_evt = match EventFd::new() {
+            Ok(evt) => evt,
+            Err(e) => {
+                error!("failed creating EventFd: {}", e);
+                return;
+            }
+        };
+
+        // Spawn a separate thread to poll self.input.
+        // A thread is used because io::Read only provides a blocking interface, and there is no
+        // generic way to add an io::Read instance to a poll context (it may not be backed by a file
+        // descriptor).  Moving the blocking read call to a separate thread and sending data back to
+        // the main worker thread with an event for notification bridges this gap.
+        let mut in_channel = self.spawn_input_thread(&in_avail_evt);
+
+        let poll_ctx: PollContext<Token> = match PollContext::build_with(&[
+            (&transmit_evt, Token::TransmitQueueAvailable),
+            (&receive_evt, Token::ReceiveQueueAvailable),
+            (&in_avail_evt, Token::InputAvailable),
+            (self.interrupt.get_resample_evt(), Token::InterruptResample),
+            (&kill_evt, Token::Kill),
+        ]) {
+            Ok(pc) => pc,
+            Err(e) => {
+                error!("failed creating PollContext: {}", e);
+                return;
+            }
+        };
+
+        let mut output: Box<dyn io::Write> = match self.output.take() {
+            Some(o) => o,
+            None => Box::new(io::sink()),
+        };
+
+        'poll: loop {
+            let events = match poll_ctx.wait() {
+                Ok(v) => v,
+                Err(e) => {
+                    error!("failed polling for events: {}", e);
+                    break;
+                }
+            };
+
+            for event in events.iter_readable() {
+                match event.token() {
+                    Token::TransmitQueueAvailable => {
+                        if let Err(e) = transmit_evt.read() {
+                            error!("failed reading transmit queue EventFd: {}", e);
+                            break 'poll;
+                        }
+                        self.process_transmit_queue(&mut transmit_queue, &mut output);
+                    }
+                    Token::ReceiveQueueAvailable => {
+                        if let Err(e) = receive_evt.read() {
+                            error!("failed reading receive queue EventFd: {}", e);
+                            break 'poll;
+                        }
+                        self.handle_input(&mut in_channel, &mut receive_queue);
+                    }
+                    Token::InputAvailable => {
+                        if let Err(e) = in_avail_evt.read() {
+                            error!("failed reading in_avail_evt: {}", e);
+                            break 'poll;
+                        }
+                        self.handle_input(&mut in_channel, &mut receive_queue);
+                    }
+                    Token::InterruptResample => {
+                        self.interrupt.interrupt_resample();
+                    }
+                    Token::Kill => break 'poll,
+                }
+            }
+        }
+    }
+}
+
+/// Virtio console device.
+pub struct Console {
+    kill_evt: Option<EventFd>,
+    worker_thread: Option<thread::JoinHandle<Worker>>,
+    input: Option<Box<dyn io::Read + Send>>,
+    output: Option<Box<dyn io::Write + Send>>,
+    keep_fds: Vec<RawFd>,
+}
+
+impl Console {
+    fn new(
+        input: Option<Box<dyn io::Read + Send>>,
+        output: Option<Box<dyn io::Write + Send>>,
+        keep_fds: Vec<RawFd>,
+    ) -> Console {
+        Console {
+            kill_evt: None,
+            worker_thread: None,
+            input,
+            output,
+            keep_fds,
+        }
+    }
+
+    /// Constructs a console with input and output streams.
+    pub fn new_in_out(
+        input: Box<dyn io::Read + Send>,
+        out: Box<dyn io::Write + Send>,
+        keep_fds: Vec<RawFd>,
+    ) -> Console {
+        Self::new(Some(input), Some(out), keep_fds)
+    }
+
+    /// Constructs a console with an output stream but no input.
+    pub fn new_out(out: Box<dyn io::Write + Send>, keep_fds: Vec<RawFd>) -> Console {
+        Self::new(None, Some(out), keep_fds)
+    }
+
+    /// Constructs a console with no connected input or output.
+    pub fn new_sink() -> Console {
+        Self::new(None, None, Vec::new())
+    }
+}
+
+impl Drop for Console {
+    fn drop(&mut self) {
+        if let Some(kill_evt) = self.kill_evt.take() {
+            // Ignore the result because there is nothing we can do about it.
+            let _ = kill_evt.write(1);
+        }
+
+        if let Some(worker_thread) = self.worker_thread.take() {
+            let _ = worker_thread.join();
+        }
+    }
+}
+
+impl VirtioDevice for Console {
+    fn keep_fds(&self) -> Vec<RawFd> {
+        self.keep_fds.clone()
+    }
+
+    fn features(&self) -> u64 {
+        1 << VIRTIO_F_VERSION_1
+    }
+
+    fn device_type(&self) -> u32 {
+        TYPE_CONSOLE
+    }
+
+    fn queue_max_sizes(&self) -> &[u16] {
+        QUEUE_SIZES
+    }
+
+    fn read_config(&self, offset: u64, data: &mut [u8]) {
+        let config = virtio_console_config {
+            max_nr_ports: 1.into(),
+            ..Default::default()
+        };
+        copy_config(data, 0, config.as_slice(), offset);
+    }
+
+    fn activate(
+        &mut self,
+        mem: GuestMemory,
+        interrupt: Interrupt,
+        queues: Vec<Queue>,
+        queue_evts: Vec<EventFd>,
+    ) {
+        if queues.len() < 2 || queue_evts.len() < 2 {
+            return;
+        }
+
+        let (self_kill_evt, kill_evt) = match EventFd::new().and_then(|e| Ok((e.try_clone()?, e))) {
+            Ok(v) => v,
+            Err(e) => {
+                error!("failed creating kill EventFd pair: {}", e);
+                return;
+            }
+        };
+        self.kill_evt = Some(self_kill_evt);
+
+        let input = self.input.take();
+        let output = self.output.take();
+
+        let worker_result = thread::Builder::new()
+            .name("virtio_console".to_string())
+            .spawn(move || {
+                let mut worker = Worker {
+                    mem,
+                    interrupt,
+                    input,
+                    output,
+                };
+                worker.run(queues, queue_evts, kill_evt);
+                worker
+            });
+
+        match worker_result {
+            Err(e) => {
+                error!("failed to spawn virtio_console worker: {}", e);
+                return;
+            }
+            Ok(join_handle) => {
+                self.worker_thread = Some(join_handle);
+            }
+        }
+    }
+
+    fn reset(&mut self) -> bool {
+        if let Some(kill_evt) = self.kill_evt.take() {
+            if kill_evt.write(1).is_err() {
+                error!("{}: failed to notify the kill event", self.debug_label());
+                return false;
+            }
+        }
+
+        if let Some(worker_thread) = self.worker_thread.take() {
+            match worker_thread.join() {
+                Err(_) => {
+                    error!("{}: failed to get back resources", self.debug_label());
+                    return false;
+                }
+                Ok(worker) => {
+                    self.input = worker.input;
+                    self.output = worker.output;
+                    return true;
+                }
+            }
+        }
+        false
+    }
+}