Implementing different connection patterns with QUIC

Implementing a new protocol on the QUIC protocol can be a little daunting initially. Although the API is not that extensive, it's more complicated than e.g. TCP where you can only send and receive bytes, and eventually have an end of stream. There isn't "one right way" to use the QUIC API. It depends on what interaction pattern your protocol intends to use. This document is an attempt at categorizing the interaction patterns. Perhaps you find exactly what you want to do here. If not, perhaps the examples give you an idea for how you can utilize the QUIC API for your use case. One thing to point out is that we're looking at interaction patterns after establishing a connection, i.e. everything that happens after we've connected or accepted incoming connections, so everything that happens once we have a Connection instance.

Overview of the QUIC API

Unlike TCP, in QUIC you can open multiple streams. Either side of a connection can decide to "open" a stream at any time:

impl Connection {
    async fn open_uni(&self) -> Result<SendStream>;
    async fn accept_uni(&self) -> Result<RecvStream>;
    
    async fn open_bi(&self) -> Result<(SendStream, RecvStream)>;
    async fn accept_bi(&self) -> Result<(SendStream, RecvStream)>;
}

Similar to how each write on one side of a TCP-based protocol will correspond to a read on the other side, when a protocol opens a stream on one end, the other side of the protocol can accept such a stream. Streams can be either uni-directional (open_uni/accept_uni), or bi-directional (open_bi/accept_bi).

  • With uni-directional streams, only the opening side sends bytes to the accepting side. The receiving side can already start consuming bytes before the opening/sending side finishes writing all data. So it supports streaming, as the name suggests.
  • With bi-directional streams, both sides can send bytes to each other at the same time. The API supports full duplex streaming.

Speaking of "finishing writing data", there are some additional ways to communicate information via streams besides sending and receiving bytes!

  • The SendStream side can .finish() the stream. This will send something like an "end of stream notification" to the other side, after all pending bytes have been sent on the stream. This "notification" can be received on the other end in various ways:
    • RecvStream::read will return Ok(None), if all pending data was read and the stream was finished. Other methods like read_chunk work similarly.
    • RecvStream::read_to_end will resolve once the finishing notification comes in, returning all pending data. If the sending side never calls .finish(), this will never resolve.
    • RecvStream::received_reset will resolve with Ok(None).
  • The SendStream side can also .reset() the stream. This will have the same effect as .finish()ing the stream, except for two differences: Resetting will happen immediately and discard any pending bytes that haven't been sent yet. You can provide an application-specific "error code" to signal the reason for the reset to the other side. This "notification" is received in these ways on the other end:
    • RecvStream::read and other methods like read_exact, read_chunks and read_to_end will return a ReadError::Reset(code) with the error code given on the send side.
    • RecvStream::received_reset will resolve to the error code Ok(Some(code)).
  • The other way around, the RecvStream side can also notify the sending side that it's not interested in reading any more data by calling RecvStream::stop with an application-specific code. This notification is received on the sending side:
    • SendStream::write and similar methods like write_all, write_chunks etc. will error out with a WriteError::Stopped(code).
    • SendStream::stopped resolves with Ok(Some(code)).

These additional "notification" mechanisms are a common source of confusion: Naively, we might expect a networking API to be able to send and receive bytes, and maybe listen for a "stop". However, it turns out that with the QUIC API, we can notify the other side about newly opened streams, and finish, reset, or even stop them. Additionally, there's two different types of stream-opening (uni-directional and bi-directional). A bi-directional stream has 3 different ways each side can close some aspect of it: Each side can either .finish() or .reset() its send half, or .stop() its receiving half.

Finally, there's one more important "notification" we have to cover: Closing the connection.

Either end of the connection can decide to close the connection at any point by calling Connection::close with an application-specific error code, (and even a bunch of bytes indicating a "reason", possibly some human-readable ASCII, but without a guarantee that it will be delivered).

