trillium_http/h3/connection.rs
1use super::{
2 H3Error,
3 frame::{Frame, FrameDecodeError, UniStreamType},
4 quic_varint::{self, QuicVarIntError},
5 settings::H3Settings,
6};
7use crate::{
8 Buffer, Conn, HttpContext,
9 h3::{H3ErrorCode, MAX_BUFFER_SIZE},
10 headers::qpack::{DecoderDynamicTable, EncoderDynamicTable, FieldSection},
11};
12use futures_lite::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
13use std::{
14 future::Future,
15 io::{self, ErrorKind},
16 pin::Pin,
17 sync::{
18 Arc, OnceLock,
19 atomic::{AtomicBool, AtomicU64, Ordering},
20 },
21 task::{Context, Poll},
22};
23use swansong::{ShutdownCompletion, Swansong};
24
25/// The result of processing an HTTP/3 bidirectional stream.
26#[derive(Debug)]
27#[allow(clippy::large_enum_variant)] // Request is the hot path; boxing it would add an allocation per request
28pub enum H3StreamResult<Transport> {
29 /// The stream carried a normal HTTP/3 request.
30 Request(Conn<Transport>),
31
32 /// The stream carries a WebTransport bidirectional data stream. The `session_id` identifies
33 /// the associated WebTransport session.
34 WebTransport {
35 /// The WebTransport session ID (stream ID of the CONNECT request).
36 session_id: u64,
37 /// The underlying transport, ready for application data.
38 transport: Transport,
39 /// Any bytes buffered after the session ID during stream negotiation.
40 buffer: Buffer,
41 },
42}
43
44/// The result of processing an HTTP/3 unidirectional stream.
45#[derive(Debug)]
46pub enum UniStreamResult<T> {
47 /// The stream was a known internal type (control, QPACK encoder/decoder) and was handled
48 /// automatically.
49 Handled,
50
51 /// A WebTransport unidirectional data stream. The `session_id` identifies the associated
52 /// WebTransport session.
53 WebTransport {
54 /// The WebTransport session ID.
55 session_id: u64,
56 /// The receive stream, ready for application data.
57 stream: T,
58 /// Any bytes buffered after the session ID during stream negotiation.
59 buffer: Buffer,
60 },
61
62 /// An unknown or unsupported stream type (e.g. Push). The caller should close or reset
63 /// this stream without processing it.
64 Unknown {
65 /// The raw stream type value.
66 stream_type: u64,
67 /// The stream.
68 stream: T,
69 },
70}
71
72/// Shared state for a single HTTP/3 QUIC connection.
73///
74/// Call the appropriate methods on this type for each stream accepted from the QUIC connection.
75#[derive(Debug)]
76pub struct H3Connection {
77 /// Shared configuration for the entire server, including tcp-based listeners
78 context: Arc<HttpContext>,
79
80 /// Connection-scoped shutdown signal. Shut down when we receive GOAWAY from the peer or when
81 /// the server-level Swansong shuts down. Request stream tasks use this to interrupt
82 /// in-progress work.
83 swansong: Swansong,
84
85 /// The peer's H3 settings, received on their control stream. Request streams may need to
86 /// consult these (e.g. max field section size).
87 peer_settings: OnceLock<H3Settings>,
88
89 /// The highest bidirectional stream ID we have accepted. Used to compute the GOAWAY value
90 /// (this + 4) to tell the peer which requests we saw. None until the first stream is accepted.
91 /// Updated by the runtime adapter's accept loop via [`record_accepted_stream`].
92 max_accepted_stream_id: AtomicU64,
93
94 /// Whether we have accepted any streams yet.
95 has_accepted_stream: AtomicBool,
96
97 /// The decoder-side QPACK dynamic table for this connection.
98 decoder_dynamic_table: DecoderDynamicTable,
99
100 /// The encoder-side QPACK dynamic table for this connection.
101 encoder_dynamic_table: EncoderDynamicTable,
102}
103
104impl H3Connection {
105 /// Construct a new `H3Connection` to manage HTTP/3 for a given peer.
106 pub fn new(context: Arc<HttpContext>) -> Arc<Self> {
107 let swansong = context.swansong.child();
108 let max_table_capacity = context.config.h3_max_table_capacity;
109 let blocked_streams = context.config.h3_blocked_streams;
110 let encoder_dynamic_table = EncoderDynamicTable::new(&context);
111 Arc::new(Self {
112 context,
113 swansong,
114 peer_settings: OnceLock::new(),
115 max_accepted_stream_id: AtomicU64::new(0),
116 has_accepted_stream: AtomicBool::new(false),
117 decoder_dynamic_table: DecoderDynamicTable::new(max_table_capacity, blocked_streams),
118 encoder_dynamic_table,
119 })
120 }
121
122 /// Retrieve the [`Swansong`] shutdown handle for this HTTP/3 connection. See also
123 /// [`H3Connection::shut_down`]
124 pub fn swansong(&self) -> &Swansong {
125 &self.swansong
126 }
127
128 /// Attempt graceful shutdown of this HTTP/3 connection (all streams).
129 ///
130 /// The returned [`ShutdownCompletion`] type can
131 /// either be awaited in an async context or blocked on with [`ShutdownCompletion::block`] in a
132 /// blocking context
133 ///
134 /// Note that this will NOT shut down the server. To shut down the whole server, use
135 /// [`HttpContext::shut_down`]
136 pub fn shut_down(&self) -> ShutdownCompletion {
137 // Wake any in-flight `decode_field_section` calls parked on the decoder
138 // table's `ThresholdWait` (a non-I/O future awaiting dynamic-table inserts
139 // from the peer). The encoder table's writer loop is already swansong-
140 // aware, but we mark it failed too for symmetry: any future state
141 // mutations after shutdown are no longer wire-relevant.
142 self.decoder_dynamic_table.fail(H3ErrorCode::NoError);
143 self.encoder_dynamic_table.fail(H3ErrorCode::NoError);
144 self.swansong.shut_down()
145 }
146
147 /// Retrieve the [`HttpContext`] for this server.
148 pub fn context(&self) -> Arc<HttpContext> {
149 self.context.clone()
150 }
151
152 /// Returns the peer's HTTP/3 settings, available once the peer's control stream has been
153 /// processed.
154 pub fn peer_settings(&self) -> Option<&H3Settings> {
155 self.peer_settings.get()
156 }
157
158 /// Record that we accepted a bidirectional stream with this ID.
159 fn record_accepted_stream(&self, stream_id: u64) {
160 self.max_accepted_stream_id
161 .fetch_max(stream_id, Ordering::Relaxed);
162 self.has_accepted_stream.store(true, Ordering::Relaxed);
163 }
164
165 /// The stream ID to send in a GOAWAY frame: one past the highest stream we accepted, or 0 if we
166 /// haven't accepted any.
167 fn goaway_id(&self) -> u64 {
168 if self.has_accepted_stream.load(Ordering::Relaxed) {
169 self.max_accepted_stream_id.load(Ordering::Relaxed) + 4
170 } else {
171 0
172 }
173 }
174
175 /// Process a single HTTP/3 request-response cycle on a bidirectional stream.
176 ///
177 /// Call this once per accepted bidirectional stream. Returns
178 /// [`H3StreamResult::WebTransport`] if the stream opens a WebTransport session rather than
179 /// a standard HTTP/3 request.
180 ///
181 /// # Errors
182 ///
183 /// Returns an `H3Error` in case of io error or http/3 semantic error.
184 pub async fn process_inbound_bidi<Transport, Handler, Fut>(
185 self: Arc<Self>,
186 transport: Transport,
187 handler: Handler,
188 stream_id: u64,
189 ) -> Result<H3StreamResult<Transport>, H3Error>
190 where
191 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
192 Handler: FnOnce(Conn<Transport>) -> Fut,
193 Fut: Future<Output = Conn<Transport>>,
194 {
195 self.record_accepted_stream(stream_id);
196 let _guard = self.swansong.guard();
197 let buffer = Vec::with_capacity(self.context.config.request_buffer_initial_len).into();
198 match Conn::new_h3(self, transport, buffer, stream_id).await? {
199 H3StreamResult::Request(conn) => Ok(H3StreamResult::Request(
200 handler(conn).await.send_h3().await?,
201 )),
202 wt @ H3StreamResult::WebTransport { .. } => Ok(wt),
203 }
204 }
205
206 /// Decode a QPACK-encoded field section, consulting the dynamic table as needed.
207 ///
208 /// If the field section's Required Insert Count is greater than zero, waits until the
209 /// dynamic table has received enough entries. Returns an error on protocol violations or
210 /// if the encoder stream fails while waiting.
211 ///
212 /// Duplicate pseudo-headers are silently ignored (first value wins).
213 /// Unknown pseudo-headers are rejected per RFC 9114 §4.1.1.
214 ///
215 /// # Errors
216 ///
217 /// Returns an error if the encoded bytes cannot be parsed as a valid field section.
218 #[cfg(feature = "unstable")]
219 pub async fn decode_field_section(
220 &self,
221 encoded: &[u8],
222 stream_id: u64,
223 ) -> Result<FieldSection<'static>, H3Error> {
224 self.decoder_dynamic_table.decode(encoded, stream_id).await
225 }
226
227 #[cfg(not(feature = "unstable"))]
228 pub(crate) async fn decode_field_section(
229 &self,
230 encoded: &[u8],
231 stream_id: u64,
232 ) -> Result<FieldSection<'static>, H3Error> {
233 self.decoder_dynamic_table.decode(encoded, stream_id).await
234 }
235
236 /// Encode a QPACK field section from pseudo-headers and headers.
237 ///
238 /// This currently uses only the static table (no dynamic table).
239 /// Decode a QPACK-encoded field section, consulting the dynamic table as needed.
240 ///
241 /// # Errors
242 ///
243 /// Returns an `H3Error` in case of http/3 semantic error.
244 #[cfg(feature = "unstable")]
245 #[allow(clippy::unnecessary_wraps, reason = "future-proofing api")]
246 pub fn encode_field_section(
247 &self,
248 field_section: &FieldSection<'_>,
249 buf: &mut Vec<u8>,
250 stream_id: u64,
251 ) -> Result<(), H3Error> {
252 self.encoder_dynamic_table
253 .encode(field_section, buf, stream_id);
254 Ok(())
255 }
256
257 #[cfg(not(feature = "unstable"))]
258 #[allow(clippy::unnecessary_wraps, reason = "future-proofing api")]
259 pub(crate) fn encode_field_section(
260 &self,
261 field_section: &FieldSection<'_>,
262 buf: &mut Vec<u8>,
263 stream_id: u64,
264 ) -> Result<(), H3Error> {
265 self.encoder_dynamic_table
266 .encode(field_section, buf, stream_id);
267 Ok(())
268 }
269
270 /// Run this server's HTTP/3 outbound control stream.
271 ///
272 /// Sends the initial SETTINGS frame, then sends GOAWAY when the connection shuts down.
273 /// Returns after GOAWAY is sent; keep the stream open until the QUIC connection closes
274 /// (closing a control stream is a connection error per RFC 9114 §6.2.1).
275 ///
276 /// # Errors
277 ///
278 /// Returns an `H3Error` in case of io error or http/3 semantic error.
279 pub async fn run_outbound_control<T>(&self, mut stream: T) -> Result<(), H3Error>
280 where
281 T: AsyncWrite + Unpin + Send,
282 {
283 let mut buf = vec![0; 128];
284
285 // Stream type + SETTINGS frame
286 let settings = Frame::Settings(H3Settings::from(&self.context.config));
287 log::trace!(
288 "H3 outbound control: sending SETTINGS: {:?}",
289 H3Settings::from(&self.context.config)
290 );
291
292 write(&mut buf, &mut stream, |buf| {
293 let mut written = quic_varint::encode(UniStreamType::Control, buf)?;
294 written += settings.encode(&mut buf[written..])?;
295 Some(written)
296 })
297 .await?;
298 log::trace!("H3 outbound control: SETTINGS sent");
299
300 // Wait for shutdown
301 self.swansong.clone().await;
302
303 // Send GOAWAY
304 write(&mut buf, &mut stream, |buf| {
305 Frame::Goaway(self.goaway_id()).encode(buf)
306 })
307 .await?;
308
309 Ok(())
310 }
311
312 /// Run the outbound QPACK encoder stream for the duration of the connection.
313 ///
314 /// Writes the stream type byte, then drains encoder-stream instructions from the encoder
315 /// dynamic table as they are enqueued. Returns when the connection shuts down or the table is
316 /// marked failed.
317 ///
318 /// # Errors
319 ///
320 /// Returns an `H3Error` in case of io error.
321 pub async fn run_encoder<T>(&self, mut stream: T) -> Result<(), H3Error>
322 where
323 T: AsyncWrite + Unpin + Send,
324 {
325 self.encoder_dynamic_table
326 .run_writer(&mut stream, self.swansong.clone())
327 .await
328 }
329
330 /// Run the outbound QPACK decoder stream for the duration of the connection.
331 ///
332 /// Writes the stream type byte, then loops sending Section Acknowledgement and Insert
333 /// Count Increment instructions as they become needed. Returns when the connection
334 /// shuts down.
335 ///
336 /// # Errors
337 ///
338 /// Returns an `H3Error` in case of io error or http/3 semantic error.
339 pub async fn run_decoder<T>(&self, mut stream: T) -> Result<(), H3Error>
340 where
341 T: AsyncWrite + Unpin + Send,
342 {
343 self.decoder_dynamic_table
344 .run_writer(&mut stream, self.swansong.clone())
345 .await
346 }
347
348 /// Handle an inbound unidirectional HTTP/3 stream from the peer.
349 ///
350 /// Internal stream types (control, QPACK encoder/decoder) are handled automatically;
351 /// application streams are returned via [`UniStreamResult`] for the caller to process.
352 ///
353 /// # Errors
354 ///
355 /// Returns a `H3Error` in case of io error or http/3 semantic error.
356 pub async fn process_inbound_uni<T>(&self, mut stream: T) -> Result<UniStreamResult<T>, H3Error>
357 where
358 T: AsyncRead + Unpin + Send,
359 {
360 self.swansong
361 .interrupt(async move {
362 let mut buf = vec![0; 128];
363 let mut filled = 0;
364
365 // Read stream type varint (decode as raw u64 to handle unknown types)
366 let stream_type =
367 read(
368 &mut buf,
369 &mut filled,
370 &mut stream,
371 |data| match quic_varint::decode(data) {
372 Ok(ok) => Ok(Some(ok)),
373 Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
374 // this branch is unreachable because u64 is always From<u64>
375 Err(QuicVarIntError::UnknownValue { bytes, value }) => {
376 Ok(Some((value, bytes)))
377 }
378 },
379 )
380 .await?;
381
382 match UniStreamType::try_from(stream_type) {
383 Ok(UniStreamType::Control) => {
384 log::trace!("H3 inbound uni: control stream");
385 self.run_inbound_control(&mut buf, &mut filled, &mut stream)
386 .await?;
387 Ok(UniStreamResult::Handled)
388 }
389
390 Ok(UniStreamType::QpackEncoder) => {
391 log::trace!(
392 "H3 inbound uni: QPACK encoder stream ({filled} bytes pre-read)"
393 );
394 let mut reader = Prepended {
395 head: &buf[..filled],
396 tail: stream,
397 };
398
399 log::trace!("QPACK encoder stream: started");
400 self.decoder_dynamic_table.run_reader(&mut reader).await?;
401
402 Ok(UniStreamResult::Handled)
403 }
404
405 Ok(UniStreamType::QpackDecoder) => {
406 log::trace!(
407 "H3 inbound uni: QPACK decoder stream ({filled} bytes pre-read)"
408 );
409 let mut reader = Prepended {
410 head: &buf[..filled],
411 tail: stream,
412 };
413 self.encoder_dynamic_table.run_reader(&mut reader).await?;
414 Ok(UniStreamResult::Handled)
415 }
416
417 Ok(UniStreamType::WebTransport) => {
418 log::trace!("H3 inbound uni: WebTransport stream");
419 let session_id = read(&mut buf, &mut filled, &mut stream, |data| {
420 match quic_varint::decode(data) {
421 Ok(ok) => Ok(Some(ok)),
422 Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
423 Err(QuicVarIntError::UnknownValue { bytes, value }) => {
424 Ok(Some((value, bytes)))
425 }
426 }
427 })
428 .await?;
429
430 buf.truncate(filled);
431
432 Ok(UniStreamResult::WebTransport {
433 session_id,
434 stream,
435 buffer: buf.into(),
436 })
437 }
438
439 Ok(UniStreamType::Push) | Err(_) => {
440 log::trace!("H3 inbound uni: unknown stream type {stream_type:#x}");
441 Ok(UniStreamResult::Unknown {
442 stream_type,
443 stream,
444 })
445 }
446 }
447 })
448 .await
449 .unwrap_or(Ok(UniStreamResult::Handled)) // interrupted
450 }
451
452 /// Handle the http/3 peer's inbound control stream.
453 ///
454 /// # Errors
455 ///
456 /// Returns a `H3Error` in case of io error or HTTP/3 semantic error.
457 // The first frame must be SETTINGS. After that, watches for
458 // GOAWAY to initiate connection shutdown.
459 async fn run_inbound_control<T>(
460 &self,
461 buf: &mut Vec<u8>,
462 filled: &mut usize,
463 stream: &mut T,
464 ) -> Result<(), H3Error>
465 where
466 T: AsyncRead + Unpin + Send,
467 {
468 // First frame must be SETTINGS (§6.2.1)
469 let settings = read(buf, filled, stream, |data| match Frame::decode(data) {
470 Ok((Frame::Settings(s), consumed)) => Ok(Some((s, consumed))),
471 Ok(_) => Err(H3ErrorCode::FrameUnexpected),
472 Err(FrameDecodeError::Incomplete) => Ok(None),
473 Err(FrameDecodeError::Error(code)) => Err(code),
474 })
475 .await?;
476
477 log::trace!("H3 peer settings: {settings:?}");
478
479 self.peer_settings
480 .set(settings)
481 .map_err(|_| H3ErrorCode::FrameUnexpected)?;
482
483 self.encoder_dynamic_table
484 .initialize_from_peer_settings(settings);
485
486 // Read subsequent frames, watching for GOAWAY
487 loop {
488 let frame = self
489 .swansong
490 .interrupt(read(buf, filled, stream, |data| {
491 match Frame::decode(data) {
492 Ok((frame, consumed)) => Ok(Some((frame, consumed))),
493 Err(FrameDecodeError::Incomplete) => Ok(None),
494 Err(FrameDecodeError::Error(code)) => Err(code),
495 }
496 }))
497 .await
498 .transpose()?;
499
500 match frame {
501 None => {
502 log::trace!("H3 control stream: interrupted by shutdown");
503 return Ok(());
504 }
505
506 Some(Frame::Goaway(id)) => {
507 log::trace!("H3 control stream: peer sent GOAWAY(stream_id={id})");
508 self.swansong.shut_down();
509 return Ok(());
510 }
511
512 Some(Frame::Settings(_)) => {
513 return Err(H3ErrorCode::FrameUnexpected.into());
514 }
515
516 Some(Frame::Unknown(n)) => {
517 // RFC 9114 §7.2.8: unknown frame types MUST be ignored.
518 // We must also consume the payload bytes so the stream stays synchronized.
519 log::trace!("H3 control stream: skipping unknown frame (payload {n} bytes)");
520 let n = usize::try_from(n).unwrap_or(usize::MAX);
521 let in_buf = n.min(*filled);
522 buf.copy_within(in_buf..*filled, 0);
523 *filled -= in_buf;
524 let mut todo = n - in_buf;
525 let mut scratch = [0u8; 256];
526 while todo > 0 {
527 let to_read = todo.min(scratch.len());
528 let n = stream
529 .read(&mut scratch[..to_read])
530 .await
531 .map_err(H3Error::Io)?;
532 if n == 0 {
533 return Err(H3ErrorCode::ClosedCriticalStream.into());
534 }
535 todo -= n;
536 }
537 }
538 other => {
539 log::trace!("H3 control stream: ignoring {other:?}");
540 }
541 }
542 }
543 }
544}
545
546async fn write(
547 buf: &mut Vec<u8>,
548 mut stream: impl AsyncWrite + Unpin + Send,
549 mut f: impl FnMut(&mut [u8]) -> Option<usize>,
550) -> io::Result<usize> {
551 let written = loop {
552 if let Some(w) = f(buf) {
553 break w;
554 }
555 if buf.len() >= MAX_BUFFER_SIZE {
556 return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation"));
557 }
558 buf.resize(buf.len() * 2, 0);
559 };
560
561 stream.write_all(&buf[..written]).await?;
562 stream.flush().await?;
563 Ok(written)
564}
565
566/// An `AsyncRead` adapter that drains a byte slice before reading from an inner stream.
567///
568/// Used in `process_inbound_uni` to replay bytes that were read ahead while
569/// parsing the stream-type varint before dispatching to `run_inbound_encoder`.
570struct Prepended<'a, T> {
571 head: &'a [u8],
572 tail: T,
573}
574
575impl<T: AsyncRead + Unpin> AsyncRead for Prepended<'_, T> {
576 fn poll_read(
577 self: Pin<&mut Self>,
578 cx: &mut Context<'_>,
579 out: &mut [u8],
580 ) -> Poll<io::Result<usize>> {
581 let this = self.get_mut();
582 if !this.head.is_empty() {
583 let n = this.head.len().min(out.len());
584 out[..n].copy_from_slice(&this.head[..n]);
585 this.head = &this.head[n..];
586 return Poll::Ready(Ok(n));
587 }
588 Pin::new(&mut this.tail).poll_read(cx, out)
589 }
590}
591
592/// Read from `stream` into `buf` until `f` can decode a value.
593///
594/// `f` receives the filled portion of the buffer and returns:
595/// - `Ok(Some((value, consumed)))` — success; consumed bytes are removed from the front
596/// - `Ok(None)` — need more data; reads more bytes and retries
597/// - `Err(e)` — unrecoverable error; propagated to caller
598async fn read<R>(
599 buf: &mut Vec<u8>,
600 filled: &mut usize,
601 stream: &mut (impl AsyncRead + Unpin + Send),
602 f: impl Fn(&[u8]) -> Result<Option<(R, usize)>, H3ErrorCode>,
603) -> Result<R, H3Error> {
604 loop {
605 if let Some((result, consumed)) = f(&buf[..*filled])? {
606 buf.copy_within(consumed..*filled, 0);
607 *filled -= consumed;
608 return Ok(result);
609 }
610
611 if *filled >= buf.len() {
612 if buf.len() >= MAX_BUFFER_SIZE {
613 return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation").into());
614 }
615 buf.resize(buf.len() * 2, 0);
616 }
617
618 let n = stream.read(&mut buf[*filled..]).await?;
619 if n == 0 {
620 return Err(io::Error::new(ErrorKind::UnexpectedEof, "stream closed").into());
621 }
622 *filled += n;
623 }
624}