Introduction

Welcome to Rust ASIO guide! If you're looking to use the Rust programming language for writing the athernet project, you've come to the right place. This guide will take you through from the basics of ASIO in Rust to creating your own asynchronous audio interface, and beyond.

Preliminaries

This guide assumes you have adequate knowledge of Rust. If you're new to Rust, you should first read the holy Rust Book. You should also be familiar with the Rust Standard Library.

It is advised that you have a basic understanding of asynchronous programming, otherwise you may fall into the nightmares of OS threads based concurrency model. If you're new to asynchronous programming, you should first read the Async Book.

Code Examples

All code in this book is written for and tested on the Windows operating system using Rust 1.72.0, which is released on August 24, 2023. Earlier versions may not include all the features used in this guide. Later versions, however, should work just fine.

For brevity, the code examples do not include use statements, except for the first time a new item from the standard library or other crate is used. As a convenience, the following prelude can be used to import everything necessary to compile any of the code examples in this guide:

#![allow(unused_imports)]
use anyhow::{Error, Result};
use cpal::{
    traits::{DeviceTrait, HostTrait, StreamTrait},
    Device, FromSample, HostId, Sample, SampleFormat, SampleRate, SizedSample,
    SupportedStreamConfig,
};
use futures::{FutureExt, Sink, SinkExt, Stream, StreamExt};
use hound::{WavSpec, WavWriter};
use rodio::{Decoder, OutputStream, Source};
use std::{
    fs::File,
    io::BufWriter,
    iter::ExactSizeIterator,
    sync::{Arc, Mutex},
    thread,
    time::Duration,
};
use tokio::{
    sync::{
        mpsc::{self, UnboundedReceiver, UnboundedSender},
        oneshot::{self, Receiver, Sender},
    },
    task, time,
};

Supplemental material, including complete versions of all code examples, is available at https://github.com/mousany/rust-asio.

You may use all example code offered with this guide for any purpose. Note that if you use them in your own projects, you may need to credit or mention the authors of this guide.

Audio IO in Rust

In this section, we will cover how to record and play audio in Rust. In addition, we will also cover how to interact with raw audio buffers in memory, which can be helpful when implementing the athernet project.

Let's get started!

ASIO on Windows

ASIO is an audio driver protocol by Steinberg. While it is available on multiple operating systems, it is most commonly used on Windows to work around limitations of WASAPI including access to large numbers of channels and lower-latency audio processing.

The CPAL crate provides an API that abstracts over multiple audio backends including ASIO. It allows for using the ASIO SDK as the audio host on Windows instead of WASAPI.

Locating the ASIO SDK

The location of ASIO SDK is exposed to CPAL by setting the CPAL_ASIO_DIR environment variable.

The build script will try to find the ASIO SDK by following these steps in order:

  1. Check if CPAL_ASIO_DIR is set and if so use the path to point to the SDK.
  2. Check if the ASIO SDK is already installed in the temporary directory, if so use that and set the path of CPAL_ASIO_DIR to the output of std::env::temp_dir().join("asio_sdk").
  3. If the ASIO SDK is not already installed, download it from https://www.steinberg.net/asiosdk and install it in the temporary directory. The path of CPAL_ASIO_DIR will be set to the output of std::env::temp_dir().join("asio_sdk").

In an ideal situation you don't need to worry about this step.

Preparing the build environment

  1. bindgen, the library used to generate bindings to the C++ SDK, requires clang. Download and install LLVM from here under the "Pre-Built Binaries" section. The version as of writing this is 17.0.1.

  2. Add the LLVM bin directory to a LIBCLANG_PATH environment variable. If you installed LLVM to the default directory, this should work in powershell:

    $env:LIBCLANG_PATH="C:\Program Files\LLVM\bin"
    
  3. If you don't have any ASIO devices or drivers available, you can download and install ASIO4ALL.

  4. The build script assumes that Microsoft Visual Studio is installed. The script will try to find vcvarsall.bat and execute it with the right machine architecture regardless of the Microsoft Visual Studio version. If there are any errors encountered in this process which is unlikely, you may find the vcvarsall.bat manually and execute it with your machine architecture as an argument. The script will detect this and skip the step.

    A manually executed command example for 64 bit machines:

    "C:\Program Files (x86)\Microsoft Visual Studio\2022\Community\VC\Auxiliary\Build\vcvarsall.bat" amd64
    
  5. Select the ASIO host at the start of your program with the following code:

    let host;
    #[cfg(target_os = "windows")]
    {
        host = cpal::host_from_id(cpal::HostId::Asio).expect("failed to initialise ASIO host");
    }

    If you run into compilations errors produced by asio-sys or bindgen, make sure that CPAL_ASIO_DIR is set correctly and try cargo clean.

  6. Make sure to enable the asio feature when building CPAL:

    cargo build --features "asio"
    

    or if you are using CPAL as a dependency in a downstream project, enable the feature like this:

    cpal = { version = "*", features = ["asio"] }
    

