// Copyright 2020 The Chromium OS Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. use std::io::{self, Read}; use std::os::unix::io::RawFd; use std::sync::mpsc::{channel, Receiver, TryRecvError}; use std::thread; use data_model::{DataInit, Le16, Le32}; use sys_util::{error, EventFd, GuestMemory, PollContext, PollToken}; use super::{ copy_config, Interrupt, Queue, Reader, VirtioDevice, Writer, TYPE_CONSOLE, VIRTIO_F_VERSION_1, }; use crate::SerialDevice; const QUEUE_SIZE: u16 = 256; // For now, just implement port 0 (receiveq and transmitq). // If VIRTIO_CONSOLE_F_MULTIPORT is implemented, more queues will be needed. const QUEUE_SIZES: &[u16] = &[QUEUE_SIZE, QUEUE_SIZE]; #[derive(Copy, Clone, Debug, Default)] #[repr(C)] struct virtio_console_config { cols: Le16, rows: Le16, max_nr_ports: Le32, emerg_wr: Le32, } // Safe because it only has data and has no implicit padding. unsafe impl DataInit for virtio_console_config {} struct Worker { mem: GuestMemory, interrupt: Interrupt, input: Option>, output: Option>, } fn write_output(output: &mut Box, data: &[u8]) -> io::Result<()> { output.write_all(&data)?; output.flush() } impl Worker { fn process_transmit_request( mut reader: Reader, output: &mut Box, ) -> io::Result { let len = reader.available_bytes(); let mut data = vec![0u8; len]; reader.read_exact(&mut data)?; write_output(output, &data)?; Ok(0) } fn process_transmit_queue( &mut self, transmit_queue: &mut Queue, output: &mut Box, ) { let mut needs_interrupt = false; while let Some(avail_desc) = transmit_queue.pop(&self.mem) { let desc_index = avail_desc.index; let reader = match Reader::new(&self.mem, avail_desc) { Ok(r) => r, Err(e) => { error!("console: failed to create reader: {}", e); transmit_queue.add_used(&self.mem, desc_index, 0); needs_interrupt = true; continue; } }; let len = match Self::process_transmit_request(reader, output) { Ok(written) => written, Err(e) => { error!("console: process_transmit_request failed: {}", e); 0 } }; transmit_queue.add_used(&self.mem, desc_index, len); needs_interrupt = true; } if needs_interrupt { self.interrupt.signal_used_queue(transmit_queue.vector); } } // Start a thread that reads self.input and sends the input back via the returned channel. // // `in_avail_evt` will be triggered by the thread when new input is available. fn spawn_input_thread(&mut self, in_avail_evt: &EventFd) -> Option> { let mut rx = match self.input.take() { Some(input) => input, None => return None, }; let (send_channel, recv_channel) = channel(); let thread_in_avail_evt = match in_avail_evt.try_clone() { Ok(evt) => evt, Err(e) => { error!("failed to clone in_avail_evt: {}", e); return None; } }; // The input thread runs in detached mode and will exit when channel is disconnected because // the console device has been dropped. let res = thread::Builder::new() .name(format!("console_input")) .spawn(move || { let mut rx_buf = [0u8; 1]; loop { match rx.read(&mut rx_buf) { Ok(0) => break, // Assume the stream of input has ended. Ok(_) => { if send_channel.send(rx_buf[0]).is_err() { // The receiver has disconnected. break; } thread_in_avail_evt.write(1).unwrap(); } Err(e) => { // Being interrupted is not an error, but everything else is. if e.kind() != io::ErrorKind::Interrupted { error!( "failed to read for bytes to queue into console device: {}", e ); break; } } } } }); if let Err(e) = res { error!("failed to spawn input thread: {}", e); return None; } Some(recv_channel) } // Check for input from `in_channel_opt` and transfer it to the receive queue, if any. fn handle_input( &mut self, in_channel_opt: &mut Option>, receive_queue: &mut Queue, ) { let in_channel = match in_channel_opt.as_ref() { Some(v) => v, None => return, }; while let Some(desc) = receive_queue.peek(&self.mem) { let desc_index = desc.index; let mut writer = match Writer::new(&self.mem, desc) { Ok(w) => w, Err(e) => { error!("console: failed to create Writer: {}", e); break; } }; let mut disconnected = false; while writer.available_bytes() > 0 { match in_channel.try_recv() { Ok(byte) => { writer.write_obj(byte).unwrap(); } Err(TryRecvError::Empty) => break, Err(TryRecvError::Disconnected) => { disconnected = true; break; } } } let bytes_written = writer.bytes_written() as u32; if bytes_written > 0 { receive_queue.pop_peeked(&self.mem); receive_queue.add_used(&self.mem, desc_index, bytes_written); self.interrupt.signal_used_queue(receive_queue.vector); } if disconnected { // Set in_channel to None so that future handle_input calls exit early. in_channel_opt.take(); return; } if bytes_written == 0 { break; } } } fn run(&mut self, mut queues: Vec, mut queue_evts: Vec, kill_evt: EventFd) { #[derive(PollToken)] enum Token { ReceiveQueueAvailable, TransmitQueueAvailable, InputAvailable, InterruptResample, Kill, } // Device -> driver let (mut receive_queue, receive_evt) = (queues.remove(0), queue_evts.remove(0)); // Driver -> device let (mut transmit_queue, transmit_evt) = (queues.remove(0), queue_evts.remove(0)); let in_avail_evt = match EventFd::new() { Ok(evt) => evt, Err(e) => { error!("failed creating EventFd: {}", e); return; } }; // Spawn a separate thread to poll self.input. // A thread is used because io::Read only provides a blocking interface, and there is no // generic way to add an io::Read instance to a poll context (it may not be backed by a file // descriptor). Moving the blocking read call to a separate thread and sending data back to // the main worker thread with an event for notification bridges this gap. let mut in_channel = self.spawn_input_thread(&in_avail_evt); let poll_ctx: PollContext = match PollContext::build_with(&[ (&transmit_evt, Token::TransmitQueueAvailable), (&receive_evt, Token::ReceiveQueueAvailable), (&in_avail_evt, Token::InputAvailable), (self.interrupt.get_resample_evt(), Token::InterruptResample), (&kill_evt, Token::Kill), ]) { Ok(pc) => pc, Err(e) => { error!("failed creating PollContext: {}", e); return; } }; let mut output: Box = match self.output.take() { Some(o) => o, None => Box::new(io::sink()), }; 'poll: loop { let events = match poll_ctx.wait() { Ok(v) => v, Err(e) => { error!("failed polling for events: {}", e); break; } }; for event in events.iter_readable() { match event.token() { Token::TransmitQueueAvailable => { if let Err(e) = transmit_evt.read() { error!("failed reading transmit queue EventFd: {}", e); break 'poll; } self.process_transmit_queue(&mut transmit_queue, &mut output); } Token::ReceiveQueueAvailable => { if let Err(e) = receive_evt.read() { error!("failed reading receive queue EventFd: {}", e); break 'poll; } self.handle_input(&mut in_channel, &mut receive_queue); } Token::InputAvailable => { if let Err(e) = in_avail_evt.read() { error!("failed reading in_avail_evt: {}", e); break 'poll; } self.handle_input(&mut in_channel, &mut receive_queue); } Token::InterruptResample => { self.interrupt.interrupt_resample(); } Token::Kill => break 'poll, } } } } } /// Virtio console device. pub struct Console { kill_evt: Option, worker_thread: Option>, input: Option>, output: Option>, keep_fds: Vec, } impl SerialDevice for Console { fn new( _evt_fd: EventFd, input: Option>, output: Option>, keep_fds: Vec, ) -> Console { Console { kill_evt: None, worker_thread: None, input, output, keep_fds, } } } impl Drop for Console { fn drop(&mut self) { if let Some(kill_evt) = self.kill_evt.take() { // Ignore the result because there is nothing we can do about it. let _ = kill_evt.write(1); } if let Some(worker_thread) = self.worker_thread.take() { let _ = worker_thread.join(); } } } impl VirtioDevice for Console { fn keep_fds(&self) -> Vec { self.keep_fds.clone() } fn features(&self) -> u64 { 1 << VIRTIO_F_VERSION_1 } fn device_type(&self) -> u32 { TYPE_CONSOLE } fn queue_max_sizes(&self) -> &[u16] { QUEUE_SIZES } fn read_config(&self, offset: u64, data: &mut [u8]) { let config = virtio_console_config { max_nr_ports: 1.into(), ..Default::default() }; copy_config(data, 0, config.as_slice(), offset); } fn activate( &mut self, mem: GuestMemory, interrupt: Interrupt, queues: Vec, queue_evts: Vec, ) { if queues.len() < 2 || queue_evts.len() < 2 { return; } let (self_kill_evt, kill_evt) = match EventFd::new().and_then(|e| Ok((e.try_clone()?, e))) { Ok(v) => v, Err(e) => { error!("failed creating kill EventFd pair: {}", e); return; } }; self.kill_evt = Some(self_kill_evt); let input = self.input.take(); let output = self.output.take(); let worker_result = thread::Builder::new() .name("virtio_console".to_string()) .spawn(move || { let mut worker = Worker { mem, interrupt, input, output, }; worker.run(queues, queue_evts, kill_evt); worker }); match worker_result { Err(e) => { error!("failed to spawn virtio_console worker: {}", e); return; } Ok(join_handle) => { self.worker_thread = Some(join_handle); } } } fn reset(&mut self) -> bool { if let Some(kill_evt) = self.kill_evt.take() { if kill_evt.write(1).is_err() { error!("{}: failed to notify the kill event", self.debug_label()); return false; } } if let Some(worker_thread) = self.worker_thread.take() { match worker_thread.join() { Err(_) => { error!("{}: failed to get back resources", self.debug_label()); return false; } Ok(worker) => { self.input = worker.input; self.output = worker.output; return true; } } } false } }