summary refs log tree commit diff
path: root/devices/src/virtio/video/worker.rs
diff options
context:
space:
mode:
Diffstat (limited to 'devices/src/virtio/video/worker.rs')
-rw-r--r--devices/src/virtio/video/worker.rs362
1 files changed, 362 insertions, 0 deletions
diff --git a/devices/src/virtio/video/worker.rs b/devices/src/virtio/video/worker.rs
new file mode 100644
index 0000000..195854f
--- /dev/null
+++ b/devices/src/virtio/video/worker.rs
@@ -0,0 +1,362 @@
+// Copyright 2020 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+//! Worker that runs in a virtio-video thread.
+
+use std::collections::{BTreeMap, VecDeque};
+
+use sys_util::{error, EventFd, GuestMemory, PollContext};
+
+use crate::virtio::queue::{DescriptorChain, Queue};
+use crate::virtio::resource_bridge::ResourceRequestSocket;
+use crate::virtio::video::command::{QueueType, VideoCmd};
+use crate::virtio::video::device::{
+    AsyncCmdTag, Device, Token, VideoCmdResponseType, VideoEvtResponseType,
+};
+use crate::virtio::video::error::{VideoError, VideoResult};
+use crate::virtio::video::event::{self, EvtType, VideoEvt};
+use crate::virtio::video::protocol;
+use crate::virtio::video::response::{self, CmdResponse, Response};
+use crate::virtio::video::{Error, Result};
+use crate::virtio::{Interrupt, Reader, Writer};
+
+pub struct Worker {
+    pub interrupt: Interrupt,
+    pub mem: GuestMemory,
+    pub cmd_evt: EventFd,
+    pub event_evt: EventFd,
+    pub kill_evt: EventFd,
+    pub resource_bridge: ResourceRequestSocket,
+}
+
+/// BTreeMap which stores descriptor chains in which asynchronous responses will be written.
+type DescPool<'a> = BTreeMap<AsyncCmdTag, DescriptorChain<'a>>;
+/// Pair of a descriptor chain and a response to be written.
+type WritableResp<'a> = (DescriptorChain<'a>, VideoResult<response::CmdResponse>);
+
+/// Invalidates all pending asynchronous commands in a given `DescPool` value and returns an updated
+/// `DescPool` value and a list of `WritableResp` to be sent to the guest.
+fn cancel_pending_requests<'a>(
+    s_id: u32,
+    desc_pool: DescPool<'a>,
+) -> (DescPool<'a>, Vec<WritableResp<'a>>) {
+    let mut new_desc_pool: DescPool<'a> = Default::default();
+    let mut resps = vec![];
+
+    for (key, value) in desc_pool.into_iter() {
+        match key {
+            AsyncCmdTag::Queue { stream_id, .. } if stream_id == s_id => {
+                resps.push((
+                    value,
+                    Ok(CmdResponse::ResourceQueue {
+                        timestamp: 0,
+                        flags: protocol::VIRTIO_VIDEO_BUFFER_FLAG_ERR,
+                        size: 0,
+                    }),
+                ));
+            }
+            AsyncCmdTag::Drain { stream_id } | AsyncCmdTag::Clear { stream_id, .. }
+                if stream_id == s_id =>
+            {
+                // TODO(b/1518105): Use more appropriate error code if a new protocol supports one.
+                resps.push((value, Err(VideoError::InvalidOperation)));
+            }
+            AsyncCmdTag::Queue { .. } | AsyncCmdTag::Drain { .. } | AsyncCmdTag::Clear { .. } => {
+                // Keep commands for other streams.
+                new_desc_pool.insert(key, value);
+            }
+        }
+    }
+
+    (new_desc_pool, resps)
+}
+
+impl Worker {
+    /// Writes responses into the command queue.
+    fn write_responses<'a>(
+        &self,
+        cmd_queue: &mut Queue,
+        responses: &mut VecDeque<WritableResp>,
+    ) -> Result<()> {
+        let mut needs_interrupt_commandq = false;
+        // Write responses into command virtqueue.
+        while let Some((desc, resp)) = responses.pop_front() {
+            let desc_index = desc.index;
+            let mut writer = Writer::new(&self.mem, desc).map_err(Error::InvalidDescriptorChain)?;
+            match resp {
+                Ok(r) => {
+                    if let Err(e) = r.write(&mut writer) {
+                        error!("failed to write an OK response for {:?}: {}", r, e);
+                    }
+                }
+                Err(err) => {
+                    if let Err(e) = err.write(&mut writer) {
+                        error!("failed to write an Error response for {:?}: {}", err, e);
+                    }
+                }
+            }
+
+            cmd_queue.add_used(&self.mem, desc_index, writer.bytes_written() as u32);
+            needs_interrupt_commandq = true;
+        }
+        if needs_interrupt_commandq {
+            self.interrupt.signal_used_queue(cmd_queue.vector);
+        }
+        Ok(())
+    }
+
+    /// Writes a `VideoEvt` into the event queue.
+    fn write_event(&self, event_queue: &mut Queue, event: &mut event::VideoEvt) -> Result<()> {
+        let desc = event_queue
+            .peek(&self.mem)
+            .ok_or_else(|| Error::DescriptorNotAvailable)?;
+        event_queue.pop_peeked(&self.mem);
+
+        let desc_index = desc.index;
+        let mut writer = Writer::new(&self.mem, desc).map_err(Error::InvalidDescriptorChain)?;
+        event
+            .write(&mut writer)
+            .map_err(|error| Error::WriteEventFailure {
+                event: event.clone(),
+                error,
+            })?;
+        event_queue.add_used(&self.mem, desc_index, writer.bytes_written() as u32);
+        self.interrupt.signal_used_queue(event_queue.vector);
+        Ok(())
+    }
+
+    /// Handles a `DescriptorChain` value sent via the command queue and returns an updated
+    /// `DescPool` and `VecDeque` of `WritableResp` to be sent to the guest.
+    fn handle_command_desc<'a, T: Device>(
+        &'a self,
+        device: &mut T,
+        poll_ctx: &PollContext<Token>,
+        mut desc_pool: DescPool<'a>,
+        desc: DescriptorChain<'a>,
+    ) -> Result<(DescPool<'a>, VecDeque<WritableResp<'a>>)> {
+        let mut resps: VecDeque<WritableResp> = Default::default();
+        let mut reader =
+            Reader::new(&self.mem, desc.clone()).map_err(Error::InvalidDescriptorChain)?;
+
+        let cmd = VideoCmd::from_reader(&mut reader).map_err(Error::ReadFailure)?;
+
+        // If a destruction command comes, cancel pending requests.
+        match cmd {
+            VideoCmd::ResourceDestroyAll { stream_id } | VideoCmd::StreamDestroy { stream_id } => {
+                let (next_desc_pool, rs) = cancel_pending_requests(stream_id, desc_pool);
+                desc_pool = next_desc_pool;
+                resps.append(&mut Into::<VecDeque<_>>::into(rs));
+            }
+            _ => (),
+        };
+
+        // Process the command by the device.
+        let resp = device.process_cmd(cmd, &poll_ctx, &self.resource_bridge);
+
+        match resp {
+            Ok(VideoCmdResponseType::Sync(r)) => {
+                resps.push_back((desc.clone(), Ok(r)));
+            }
+            Ok(VideoCmdResponseType::Async(tag)) => {
+                // If the command expects an asynchronous response,
+                // store `desc` to use it after the back-end device notifies the
+                // completion.
+                desc_pool.insert(tag, desc);
+            }
+            Err(e) => {
+                resps.push_back((desc.clone(), Err(e)));
+            }
+        }
+
+        Ok((desc_pool, resps))
+    }
+
+    /// Handles the command queue returns an updated `DescPool`.
+    fn handle_command_queue<'a, T: Device>(
+        &'a self,
+        cmd_queue: &mut Queue,
+        device: &mut T,
+        poll_ctx: &PollContext<Token>,
+        mut desc_pool: DescPool<'a>,
+    ) -> Result<DescPool<'a>> {
+        let _ = self.cmd_evt.read();
+
+        while let Some(desc) = cmd_queue.pop(&self.mem) {
+            let (next_desc_pool, mut resps) =
+                self.handle_command_desc(device, poll_ctx, desc_pool, desc)?;
+            desc_pool = next_desc_pool;
+            self.write_responses(cmd_queue, &mut resps)?;
+        }
+        Ok(desc_pool)
+    }
+
+    /// Handles a `VideoEvtResponseType` value and returns an updated `DescPool` and `VecDeque` of
+    /// `WritableResp` to be sent to the guest.
+    fn handle_event_resp<'a, T: Device>(
+        &'a self,
+        event_queue: &mut Queue,
+        device: &mut T,
+        mut desc_pool: DescPool<'a>,
+        resp: VideoEvtResponseType,
+    ) -> Result<(DescPool<'a>, VecDeque<WritableResp>)> {
+        let mut responses: VecDeque<WritableResp> = Default::default();
+        match resp {
+            VideoEvtResponseType::AsyncCmd {
+                tag: AsyncCmdTag::Drain { stream_id },
+                resp,
+            } => {
+                let tag = AsyncCmdTag::Drain { stream_id };
+                let drain_desc = desc_pool
+                    .remove(&tag)
+                    .ok_or_else(|| Error::UnexpectedResponse(tag))?;
+
+                // When `Drain` request is completed, returns an empty output resource
+                // with EOS flag first.
+                let resource_id = device
+                    .take_resource_id_to_notify_eos(stream_id)
+                    .ok_or_else(|| Error::NoEOSBuffer { stream_id })?;
+
+                let q_desc = desc_pool
+                    .remove(&AsyncCmdTag::Queue {
+                        stream_id,
+                        queue_type: QueueType::Output,
+                        resource_id,
+                    })
+                    .ok_or_else(|| Error::InvalidEOSResource {
+                        stream_id,
+                        resource_id,
+                    })?;
+
+                responses.push_back((
+                    q_desc,
+                    Ok(CmdResponse::ResourceQueue {
+                        timestamp: 0,
+                        flags: protocol::VIRTIO_VIDEO_BUFFER_FLAG_EOS,
+                        size: 0,
+                    }),
+                ));
+
+                // Then, responds the Drain request.
+                responses.push_back((drain_desc, resp));
+            }
+            VideoEvtResponseType::AsyncCmd {
+                tag:
+                    AsyncCmdTag::Clear {
+                        queue_type,
+                        stream_id,
+                    },
+                resp,
+            } => {
+                let tag = AsyncCmdTag::Clear {
+                    queue_type,
+                    stream_id,
+                };
+                let desc = desc_pool
+                    .remove(&tag)
+                    .ok_or_else(|| Error::UnexpectedResponse(tag))?;
+
+                // When `Clear` request is completed, invalidate all pending requests.
+                let (next_desc_pool, resps) = cancel_pending_requests(stream_id, desc_pool);
+                desc_pool = next_desc_pool;
+                responses.append(&mut Into::<VecDeque<_>>::into(resps));
+
+                // Then, responds the `Clear` request.
+                responses.push_back((desc, resp));
+            }
+            VideoEvtResponseType::AsyncCmd { tag, resp } => {
+                let desc = desc_pool
+                    .remove(&tag)
+                    .ok_or_else(|| Error::UnexpectedResponse(tag))?;
+                responses.push_back((desc, resp));
+            }
+            VideoEvtResponseType::Event(mut evt) => {
+                self.write_event(event_queue, &mut evt)?;
+            }
+        };
+        Ok((desc_pool, responses))
+    }
+
+    /// Handles an event notified via an event FD and returns an updated `DescPool`.
+    fn handle_event_fd<'a, T: Device>(
+        &'a self,
+        cmd_queue: &mut Queue,
+        event_queue: &mut Queue,
+        device: &mut T,
+        desc_pool: DescPool<'a>,
+        stream_id: u32,
+    ) -> Result<DescPool<'a>> {
+        let resp = device.process_event_fd(stream_id);
+        match resp {
+            Some(r) => match self.handle_event_resp(event_queue, device, desc_pool, r) {
+                Ok((updated_desc_pool, mut resps)) => {
+                    self.write_responses(cmd_queue, &mut resps)?;
+                    Ok(updated_desc_pool)
+                }
+                Err(e) => {
+                    // Ignore result of write_event for a fatal error.
+                    let _ = self.write_event(
+                        event_queue,
+                        &mut VideoEvt {
+                            typ: EvtType::Error,
+                            stream_id,
+                        },
+                    );
+                    Err(e)
+                }
+            },
+            None => Ok(desc_pool),
+        }
+    }
+
+    pub fn run<T: Device>(
+        &mut self,
+        mut cmd_queue: Queue,
+        mut event_queue: Queue,
+        mut device: T,
+    ) -> Result<()> {
+        let poll_ctx: PollContext<Token> = PollContext::build_with(&[
+            (&self.cmd_evt, Token::CmdQueue),
+            (&self.event_evt, Token::EventQueue),
+            (&self.kill_evt, Token::Kill),
+            (self.interrupt.get_resample_evt(), Token::InterruptResample),
+        ])
+        .map_err(Error::PollContextCreationFailed)?;
+
+        // Stores descriptors in which responses for asynchronous commands will be written.
+        let mut desc_pool: DescPool<'_> = Default::default();
+
+        loop {
+            let poll_events = poll_ctx.wait().map_err(Error::PollError)?;
+
+            for poll_event in poll_events.iter_readable() {
+                match poll_event.token() {
+                    Token::CmdQueue => {
+                        desc_pool = self.handle_command_queue(
+                            &mut cmd_queue,
+                            &mut device,
+                            &poll_ctx,
+                            desc_pool,
+                        )?;
+                    }
+                    Token::EventQueue => {
+                        let _ = self.event_evt.read();
+                    }
+                    Token::EventFd { id } => {
+                        desc_pool = self.handle_event_fd(
+                            &mut cmd_queue,
+                            &mut event_queue,
+                            &mut device,
+                            desc_pool,
+                            id,
+                        )?;
+                    }
+                    Token::InterruptResample => {
+                        self.interrupt.interrupt_resample();
+                    }
+                    Token::Kill => return Ok(()),
+                }
+            }
+        }
+    }
+}