Once this notification is received on the other end, all stream writes return WriteError::ConnectionLost(ApplicationClose { .. }) and all reads return ReadError::ConnectionLost(ApplicationClose { .. }). It can also be received by waiting for Connection::closed` to resolve.

Importantly, this notification interrupts all flows of data:

  • On the side that triggers it, it will drop all data to be sent
  • On the side that receives it, it will immediately drop all data to be sent and the side will stop receiving new data.

What this means is that it's important to carefully close the connection at the right time, either at a point in the protocol where we know that we won't be sending or receiving any more data, or when we're sure we want to interrupt all data flows.

On the other hand, we want to make sure that we end protocols by sending this notification on at least one end of the connection, as just "leaving the connection hanging" on one endpoint causes the other endpoint to needlessly wait for more information, eventually timing out.


Let's look at some interaction pattern examples so we get a feeling for how all of these pieces fit together:

Request and Response

The most common type of protocol interaction. In this case, the connecting endpoint first sends a request. The accepting endpoint will read the full request before starting to send a response. Once the connecting endpoint has read the full response, it will close the connection. The accepting endpoint waits for this close notification before shutting down.

async fn connecting_endpoint(conn: Connection, request: &[u8]) -> Result<Vec<u8>> {
    let (mut send, mut recv) = conn.open_bi().await?;
    send.write_all(request).await?;
    send.finish()?;
    
    let response = recv.read_to_end(MAX_RESPONSE_SIZE).await?;
    
    conn.close(0u32.into(), b"I have everything, thanks!");
    
    Ok(response);
}

async fn accepting_endpoint(conn: Connection) -> Result<()> {
    let (mut send, mut recv) = conn.accept_bi().await?;
    let request = recv.read_to_end(MAX_REQUEST_SIZE).await?;
    
    let response = compute_response(request);
    send.write_all(&response).await?;
    send.finish()?;
    
    conn.closed().await;
    
    Ok(())
}

Full duplex Request & Response streaming

It's possible to start sending a response before the request has finished coming in. This makes it possible to handle arbitrarily big requests in O(1) memory. In this toy example we're reading u64s from the client and send back each of them doubled.

async fn connecting_endpoint(conn: Connection, mut request: impl Stream<u64>) -> Result<()> {
    let (mut send, mut recv) = conn.open_bi().await?;
    
    // concurrently read the responses
    let read_task = tokio::spawn(async move {
	    let mut buf = [u8; size_of::<u64>()];
	    // read_exact will return `Err` once the other side
	    // finishes its stream
        while recv.read_exact(&mut buf).await.is_ok() {
	        let number = u64::from_be_bytes(buf);
            println!("Read response: {number}");
        }
    });
    
    while let Some(number) = request.next().await {
        send.write_u64(number).await?;
    }
    send.finish()?;
    
    // we close the connection after having read all data
    read_task.await?;
    conn.close();
    
    Ok(())
}

async fn accepting_endpoint(conn: Connection) -> Result<()> {
    let (mut send, mut recv) = conn.accept_bi().await?;
    
    let mut buf = [u8; size_of::<u64>()];
    while recv.read_exact(&mut buf).await.is_ok() {
	    let number = u64::from_be_bytes(buf);
	    send.write_u64(number.wrapping_mul(2)).await?;
    }
    send.finish()?;
    
	// the other side will tell us when it's done reading our data
	conn.closed().await;
	
	Ok(())
}

Multiple Requests & Responses

This is one of the main use cases QUIC was designed for: Multiplex multiple requests and responses on the same connection. HTTP3 is an example for a protocol using QUIC's capabilities for this. A single HTTP3 connection to a server can handle multiple HTTP requests concurrently without the requests blocking each other. This is the main innovation in HTTP3: It makes HTTP/2's connection pool obsolete.

In HTTP3, each HTTP request is run as its own bi-directional stream. The request is sent in one direction while the response is received in the other direction. This way both stream directions are cancellable as a unit, this makes it possible for the user agent to cancel some HTTP requests without cancelling any others in the same HTTP3 connection.

Using the QUIC API for this purpose will feel very natural:

// The connecting endpoint can call this multiple times
// for one connection.
// When it doesn't want to do more requests and has all
// responses, it can close the connection.
async fn request(conn: &Connection, request: &[u8]) -> Result<Vec<u8>> {
    let (mut send, mut recv) = conn.open_bi().await?;
    send.write_all(request).await?;
    send.finish()?;
    
    let response = recv.read_to_end(MAX_RESPONSE_SIZE).await?;
    
    Ok(response);
}

// The accepting endpoint will call this to handle all
// incoming requests on a single connection.
async fn handle_requests(conn: Connection) -> Result<()> {
    while let Ok((send, recv)) = conn.accept_bi().await {
        tokio::spawn(handle_request(send, recv));
    }
    // conn.accept_bi() will error out once the connection was
    // closed, so we don't need to use conn.closed().
}

async fn handle_request(mut send: SendStream, mut recv: RecvStream) -> Result<()> {
    let request = recv.read_to_end(MAX_REQUEST_SIZE).await?;
    
    let response = compute_response(request);
    send.write_all(&response).await?;
    send.finish()?;
    
    Ok(())
}

Please not that, in this case, the client doesn't immediately close the connection after a single request (duh!). Instead, it might want to optimistically keep the connection open for some idle time or until it knows the application won't need to make another request, and only then close the connection. All that said, it's still true that the connecting side closes the connection.

Multiple ordered Notifications

Sending and receiving multiple notifications that can be handled one-by-one can be done by adding framing to the bytes on a uni-directional stream.

async fn connecting_endpoint(conn: Connection, mut notifications: impl Stream<Item = Bytes>) -> Result<()> {
    let send = conn.open_uni().await?;
    
    let mut send_frame = LengthDelimitedCodec::builder().new_write(send);
    while let Some(notfication) = notifications.next().await {
        send_frame.send(notification).await?;
    }
    
    send.finish()?;
    conn.closed().await;
    
    Ok(())
}

async fn accepting_endpoint(conn: Connection) -> Result<()> {
    let recv = conn.accept_uni().await?;
    let mut recv_frame = LengthDelimitedCodec::builder().new_read(recv);
    
    while let Some(notification) = recv_frame.try_next().await? {
        println!("Received notification: {notification:?}");
    }
    
    conn.close(0u32, b"got everything!");
    
    Ok(())
}

Here we're using LengthDelimitedCodec and tokio-util's codec feature to easily turn the SendStream and RecvStream that work as streams of bytes into streams of items, where each item in this case is a Bytes/BytesMut. In practice you would probably add byte parsing to this code first, and you might want to configure the LengthDelimitedCodec.

The resulting notifications are all in order since the bytes in the uni-directional streams are received in-order, and we're processing one frame before continuing to read the next bytes off of the QUIC stream.

Request with multiple Responses

If your protocol expects multiple responses for a request, we can implement that with the same primitive we've learned about in the section about multiple ordered notifications: We use framing to segment a single response byte stream into multiple ordered responses:

async fn connecting_endpoint(conn: Connection, request: &[u8]) -> Result<()> {
    let (mut send, recv) = conn.open_bi().await?;
	send.write_all(request).await?;
	send.finish()?;
	
	let mut recv_frame = LengthDelimitedCodec::builder().new_read(recv).await?;
	while let Some(response) = recv_frame.try_next().await? {
	    println!("Received response: {response:?}");
	}
	
	conn.close(0u32, b"thank you!");
    
    Ok(())
}

async fn accepting_endpoint(conn: Connection) -> Result<()> {
	let (send, mut recv) = conn.accept_bi().await?;
	let request = recv.read_to_end(MAX_REQUEST_SIZE).await?;
	
	let mut send_frame = LengthDelimitedCodec::builder().new_write(send);
	let mut responses = responses_for_Request(&request);
	while let Some(response) = responses.next().await {
	    send_frame.send(response).await?;
	}

	conn.closed().await;
	
	Ok(())
}

fn responses_for_request(req: &[u8]) -> impl Stream<Item = Bytes> {
    // ...
}

This example ends up similar as the one with ordered notifications, except

  1. The roles are reversed: The length-prefix sending happens on the accepting endpoint, and the length-prefix decoding on the connecting endpoint.
  2. We additionally send a request before we start receiving multiple responses.

Requests with multiple unordered Responses

The previous example required all responses to come in ordered. What if that's undesired? What if we want the connecting endpoint to receive incoming responses as quickly as possible? In that case, we need to break up the single response stream into multiple response streams. We can do this by "conceptually" splitting the "single" bi-directional stream into one uni-directional stream for the request and multiple uni-directional streams in the other direction for all the responses:

async fn connnecting_side(conn: Connection, request: &[u8]) -> Result<()> {
    let mut send = conn.open_uni().await?;
    send.write_all(request).await?;
    send.finish()?;
    
    let recv_tasks = TaskTracker::new();
    // accept_uni will return `Err` once the connection is closed
	while let Ok(recv) = conn.accept_uni().await? {
	    recv_tasks.spawn(handle_response(recv));
	}
	recv_tasks.wait().await;
	conn.close(0u32, b"Thank you!");
	
}

Proxying UDP traffic using the unreliable datagram extension

Time-sensitive Real-time interaction

We often see users reaching for the QUIC datagram extension when implementing real-time protocols. Doing this is in most cases misguided. QUIC datagram sending still interacts with QUIC's congestion controller and thus are also acknowledged. Implementing traditional protocols on top of QUIC datagrams might thus not perform the way they were designed to. Instead, it's often better to use lots of streams that are then stopped, reset or prioritized.

A real-world example is the media over QUIC protocol (MoQ in short): MoQ is used to transfer live video frames. It uses one QUIC stream for each frame (QUIC streams are cheap to create)!

The receiver then stops streams that are "too old" to be delivered, e.g. because it's a live video stream and newer frames were already fully received. Similarly, the sending side will also reset older streams for the application level to indicate to the QUIC stack it doesn't need to keep re-trying the transmission of an outdated live video frame. (MoQ will actually also use stream prioritization to make sure the newest video frames get scheduled to be sent first.)

Use Streams with stop/reset instead of UDP datagrams

https://discord.com/channels/1161119546170687619/1195362941134962748/1407266901939327007

Use Stream priority to down-regulate secondary data

https://discord.com/channels/976380008299917365/1063547094863978677/1248723504246030336

Closing Connections

Gracefully closing connections can be tricky to get right when first working with the QUIC API. If you don't close connections gracefully, you'll see the connecting timing out on one endpoint, usually after 30s, even though another endpoint finishes promptly without errors. This happens when the endpoint that finishes doesn't notify the other endpoint about having finished operations. There's mainly two reasons this happens:

  1. The protocol doesn't call Connection::close at the right moment.
  2. The endpoint that closes the connection is immediately dropped afterwards without waiting for Endpoint::close. To make sure that you're not hitting (2), simply always make sure to wait for Endpoint::close to resolve, on both Endpoints, if you can afford it. Getting (1) right is harder. We might accidentally close connections too early, because we accidentally drop the Connection (which implicitly calls close). Instead, we should always keep around the connection and either wait for Connection::closed to resolve or call Connection::close ourselves at the right moment. When that is depends on what kind of protocol you're implementing:

After a single Interaction

Protocols that implement a single interaction want to keep their connection alive for only the time of this interaction. In this case, the endpoint that received application data last will be the endpoint that calls Connection::close at that point in time. Conversely, the other endpoint should wait for Connection::closed to resolve before ending its operations. An example of this can be seen in the Request and Response section above: The connecting side closes the connection once it received the response and the accepting side waits for the connection to be closed after having sent off the response.

During continuous Interaction

Sometimes we want to keep open connections as long as the user is actively working with the application, so we don't needlessly run handshakes or try to hole-punch repeatedly. In these cases, the protocol flow doesn't indicate which endpoint of the connection will be the one that closes the connection. Instead, clients should concurrently monitor Connection::closed while they're running the protocol:

async fn handle_connection(conn: Connection) -> Result<()> {
    futures_lite::future::race(
		run_protocol(conn.clone()),
		async move {
			conn.closed().await;
			anyhow::Ok(())
		},
    )?;
    Ok(())
}

async fn run_protocol(conn: Connection) -> Result<()> {
	// run normal protocol flow
	// once we realize we want to abort the connection flow
	conn.close(0u32.into(), b"ah sorry, have to go!");
	
	Ok(())
}

And again, after handle_connection we need to make sure to wait for Endpoint::close to resolve.

Aborting Streams

https://discord.com/channels/949724860232392765/1399719019292000329/1399721482522984502

As Sender

As Receiver

QUIC 0-RTT features

Server-side 0.5-RTT

  • always works
  • gotcha: Request can be replayed

Client-side 0-RTT

Gotchas

Streams are opened "lazily"

Stream flushing vs. finishing

Dropping the endpoint without Endpoint.close().await

Dropping all Connections and Streams closes the connection

Knowing Idempotency