Recording Sound with CPAL

Here are some concepts cpal exposes:

  • A Host provides access to the available audio devices on the system. Some platforms have more than one host available, but every platform supported by CPAL has at least one default_host that is guaranteed to be available.
  • A Device is an audio device that may have any number of input and output streams.
  • A Stream is an open flow of audio data. Input streams allow you to receive audio data, output streams allow you to play audio data. You must choose which Device will run your stream before you can create one. Often, a default device can be retrieved via the Host.

Creating a Stream

To create a stream, you must first create a Host and a Device:

use cpal::{
  traits::{DeviceTrait, HostTrait},
  HostId
};

let host = cpal::host_from_id(HostId::Asio).expect("failed to initialise ASIO host");
let device = host.default_input_device().expect("failed to find input device");

Since we only need one channel of audio, you need to replace the device's default config with one that only has one channel:

use cpal::{SampleRate, SupportedStreamConfig};

let default_config = device.default_input_config().unwrap();
let config = SupportedStreamConfig::new(
    1,                 // mono
    SampleRate(48000), // sample rate
    default_config.buffer_size().clone(),
    default_config.sample_format(),
);

Now you can create a stream from the device and the config:

use cpal::SampleFormat;

let stream = match config.sample_format() {
    SampleFormat::I8 => device.build_input_stream(
        &config.into(),
        move |data: &[i8], _: &_| {
            // react to stream events and read or write stream data here.
        },
        move |err| {
            // react to errors here.
        },
        None,
    ),

    ...
}
.unwrap();

While the stream is running, the selected audio device will periodically call the data callback that was passed to the function.

Creating and running a stream will not block the thread. On modern platforms, the given callback is called by a dedicated, high-priority thread responsible for delivering audio data to the system’s audio device in a timely manner.

Starting and Stopping a Stream

Not all platforms automatically start a stream when it is created. To start a stream, call play() on it:

use cpal::traits::StreamTrait;

stream.play().expect("failed to play stream");

Some devices support pausing the audio stream. This can be done by calling pause() on the stream:

stream.pause().expect("failed to pause stream");

Writing a WAV File

This example shows how to write a WAV file from a stream. It uses the hound crate to write the WAV file.

use cpal::{Sample, FromSample};
use hound::{WavSpec, WavWriter};
use std::{
    fs::File,
    io::BufWriter,
    sync::{Arc, Mutex},
    thread,
    time::Duration,
};

fn write_input_data<T, U>(data: &[T], writer: &Arc<Mutex<WavWriter<BufWriter<File>>>>)
where
    T: Sample,
    U: Sample + hound::Sample + FromSample<T>,
{
    let mut writer = writer.lock().unwrap();
    for &sample in data {
        writer.write_sample(sample.to_sample::<U>()).ok();
    }
}

let spec = wav_spec_from_config(&config);
let writer = Arc::new(Mutex::new(WavWriter::create("output.wav", spec).unwrap()));
let err_fn = |err| eprintln!("an error occurred on stream: {}", err);

let stream = match config.sample_format() {
    SampleFormat::I8 => device.build_input_stream(
        &config.into(),
        move |data: &[i8], _: &_| {
            write_input_data::<i8, f32>(data, &writer);
        },
        err_fn,
        None,
    ),
    ...
}
.unwrap();

stream.play().expect("failed to play stream");
thread::sleep(Duration::from_secs(5));
stream.pause().expect("failed to pause stream");

Playing Sound with Rodio

