diff options
Diffstat (limited to 'devices/src/virtio/console.rs')
-rw-r--r-- | devices/src/virtio/console.rs | 449 |
1 files changed, 449 insertions, 0 deletions
diff --git a/devices/src/virtio/console.rs b/devices/src/virtio/console.rs new file mode 100644 index 0000000..38f5bf1 --- /dev/null +++ b/devices/src/virtio/console.rs @@ -0,0 +1,449 @@ +// Copyright 2020 The Chromium OS Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +use std::io::{self, Read}; +use std::os::unix::io::RawFd; +use std::sync::mpsc::{channel, Receiver, TryRecvError}; +use std::thread; + +use data_model::{DataInit, Le16, Le32}; +use sys_util::{error, EventFd, GuestMemory, PollContext, PollToken}; + +use super::{ + copy_config, Interrupt, Queue, Reader, VirtioDevice, Writer, TYPE_CONSOLE, VIRTIO_F_VERSION_1, +}; + +const QUEUE_SIZE: u16 = 256; + +// For now, just implement port 0 (receiveq and transmitq). +// If VIRTIO_CONSOLE_F_MULTIPORT is implemented, more queues will be needed. +const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE, QUEUE_SIZE]; + +#[derive(Copy, Clone, Debug, Default)] +#[repr(C)] +struct virtio_console_config { + cols: Le16, + rows: Le16, + max_nr_ports: Le32, + emerg_wr: Le32, +} + +// Safe because it only has data and has no implicit padding. +unsafe impl DataInit for virtio_console_config {} + +struct Worker { + mem: GuestMemory, + interrupt: Interrupt, + input: Option<Box<dyn io::Read + Send>>, + output: Option<Box<dyn io::Write + Send>>, +} + +fn write_output(output: &mut Box<dyn io::Write>, data: &[u8]) -> io::Result<()> { + output.write_all(&data)?; + output.flush() +} + +impl Worker { + fn process_transmit_request( + mut reader: Reader, + output: &mut Box<dyn io::Write>, + ) -> io::Result<u32> { + let len = reader.available_bytes(); + let mut data = vec![0u8; len]; + reader.read_exact(&mut data)?; + write_output(output, &data)?; + Ok(0) + } + + fn process_transmit_queue( + &mut self, + transmit_queue: &mut Queue, + output: &mut Box<dyn io::Write>, + ) { + let mut needs_interrupt = false; + while let Some(avail_desc) = transmit_queue.pop(&self.mem) { + let desc_index = avail_desc.index; + + let reader = match Reader::new(&self.mem, avail_desc) { + Ok(r) => r, + Err(e) => { + error!("console: failed to create reader: {}", e); + transmit_queue.add_used(&self.mem, desc_index, 0); + needs_interrupt = true; + continue; + } + }; + + let len = match Self::process_transmit_request(reader, output) { + Ok(written) => written, + Err(e) => { + error!("console: process_transmit_request failed: {}", e); + 0 + } + }; + + transmit_queue.add_used(&self.mem, desc_index, len); + needs_interrupt = true; + } + + if needs_interrupt { + self.interrupt.signal_used_queue(transmit_queue.vector); + } + } + + // Start a thread that reads self.input and sends the input back via the returned channel. + // + // `in_avail_evt` will be triggered by the thread when new input is available. + fn spawn_input_thread(&mut self, in_avail_evt: &EventFd) -> Option<Receiver<u8>> { + let mut rx = match self.input.take() { + Some(input) => input, + None => return None, + }; + + let (send_channel, recv_channel) = channel(); + + let thread_in_avail_evt = match in_avail_evt.try_clone() { + Ok(evt) => evt, + Err(e) => { + error!("failed to clone in_avail_evt: {}", e); + return None; + } + }; + + // The input thread runs in detached mode and will exit when channel is disconnected because + // the console device has been dropped. + let res = thread::Builder::new() + .name(format!("console_input")) + .spawn(move || { + let mut rx_buf = [0u8; 1]; + loop { + match rx.read(&mut rx_buf) { + Ok(0) => break, // Assume the stream of input has ended. + Ok(_) => { + if send_channel.send(rx_buf[0]).is_err() { + // The receiver has disconnected. + break; + } + thread_in_avail_evt.write(1).unwrap(); + } + Err(e) => { + // Being interrupted is not an error, but everything else is. + if e.kind() != io::ErrorKind::Interrupted { + error!( + "failed to read for bytes to queue into console device: {}", + e + ); + break; + } + } + } + } + }); + if let Err(e) = res { + error!("failed to spawn input thread: {}", e); + return None; + } + Some(recv_channel) + } + + // Check for input from `in_channel_opt` and transfer it to the receive queue, if any. + fn handle_input( + &mut self, + in_channel_opt: &mut Option<Receiver<u8>>, + receive_queue: &mut Queue, + ) { + let in_channel = match in_channel_opt.as_ref() { + Some(v) => v, + None => return, + }; + + while let Some(desc) = receive_queue.peek(&self.mem) { + let desc_index = desc.index; + let mut writer = match Writer::new(&self.mem, desc) { + Ok(w) => w, + Err(e) => { + error!("console: failed to create Writer: {}", e); + break; + } + }; + + let mut disconnected = false; + while writer.available_bytes() > 0 { + match in_channel.try_recv() { + Ok(byte) => { + writer.write_obj(byte).unwrap(); + } + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + disconnected = true; + break; + } + } + } + + let bytes_written = writer.bytes_written() as u32; + + if bytes_written > 0 { + receive_queue.pop_peeked(&self.mem); + receive_queue.add_used(&self.mem, desc_index, bytes_written); + self.interrupt.signal_used_queue(receive_queue.vector); + } + + if disconnected { + // Set in_channel to None so that future handle_input calls exit early. + in_channel_opt.take(); + return; + } + + if bytes_written == 0 { + break; + } + } + } + + fn run(&mut self, mut queues: Vec<Queue>, mut queue_evts: Vec<EventFd>, kill_evt: EventFd) { + #[derive(PollToken)] + enum Token { + ReceiveQueueAvailable, + TransmitQueueAvailable, + InputAvailable, + InterruptResample, + Kill, + } + + // Device -> driver + let (mut receive_queue, receive_evt) = (queues.remove(0), queue_evts.remove(0)); + + // Driver -> device + let (mut transmit_queue, transmit_evt) = (queues.remove(0), queue_evts.remove(0)); + + let in_avail_evt = match EventFd::new() { + Ok(evt) => evt, + Err(e) => { + error!("failed creating EventFd: {}", e); + return; + } + }; + + // Spawn a separate thread to poll self.input. + // A thread is used because io::Read only provides a blocking interface, and there is no + // generic way to add an io::Read instance to a poll context (it may not be backed by a file + // descriptor). Moving the blocking read call to a separate thread and sending data back to + // the main worker thread with an event for notification bridges this gap. + let mut in_channel = self.spawn_input_thread(&in_avail_evt); + + let poll_ctx: PollContext<Token> = match PollContext::build_with(&[ + (&transmit_evt, Token::TransmitQueueAvailable), + (&receive_evt, Token::ReceiveQueueAvailable), + (&in_avail_evt, Token::InputAvailable), + (self.interrupt.get_resample_evt(), Token::InterruptResample), + (&kill_evt, Token::Kill), + ]) { + Ok(pc) => pc, + Err(e) => { + error!("failed creating PollContext: {}", e); + return; + } + }; + + let mut output: Box<dyn io::Write> = match self.output.take() { + Some(o) => o, + None => Box::new(io::sink()), + }; + + 'poll: loop { + let events = match poll_ctx.wait() { + Ok(v) => v, + Err(e) => { + error!("failed polling for events: {}", e); + break; + } + }; + + for event in events.iter_readable() { + match event.token() { + Token::TransmitQueueAvailable => { + if let Err(e) = transmit_evt.read() { + error!("failed reading transmit queue EventFd: {}", e); + break 'poll; + } + self.process_transmit_queue(&mut transmit_queue, &mut output); + } + Token::ReceiveQueueAvailable => { + if let Err(e) = receive_evt.read() { + error!("failed reading receive queue EventFd: {}", e); + break 'poll; + } + self.handle_input(&mut in_channel, &mut receive_queue); + } + Token::InputAvailable => { + if let Err(e) = in_avail_evt.read() { + error!("failed reading in_avail_evt: {}", e); + break 'poll; + } + self.handle_input(&mut in_channel, &mut receive_queue); + } + Token::InterruptResample => { + self.interrupt.interrupt_resample(); + } + Token::Kill => break 'poll, + } + } + } + } +} + +/// Virtio console device. +pub struct Console { + kill_evt: Option<EventFd>, + worker_thread: Option<thread::JoinHandle<Worker>>, + input: Option<Box<dyn io::Read + Send>>, + output: Option<Box<dyn io::Write + Send>>, + keep_fds: Vec<RawFd>, +} + +impl Console { + fn new( + input: Option<Box<dyn io::Read + Send>>, + output: Option<Box<dyn io::Write + Send>>, + keep_fds: Vec<RawFd>, + ) -> Console { + Console { + kill_evt: None, + worker_thread: None, + input, + output, + keep_fds, + } + } + + /// Constructs a console with input and output streams. + pub fn new_in_out( + input: Box<dyn io::Read + Send>, + out: Box<dyn io::Write + Send>, + keep_fds: Vec<RawFd>, + ) -> Console { + Self::new(Some(input), Some(out), keep_fds) + } + + /// Constructs a console with an output stream but no input. + pub fn new_out(out: Box<dyn io::Write + Send>, keep_fds: Vec<RawFd>) -> Console { + Self::new(None, Some(out), keep_fds) + } + + /// Constructs a console with no connected input or output. + pub fn new_sink() -> Console { + Self::new(None, None, Vec::new()) + } +} + +impl Drop for Console { + fn drop(&mut self) { + if let Some(kill_evt) = self.kill_evt.take() { + // Ignore the result because there is nothing we can do about it. + let _ = kill_evt.write(1); + } + + if let Some(worker_thread) = self.worker_thread.take() { + let _ = worker_thread.join(); + } + } +} + +impl VirtioDevice for Console { + fn keep_fds(&self) -> Vec<RawFd> { + self.keep_fds.clone() + } + + fn features(&self) -> u64 { + 1 << VIRTIO_F_VERSION_1 + } + + fn device_type(&self) -> u32 { + TYPE_CONSOLE + } + + fn queue_max_sizes(&self) -> &[u16] { + QUEUE_SIZES + } + + fn read_config(&self, offset: u64, data: &mut [u8]) { + let config = virtio_console_config { + max_nr_ports: 1.into(), + ..Default::default() + }; + copy_config(data, 0, config.as_slice(), offset); + } + + fn activate( + &mut self, + mem: GuestMemory, + interrupt: Interrupt, + queues: Vec<Queue>, + queue_evts: Vec<EventFd>, + ) { + if queues.len() < 2 || queue_evts.len() < 2 { + return; + } + + let (self_kill_evt, kill_evt) = match EventFd::new().and_then(|e| Ok((e.try_clone()?, e))) { + Ok(v) => v, + Err(e) => { + error!("failed creating kill EventFd pair: {}", e); + return; + } + }; + self.kill_evt = Some(self_kill_evt); + + let input = self.input.take(); + let output = self.output.take(); + + let worker_result = thread::Builder::new() + .name("virtio_console".to_string()) + .spawn(move || { + let mut worker = Worker { + mem, + interrupt, + input, + output, + }; + worker.run(queues, queue_evts, kill_evt); + worker + }); + + match worker_result { + Err(e) => { + error!("failed to spawn virtio_console worker: {}", e); + return; + } + Ok(join_handle) => { + self.worker_thread = Some(join_handle); + } + } + } + + fn reset(&mut self) -> bool { + if let Some(kill_evt) = self.kill_evt.take() { + if kill_evt.write(1).is_err() { + error!("{}: failed to notify the kill event", self.debug_label()); + return false; + } + } + + if let Some(worker_thread) = self.worker_thread.take() { + match worker_thread.join() { + Err(_) => { + error!("{}: failed to get back resources", self.debug_label()); + return false; + } + Ok(worker) => { + self.input = worker.input; + self.output = worker.output; + return true; + } + } + } + false + } +} |