Skip to main content

trillium_http/h3/
connection.rs

1use super::{
2    H3Error,
3    frame::{Frame, FrameDecodeError, UniStreamType},
4    quic_varint::{self, QuicVarIntError},
5    settings::H3Settings,
6};
7use crate::{
8    Buffer, Conn, HttpContext,
9    h3::{H3ErrorCode, MAX_BUFFER_SIZE},
10    headers::qpack::{DecoderDynamicTable, EncoderDynamicTable, FieldSection},
11};
12use futures_lite::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
13use std::{
14    future::Future,
15    io::{self, ErrorKind},
16    pin::Pin,
17    sync::{
18        Arc, OnceLock,
19        atomic::{AtomicBool, AtomicU64, Ordering},
20    },
21    task::{Context, Poll},
22};
23use swansong::{ShutdownCompletion, Swansong};
24
25/// The result of processing an HTTP/3 bidirectional stream.
26#[derive(Debug)]
27#[allow(clippy::large_enum_variant)] // Request is the hot path; boxing it would add an allocation per request
28pub enum H3StreamResult<Transport> {
29    /// The stream carried a normal HTTP/3 request.
30    Request(Conn<Transport>),
31
32    /// The stream carries a WebTransport bidirectional data stream. The `session_id` identifies
33    /// the associated WebTransport session.
34    WebTransport {
35        /// The WebTransport session ID (stream ID of the CONNECT request).
36        session_id: u64,
37        /// The underlying transport, ready for application data.
38        transport: Transport,
39        /// Any bytes buffered after the session ID during stream negotiation.
40        buffer: Buffer,
41    },
42}
43
44/// The result of processing an HTTP/3 unidirectional stream.
45#[derive(Debug)]
46pub enum UniStreamResult<T> {
47    /// The stream was a known internal type (control, QPACK encoder/decoder) and was handled
48    /// automatically.
49    Handled,
50
51    /// A WebTransport unidirectional data stream. The `session_id` identifies the associated
52    /// WebTransport session.
53    WebTransport {
54        /// The WebTransport session ID.
55        session_id: u64,
56        /// The receive stream, ready for application data.
57        stream: T,
58        /// Any bytes buffered after the session ID during stream negotiation.
59        buffer: Buffer,
60    },
61
62    /// An unknown or unsupported stream type (e.g. Push). The caller should close or reset
63    /// this stream without processing it.
64    Unknown {
65        /// The raw stream type value.
66        stream_type: u64,
67        /// The stream.
68        stream: T,
69    },
70}
71
72/// Shared state for a single HTTP/3 QUIC connection.
73///
74/// Call the appropriate methods on this type for each stream accepted from the QUIC connection.
75#[derive(Debug)]
76pub struct H3Connection {
77    /// Shared configuration for the entire server, including tcp-based listeners
78    context: Arc<HttpContext>,
79
80    /// Connection-scoped shutdown signal. Shut down when we receive GOAWAY from the peer or when
81    /// the server-level Swansong shuts down.  Request stream tasks use this to interrupt
82    /// in-progress work.
83    swansong: Swansong,
84
85    /// The peer's H3 settings, received on their control stream.  Request streams may need to
86    /// consult these (e.g. max field section size).
87    peer_settings: OnceLock<H3Settings>,
88
89    /// The highest bidirectional stream ID we have accepted.  Used to compute the GOAWAY value
90    /// (this + 4) to tell the peer which requests we saw. None until the first stream is accepted.
91    /// Updated by the runtime adapter's accept loop via [`record_accepted_stream`].
92    max_accepted_stream_id: AtomicU64,
93
94    /// Whether we have accepted any streams yet.
95    has_accepted_stream: AtomicBool,
96
97    /// The decoder-side QPACK dynamic table for this connection.
98    decoder_dynamic_table: DecoderDynamicTable,
99
100    /// The encoder-side QPACK dynamic table for this connection.
101    encoder_dynamic_table: EncoderDynamicTable,
102}
103
104impl H3Connection {
105    /// Construct a new `H3Connection` to manage HTTP/3 for a given peer.
106    pub fn new(context: Arc<HttpContext>) -> Arc<Self> {
107        let swansong = context.swansong.child();
108        let max_table_capacity = context.config.h3_max_table_capacity;
109        let blocked_streams = context.config.h3_blocked_streams;
110        let encoder_dynamic_table = EncoderDynamicTable::new(&context);
111        Arc::new(Self {
112            context,
113            swansong,
114            peer_settings: OnceLock::new(),
115            max_accepted_stream_id: AtomicU64::new(0),
116            has_accepted_stream: AtomicBool::new(false),
117            decoder_dynamic_table: DecoderDynamicTable::new(max_table_capacity, blocked_streams),
118            encoder_dynamic_table,
119        })
120    }
121
122    /// Retrieve the [`Swansong`] shutdown handle for this HTTP/3 connection. See also
123    /// [`H3Connection::shut_down`]
124    pub fn swansong(&self) -> &Swansong {
125        &self.swansong
126    }
127
128    /// Attempt graceful shutdown of this HTTP/3 connection (all streams).
129    ///
130    /// The returned [`ShutdownCompletion`] type can
131    /// either be awaited in an async context or blocked on with [`ShutdownCompletion::block`] in a
132    /// blocking context
133    ///
134    /// Note that this will NOT shut down the server. To shut down the whole server, use
135    /// [`HttpContext::shut_down`]
136    pub fn shut_down(&self) -> ShutdownCompletion {
137        // Wake any in-flight `decode_field_section` calls parked on the decoder
138        // table's `ThresholdWait` (a non-I/O future awaiting dynamic-table inserts
139        // from the peer). The encoder table's writer loop is already swansong-
140        // aware, but we mark it failed too for symmetry: any future state
141        // mutations after shutdown are no longer wire-relevant.
142        self.decoder_dynamic_table.fail(H3ErrorCode::NoError);
143        self.encoder_dynamic_table.fail(H3ErrorCode::NoError);
144        self.swansong.shut_down()
145    }
146
147    /// Retrieve the [`HttpContext`] for this server.
148    pub fn context(&self) -> Arc<HttpContext> {
149        self.context.clone()
150    }
151
152    /// Returns the peer's HTTP/3 settings, available once the peer's control stream has been
153    /// processed.
154    pub fn peer_settings(&self) -> Option<&H3Settings> {
155        self.peer_settings.get()
156    }
157
158    /// Record that we accepted a bidirectional stream with this ID.
159    fn record_accepted_stream(&self, stream_id: u64) {
160        self.max_accepted_stream_id
161            .fetch_max(stream_id, Ordering::Relaxed);
162        self.has_accepted_stream.store(true, Ordering::Relaxed);
163    }
164
165    /// The stream ID to send in a GOAWAY frame: one past the highest stream we accepted, or 0 if we
166    /// haven't accepted any.
167    fn goaway_id(&self) -> u64 {
168        if self.has_accepted_stream.load(Ordering::Relaxed) {
169            self.max_accepted_stream_id.load(Ordering::Relaxed) + 4
170        } else {
171            0
172        }
173    }
174
175    /// Process a single HTTP/3 request-response cycle on a bidirectional stream.
176    ///
177    /// Call this once per accepted bidirectional stream. Returns
178    /// [`H3StreamResult::WebTransport`] if the stream opens a WebTransport session rather than
179    /// a standard HTTP/3 request.
180    ///
181    /// # Errors
182    ///
183    /// Returns an `H3Error` in case of io error or http/3 semantic error.
184    pub async fn process_inbound_bidi<Transport, Handler, Fut>(
185        self: Arc<Self>,
186        transport: Transport,
187        handler: Handler,
188        stream_id: u64,
189    ) -> Result<H3StreamResult<Transport>, H3Error>
190    where
191        Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
192        Handler: FnOnce(Conn<Transport>) -> Fut,
193        Fut: Future<Output = Conn<Transport>>,
194    {
195        self.record_accepted_stream(stream_id);
196        let _guard = self.swansong.guard();
197        let buffer = Vec::with_capacity(self.context.config.request_buffer_initial_len).into();
198        match Conn::new_h3(self, transport, buffer, stream_id).await? {
199            H3StreamResult::Request(conn) => Ok(H3StreamResult::Request(
200                handler(conn).await.send_h3().await?,
201            )),
202            wt @ H3StreamResult::WebTransport { .. } => Ok(wt),
203        }
204    }
205
206    /// Decode a QPACK-encoded field section, consulting the dynamic table as needed.
207    ///
208    /// If the field section's Required Insert Count is greater than zero, waits until the
209    /// dynamic table has received enough entries. Returns an error on protocol violations or
210    /// if the encoder stream fails while waiting.
211    ///
212    /// Duplicate pseudo-headers are silently ignored (first value wins).
213    /// Unknown pseudo-headers are rejected per RFC 9114 §4.1.1.
214    ///
215    /// # Errors
216    ///
217    /// Returns an error if the encoded bytes cannot be parsed as a valid field section.
218    #[cfg(feature = "unstable")]
219    pub async fn decode_field_section(
220        &self,
221        encoded: &[u8],
222        stream_id: u64,
223    ) -> Result<FieldSection<'static>, H3Error> {
224        self.decoder_dynamic_table.decode(encoded, stream_id).await
225    }
226
227    #[cfg(not(feature = "unstable"))]
228    pub(crate) async fn decode_field_section(
229        &self,
230        encoded: &[u8],
231        stream_id: u64,
232    ) -> Result<FieldSection<'static>, H3Error> {
233        self.decoder_dynamic_table.decode(encoded, stream_id).await
234    }
235
236    /// Encode a QPACK field section from pseudo-headers and headers.
237    ///
238    /// This currently uses only the static table (no dynamic table).
239    /// Decode a QPACK-encoded field section, consulting the dynamic table as needed.
240    ///
241    /// # Errors
242    ///
243    /// Returns an `H3Error` in case of http/3 semantic error.
244    #[cfg(feature = "unstable")]
245    #[allow(clippy::unnecessary_wraps, reason = "future-proofing api")]
246    pub fn encode_field_section(
247        &self,
248        field_section: &FieldSection<'_>,
249        buf: &mut Vec<u8>,
250        stream_id: u64,
251    ) -> Result<(), H3Error> {
252        self.encoder_dynamic_table
253            .encode(field_section, buf, stream_id);
254        Ok(())
255    }
256
257    #[cfg(not(feature = "unstable"))]
258    #[allow(clippy::unnecessary_wraps, reason = "future-proofing api")]
259    pub(crate) fn encode_field_section(
260        &self,
261        field_section: &FieldSection<'_>,
262        buf: &mut Vec<u8>,
263        stream_id: u64,
264    ) -> Result<(), H3Error> {
265        self.encoder_dynamic_table
266            .encode(field_section, buf, stream_id);
267        Ok(())
268    }
269
270    /// Run this server's HTTP/3 outbound control stream.
271    ///
272    /// Sends the initial SETTINGS frame, then sends GOAWAY when the connection shuts down.
273    /// Returns after GOAWAY is sent; keep the stream open until the QUIC connection closes
274    /// (closing a control stream is a connection error per RFC 9114 §6.2.1).
275    ///
276    /// # Errors
277    ///
278    /// Returns an `H3Error` in case of io error or http/3 semantic error.
279    pub async fn run_outbound_control<T>(&self, mut stream: T) -> Result<(), H3Error>
280    where
281        T: AsyncWrite + Unpin + Send,
282    {
283        let mut buf = vec![0; 128];
284
285        // Stream type + SETTINGS frame
286        let settings = Frame::Settings(H3Settings::from(&self.context.config));
287        log::trace!(
288            "H3 outbound control: sending SETTINGS: {:?}",
289            H3Settings::from(&self.context.config)
290        );
291
292        write(&mut buf, &mut stream, |buf| {
293            let mut written = quic_varint::encode(UniStreamType::Control, buf)?;
294            written += settings.encode(&mut buf[written..])?;
295            Some(written)
296        })
297        .await?;
298        log::trace!("H3 outbound control: SETTINGS sent");
299
300        // Wait for shutdown
301        self.swansong.clone().await;
302
303        // Send GOAWAY
304        write(&mut buf, &mut stream, |buf| {
305            Frame::Goaway(self.goaway_id()).encode(buf)
306        })
307        .await?;
308
309        Ok(())
310    }
311
312    /// Run the outbound QPACK encoder stream for the duration of the connection.
313    ///
314    /// Writes the stream type byte, then drains encoder-stream instructions from the encoder
315    /// dynamic table as they are enqueued. Returns when the connection shuts down or the table is
316    /// marked failed.
317    ///
318    /// # Errors
319    ///
320    /// Returns an `H3Error` in case of io error.
321    pub async fn run_encoder<T>(&self, mut stream: T) -> Result<(), H3Error>
322    where
323        T: AsyncWrite + Unpin + Send,
324    {
325        self.encoder_dynamic_table
326            .run_writer(&mut stream, self.swansong.clone())
327            .await
328    }
329
330    /// Run the outbound QPACK decoder stream for the duration of the connection.
331    ///
332    /// Writes the stream type byte, then loops sending Section Acknowledgement and Insert
333    /// Count Increment instructions as they become needed. Returns when the connection
334    /// shuts down.
335    ///
336    /// # Errors
337    ///
338    /// Returns an `H3Error` in case of io error or http/3 semantic error.
339    pub async fn run_decoder<T>(&self, mut stream: T) -> Result<(), H3Error>
340    where
341        T: AsyncWrite + Unpin + Send,
342    {
343        self.decoder_dynamic_table
344            .run_writer(&mut stream, self.swansong.clone())
345            .await
346    }
347
348    /// Handle an inbound unidirectional HTTP/3 stream from the peer.
349    ///
350    /// Internal stream types (control, QPACK encoder/decoder) are handled automatically;
351    /// application streams are returned via [`UniStreamResult`] for the caller to process.
352    ///
353    /// # Errors
354    ///
355    /// Returns a `H3Error` in case of io error or http/3 semantic error.
356    pub async fn process_inbound_uni<T>(&self, mut stream: T) -> Result<UniStreamResult<T>, H3Error>
357    where
358        T: AsyncRead + Unpin + Send,
359    {
360        self.swansong
361            .interrupt(async move {
362                let mut buf = vec![0; 128];
363                let mut filled = 0;
364
365                // Read stream type varint (decode as raw u64 to handle unknown types)
366                let stream_type =
367                    read(
368                        &mut buf,
369                        &mut filled,
370                        &mut stream,
371                        |data| match quic_varint::decode(data) {
372                            Ok(ok) => Ok(Some(ok)),
373                            Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
374                            // this branch is unreachable because u64 is always From<u64>
375                            Err(QuicVarIntError::UnknownValue { bytes, value }) => {
376                                Ok(Some((value, bytes)))
377                            }
378                        },
379                    )
380                    .await?;
381
382                match UniStreamType::try_from(stream_type) {
383                    Ok(UniStreamType::Control) => {
384                        log::trace!("H3 inbound uni: control stream");
385                        self.run_inbound_control(&mut buf, &mut filled, &mut stream)
386                            .await?;
387                        Ok(UniStreamResult::Handled)
388                    }
389
390                    Ok(UniStreamType::QpackEncoder) => {
391                        log::trace!(
392                            "H3 inbound uni: QPACK encoder stream ({filled} bytes pre-read)"
393                        );
394                        let mut reader = Prepended {
395                            head: &buf[..filled],
396                            tail: stream,
397                        };
398
399                        log::trace!("QPACK encoder stream: started");
400                        self.decoder_dynamic_table.run_reader(&mut reader).await?;
401
402                        Ok(UniStreamResult::Handled)
403                    }
404
405                    Ok(UniStreamType::QpackDecoder) => {
406                        log::trace!(
407                            "H3 inbound uni: QPACK decoder stream ({filled} bytes pre-read)"
408                        );
409                        let mut reader = Prepended {
410                            head: &buf[..filled],
411                            tail: stream,
412                        };
413                        self.encoder_dynamic_table.run_reader(&mut reader).await?;
414                        Ok(UniStreamResult::Handled)
415                    }
416
417                    Ok(UniStreamType::WebTransport) => {
418                        log::trace!("H3 inbound uni: WebTransport stream");
419                        let session_id = read(&mut buf, &mut filled, &mut stream, |data| {
420                            match quic_varint::decode(data) {
421                                Ok(ok) => Ok(Some(ok)),
422                                Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
423                                Err(QuicVarIntError::UnknownValue { bytes, value }) => {
424                                    Ok(Some((value, bytes)))
425                                }
426                            }
427                        })
428                        .await?;
429
430                        buf.truncate(filled);
431
432                        Ok(UniStreamResult::WebTransport {
433                            session_id,
434                            stream,
435                            buffer: buf.into(),
436                        })
437                    }
438
439                    Ok(UniStreamType::Push) | Err(_) => {
440                        log::trace!("H3 inbound uni: unknown stream type {stream_type:#x}");
441                        Ok(UniStreamResult::Unknown {
442                            stream_type,
443                            stream,
444                        })
445                    }
446                }
447            })
448            .await
449            .unwrap_or(Ok(UniStreamResult::Handled)) // interrupted
450    }
451
452    /// Handle the http/3 peer's inbound control stream.
453    ///
454    /// # Errors
455    ///
456    /// Returns a `H3Error` in case of io error or HTTP/3 semantic error.
457    // The first frame must be SETTINGS. After that, watches for
458    // GOAWAY to initiate connection shutdown.
459    async fn run_inbound_control<T>(
460        &self,
461        buf: &mut Vec<u8>,
462        filled: &mut usize,
463        stream: &mut T,
464    ) -> Result<(), H3Error>
465    where
466        T: AsyncRead + Unpin + Send,
467    {
468        // First frame must be SETTINGS (§6.2.1)
469        let settings = read(buf, filled, stream, |data| match Frame::decode(data) {
470            Ok((Frame::Settings(s), consumed)) => Ok(Some((s, consumed))),
471            Ok(_) => Err(H3ErrorCode::FrameUnexpected),
472            Err(FrameDecodeError::Incomplete) => Ok(None),
473            Err(FrameDecodeError::Error(code)) => Err(code),
474        })
475        .await?;
476
477        log::trace!("H3 peer settings: {settings:?}");
478
479        self.peer_settings
480            .set(settings)
481            .map_err(|_| H3ErrorCode::FrameUnexpected)?;
482
483        self.encoder_dynamic_table
484            .initialize_from_peer_settings(settings);
485
486        // Read subsequent frames, watching for GOAWAY
487        loop {
488            let frame = self
489                .swansong
490                .interrupt(read(buf, filled, stream, |data| {
491                    match Frame::decode(data) {
492                        Ok((frame, consumed)) => Ok(Some((frame, consumed))),
493                        Err(FrameDecodeError::Incomplete) => Ok(None),
494                        Err(FrameDecodeError::Error(code)) => Err(code),
495                    }
496                }))
497                .await
498                .transpose()?;
499
500            match frame {
501                None => {
502                    log::trace!("H3 control stream: interrupted by shutdown");
503                    return Ok(());
504                }
505
506                Some(Frame::Goaway(id)) => {
507                    log::trace!("H3 control stream: peer sent GOAWAY(stream_id={id})");
508                    self.swansong.shut_down();
509                    return Ok(());
510                }
511
512                Some(Frame::Settings(_)) => {
513                    return Err(H3ErrorCode::FrameUnexpected.into());
514                }
515
516                Some(Frame::Unknown(n)) => {
517                    // RFC 9114 §7.2.8: unknown frame types MUST be ignored.
518                    // We must also consume the payload bytes so the stream stays synchronized.
519                    log::trace!("H3 control stream: skipping unknown frame (payload {n} bytes)");
520                    let n = usize::try_from(n).unwrap_or(usize::MAX);
521                    let in_buf = n.min(*filled);
522                    buf.copy_within(in_buf..*filled, 0);
523                    *filled -= in_buf;
524                    let mut todo = n - in_buf;
525                    let mut scratch = [0u8; 256];
526                    while todo > 0 {
527                        let to_read = todo.min(scratch.len());
528                        let n = stream
529                            .read(&mut scratch[..to_read])
530                            .await
531                            .map_err(H3Error::Io)?;
532                        if n == 0 {
533                            return Err(H3ErrorCode::ClosedCriticalStream.into());
534                        }
535                        todo -= n;
536                    }
537                }
538                other => {
539                    log::trace!("H3 control stream: ignoring {other:?}");
540                }
541            }
542        }
543    }
544}
545
546async fn write(
547    buf: &mut Vec<u8>,
548    mut stream: impl AsyncWrite + Unpin + Send,
549    mut f: impl FnMut(&mut [u8]) -> Option<usize>,
550) -> io::Result<usize> {
551    let written = loop {
552        if let Some(w) = f(buf) {
553            break w;
554        }
555        if buf.len() >= MAX_BUFFER_SIZE {
556            return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation"));
557        }
558        buf.resize(buf.len() * 2, 0);
559    };
560
561    stream.write_all(&buf[..written]).await?;
562    stream.flush().await?;
563    Ok(written)
564}
565
566/// An `AsyncRead` adapter that drains a byte slice before reading from an inner stream.
567///
568/// Used in `process_inbound_uni` to replay bytes that were read ahead while
569/// parsing the stream-type varint before dispatching to `run_inbound_encoder`.
570struct Prepended<'a, T> {
571    head: &'a [u8],
572    tail: T,
573}
574
575impl<T: AsyncRead + Unpin> AsyncRead for Prepended<'_, T> {
576    fn poll_read(
577        self: Pin<&mut Self>,
578        cx: &mut Context<'_>,
579        out: &mut [u8],
580    ) -> Poll<io::Result<usize>> {
581        let this = self.get_mut();
582        if !this.head.is_empty() {
583            let n = this.head.len().min(out.len());
584            out[..n].copy_from_slice(&this.head[..n]);
585            this.head = &this.head[n..];
586            return Poll::Ready(Ok(n));
587        }
588        Pin::new(&mut this.tail).poll_read(cx, out)
589    }
590}
591
592/// Read from `stream` into `buf` until `f` can decode a value.
593///
594/// `f` receives the filled portion of the buffer and returns:
595/// - `Ok(Some((value, consumed)))` — success; consumed bytes are removed from the front
596/// - `Ok(None)` — need more data; reads more bytes and retries
597/// - `Err(e)` — unrecoverable error; propagated to caller
598async fn read<R>(
599    buf: &mut Vec<u8>,
600    filled: &mut usize,
601    stream: &mut (impl AsyncRead + Unpin + Send),
602    f: impl Fn(&[u8]) -> Result<Option<(R, usize)>, H3ErrorCode>,
603) -> Result<R, H3Error> {
604    loop {
605        if let Some((result, consumed)) = f(&buf[..*filled])? {
606            buf.copy_within(consumed..*filled, 0);
607            *filled -= consumed;
608            return Ok(result);
609        }
610
611        if *filled >= buf.len() {
612            if buf.len() >= MAX_BUFFER_SIZE {
613                return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation").into());
614            }
615            buf.resize(buf.len() * 2, 0);
616        }
617
618        let n = stream.read(&mut buf[*filled..]).await?;
619        if n == 0 {
620            return Err(io::Error::new(ErrorKind::UnexpectedEof, "stream closed").into());
621        }
622        *filled += n;
623    }
624}