Playing sound can be done similarly to recording sound with CPAL. Fortunately, developers of CPAL have also created a library called Rodio, which is a simple audio playback library. It is built on top of CPAL and provides a simpler API for playing sound. Let's see how we can use it to play the sound we recorded in the previous section.

Creating a Sink

The Rodio library also provides a type named Sink which represents an audio track. It can be created from the Device and SupportedStreamConfig types from CPAL.

use rodio::{OutputStream, Sink};

let (_stream, handle) = OutputStream::try_from_device_config(&device, config).unwrap();
let sink = Sink::try_new(&handle).unwrap();

Playing Sound from a WAV file

The Sink type provides a method named append which can be used to append a Source to the audio track. The Source is a trait which is implemented by many types, including rodio::Decoder, which allows us to play audio from a file.

Playing a sound in Sink will not block the thread. Instead, it is done in the background by a dedicated thread. However, you can use the Sink::sleep_until_end method to block the thread until the sound has finished playing.

use rodio::Decoder;
use std::fs::File;

let source = Decoder::new(File::open("output.wav").unwrap()).unwrap();
sink.append(source);
sink.sleep_until_end();

Pausing and Resuming a Sink

The Sink type also provides methods for pausing and resuming the audio track. Let's see how we can use them.

sink.pause();
sink.play();

It is also possible to clear the audio track by using the Sink::clear method.

sink.clear();

Audio Track in Memory

It is unnecessary to write the audio data to a file. We can keep the audio data in memory and play it directly. To do this, we need to create our own AudioTrack struct implementing the Source trait.

The Source Trait

The Source trait is defined in the rodio crate as follows:

pub trait Source: Iterator
where
    Self::Item: rodio::Sample, 
{
    fn current_frame_len(&self) -> Option<usize>;
    fn channels(&self) -> u16;
    fn sample_rate(&self) -> u32;
    fn total_duration(&self) -> Option<Duration>;
}

To put it simply, the Source trait is an iterator that iterates over the audio samples, with additional information about the audio data. The current_frame_len method returns the number of samples in the current frame. The channels method returns the number of channels. The sample_rate method returns the sample rate. The total_duration method returns the total duration of the audio data.

Wrapping an Iterator

Hence, we can create our own AudioTrack struct by adding some extra fields to existing Iterator structs.

use std::iter::ExactSizeIterator;

pub struct AudioTrack<I: ExactSizeIterator>
where
    I::Item: rodio::Sample,
{
    inner: I,
    config: SupportedStreamConfig,
}

impl<I: ExactSizeIterator> AudioTrack<I>
where
    I::Item: rodio::Sample,
{
    pub fn new(iter: I, config: SupportedStreamConfig) -> Self {
        Self {
            inner: iter,
            config,
        }
    }
}

impl<I: ExactSizeIterator> Iterator for AudioTrack<I>
where
    I::Item: rodio::Sample,
{
    type Item = I::Item;

    fn next(&mut self) -> Option<Self::Item> {
        self.inner.next()
    }
}

The AudioTrack struct has two fields. The inner field is an iterator that iterates over the audio samples. The config field is the configuration of the audio stream. The Iterator trait is implemented for the AudioTrack struct as required by the Source trait.

Next, we need to implement the Source trait for the AudioTrack struct.

impl<I: ExactSizeIterator> Source for AudioTrack<I>
where
    I::Item: rodio::Sample,
{
    fn current_frame_len(&self) -> Option<usize> {
        Some(self.inner.len())
    }

    fn channels(&self) -> u16 {
        self.config.channels()
    }

    fn sample_rate(&self) -> u32 {
        self.config.sample_rate().0
    }

    fn total_duration(&self) -> Option<std::time::Duration> {
        None
    }
}

Recording and Playing

Finally, we can use the AudioTrack struct to record and play audio data without writing to a file.


fn write_input_data<T, U>(data: &[T], writer: &Arc<Mutex<Vec<U>>>)
where
    T: Sample,
    U: Sample + hound::Sample + FromSample<T>,
{
    writer
        .lock()
        .unwrap()
        .extend(data.iter().map(|sample| U::from_sample(*sample)));
}

let writer = Arc::new(Mutex::new(vec![]));
let reader = writer.clone();

