summary refs log tree commit diff
diff options
context:
space:
mode:
authorFletcher Woodruff <fletcherw@chromium.org>2019-08-12 11:46:47 -0600
committerCommit Bot <commit-bot@chromium.org>2020-02-28 01:07:56 +0000
commit7eae7735ee3485605d2ec7ba0685588b7a38b37b (patch)
treea2055a77649a489624b7b5b4dfbf08607655b9cf
parent020fbf04c2ac112f34b87306b5fbb75e7a02a81a (diff)
downloadcrosvm-7eae7735ee3485605d2ec7ba0685588b7a38b37b.tar
crosvm-7eae7735ee3485605d2ec7ba0685588b7a38b37b.tar.gz
crosvm-7eae7735ee3485605d2ec7ba0685588b7a38b37b.tar.bz2
crosvm-7eae7735ee3485605d2ec7ba0685588b7a38b37b.tar.lz
crosvm-7eae7735ee3485605d2ec7ba0685588b7a38b37b.tar.xz
crosvm-7eae7735ee3485605d2ec7ba0685588b7a38b37b.tar.zst
crosvm-7eae7735ee3485605d2ec7ba0685588b7a38b37b.zip
ac97: switch to ShmStreamSource
Convert playback and capture for the AC97 device to use the zero-copy
ShmStreamSource instead of the old StreamSource.

In the process, rework start_playback and start_capture unit tests so
they rely less on sleep statements.

BUG=chromium:968724
BUG=chromium:1006035
TEST="sox -n -r 48000 -b 16 output.raw synth 5 sine 330 &&
     aplay -f dat output.raw" within a VM, check that sine wave is played
     accurately.

Change-Id: Ie9cddbc5285a9505872c9951a8a1da01de70eb88
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/platform/crosvm/+/1749950
Tested-by: kokoro <noreply+kokoro@google.com>
Commit-Queue: Fletcher Woodruff <fletcherw@chromium.org>
Reviewed-by: Dylan Reid <dgreid@chromium.org>
Reviewed-by: Chih-Yang Hsia <paulhsia@chromium.org>
-rw-r--r--devices/src/pci/ac97.rs8
-rw-r--r--devices/src/pci/ac97_bus_master.rs553
-rw-r--r--src/linux.rs4
-rw-r--r--sys_util/src/guest_memory.rs7
4 files changed, 354 insertions, 218 deletions
diff --git a/devices/src/pci/ac97.rs b/devices/src/pci/ac97.rs
index eb19b5f..792df24 100644
--- a/devices/src/pci/ac97.rs
+++ b/devices/src/pci/ac97.rs
@@ -4,7 +4,7 @@
 
 use std::os::unix::io::RawFd;
 
-use audio_streams::StreamSource;
+use audio_streams::shm_streams::ShmStreamSource;
 use resources::{Alloc, MmioType, SystemAllocator};
 use sys_util::{error, EventFd, GuestMemory};
 
