Implementing different connection patterns with QUIC
Implementing a new protocol on the QUIC protocol can be a little daunting initially. Although the API is not that extensive, it's more complicated than e.g. TCP where you can only send and receive bytes, and eventually have an end of stream.
There isn't "one right way" to use the QUIC API. It depends on what interaction pattern your protocol intends to use.
This document is an attempt at categorizing the interaction patterns. Perhaps you find exactly what you want to do here. If not, perhaps the examples give you an idea for how you can utilize the QUIC API for your use case.
One thing to point out is that we're looking at interaction patterns after establishing a connection, i.e. everything that happens after we've connected or accepted incoming connections, so everything that happens once we have a Connection instance.
Overview of the QUIC API
Unlike TCP, in QUIC you can open multiple streams. Either side of a connection can decide to "open" a stream at any time:
impl Connection {
async fn open_uni(&self) -> Result<SendStream>;
async fn accept_uni(&self) -> Result<RecvStream>;
async fn open_bi(&self) -> Result<(SendStream, RecvStream)>;
async fn accept_bi(&self) -> Result<(SendStream, RecvStream)>;
}
Similar to how each write on one side of a TCP-based protocol will correspond to a read on the other side, when a protocol opens a stream on one end, the other side of the protocol can accept such a stream.
Streams can be either uni-directional (open_uni/accept_uni), or bi-directional (open_bi/accept_bi).
- With uni-directional streams, only the opening side sends bytes to the accepting side. The receiving side can already start consuming bytes before the opening/sending side finishes writing all data. So it supports streaming, as the name suggests.
- With bi-directional streams, both sides can send bytes to each other at the same time. The API supports full duplex streaming.
One bi-directional stream is essentially the closest equivalent to a TCP stream. If your goal is to adapt a TCP protocol to the QUIC API, the easiest way is going to be opening a single bi-directional stream and then essentially using the send and receive stream pair as if it were a TCP stream.
Speaking of "finishing writing data", there are some additional ways to communicate information via streams besides sending and receiving bytes!
- The
SendStreamside can.finish()the stream. This will send something like an "end of stream notification" to the other side, after all pending bytes have been sent on the stream. This "notification" can be received on the other end in various ways:RecvStream::readwill returnOk(None), if all pending data was read and the stream was finished. Other methods likeread_chunkwork similarly.RecvStream::read_to_endwill resolve once the finishing notification comes in, returning all pending data. If the sending side never calls.finish(), this will never resolve.RecvStream::received_resetwill resolve withOk(None).
- The
SendStreamside can also.reset()the stream. This will have the same effect as.finish()ing the stream, except for two differences: Resetting will happen immediately and discard any pending bytes that haven't been sent yet. You can provide an application-specific "error code" to signal the reason for the reset to the other side. This "notification" is received in these ways on the other end:RecvStream::readand other methods likeread_exact,read_chunksandread_to_endwill return aReadError::Reset(code)with the error code given on the send side.RecvStream::received_resetwill resolve to the error codeOk(Some(code)).
- The other way around, the
RecvStreamside can also notify the sending side that it's not interested in reading any more data by callingRecvStream::stopwith an application-specific code. This notification is received on the sending side:SendStream::writeand similar methods likewrite_all,write_chunksetc. will error out with aWriteError::Stopped(code).SendStream::stoppedresolves withOk(Some(code)).
What is the difference between a bi-directional stream and two uni-directional streams?
- The bi-directional stream establishes the stream pair in a single "open -> accept" interaction. For two uni-directional streams in two directions, you'd need one side to open, then send data, then accept at the same time. The other side would have to accept and then open a stream.
- Two uni-directional streams can not be stopped or reset as a unit: One stream might be stopped or reset with one close code while the other is still open. Bi-directional streams can only be stopped or reset as a unit.
These additional "notification" mechanisms are a common source of confusion: Naively, we might expect a networking API to be able to send and receive bytes, and maybe listen for a "stop".
However, it turns out that with the QUIC API, we can notify the other side about newly opened streams, and finish, reset, or even stop them. Additionally, there's two different types of stream-opening (uni-directional and bi-directional).
A bi-directional stream has 3 different ways each side can close some aspect of it: Each side can either .finish() or .reset() its send half, or .stop() its receiving half.
Finally, there's one more important "notification" we have to cover: Closing the connection.
Either end of the connection can decide to close the connection at any point by calling Connection::close with an application-specific error code, (and even a bunch of bytes indicating a "reason", possibly some human-readable ASCII, but without a guarantee that it will be delivered).
Once this notification is received on the other end, all stream writes return WriteError::ConnectionLost(ApplicationClose { .. }) and all reads return ReadError::ConnectionLost(ApplicationClose { .. }). It can also be received by waiting for Connection::closed` to resolve.
Importantly, this notification interrupts all flows of data:
- On the side that triggers it, it will drop all data to be sent
- On the side that receives it, it will immediately drop all data to be sent and the side will stop receiving new data.
What this means is that it's important to carefully close the connection at the right time, either at a point in the protocol where we know that we won't be sending or receiving any more data, or when we're sure we want to interrupt all data flows.
On the other hand, we want to make sure that we end protocols by sending this notification on at least one end of the connection, as just "leaving the connection hanging" on one endpoint causes the other endpoint to needlessly wait for more information, eventually timing out.
Let's look at some interaction pattern examples so we get a feeling for how all of these pieces fit together:
Request and Response
The most common type of protocol interaction. In this case, the connecting endpoint first sends a request. The accepting endpoint will read the full request before starting to send a response. Once the connecting endpoint has read the full response, it will close the connection. The accepting endpoint waits for this close notification before shutting down.
async fn connecting_endpoint(conn: Connection, request: &[u8]) -> Result<Vec<u8>> {
let (mut send, mut recv) = conn.open_bi().await?;
send.write_all(request).await?;
send.finish()?;
let response = recv.read_to_end(MAX_RESPONSE_SIZE).await?;
conn.close(0u32.into(), b"I have everything, thanks!");
Ok(response);
}
async fn accepting_endpoint(conn: Connection) -> Result<()> {
let (mut send, mut recv) = conn.accept_bi().await?;
let request = recv.read_to_end(MAX_REQUEST_SIZE).await?;
let response = compute_response(request);
send.write_all(&response).await?;
send.finish()?;
conn.closed().await;
Ok(())
}
Full duplex Request & Response streaming
It's possible to start sending a response before the request has finished coming in.
This makes it possible to handle arbitrarily big requests in O(1) memory.
In this toy example we're reading u64s from the client and send back each of them doubled.
async fn connecting_endpoint(conn: Connection, mut request: impl Stream<u64>) -> Result<()> {
let (mut send, mut recv) = conn.open_bi().await?;
// concurrently read the responses
let read_task = tokio::spawn(async move {
let mut buf = [u8; size_of::<u64>()];
// read_exact will return `Err` once the other side
// finishes its stream
while recv.read_exact(&mut buf).await.is_ok() {
let number = u64::from_be_bytes(buf);
println!("Read response: {number}");
}
});
while let Some(number) = request.next().await {
send.write_u64(number).await?;
}
send.finish()?;
// we close the connection after having read all data
read_task.await?;
conn.close();
Ok(())
}
async fn accepting_endpoint(conn: Connection) -> Result<()> {
let (mut send, mut recv) = conn.accept_bi().await?;
let mut buf = [u8; size_of::<u64>()];
while recv.read_exact(&mut buf).await.is_ok() {
let number = u64::from_be_bytes(buf);
send.write_u64(number.wrapping_mul(2)).await?;
}
send.finish()?;
// the other side will tell us when it's done reading our data
conn.closed().await;
Ok(())
}
Multiple Requests & Responses
This is one of the main use cases QUIC was designed for: Multiplex multiple requests and responses on the same connection. HTTP3 is an example for a protocol using QUIC's capabilities for this. A single HTTP3 connection to a server can handle multiple HTTP requests concurrently without the requests blocking each other. This is the main innovation in HTTP3: It makes HTTP/2's connection pool obsolete.
In HTTP3, each HTTP request is run as its own bi-directional stream. The request is sent in one direction while the response is received in the other direction. This way both stream directions are cancellable as a unit, this makes it possible for the user agent to cancel some HTTP requests without cancelling any others in the same HTTP3 connection.
Using the QUIC API for this purpose will feel very natural:
// The connecting endpoint can call this multiple times
// for one connection.
// When it doesn't want to do more requests and has all
// responses, it can close the connection.
async fn request(conn: &Connection, request: &[u8]) -> Result<Vec<u8>> {
let (mut send, mut recv) = conn.open_bi().await?;
send.write_all(request).await?;
send.finish()?;
let response = recv.read_to_end(MAX_RESPONSE_SIZE).await?;
Ok(response);
}
// The accepting endpoint will call this to handle all
// incoming requests on a single connection.
async fn handle_requests(conn: Connection) -> Result<()> {
while let Ok((send, recv)) = conn.accept_bi().await {
tokio::spawn(handle_request(send, recv));
}
// conn.accept_bi() will error out once the connection was
// closed, so we don't need to use conn.closed().
}
async fn handle_request(mut send: SendStream, mut recv: RecvStream) -> Result<()> {
let request = recv.read_to_end(MAX_REQUEST_SIZE).await?;
let response = compute_response(request);
send.write_all(&response).await?;
send.finish()?;
Ok(())
}
Please not that, in this case, the client doesn't immediately close the connection after a single request (duh!). Instead, it might want to optimistically keep the connection open for some idle time or until it knows the application won't need to make another request, and only then close the connection. All that said, it's still true that the connecting side closes the connection.
Multiple ordered Notifications
Sending and receiving multiple notifications that can be handled one-by-one can be done by adding framing to the bytes on a uni-directional stream.
async fn connecting_endpoint(conn: Connection, mut notifications: impl Stream<Item = Bytes>) -> Result<()> {
let send = conn.open_uni().await?;
let mut send_frame = LengthDelimitedCodec::builder().new_write(send);
while let Some(notfication) = notifications.next().await {
send_frame.send(notification).await?;
}
send.finish()?;
conn.closed().await;
Ok(())
}
async fn accepting_endpoint(conn: Connection) -> Result<()> {
let recv = conn.accept_uni().await?;
let mut recv_frame = LengthDelimitedCodec::builder().new_read(recv);
while let Some(notification) = recv_frame.try_next().await? {
println!("Received notification: {notification:?}");
}
conn.close(0u32, b"got everything!");
Ok(())
}
Here we're using LengthDelimitedCodec and tokio-util's codec feature to easily turn the SendStream and RecvStream that work as streams of bytes into streams of items, where each item in this case is a Bytes/BytesMut. In practice you would probably add byte parsing to this code first, and you might want to configure the LengthDelimitedCodec.
The resulting notifications are all in order since the bytes in the uni-directional streams are received in-order, and we're processing one frame before continuing to read the next bytes off of the QUIC stream.
There's another somewhat common way of doing this:
The order that accept_uni come in will match the order that open_uni are called on the remote endpoint. (The same also goes for bi-directional streams.)
This way you would receive one notification per stream and know the order of notifications from the stream ID/the order of accepted streams.
The downside of doing it that way is you will occupy more than one stream. If you want to multiplex other things on the same connection, you'll need to add some signaling.
Request with multiple Responses
If your protocol expects multiple responses for a request, we can implement that with the same primitive we've learned about in the section about multiple ordered notifications: We use framing to segment a single response byte stream into multiple ordered responses:
async fn connecting_endpoint(conn: Connection, request: &[u8]) -> Result<()> {
let (mut send, recv) = conn.open_bi().await?;
send.write_all(request).await?;
send.finish()?;
let mut recv_frame = LengthDelimitedCodec::builder().new_read(recv).await?;
while let Some(response) = recv_frame.try_next().await? {
println!("Received response: {response:?}");
}
conn.close(0u32, b"thank you!");
Ok(())
}
async fn accepting_endpoint(conn: Connection) -> Result<()> {
let (send, mut recv) = conn.accept_bi().await?;
let request = recv.read_to_end(MAX_REQUEST_SIZE).await?;
let mut send_frame = LengthDelimitedCodec::builder().new_write(send);
let mut responses = responses_for_Request(&request);
while let Some(response) = responses.next().await {
send_frame.send(response).await?;
}
conn.closed().await;
Ok(())
}
fn responses_for_request(req: &[u8]) -> impl Stream<Item = Bytes> {
// ...
}
This example ends up similar as the one with ordered notifications, except
- The roles are reversed: The length-prefix sending happens on the accepting endpoint, and the length-prefix decoding on the connecting endpoint.
- We additionally send a request before we start receiving multiple responses.
At this point you should have a good feel for how to write request/response protocols using the QUIC API. For example, you should be able to piece together a full-duplex request/response protocol where you're sending the request as multiple frames and the response comes in with multiple frames, too, by combining two length delimited codes in both ways and taking notes from the full duplex section further above.
Requests with multiple unordered Responses
The previous example required all responses to come in ordered. What if that's undesired? What if we want the connecting endpoint to receive incoming responses as quickly as possible? In that case, we need to break up the single response stream into multiple response streams. We can do this by "conceptually" splitting the "single" bi-directional stream into one uni-directional stream for the request and multiple uni-directional streams in the other direction for all the responses:
async fn connnecting_side(conn: Connection, request: &[u8]) -> Result<()> {
let mut send = conn.open_uni().await?;
send.write_all(request).await?;
send.finish()?;
let recv_tasks = TaskTracker::new();
// accept_uni will return `Err` once the connection is closed
while let Ok(recv) = conn.accept_uni().await? {
recv_tasks.spawn(handle_response(recv));
}
recv_tasks.wait().await;
conn.close(0u32, b"Thank you!");
}
You might've noticed that this destroys the "association" between the two stream directions. This means we can't use tricks similar to what HTTP3 does that we described above to multiplex multiple request-responses interactions on the same connection. This is unfortunate, but can be fixed by prefixing your requests and responses with a unique ID chosen per request. This ID then helps associate the responses to the requests that used the same ID. Another thing that might or might not be important for your use case is knowing when unordered stream of responses is "done": You can either introduce another message type that is interpreted as a finishing token, but there's another elegant way of solving this. Instead of only opening a uni-directional stream for the request, you open a bi-directional one. The response stream will only be used to indicate the final response stream ID. It then acts as a sort of "control stream" to provide auxiliary information about the request for the connecting endpoint.
Proxying UDP traffic using the unreliable datagram extension
Time-sensitive Real-time interaction
We often see users reaching for the QUIC datagram extension when implementing real-time protocols. Doing this is in most cases misguided. QUIC datagram sending still interacts with QUIC's congestion controller and thus are also acknowledged. Implementing traditional protocols on top of QUIC datagrams might thus not perform the way they were designed to. Instead, it's often better to use lots of streams that are then stopped, reset or prioritized.
A real-world example is the media over QUIC protocol (MoQ in short): MoQ is used to transfer live video frames. It uses one QUIC stream for each frame (QUIC streams are cheap to create)!
The receiver then stops streams that are "too old" to be delivered, e.g. because it's a live video stream and newer frames were already fully received. Similarly, the sending side will also reset older streams for the application level to indicate to the QUIC stack it doesn't need to keep re-trying the transmission of an outdated live video frame. (MoQ will actually also use stream prioritization to make sure the newest video frames get scheduled to be sent first.)
Use Streams with stop/reset instead of UDP datagrams
https://discord.com/channels/1161119546170687619/1195362941134962748/1407266901939327007
Use Stream priority to down-regulate secondary data
https://discord.com/channels/976380008299917365/1063547094863978677/1248723504246030336
Closing Connections
Gracefully closing connections can be tricky to get right when first working with the QUIC API. If you don't close connections gracefully, you'll see the connecting timing out on one endpoint, usually after 30s, even though another endpoint finishes promptly without errors. This happens when the endpoint that finishes doesn't notify the other endpoint about having finished operations. There's mainly two reasons this happens:
- The protocol doesn't call
Connection::closeat the right moment. - The endpoint that closes the connection is immediately dropped afterwards without waiting for
Endpoint::close. To make sure that you're not hitting (2), simply always make sure to wait forEndpoint::closeto resolve, on bothEndpoints, if you can afford it. Getting (1) right is harder. We might accidentally close connections too early, because we accidentally drop theConnection(which implicitly calls close). Instead, we should always keep around the connection and either wait forConnection::closedto resolve or callConnection::closeourselves at the right moment. When that is depends on what kind of protocol you're implementing:
After a single Interaction
Protocols that implement a single interaction want to keep their connection alive for only the time of this interaction.
In this case, the endpoint that received application data last will be the endpoint that calls Connection::close at that point in time.
Conversely, the other endpoint should wait for Connection::closed to resolve before ending its operations.
An example of this can be seen in the Request and Response section above: The connecting side closes the connection once it received the response and the accepting side waits for the connection to be closed after having sent off the response.
During continuous Interaction
Sometimes we want to keep open connections as long as the user is actively working with the application, so we don't needlessly run handshakes or try to hole-punch repeatedly.
In these cases, the protocol flow doesn't indicate which endpoint of the connection will be the one that closes the connection.
Instead, clients should concurrently monitor Connection::closed while they're running the protocol:
async fn handle_connection(conn: Connection) -> Result<()> {
futures_lite::future::race(
run_protocol(conn.clone()),
async move {
conn.closed().await;
anyhow::Ok(())
},
)?;
Ok(())
}
async fn run_protocol(conn: Connection) -> Result<()> {
// run normal protocol flow
// once we realize we want to abort the connection flow
conn.close(0u32.into(), b"ah sorry, have to go!");
Ok(())
}
And again, after handle_connection we need to make sure to wait for Endpoint::close to resolve.
Aborting Streams
https://discord.com/channels/949724860232392765/1399719019292000329/1399721482522984502
As Sender
As Receiver
QUIC 0-RTT features
Server-side 0.5-RTT
- always works
- gotcha: Request can be replayed