... // Create the input stream and record the audio data

let reader = reader.lock().unwrap();
let track = AudioTrack::new(reader.clone().into_iter(), config);

... // Create the output sink

sink.append(track);
sink.sleep_until_end();

Bridging to Async Rust

In this section, we will cover how to bridge between the synchronous and asynchronous worlds in Rust. With this knowledge, you will be free from the nightmares of OS threads based concurrency model, achieving performant and bug-free concurrency which is essential for the athernet project.

Buckle up!

The Stream and Sink Traits

The Stream and Sink traits are part of the core of the futures crate. They are used to represent asynchronous streams and sinks, respectively. A stream is a source of values that are produced asynchronously, and a sink is a destination for values that are consumed asynchronously. The Stream and Sink traits are the asynchronous equivalents of the Iterator and Write traits, respectively.

The Stream trait is defined as follows:

pub trait Stream {
    type Item;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}

The poll_next method is similar to the next method of the Iterator trait, except that it returns a Poll<Option<Self::Item>> instead of an Option<Self::Item>. The Poll type is the same type that is returned by the poll method of the Future trait, which has been discussed in the Async Book.

The Sink trait is defined as follows:

pub trait Sink<Item> {
    type Error;

    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
    fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
}

The poll_ready method is used to check if the sink is ready to receive a value. The start_send method is used to send a value to the sink. The poll_flush method is used to flush the sink, and the poll_close method is used to close the sink.

The Stream and Sink traits are implemented for many types in the futures crate. For example, the TcpStream type implements both the Stream and Sink traits. Bridging to the asynchronous world is often as simple as using the Stream and Sink traits, while they can also precisely describe the layers of the athernet.

Recording as Stream

Since the driver uses callbacks to handle events, it is possible to use Mutex and Waker to integrate it with the futures crate. However, the simplest way is to delegate the work to an unbounded channel. The channel will be used to send the recorded data to the main thread, where it will be processed.

use anyhow::Result;
use cpal::{Device, SizedSample};

fn build_input_stream<T>(
    device: &Device,
    config: SupportedStreamConfig,
    sender: UnboundedSender<Vec<f32>>,
) -> Result<cpal::Stream>
where
    T: SizedSample,
    f32: FromSample<T>,
{
    let stream = device.build_input_stream(
        &config.config(),
        move |data: &[T], _: &_| {
            let data = data
                .iter()
                .map(|&sample| f32::from_sample(sample))
                .collect::<Vec<f32>>();
            sender.send(data).unwrap();
        },
        |err| eprintln!("an error occurred on stream: {}", err),
        None,
    )?;
    Ok(stream)
}

Since the unbounded channel is asynchronous, it also handles the wake-up when any data is received, making the manual control of the Waker unnecessary.

use futures::Stream;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};

pub struct AudioInputStream {
    stream: cpal::Stream,
    receiver: UnboundedReceiver<Vec<f32>>,
}

impl AudioInputStream {
    pub fn try_from_device_config(device: &Device, config: SupportedStreamConfig) -> Result<Self> {
        let (sender, receiver) = mpsc::unbounded_channel();
        let stream = match config.sample_format() {
            SampleFormat::I8 => build_input_stream::<i8>(device, config, sender)?,
           ...
            _ => return Err(anyhow::anyhow!("unsupported sample format")),
        };
        Ok(Self { stream, receiver })
    }
}

impl Stream for AudioInputStream {
    type Item = Vec<f32>;

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> futures::task::Poll<Option<Self::Item>> {
        self.stream.play().unwrap();
        self.receiver.poll_recv(cx)
    }
}

Playing as Sink

Unlike recording, playing audio as a sink is a bit more complicated. This is because the Sink struct provided by Rodio only provides a sleep_until_end method to indicate when the audio has finished playing. This is not very useful in our case, as it would block the thread until finished. Instead, we want to be able to play audio in the background, while still being able know when it has finished playing.

The solution to this is to use a dedicated thread to play the audio, and notify the main thread when it has finished via an oneshot channel.

use tokio::{
    sync::oneshot::{self, Receiver, Sender},
    task,
};