@@ -39,7 +39,7 @@ pub struct Ac97Dev {
 impl Ac97Dev {
     /// Creates an 'Ac97Dev' that uses the given `GuestMemory` and starts with all registers at
     /// default values.
-    pub fn new(mem: GuestMemory, audio_server: Box<dyn StreamSource>) -> Self {
+    pub fn new(mem: GuestMemory, audio_server: Box<dyn ShmStreamSource>) -> Self {
         let config_regs = PciConfiguration::new(
             0x8086,
             PCI_DEVICE_ID_INTEL_82801AA_5,
@@ -236,13 +236,13 @@ impl PciDevice for Ac97Dev {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use audio_streams::DummyStreamSource;
+    use audio_streams::shm_streams::MockShmStreamSource;
     use sys_util::GuestAddress;
 
     #[test]
     fn create() {
         let mem = GuestMemory::new(&[(GuestAddress(0u64), 4 * 1024 * 1024)]).unwrap();
-        let mut ac97_dev = Ac97Dev::new(mem, Box::new(DummyStreamSource::new()));
+        let mut ac97_dev = Ac97Dev::new(mem, Box::new(MockShmStreamSource::new()));
         let mut allocator = SystemAllocator::builder()
             .add_io_addresses(0x1000_0000, 0x1000_0000)
             .add_low_mmio_addresses(0x2000_0000, 0x1000_0000)
diff --git a/devices/src/pci/ac97_bus_master.rs b/devices/src/pci/ac97_bus_master.rs
index d3d2f85..c13913a 100644
--- a/devices/src/pci/ac97_bus_master.rs
+++ b/devices/src/pci/ac97_bus_master.rs
@@ -3,21 +3,21 @@
 // found in the LICENSE file.
 
 use std;
+use std::collections::VecDeque;
+use std::convert::AsRef;
 use std::error::Error;
 use std::fmt::{self, Display};
-use std::io::Write;
-use std::os::unix::io::RawFd;
+use std::os::unix::io::{AsRawFd, RawFd};
 use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 use std::thread;
-use std::time::Instant;
+use std::time::{Duration, Instant};
 
 use audio_streams::{
-    capture::{CaptureBuffer, CaptureBufferStream},
-    PlaybackBuffer, PlaybackBufferStream, SampleFormat, StreamControl, StreamSource,
+    shm_streams::{ShmStream, ShmStreamSource},
+    DummyStreamControl, SampleFormat, StreamControl, StreamDirection, StreamEffect,
 };
-use data_model::{VolatileMemory, VolatileSlice};
-use sync::Mutex;
+use sync::{Condvar, Mutex};
 use sys_util::{
     self, error, set_rt_prio_limit, set_rt_round_robin, warn, EventFd, GuestAddress, GuestMemory,
 };
@@ -26,6 +26,7 @@ use crate::pci::ac97_mixer::Ac97Mixer;
 use crate::pci::ac97_regs::*;
 
 const DEVICE_SAMPLE_RATE: usize = 48000;
+const DEVICE_CHANNEL_COUNT: usize = 2;
 
 // Bus Master registers. Keeps the state of the bus master register values. Used to share the state
 // between the main and audio threads.
@@ -76,8 +77,6 @@ impl Ac97BusMasterRegs {
 enum GuestMemoryError {
     // Failure getting the address of the audio buffer.
     ReadingGuestBufferAddress(sys_util::GuestMemoryError),
-    // Failure reading samples from guest memory.
-    ReadingGuestSamples(data_model::VolatileMemoryError),
 }
 
 impl std::error::Error for GuestMemoryError {}
@@ -90,7 +89,6 @@ impl Display for GuestMemoryError {
             ReadingGuestBufferAddress(e) => {
                 write!(f, "Failed to get the address of the audio buffer: {}.", e)
             }
-            ReadingGuestSamples(e) => write!(f, "Failed to read samples from guest memory: {}.", e),
         }
     }
 }
@@ -106,12 +104,16 @@ type GuestMemoryResult<T> = std::result::Result<T, GuestMemoryError>;
 // Internal error type used for reporting errors from the audio thread.
 #[derive(Debug)]
 enum AudioError {
+    // Failed to create a new stream.
+    CreateStream(Box<dyn Error>),
+    // Guest did not provide a buffer when needed.
+    NoBufferAvailable,
     // Failure to read guest memory.
     ReadingGuestError(GuestMemoryError),
-    // Failure to get an buffer from the stream.
-    StreamError(Box<dyn Error>),
-    // Failure writing to the audio output.
-    WritingOutput(std::io::Error),
+    // Failure to respond to the ServerRequest.
+    RespondRequest(Box<dyn Error>),
+    // Failure to wait for a request from the stream.
+    WaitForAction(Box<dyn Error>),
 }
 
 impl std::error::Error for AudioError {}
@@ -121,9 +123,11 @@ impl Display for AudioError {
         use self::AudioError::*;
 
         match self {
+            CreateStream(e) => write!(f, "Failed to create audio stream: {}.", e),
+            NoBufferAvailable => write!(f, "No buffer was available from the Guest"),
             ReadingGuestError(e) => write!(f, "Failed to read guest memory: {}.", e),
-            StreamError(e) => write!(f, "Failed to get a buffer from the stream: {}", e),
-            WritingOutput(e) => write!(f, "Failed to write audio output: {}.", e),
+            RespondRequest(e) => write!(f, "Failed to respond to the ServerRequest: {}", e),
+            WaitForAction(e) => write!(f, "Failed to wait for a message from the stream: {}", e),
         }
     }
 }
@@ -134,6 +138,7 @@ type AudioResult<T> = std::result::Result<T, AudioError>;
 struct AudioThreadInfo {
     thread: Option<thread::JoinHandle<()>>,
     thread_run: Arc<AtomicBool>,
+    thread_semaphore: Arc<Condvar>,
     stream_control: Option<Box<dyn StreamControl>>,
 }
 
@@ -142,6 +147,7 @@ impl AudioThreadInfo {
         Self {
             thread: None,
             thread_run: Arc::new(AtomicBool::new(false)),
+            thread_semaphore: Arc::new(Condvar::new()),
             stream_control: None,
         }
     }
@@ -160,7 +166,7 @@ pub struct Ac97BusMaster {
     pi_info: AudioThreadInfo,
 
     // Audio server used to create playback or capture streams.
-    audio_server: Box<dyn StreamSource>,
+    audio_server: Box<dyn ShmStreamSource>,
 
     // Thread for hadlind IRQ resample events from the guest.
     irq_resample_thread: Option<thread::JoinHandle<()>>,
@@ -169,7 +175,7 @@ pub struct Ac97BusMaster {
 impl Ac97BusMaster {
     /// Creates an Ac97BusMaster` object that plays audio from `mem` to streams provided by
     /// `audio_server`.
-    pub fn new(mem: GuestMemory, audio_server: Box<dyn StreamSource>) -> Self {
+    pub fn new(mem: GuestMemory, audio_server: Box<dyn ShmStreamSource>) -> Self {
         Ac97BusMaster {
             mem,
             regs: Arc::new(Mutex::new(Ac97BusMasterRegs::new())),
@@ -186,7 +192,9 @@ impl Ac97BusMaster {
 
     /// Returns any file descriptors that need to be kept open when entering a jail.
     pub fn keep_fds(&self) -> Option<Vec<RawFd>> {
-        self.audio_server.keep_fds()
+        let mut fds = self.audio_server.keep_fds();
+        fds.push(self.mem.as_raw_fd());
+        Some(fds)
     }
 
     /// Provides the events needed to raise interrupts in the guest.
@@ -386,7 +394,18 @@ impl Ac97BusMaster {
             && func_regs.sr & SR_DCH == SR_DCH
             && func_regs.civ != func_regs.lvi
         {
+            if func_regs.sr & SR_CELV != 0 {
+                // CELV means we'd already processed the buffer at CIV.
+                // Move CIV to the next buffer now that LVI has moved.
+                func_regs.move_to_next_buffer();
+            }
             func_regs.sr &= !(SR_DCH | SR_CELV);
+
+            match func {
+                Ac97Function::Input => self.pi_info.thread_semaphore.notify_one(),
+                Ac97Function::Output => self.po_info.thread_semaphore.notify_one(),
+                Ac97Function::Microphone => (),
+            }
         }
     }
 
@@ -459,74 +478,79 @@ impl Ac97BusMaster {
         self.regs.lock().glob_cnt = new_glob_cnt;
     }
 
-    fn start_audio(&mut self, func: Ac97Function, mixer: &Ac97Mixer) -> Result<(), Box<dyn Error>> {
+    fn start_audio(&mut self, func: Ac97Function, mixer: &Ac97Mixer) -> AudioResult<()> {
         const AUDIO_THREAD_RTPRIO: u16 = 10; // Matches other cros audio clients.
 
-        let thread_info = match func {
+        let (direction, thread_info) = match func {
             Ac97Function::Microphone => return Ok(()),
-            Ac97Function::Input => &mut self.pi_info,
-            Ac97Function::Output => &mut self.po_info,
+            Ac97Function::Input => (StreamDirection::Capture, &mut self.pi_info),
+            Ac97Function::Output => (StreamDirection::Playback, &mut self.po_info),
         };
 
-        let num_channels = 2;
         let buffer_samples = current_buffer_size(self.regs.lock().func_regs(func), &self.mem)?;
-        let buffer_frames = buffer_samples / num_channels;
+        let buffer_frames = buffer_samples / DEVICE_CHANNEL_COUNT;
         thread_info.thread_run.store(true, Ordering::Relaxed);
         let thread_run = thread_info.thread_run.clone();
+        let thread_semaphore = thread_info.thread_semaphore.clone();
         let thread_mem = self.mem.clone();
         let thread_regs = self.regs.clone();
 
-        match func {
-            Ac97Function::Input => {
-                let (stream_control, input_stream) = self.audio_server.new_capture_stream(
-                    num_channels,
-                    SampleFormat::S16LE,
-                    DEVICE_SAMPLE_RATE,
-                    buffer_frames,
-                )?;
-                self.pi_info.stream_control = Some(stream_control);
-                self.update_mixer_settings(mixer);
-
-                self.pi_info.thread = Some(thread::spawn(move || {
-                    if set_rt_prio_limit(u64::from(AUDIO_THREAD_RTPRIO)).is_err()
-                        || set_rt_round_robin(i32::from(AUDIO_THREAD_RTPRIO)).is_err()
-                    {
-                        warn!("Failed to set audio thread to real time.");
-                    }
-                    if let Err(e) =
-                        audio_in_thread(thread_regs, thread_mem, &thread_run, input_stream)
-                    {
-                        error!("Capture error: {}", e);
-                    }
-                    thread_run.store(false, Ordering::Relaxed);
-                }));
-            }
-            Ac97Function::Output => {
-                let (stream_control, output_stream) = self.audio_server.new_playback_stream(
-                    num_channels,
-                    SampleFormat::S16LE,
-                    DEVICE_SAMPLE_RATE,
-                    buffer_frames,
-                )?;
-                self.po_info.stream_control = Some(stream_control);
-                self.update_mixer_settings(mixer);
-
-                self.po_info.thread = Some(thread::spawn(move || {
-                    if set_rt_prio_limit(u64::from(AUDIO_THREAD_RTPRIO)).is_err()
-                        || set_rt_round_robin(i32::from(AUDIO_THREAD_RTPRIO)).is_err()
-                    {
-                        warn!("Failed to set audio thread to real time.");
-                    }
-                    if let Err(e) =
-                        audio_out_thread(thread_regs, thread_mem, &thread_run, output_stream)
-                    {
-                        error!("Playback error: {}", e);
-                    }
-                    thread_run.store(false, Ordering::Relaxed);
-                }));
+        let mut pending_buffers = VecDeque::with_capacity(2);
+        let starting_offsets = match direction {
+            StreamDirection::Capture => {
+                let mut offsets = [0, 0];
+                let mut locked_regs = self.regs.lock();
+                for i in 0..2 {
+                    let buffer = next_guest_buffer(&mut locked_regs, &self.mem, func, 0)?
+                        .ok_or(AudioError::NoBufferAvailable)?;
+                    offsets[i] = buffer.offset as u64;
+                    pending_buffers.push_back(Some(buffer));
+                }
+                offsets
             }
-            Ac97Function::Microphone => (),
+            StreamDirection::Playback => [0, 0],
         };
+        let stream = self
+            .audio_server
+            .new_stream(
+                direction,
+                DEVICE_CHANNEL_COUNT,
+                SampleFormat::S16LE,
+                DEVICE_SAMPLE_RATE,
+                buffer_frames,
+                StreamEffect::NoEffect,
+                self.mem.as_ref(),
+                starting_offsets,
+            )
+            .map_err(AudioError::CreateStream)?;
+
+        thread_info.stream_control = Some(Box::new(DummyStreamControl::new()));
+        thread_info.thread = Some(thread::spawn(move || {
+            if let Err(e) = set_rt_prio_limit(u64::from(AUDIO_THREAD_RTPRIO))
+                .and_then(|_| set_rt_round_robin(i32::from(AUDIO_THREAD_RTPRIO)))
+            {
+                warn!("Failed to set audio thread to real time: {}", e);
+            }
+
+            let message_interval =
+                Duration::from_secs_f64(buffer_frames as f64 / DEVICE_SAMPLE_RATE as f64);
+
+            if let Err(e) = audio_thread(
+                func,
+                thread_regs,
+                thread_mem,
+                &thread_run,
+                thread_semaphore,
+                message_interval,
+                stream,
+                pending_buffers,
+            ) {
+                error!("{:?} error: {}", func, e);
+            }
+            thread_run.store(false, Ordering::Relaxed);
+        }));
+        self.update_mixer_settings(mixer);
+
         Ok(())
     }
 
@@ -537,6 +561,7 @@ impl Ac97BusMaster {
             Ac97Function::Output => &mut self.po_info,
         };
         thread_info.thread_run.store(false, Ordering::Relaxed);
+        thread_info.thread_semaphore.notify_one();
         if let Some(thread) = thread_info.thread.take() {
             if let Err(e) = thread.join() {
                 error!("Failed to join {:?} thread: {:?}.", func, e);
@@ -565,62 +590,88 @@ impl Ac97BusMaster {
     }
 }
 
-// Gets the next buffer from the guest. This will return `None` if the DMA controlled stopped bit is
-// set, such as after an underrun where CIV hits LVI.
-fn next_guest_buffer<'a>(
-    func_regs: &mut Ac97FunctionRegs,
-    mem: &'a GuestMemory,
-) -> GuestMemoryResult<Option<VolatileSlice<'a>>> {
-    let sample_size = 2;
+#[derive(Debug)]
+struct GuestBuffer {
+    index: u8,
+    offset: usize,
+    frames: usize,
+}
 
-    if func_regs.sr & SR_DCH != 0 {
-        return Ok(None);
-    }
-    let next_buffer = func_regs.civ;
-    let descriptor_addr = func_regs.bdbar + u32::from(next_buffer) * DESCRIPTOR_LENGTH as u32;
+fn get_buffer_offset(
+    func_regs: &Ac97FunctionRegs,
+    mem: &GuestMemory,
+    index: u8,
+) -> GuestMemoryResult<usize> {
+    let descriptor_addr = func_regs.bdbar + u32::from(index) * DESCRIPTOR_LENGTH as u32;
     let buffer_addr_reg: u32 = mem
         .read_obj_from_addr(GuestAddress(u64::from(descriptor_addr)))
         .map_err(GuestMemoryError::ReadingGuestBufferAddress)?;
-    let buffer_addr = buffer_addr_reg & !0x03u32; // The address must be aligned to four bytes.
+    let buffer_addr = GuestAddress((buffer_addr_reg & !0x03u32) as u64); // The address must be aligned to four bytes.
+
+    mem.offset_from_base(buffer_addr)
+        .map_err(GuestMemoryError::ReadingGuestBufferAddress)
+}
+
+fn get_buffer_samples(
+    func_regs: &Ac97FunctionRegs,
+    mem: &GuestMemory,
+    index: u8,
+) -> GuestMemoryResult<usize> {
+    let descriptor_addr = func_regs.bdbar + u32::from(index) * DESCRIPTOR_LENGTH as u32;
     let control_reg: u32 = mem
         .read_obj_from_addr(GuestAddress(u64::from(descriptor_addr) + 4))
         .map_err(GuestMemoryError::ReadingGuestBufferAddress)?;
-    let buffer_samples: usize = control_reg as usize & 0x0000_ffff;
+    let buffer_samples = control_reg as usize & 0x0000_ffff;
+    Ok(buffer_samples)
+}
 
-    func_regs.picb = buffer_samples as u16;
+// Gets the start address and length of the buffer at `civ + offset` from the
+// guest.
+// This will return `None` if `civ + offset` is past LVI; if the DMA controlled
+// stopped bit is set, such as after an underrun where CIV hits LVI; or if
+// `civ + offset == LVI and the CELV flag is set.
+fn next_guest_buffer<'a>(
+    regs: &Ac97BusMasterRegs,
+    mem: &GuestMemory,
+    func: Ac97Function,
+    offset: usize,
+) -> AudioResult<Option<GuestBuffer>> {
+    let func_regs = regs.func_regs(func);
+    let offset = (offset % 32) as u8;
+    let index = (func_regs.civ + offset) % 32;
+
+    // Check that value is between `low` and `high` modulo some `n`.
+    fn check_between(low: u8, high: u8, value: u8) -> bool {
+        // If low <= high, value must be in the interval between them:
+        // 0     l     h     n
+        // ......+++++++......
+        (low <= high && (low <= value && value <= high)) ||
+        // If low > high, value must not be in the interval between them:
+        // 0       h      l  n
+        // +++++++++......++++
+        (low > high && (low <= value || value <= high))
+    };
 
-    let samples_remaining = func_regs.picb as usize;
-    if samples_remaining == 0 {
+    // Check if
+    //  * we're halted
+    //  * `index` is not between CIV and LVI (mod 32)
+    //  * `index is LVI and we've already processed LVI (SR_CELV is set)
+    //  if any of these are true `index` isn't valid.
+    if func_regs.sr & SR_DCH != 0
+        || !check_between(func_regs.civ, func_regs.lvi, index)
+        || func_regs.sr & SR_CELV != 0
+    {
         return Ok(None);
     }
-    let read_pos = u64::from(buffer_addr);
-    Ok(Some(
-        mem.get_slice(read_pos, samples_remaining as u64 * sample_size)
-            .map_err(GuestMemoryError::ReadingGuestSamples)?,
-    ))
-}
 
-// Reads the next buffer from guest memory and writes it to `out_buffer`.
-fn play_buffer(
-    regs: &mut Ac97BusMasterRegs,
-    mem: &GuestMemory,
-    out_buffer: &mut PlaybackBuffer,
-) -> AudioResult<()> {
-    // If the current buffer had any samples in it, mark it as done.
-    if regs.func_regs_mut(Ac97Function::Output).picb > 0 {
-        buffer_completed(regs, mem, Ac97Function::Output)?
-    }
-    let func_regs = regs.func_regs_mut(Ac97Function::Output);
-    let buffer_len = func_regs.picb * 2;
-    if let Some(buffer) = next_guest_buffer(func_regs, mem)? {
-        out_buffer.copy_cb(buffer.size() as usize, |out| buffer.copy_to(out));
-    } else {
-        let zeros = vec![0u8; buffer_len as usize];
-        out_buffer
-            .write(&zeros)
-            .map_err(AudioError::WritingOutput)?;
-    }
-    Ok(())
+    let offset = get_buffer_offset(func_regs, mem, index)?;
+    let frames = get_buffer_samples(func_regs, mem, index)? / DEVICE_CHANNEL_COUNT;
+
+    Ok(Some(GuestBuffer {
+        index,
+        offset,
+        frames,
+    }))
 }
 
 // Marks the current buffer completed and moves to the next buffer for the given
@@ -629,7 +680,7 @@ fn buffer_completed(
     regs: &mut Ac97BusMasterRegs,
     mem: &GuestMemory,
     func: Ac97Function,
-) -> GuestMemoryResult<()> {
+) -> AudioResult<()> {
     // check if the completed descriptor wanted an interrupt on completion.
     let civ = regs.func_regs(func).civ;
     let descriptor_addr = regs.func_regs(func).bdbar + u32::from(civ) * DESCRIPTOR_LENGTH as u32;
@@ -661,51 +712,115 @@ fn buffer_completed(
     Ok(())
 }
 
-// Runs, playing back audio from the guest to `output_stream` until stopped or an error occurs.
-fn audio_out_thread(
+// Runs and updates the offset within the stream shm where samples can be
+// found/placed for shm playback/capture streams, respectively
+fn audio_thread(
+    func: Ac97Function,
     regs: Arc<Mutex<Ac97BusMasterRegs>>,
     mem: GuestMemory,
     thread_run: &AtomicBool,
-    mut output_stream: Box<dyn PlaybackBufferStream>,
+    lvi_semaphore: Arc<Condvar>,
+    message_interval: Duration,
+    mut stream: Box<dyn ShmStream>,
+    // A queue of the pending buffers at the server.
+    mut pending_buffers: VecDeque<Option<GuestBuffer>>,
 ) -> AudioResult<()> {
-    while thread_run.load(Ordering::Relaxed) {
-        output_stream
-            .next_playback_buffer()
-            .map_err(AudioError::StreamError)
-            .and_then(|mut pb_buf| play_buffer(&mut regs.lock(), &mem, &mut pb_buf))?;
+    if func == Ac97Function::Microphone {
+        return Ok(());
     }
-    Ok(())
-}
 
-// Reads samples from `in_buffer` and writes it to the next buffer from guest memory.
-fn capture_buffer(
-    regs: &mut Ac97BusMasterRegs,
-    mem: &GuestMemory,
-    in_buffer: &mut CaptureBuffer,
-) -> AudioResult<()> {
-    // If the current buffer had any samples in it, mark it as done.
-    if regs.func_regs_mut(Ac97Function::Input).picb > 0 {
-        buffer_completed(regs, mem, Ac97Function::Input)?
-    }
-    let func_regs = regs.func_regs_mut(Ac97Function::Input);
-    if let Some(buffer) = next_guest_buffer(func_regs, mem)? {
-        in_buffer.copy_cb(buffer.size() as usize, |inb| buffer.copy_from(inb))
+    // Set up picb.
+    {
+        let mut locked_regs = regs.lock();
+        locked_regs.func_regs_mut(func).picb =
+            current_buffer_size(locked_regs.func_regs(func), &mem)? as u16;
     }
-    Ok(())
-}
 
-// Runs, capturing audio from `input_stream` to the guest until stopped or an error occurs.
-fn audio_in_thread(
-    regs: Arc<Mutex<Ac97BusMasterRegs>>,
-    mem: GuestMemory,
-    thread_run: &AtomicBool,
-    mut input_stream: Box<dyn CaptureBufferStream>,
-) -> AudioResult<()> {
-    while thread_run.load(Ordering::Relaxed) {
-        input_stream
-            .next_capture_buffer()
-            .map_err(AudioError::StreamError)
-            .and_then(|mut cp_buf| capture_buffer(&mut regs.lock(), &mem, &mut cp_buf))?;
+    'audio_loop: while thread_run.load(Ordering::Relaxed) {
+        {
+            let mut locked_regs = regs.lock();
+            while locked_regs.func_regs(func).sr & SR_DCH != 0 {
+                locked_regs = lvi_semaphore.wait(locked_regs);
+                if !thread_run.load(Ordering::Relaxed) {
+                    break 'audio_loop;
+                }
+            }
+        }
+
+        let timeout = Duration::from_secs(1);
+        let action = stream
+            .wait_for_next_action_with_timeout(timeout)
+            .map_err(AudioError::WaitForAction)?;
+
+        let request = match action {
+            None => {
+                warn!("No audio message received within timeout of {:?}", timeout);
+                continue;
+            }
+            Some(request) => request,
+        };
+        let start = Instant::now();
+
+        let next_buffer = {
+            let mut locked_regs = regs.lock();
+            if pending_buffers.len() == 2 {
+                // When we have two pending buffers and receive a request for
+                // another, we know that oldest buffer has been completed.
+                // However, if that old buffer was an empty buffer we sent
+                // because the guest driver had no available buffers, we don't
+                // want to mark a buffer complete.
+                if let Some(Some(_)) = pending_buffers.pop_front() {
+                    buffer_completed(&mut locked_regs, &mem, func)?;
+                }
+            }
+
+            // We count the number of pending, real buffers at the server, and
+            // then use that as our offset from CIV.
+            let offset = pending_buffers.iter().filter(|e| e.is_some()).count();
+
+            // Get a buffer to respond to our request. If there's no buffer
+            // available, we'll wait one buffer interval and check again.
+            loop {
+                if let Some(buffer) = next_guest_buffer(&mut locked_regs, &mem, func, offset)? {
+                    break Some(buffer);
+                }
+                let elapsed = start.elapsed();
+                if elapsed > message_interval {
+                    break None;
+                }
+                locked_regs = lvi_semaphore
+                    .wait_timeout(locked_regs, message_interval - elapsed)
+                    .0;
+            }
+        };
+
+        match next_buffer {
+            Some(ref buffer) => {
+                let requested_frames = request.requested_frames();
+                if requested_frames != buffer.frames {
+                    // We should be able to handle when the number of frames in
+                    // the buffer doesn't match the number of frames requested,
+                    // but we don't yet.
+                    warn!(
+                        "Stream requested {} frames but buffer had {} frames: {:?}",
+                        requested_frames, buffer.frames, buffer
+                    );
+                }
+
+                request
+                    .set_buffer_offset_and_frames(
+                        buffer.offset,
+                        std::cmp::min(requested_frames, buffer.frames),
+                    )
+                    .map_err(AudioError::RespondRequest)?;
+            }
+            None => {
+                request
+                    .ignore_request()
+                    .map_err(AudioError::RespondRequest)?;
+            }
+        }
+        pending_buffers.push_back(next_buffer);
     }
     Ok(())
 }
@@ -753,27 +868,20 @@ fn current_buffer_size(
     mem: &GuestMemory,
 ) -> GuestMemoryResult<usize> {
     let civ = func_regs.civ;
-    let descriptor_addr = func_regs.bdbar + u32::from(civ) * DESCRIPTOR_LENGTH as u32;
-    let control_reg: u32 = mem
-        .read_obj_from_addr(GuestAddress(u64::from(descriptor_addr) + 4))
-        .map_err(GuestMemoryError::ReadingGuestBufferAddress)?;
-    let buffer_len: usize = control_reg as usize & 0x0000_ffff;
-    Ok(buffer_len)
+    get_buffer_samples(func_regs, mem, civ)
 }
 
 #[cfg(test)]
 mod test {
     use super::*;
 
-    use std::time;
-
-    use audio_streams::DummyStreamSource;
+    use audio_streams::shm_streams::MockShmStreamSource;
 
     #[test]
     fn bm_bdbar() {
         let mut bm = Ac97BusMaster::new(
             GuestMemory::new(&[]).expect("Creating guest memory failed."),
-            Box::new(DummyStreamSource::new()),
+            Box::new(MockShmStreamSource::new()),
         );
 
         let bdbars = [0x00u64, 0x10, 0x20];
@@ -797,7 +905,7 @@ mod test {
     fn bm_status_reg() {
         let mut bm = Ac97BusMaster::new(
             GuestMemory::new(&[]).expect("Creating guest memory failed."),
-            Box::new(DummyStreamSource::new()),
+            Box::new(MockShmStreamSource::new()),
         );
 
         let sr_addrs = [0x06u64, 0x16, 0x26];
@@ -813,7 +921,7 @@ mod test {
     fn bm_global_control() {
         let mut bm = Ac97BusMaster::new(
             GuestMemory::new(&[]).expect("Creating guest memory failed."),
-            Box::new(DummyStreamSource::new()),
+            Box::new(MockShmStreamSource::new()),
         );
 
         assert_eq!(bm.readl(GLOB_CNT_2C), 0x0000_0000);
@@ -839,6 +947,7 @@ mod test {
 
     #[test]
     fn start_playback() {
+        const TIMEOUT: Duration = Duration::from_millis(500);
         const LVI_MASK: u8 = 0x1f; // Five bits for 32 total entries.
         const IOC_MASK: u32 = 0x8000_0000; // Interrupt on completion.
         let num_buffers = LVI_MASK as usize + 1;
@@ -848,7 +957,8 @@ mod test {
         const GUEST_ADDR_BASE: u32 = 0x100_0000;
         let mem = GuestMemory::new(&[(GuestAddress(GUEST_ADDR_BASE as u64), 1024 * 1024 * 1024)])
             .expect("Creating guest memory failed.");
-        let mut bm = Ac97BusMaster::new(mem.clone(), Box::new(DummyStreamSource::new()));
+        let stream_source = MockShmStreamSource::new();
+        let mut bm = Ac97BusMaster::new(mem.clone(), Box::new(stream_source.clone()));
         let mixer = Ac97Mixer::new();
 
         // Release cold reset.
@@ -871,33 +981,26 @@ mod test {
         }
 
         bm.writeb(PO_LVI_15, LVI_MASK, &mixer);
+        assert_eq!(bm.readb(PO_CIV_14), 0);
 
         // Start.
         bm.writeb(PO_CR_1B, CR_IOCE | CR_RPBM, &mixer);
+        assert_eq!(bm.readw(PO_PICB_18), 0);
+
+        let mut stream = stream_source.get_last_stream();
+        // Trigger callback and see that CIV has not changed, since only 1
+        // buffer has been sent.
+        assert!(stream.trigger_callback_with_timeout(TIMEOUT));
 
-        std::thread::sleep(time::Duration::from_millis(50));
-        let picb = bm.readw(PO_PICB_18);
         let mut civ = bm.readb(PO_CIV_14);
         assert_eq!(civ, 0);
-        let pos = (FRAGMENT_SIZE - (picb as usize * 2)) / 4;
 
-        // Check that frames are consumed at least at a reasonable rate.
-        // This wont be exact as during unit tests the thread scheduling is highly variable, so the
-        // test only checks that some samples are consumed.
-        assert!(pos > 1000);
-
-        assert!(bm.readw(PO_SR_16) & SR_DCH == 0); // DMA is running.
-
-        // civ should move eventually.
-        for _i in 0..30 {
-            if civ != 0 {
-                break;
-            }
-            std::thread::sleep(time::Duration::from_millis(20));
-            civ = bm.readb(PO_CIV_14);
-        }
-
-        assert_ne!(0, civ);
+        // After two more callbacks, CIV should now be 1 since we know that the
+        // first buffer must have been played.
+        assert!(stream.trigger_callback_with_timeout(TIMEOUT));
+        assert!(stream.trigger_callback_with_timeout(TIMEOUT));
+        civ = bm.readb(PO_CIV_14);
+        assert_eq!(civ, 1);
 
         // Buffer complete should be set as the IOC bit was set in the descriptor.
         assert!(bm.readw(PO_SR_16) & SR_BCIS != 0);
@@ -905,17 +1008,30 @@ mod test {
         bm.writew(PO_SR_16, SR_BCIS);
         assert!(bm.readw(PO_SR_16) & SR_BCIS == 0);
 
-        // Set last valid to the next and wait until it is hit.
-        bm.writeb(PO_LVI_15, civ + 1, &mixer);
-        std::thread::sleep(time::Duration::from_millis(500));
+        std::thread::sleep(Duration::from_millis(50));
+        let picb = bm.readw(PO_PICB_18);
+        let pos = (FRAGMENT_SIZE - (picb as usize * 2)) / 4;
+
+        // Check that frames are consumed at least at a reasonable rate.
+        // This can't be exact as during unit tests the thread scheduling is highly variable, so the
+        // test only checks that some samples are consumed.
+        assert!(pos > 0);
+        assert!(bm.readw(PO_SR_16) & SR_DCH == 0); // DMA is running.
+
+        // Set last valid to next buffer to be sent and trigger callback so we hit it.
+        bm.writeb(PO_LVI_15, civ + 2, &mixer);
+        assert!(stream.trigger_callback_with_timeout(TIMEOUT));
+        assert!(stream.trigger_callback_with_timeout(TIMEOUT));
+        assert!(stream.trigger_callback_with_timeout(TIMEOUT));
         assert!(bm.readw(PO_SR_16) & SR_LVBCI != 0); // Hit last buffer
         assert!(bm.readw(PO_SR_16) & SR_DCH == SR_DCH); // DMA stopped because of lack of buffers.
-        assert_eq!(bm.readw(PO_SR_16) & SR_CELV, SR_CELV);
+        assert!(bm.readw(PO_SR_16) & SR_CELV == SR_CELV); // Processed the last buffer
         assert_eq!(bm.readb(PO_LVI_15), bm.readb(PO_CIV_14));
         assert!(
             bm.readl(GLOB_STA_30) & GS_POINT != 0,
             "POINT bit should be set."
         );
+
         // Clear the LVB bit
         bm.writeb(PO_SR_16, SR_LVBCI as u8, &mixer);
         assert!(bm.readw(PO_SR_16) & SR_LVBCI == 0);
@@ -924,9 +1040,11 @@ mod test {
         assert!(bm.readw(PO_SR_16) & SR_DCH == 0); // DMA restarts.
         assert_eq!(bm.readw(PO_SR_16) & SR_CELV, 0);
 
-        let (restart_civ, restart_picb) = (bm.readb(PO_CIV_14), bm.readw(PO_PICB_18));
-        std::thread::sleep(time::Duration::from_millis(20));
-        assert!(bm.readw(PO_PICB_18) != restart_picb || bm.readb(PO_CIV_14) != restart_civ);
+        let restart_civ = bm.readb(PO_CIV_14);
+        assert!(stream.trigger_callback_with_timeout(TIMEOUT));
+        assert!(stream.trigger_callback_with_timeout(TIMEOUT));
+        assert!(stream.trigger_callback_with_timeout(TIMEOUT));
+        assert!(bm.readb(PO_CIV_14) != restart_civ);
 
         // Stop.
         bm.writeb(PO_CR_1B, 0, &mixer);
@@ -940,6 +1058,7 @@ mod test {
 
     #[test]
     fn start_capture() {
+        const TIMEOUT: Duration = Duration::from_millis(500);
         const LVI_MASK: u8 = 0x1f; // Five bits for 32 total entries.
         const IOC_MASK: u32 = 0x8000_0000; // Interrupt on completion.
         let num_buffers = LVI_MASK as usize + 1;
@@ -949,7 +1068,8 @@ mod test {
         const GUEST_ADDR_BASE: u32 = 0x100_0000;
         let mem = GuestMemory::new(&[(GuestAddress(GUEST_ADDR_BASE as u64), 1024 * 1024 * 1024)])
             .expect("Creating guest memory failed.");
-        let mut bm = Ac97BusMaster::new(mem.clone(), Box::new(DummyStreamSource::new()));
+        let stream_source = MockShmStreamSource::new();
+        let mut bm = Ac97BusMaster::new(mem.clone(), Box::new(stream_source.clone()));
         let mixer = Ac97Mixer::new();
 
         // Release cold reset.
@@ -972,25 +1092,32 @@ mod test {
         bm.writeb(PI_CR_0B, CR_IOCE | CR_RPBM, &mixer);
         assert_eq!(bm.readw(PI_PICB_08), 0);
 
-        std::thread::sleep(time::Duration::from_millis(50));
+        let mut stream = stream_source.get_last_stream();
+        assert!(stream.trigger_callback_with_timeout(TIMEOUT));
+
+        // CIV is 1 here since we preemptively sent two buffer indices to the
+        // server before creating the stream. When we triggered the callback
+        // above, that means the first of those buffers was filled, so CIV
+        // increments to 1.
+        let civ = bm.readb(PI_CIV_04);
+        assert_eq!(civ, 1);
+        std::thread::sleep(Duration::from_millis(20));
         let picb = bm.readw(PI_PICB_08);
-        assert!(picb > 1000);
+        assert!(picb > 0);
         assert!(bm.readw(PI_SR_06) & SR_DCH == 0); // DMA is running.
 
-        // civ should move eventually.
-        for _i in 0..10 {
-            let civ = bm.readb(PI_CIV_04);
-            if civ != 0 {
-                break;
-            }
-            std::thread::sleep(time::Duration::from_millis(20));
-        }
-        assert_ne!(bm.readb(PI_CIV_04), 0);
+        // Trigger 2 callbacks so that we'll move to buffer 3 since at that
+        // point we can be certain that buffers 1 and 2 have been captured to.
+        assert!(stream.trigger_callback_with_timeout(TIMEOUT));
+        assert!(stream.trigger_callback_with_timeout(TIMEOUT));
+        assert_eq!(bm.readb(PI_CIV_04), 3);
 
         let civ = bm.readb(PI_CIV_04);
-        // Sets LVI to CIV + 1 to trigger last buffer hit
-        bm.writeb(PI_LVI_05, civ + 1, &mixer);
-        std::thread::sleep(time::Duration::from_millis(5000));
+        // Sets LVI to CIV + 2 to trigger last buffer hit
+        bm.writeb(PI_LVI_05, civ + 2, &mixer);
+        assert!(stream.trigger_callback_with_timeout(TIMEOUT));
+        assert!(stream.trigger_callback_with_timeout(TIMEOUT));
+        assert!(stream.trigger_callback_with_timeout(TIMEOUT));
         assert_ne!(bm.readw(PI_SR_06) & SR_LVBCI, 0); // Hit last buffer
         assert_eq!(bm.readw(PI_SR_06) & SR_DCH, SR_DCH); // DMA stopped because of lack of buffers.
         assert_eq!(bm.readw(PI_SR_06) & SR_CELV, SR_CELV);
@@ -1009,7 +1136,9 @@ mod test {
         assert_eq!(bm.readw(PI_SR_06) & SR_CELV, 0);
 
         let restart_civ = bm.readb(PI_CIV_04);
-        std::thread::sleep(time::Duration::from_millis(200));
+        assert!(stream.trigger_callback_with_timeout(TIMEOUT));
+        assert!(stream.trigger_callback_with_timeout(TIMEOUT));
+        assert!(stream.trigger_callback_with_timeout(TIMEOUT));
         assert_ne!(bm.readb(PI_CIV_04), restart_civ);
 
         // Stop.
diff --git a/src/linux.rs b/src/linux.rs
index f5d2d1c..be2df22 100644
--- a/src/linux.rs
+++ b/src/linux.rs
@@ -27,7 +27,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
 
 use libc::{self, c_int, gid_t, uid_t};
 
-use audio_streams::DummyStreamSource;
+use audio_streams::shm_streams::NullShmStreamSource;
 #[cfg(feature = "gpu")]
 use devices::virtio::EventDevice;
 use devices::virtio::{self, VirtioDevice};
@@ -1171,7 +1171,7 @@ fn create_devices(
     }
 
     if cfg.null_audio {
-        let server = Box::new(DummyStreamSource::new());
+        let server = Box::new(NullShmStreamSource::new());
         let null_audio = devices::Ac97Dev::new(mem.clone(), server);
 
         pci_devices.push((
diff --git a/sys_util/src/guest_memory.rs b/sys_util/src/guest_memory.rs
index 6399f8c..2390b92 100644
--- a/sys_util/src/guest_memory.rs
+++ b/sys_util/src/guest_memory.rs
@@ -4,6 +4,7 @@
 
 //! Track memory regions that are mapped to the guest VM.
 
+use std::convert::AsRef;
 use std::convert::TryFrom;
 use std::fmt::{self, Display};
 use std::os::unix::io::{AsRawFd, RawFd};
@@ -107,6 +108,12 @@ impl AsRawFd for GuestMemory {
     }
 }
 
+impl AsRef<SharedMemory> for GuestMemory {
+    fn as_ref(&self) -> &SharedMemory {
+        &self.memfd
+    }
+}
+
 impl GuestMemory {
     /// Creates backing memfd for GuestMemory regions
     fn create_memfd(ranges: &[(GuestAddress, u64)]) -> Result<SharedMemory> {