pub struct AudioOutputStream<I>
where
    I: ExactSizeIterator + Send + 'static,
    I::Item: rodio::Sample + Send,
    f32: FromSample<I::Item>,
{
    _stream: OutputStream,
    sender: UnboundedSender<(AudioTrack<I>, Sender<()>)>,
    task: Option<Receiver<()>>,
}

impl<I> AudioOutputStream<I>
where
    I: ExactSizeIterator + Send + 'static,
    I::Item: rodio::Sample + Send,
    f32: FromSample<I::Item>,
{
    pub fn try_from_device_config(device: &Device, config: SupportedStreamConfig) -> Result<Self> {
        let (_stream, handle) = OutputStream::try_from_device_config(device, config)?;
        let sink = rodio::Sink::try_new(&handle)?;

        let (sender, mut receiver) = mpsc::unbounded_channel::<(AudioTrack<I>, Sender<()>)>();
        task::spawn_blocking(move || {
            while let Some((track, sender)) = receiver.blocking_recv() {
                sink.append(track);
                sink.sleep_until_end();
                let _ = sender.send(());
            }
        });

        Ok(Self {
            _stream,
            sender,
            task: None,
        })
    }
}

Since the one-shot channel is also asynchronous, its Receiver can be polled directly, delegating the control for waking up.

use futures::FutureExt;

impl<I> AudioOutputStream<I>
where
    I: ExactSizeIterator + Send + 'static,
    I::Item: rodio::Sample + Send,
    f32: FromSample<I::Item>,
{
    fn poll(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<std::result::Result<(), Error>> {
        match self.as_mut().task {
            Some(ref mut inner) => {
                if inner.poll_unpin(cx).is_ready() {
                    self.as_mut().task = None;
                    std::task::Poll::Ready(Ok(()))
                } else {
                    std::task::Poll::Pending
                }
            }
            None => std::task::Poll::Ready(Ok(())),
        }
    }
}

And finally, the Sink implementation is very similar to the Stream implementation from the previous section. To make things simple, buffering is not implemented here, but it could be done by using the buffer method from the SinkExt.

use anyhow::Error;
use futures::Sink;

impl<I> Sink<AudioTrack<I>> for AudioOutputStream<I>
where
    I: ExactSizeIterator + Send + 'static,
    I::Item: rodio::Sample + Send,
    f32: FromSample<I::Item>,
{
    type Error = Error;

    fn poll_close(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<std::result::Result<(), Self::Error>> {
        self.poll(cx)
    }

    fn poll_flush(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<std::result::Result<(), Self::Error>> {
        self.poll(cx)
    }

    fn poll_ready(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<std::result::Result<(), Self::Error>> {
        self.poll(cx)
    }

    fn start_send(
        mut self: std::pin::Pin<&mut Self>,
        item: AudioTrack<I>,
    ) -> std::result::Result<(), Self::Error> {
        let (sender, receiver) = oneshot::channel();
        self.sender
            .send((item, sender))
            .map_err(|_| Error::msg("failed to send audio track"))?;
        self.as_mut().task = Some(receiver);
        Ok(())
    }
}

Putting It All Together

Now that we have all the pieces, let's put them together. Here is the record-then-playback example from last section:

use futures::{SinkExt, StreamExt};
use tokio::time;

... // Create device and config

let mut input_stream =
    AudioInputStream::try_from_device_config(&device, config.clone()).unwrap();
let mut inputs = vec![];
time::timeout(Duration::from_secs(5), async {
    while let Some(sample) = input_stream.next().await {
        inputs.extend(sample);
    }
})
.await
.ok();

let track = AudioTrack::new(inputs.into_iter(), config.clone());
let mut output_stream = AudioOutputStream::try_from_device_config(&device, config).unwrap();
output_stream.send(track).await.unwrap();

One obvious changes is that we are using async and await instead of sleep and sleep_until_end. Under the hood, sleep and sleep_until_end may still happen, but they are scheduled by the tokio runtime instead of manually by us, like we did in the previous section.

You may find this extremely helpful when writing the athernet project, as explicit synchronization is rarely needed, let alone the confusing synchronization bugs.

In addition, this approach also allows us to make use of existing asynchronous infrastructure, such as tokio_utils and futures_util, where framing, buffering, and other utilities are provided.

References