Skip to main content

trillium_http/h3/
connection.rs

1mod peer_settings_wait;
2
3use super::{
4    H3Error,
5    frame::{Frame, FrameDecodeError, UniStreamType},
6    quic_varint::{self, QuicVarIntError},
7    settings::H3Settings,
8};
9use crate::{
10    Buffer, Conn, HttpContext,
11    conn::H3FirstFrame,
12    h3::{H3ErrorCode, MAX_BUFFER_SIZE},
13    headers::qpack::{DecoderDynamicTable, EncoderDynamicTable, FieldSection},
14};
15use event_listener::Event;
16use futures_lite::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
17use std::{
18    future::Future,
19    io::{self, ErrorKind},
20    pin::Pin,
21    sync::{
22        Arc, OnceLock,
23        atomic::{AtomicBool, AtomicU64, Ordering},
24    },
25    task::{Context, Poll},
26};
27use swansong::{ShutdownCompletion, Swansong};
28
29/// The result of processing an HTTP/3 bidirectional stream.
30#[derive(Debug)]
31#[allow(
32    clippy::large_enum_variant,
33    reason = "Request is the hot path; boxing it would add an allocation per request"
34)]
35pub enum H3StreamResult<Transport> {
36    /// The stream carried a normal HTTP/3 request.
37    Request(Conn<Transport>),
38
39    /// The stream carries a WebTransport bidirectional data stream. The `session_id` identifies
40    /// the associated WebTransport session.
41    WebTransport {
42        /// The WebTransport session ID (stream ID of the CONNECT request).
43        session_id: u64,
44        /// The underlying transport, ready for application data.
45        transport: Transport,
46        /// Any bytes buffered after the session ID during stream negotiation.
47        buffer: Buffer,
48    },
49}
50
51/// Inner-loop result of [`H3Connection::process_inbound_uni_with_close`] before the recv
52/// stream is reattached. Decouples the inner async block (which only borrows the stream)
53/// from the caller-visible [`UniStreamResult`] (which returns the stream by value on
54/// non-`Handled` variants), so the function can keep ownership of `stream` long enough to
55/// fire its close callback before `stream` drops.
56enum UniInnerResult {
57    Handled,
58    WebTransport { session_id: u64, buffer: Buffer },
59    Unknown { stream_type: u64 },
60}
61
62/// The result of processing an HTTP/3 unidirectional stream.
63#[derive(Debug)]
64pub enum UniStreamResult<T> {
65    /// The stream was a known internal type (control, QPACK encoder/decoder) and was handled
66    /// automatically.
67    Handled,
68
69    /// A WebTransport unidirectional data stream. The `session_id` identifies the associated
70    /// WebTransport session.
71    WebTransport {
72        /// The WebTransport session ID.
73        session_id: u64,
74        /// The receive stream, ready for application data.
75        stream: T,
76        /// Any bytes buffered after the session ID during stream negotiation.
77        buffer: Buffer,
78    },
79
80    /// A stream whose type is recognized but unsupported (e.g. `Push`) or not recognized
81    /// at all by this crate.
82    ///
83    /// The caller is responsible for disposing of the stream — the in-tree consumers RST
84    /// it with `H3_STREAM_CREATION_ERROR`. `process_inbound_uni` deliberately does *not*
85    /// close the stream itself: handing it back gives a downstream extension the option to
86    /// implement a stream type trillium-http doesn't know about (a future RFC, an
87    /// experiment, etc.) without forking the codec.
88    Unknown {
89        /// The raw stream type value.
90        stream_type: u64,
91        /// The stream.
92        stream: T,
93    },
94}
95
96/// Shared state for a single HTTP/3 QUIC connection.
97///
98/// Call the appropriate methods on this type for each stream accepted from the QUIC connection.
99///
100/// # Driver shape (vs h2)
101///
102/// h2 multiplexes everything onto a single TCP byte stream, so a single
103/// [`H2Driver`][crate::h2::H2Driver] task suffices. h3 instead has the QUIC layer hand us multiple
104/// independent streams: an inbound and outbound control stream, an inbound and outbound QPACK
105/// encoder stream, an inbound and outbound QPACK decoder stream, and one bidi stream per
106/// request. There is no single "h3 driver" — each stream is driven by its own future returned from
107/// `H3Connection`'s `run_*` / `process_*` methods, and the caller decides how those futures are
108/// scheduled.
109///
110/// The trillium-http boundary is **runtime-free by design**: this crate hands out anonymous futures
111/// and lets the caller pick the executor. The in-tree consumers (`trillium-server-common`,
112/// `trillium-client`) follow a task-per-stream pattern — spawn each long-lived control / encoder /
113/// decoder future on its own task at connection setup, then spawn one task per accepted request
114/// stream. Nothing in this crate requires that pattern; a caller could in principle race all the
115/// futures on one task instead, with different perf characteristics.
116#[derive(Debug)]
117pub struct H3Connection {
118    /// Shared configuration across all protocols.
119    context: Arc<HttpContext>,
120
121    /// Connection-scoped shutdown signal. Shut down when we receive GOAWAY from the peer or when
122    /// the server-level Swansong shuts down.  Request stream tasks use this to interrupt
123    /// in-progress work.
124    swansong: Swansong,
125
126    /// The peer's H3 settings, received on their control stream.  Request streams may need to
127    /// consult these (e.g. max field section size).
128    pub(super) peer_settings: OnceLock<H3Settings>,
129
130    /// Multi-listener wake source for
131    /// [`PeerSettingsReady`][peer_settings_wait::PeerSettingsReady]. Notified by
132    /// `run_inbound_control` after applying peer SETTINGS, and again on connection
133    /// close, so any number of concurrently-parked futures all unblock together.
134    pub(super) peer_settings_event: Event,
135
136    /// The highest bidirectional stream ID we have accepted.  Used to compute the GOAWAY value
137    /// (this + 4) to tell the peer which requests we saw. None until the first stream is accepted.
138    /// Updated by the runtime adapter's accept loop via [`record_accepted_stream`].
139    max_accepted_stream_id: AtomicU64,
140
141    /// Whether we have accepted any streams yet.
142    has_accepted_stream: AtomicBool,
143
144    /// The decoder-side QPACK dynamic table for this connection.
145    decoder_dynamic_table: DecoderDynamicTable,
146
147    /// The encoder-side QPACK dynamic table for this connection.
148    encoder_dynamic_table: EncoderDynamicTable,
149}
150
151impl H3Connection {
152    /// Construct a new `H3Connection` to manage HTTP/3 for a given peer.
153    pub fn new(context: Arc<HttpContext>) -> Arc<Self> {
154        let swansong = context.swansong.child();
155        let max_table_capacity = context.config.dynamic_table_capacity;
156        let blocked_streams = context.config.h3_blocked_streams;
157        let encoder_dynamic_table = EncoderDynamicTable::new(&context);
158        Arc::new(Self {
159            context,
160            swansong,
161            peer_settings: OnceLock::new(),
162            peer_settings_event: Event::new(),
163            max_accepted_stream_id: AtomicU64::new(0),
164            has_accepted_stream: AtomicBool::new(false),
165            decoder_dynamic_table: DecoderDynamicTable::new(max_table_capacity, blocked_streams),
166            encoder_dynamic_table,
167        })
168    }
169
170    /// Retrieve the [`Swansong`] shutdown handle for this HTTP/3 connection. See also
171    /// [`H3Connection::shut_down`]
172    pub fn swansong(&self) -> &Swansong {
173        &self.swansong
174    }
175
176    /// Attempt graceful shutdown of this HTTP/3 connection (all streams).
177    ///
178    /// The returned [`ShutdownCompletion`] type can
179    /// either be awaited in an async context or blocked on with [`ShutdownCompletion::block`] in a
180    /// blocking context
181    ///
182    /// Note that this will NOT shut down the server. To shut down the whole server, use
183    /// [`HttpContext::shut_down`]
184    pub fn shut_down(&self) -> ShutdownCompletion {
185        // Wake any in-flight `decode_field_section` calls parked on the decoder
186        // table's `ThresholdWait` (a non-I/O future awaiting dynamic-table inserts
187        // from the peer). The encoder table's writer loop is already swansong-
188        // aware, but we mark it failed too for symmetry: any future state
189        // mutations after shutdown are no longer wire-relevant.
190        self.decoder_dynamic_table.fail(H3ErrorCode::NoError);
191        self.encoder_dynamic_table.fail(H3ErrorCode::NoError);
192        self.wake_peer_settings_waiters();
193        self.swansong.shut_down()
194    }
195
196    /// Retrieve the [`HttpContext`] for this server.
197    pub fn context(&self) -> Arc<HttpContext> {
198        self.context.clone()
199    }
200
201    /// Returns the peer's HTTP/3 settings, available once the peer's control stream has been
202    /// processed.
203    pub fn peer_settings(&self) -> Option<&H3Settings> {
204        self.peer_settings.get()
205    }
206
207    /// Record that we accepted a bidirectional stream with this ID.
208    fn record_accepted_stream(&self, stream_id: u64) {
209        self.max_accepted_stream_id
210            .fetch_max(stream_id, Ordering::Relaxed);
211        self.has_accepted_stream.store(true, Ordering::Relaxed);
212    }
213
214    /// The stream ID to send in a GOAWAY frame: one past the highest stream we accepted, or 0 if we
215    /// haven't accepted any.
216    fn goaway_id(&self) -> u64 {
217        if self.has_accepted_stream.load(Ordering::Relaxed) {
218            self.max_accepted_stream_id.load(Ordering::Relaxed) + 4
219        } else {
220            0
221        }
222    }
223
224    /// Process a single HTTP/3 request-response cycle on a bidirectional stream.
225    ///
226    /// Call this once per accepted bidirectional stream. Returns
227    /// [`H3StreamResult::WebTransport`] if the stream opens a WebTransport session rather than
228    /// a standard HTTP/3 request.
229    ///
230    /// On a stream-level protocol error (e.g. malformed pseudo-headers,
231    /// `H3_MESSAGE_ERROR`), this method drops the transport without resetting it. To honour
232    /// RFC 9114's stream-error MUSTs, callers should use [`process_inbound_bidi_with_reset`]
233    /// instead and pass a closure that issues a stream RST with the protocol error code.
234    ///
235    /// [`process_inbound_bidi_with_reset`]: Self::process_inbound_bidi_with_reset
236    ///
237    /// # Errors
238    ///
239    /// Returns an `H3Error` in case of io error or http/3 semantic error.
240    #[deprecated(
241        since = "1.2.0",
242        note = "use `process_inbound_bidi_with_reset` so stream-level protocol errors RST the \
243                stream as required by RFC 9114"
244    )]
245    pub async fn process_inbound_bidi<Transport, Handler, Fut>(
246        self: Arc<Self>,
247        transport: Transport,
248        handler: Handler,
249        stream_id: u64,
250    ) -> Result<H3StreamResult<Transport>, H3Error>
251    where
252        Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
253        Handler: FnOnce(Conn<Transport>) -> Fut,
254        Fut: Future<Output = Conn<Transport>>,
255    {
256        self.process_inbound_bidi_with_reset(transport, handler, stream_id, |_, _| {})
257            .await
258    }
259
260    /// Process a single HTTP/3 request-response cycle on a bidirectional stream, calling
261    /// `reset` to issue a stream RST when a stream-level protocol error occurs.
262    ///
263    /// Identical to [`process_inbound_bidi`][Self::process_inbound_bidi] except that on any
264    /// `H3Error::Protocol(code)` produced by first-frame processing (HEADERS decode,
265    /// pseudo-header validation, etc.), `reset` is invoked with the still-owned transport and
266    /// the error code before the error is returned. This lets callers RST both the recv and
267    /// send halves of the bidi stream — required by RFC 9114 for stream errors like
268    /// `H3_MESSAGE_ERROR`. I/O errors and successful runs do not invoke `reset`.
269    ///
270    /// `reset` is a `FnOnce` taking `(&mut Transport, H3ErrorCode)`. trillium-http does not
271    /// itself depend on any reset capability of the transport; callers wire up the actual
272    /// stream-RST mechanism (e.g. quinn's `RecvStream::stop` + `SendStream::reset`) inside
273    /// the closure.
274    ///
275    /// # Errors
276    ///
277    /// Returns an `H3Error` in case of io error or http/3 semantic error.
278    pub async fn process_inbound_bidi_with_reset<Transport, Handler, Fut, Reset>(
279        self: Arc<Self>,
280        mut transport: Transport,
281        handler: Handler,
282        stream_id: u64,
283        reset: Reset,
284    ) -> Result<H3StreamResult<Transport>, H3Error>
285    where
286        Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
287        Handler: FnOnce(Conn<Transport>) -> Fut,
288        Fut: Future<Output = Conn<Transport>>,
289        Reset: FnOnce(&mut Transport, H3ErrorCode),
290    {
291        self.record_accepted_stream(stream_id);
292        let _guard = self.swansong.guard();
293        let mut buffer: Buffer =
294            Vec::with_capacity(self.context.config.request_buffer_initial_len).into();
295
296        let outcome =
297            Conn::process_first_frame_h3(&self, &mut transport, &mut buffer, stream_id).await;
298
299        match outcome {
300            Ok(H3FirstFrame::Request {
301                validated,
302                start_time,
303            }) => {
304                let conn =
305                    Conn::build_h3(self, transport, buffer, validated, start_time, stream_id);
306                Ok(H3StreamResult::Request(
307                    handler(conn).await.send_h3().await?,
308                ))
309            }
310            Ok(H3FirstFrame::WebTransport { session_id }) => Ok(H3StreamResult::WebTransport {
311                session_id,
312                transport,
313                buffer,
314            }),
315            Err(error) => {
316                if let H3Error::Protocol(code) = &error {
317                    reset(&mut transport, *code);
318                }
319                Err(error)
320            }
321        }
322    }
323
324    /// Decode a QPACK-encoded field section, consulting the dynamic table as needed.
325    ///
326    /// If the field section's Required Insert Count is greater than zero, waits until the
327    /// dynamic table has received enough entries. Returns an error on protocol violations or
328    /// if the encoder stream fails while waiting.
329    ///
330    /// Duplicate pseudo-headers are silently ignored (first value wins). Unknown
331    /// pseudo-headers are rejected.
332    ///
333    /// # Errors
334    ///
335    /// Returns an error if the encoded bytes cannot be parsed as a valid field section.
336    #[cfg(feature = "unstable")]
337    pub async fn decode_field_section(
338        &self,
339        encoded: &[u8],
340        stream_id: u64,
341    ) -> Result<FieldSection<'static>, H3Error> {
342        self.decoder_dynamic_table.decode(encoded, stream_id).await
343    }
344
345    #[cfg(not(feature = "unstable"))]
346    pub(crate) async fn decode_field_section(
347        &self,
348        encoded: &[u8],
349        stream_id: u64,
350    ) -> Result<FieldSection<'static>, H3Error> {
351        self.decoder_dynamic_table.decode(encoded, stream_id).await
352    }
353
354    /// Encode a QPACK field section from pseudo-headers and headers, consulting the encoder
355    /// dynamic table to emit literal-with-name-reference or indexed representations as the
356    /// table's contents allow.
357    ///
358    /// # Errors
359    ///
360    /// Returns an `H3Error` in case of http/3 semantic error.
361    #[cfg(feature = "unstable")]
362    #[allow(clippy::unnecessary_wraps, reason = "future-proofing api")]
363    pub fn encode_field_section(
364        &self,
365        field_section: &FieldSection<'_>,
366        buf: &mut Vec<u8>,
367        stream_id: u64,
368    ) -> Result<(), H3Error> {
369        self.encoder_dynamic_table
370            .encode(field_section, buf, stream_id);
371        Ok(())
372    }
373
374    #[cfg(not(feature = "unstable"))]
375    #[allow(clippy::unnecessary_wraps, reason = "future-proofing api")]
376    pub(crate) fn encode_field_section(
377        &self,
378        field_section: &FieldSection<'_>,
379        buf: &mut Vec<u8>,
380        stream_id: u64,
381    ) -> Result<(), H3Error> {
382        self.encoder_dynamic_table
383            .encode(field_section, buf, stream_id);
384        Ok(())
385    }
386
387    /// Run this connection's HTTP/3 outbound control stream.
388    ///
389    /// Sends the initial SETTINGS frame, then sends GOAWAY when the connection shuts down.
390    /// Returns after GOAWAY is sent; keep the stream open until the QUIC connection closes
391    /// (closing a control stream is a connection error).
392    ///
393    /// # Errors
394    ///
395    /// Returns an `H3Error` in case of io error or http/3 semantic error.
396    pub async fn run_outbound_control<T>(&self, mut stream: T) -> Result<(), H3Error>
397    where
398        T: AsyncWrite + Unpin + Send,
399    {
400        let mut buf = vec![0; 128];
401
402        let settings = Frame::Settings(H3Settings::from(&self.context.config));
403        log::trace!(
404            "H3 outbound control: sending SETTINGS: {:?}",
405            H3Settings::from(&self.context.config)
406        );
407
408        write(&mut buf, &mut stream, |buf| {
409            let mut written = quic_varint::encode(UniStreamType::Control, buf)?;
410            written += settings.encode(&mut buf[written..])?;
411            Some(written)
412        })
413        .await?;
414        log::trace!("H3 outbound control: SETTINGS sent");
415
416        self.swansong.clone().await;
417
418        write(&mut buf, &mut stream, |buf| {
419            Frame::Goaway(self.goaway_id()).encode(buf)
420        })
421        .await?;
422
423        Ok(())
424    }
425
426    /// Run the outbound QPACK encoder stream for the duration of the connection.
427    ///
428    /// Writes the stream type byte, then drains encoder-stream instructions from the encoder
429    /// dynamic table as they are enqueued. Returns when the connection shuts down or the table is
430    /// marked failed.
431    ///
432    /// # Errors
433    ///
434    /// Returns an `H3Error` in case of io error.
435    pub async fn run_encoder<T>(&self, mut stream: T) -> Result<(), H3Error>
436    where
437        T: AsyncWrite + Unpin + Send,
438    {
439        self.encoder_dynamic_table
440            .run_writer(&mut stream, self.swansong.clone())
441            .await
442    }
443
444    /// Run the outbound QPACK decoder stream for the duration of the connection.
445    ///
446    /// Writes the stream type byte, then loops sending Section Acknowledgement and Insert
447    /// Count Increment instructions as they become needed. Returns when the connection
448    /// shuts down.
449    ///
450    /// # Errors
451    ///
452    /// Returns an `H3Error` in case of io error or http/3 semantic error.
453    pub async fn run_decoder<T>(&self, mut stream: T) -> Result<(), H3Error>
454    where
455        T: AsyncWrite + Unpin + Send,
456    {
457        self.decoder_dynamic_table
458            .run_writer(&mut stream, self.swansong.clone())
459            .await
460    }
461
462    /// Handle an inbound unidirectional HTTP/3 stream from the peer.
463    ///
464    /// Internal stream types (control, QPACK encoder/decoder) are handled automatically;
465    /// application streams are returned via [`UniStreamResult`] for the caller to process.
466    ///
467    /// On a connection-level protocol error, this method drops the recv stream before
468    /// the caller can react. Quinn's `RecvStream::drop` then sends `STOP_SENDING`, which
469    /// races against the caller's `connection.close` — if the peer responds with a
470    /// malformed `RESET_STREAM` (notably `final_offset = 0`) before our app close is
471    /// applied, the transport-level error overrides our app error code on the wire.
472    /// Use [`process_inbound_uni_with_close`] to thread the close call through the
473    /// function so it fires before the stream drops.
474    ///
475    /// [`process_inbound_uni_with_close`]: Self::process_inbound_uni_with_close
476    ///
477    /// # Errors
478    ///
479    /// Returns a `H3Error` in case of io error or http/3 semantic error.
480    #[deprecated(
481        since = "1.2.0",
482        note = "use `process_inbound_uni_with_close` so connection-level protocol errors close \
483                the QUIC connection before the recv stream drops, avoiding a `FINAL_SIZE_ERROR` \
484                race with the peer's response to STOP_SENDING"
485    )]
486    pub async fn process_inbound_uni<T>(&self, stream: T) -> Result<UniStreamResult<T>, H3Error>
487    where
488        T: AsyncRead + Unpin + Send,
489    {
490        self.process_inbound_uni_with_close(stream, |_| {}).await
491    }
492
493    /// Handle an inbound unidirectional HTTP/3 stream from the peer, calling `on_close` to
494    /// close the QUIC connection if a connection-level protocol error is detected.
495    ///
496    /// Identical to [`process_inbound_uni`][Self::process_inbound_uni] except that on
497    /// any `H3Error::Protocol(code)` whose code is a connection-level error (RFC 9114,
498    /// RFC 9204), `on_close` is invoked with that code while the recv stream is still alive. This
499    /// lets callers send a `CONNECTION_CLOSE` before the stream drops — if the close call sets
500    /// quinn's `conn.error`, quinn's `RecvStream::drop` skips `STOP_SENDING`, eliminating a
501    /// peer race that otherwise causes `FINAL_SIZE_ERROR` to override the app error code.
502    ///
503    /// `on_close` is a `FnOnce` taking `H3ErrorCode`. trillium-http does not itself
504    /// hold the QUIC connection; callers wire up the actual `connection.close()` call
505    /// inside the closure (e.g. quinn's `Connection::close`).
506    ///
507    /// # Errors
508    ///
509    /// Returns a `H3Error` in case of io error or http/3 semantic error.
510    pub async fn process_inbound_uni_with_close<T, OnClose>(
511        &self,
512        mut stream: T,
513        on_close: OnClose,
514    ) -> Result<UniStreamResult<T>, H3Error>
515    where
516        T: AsyncRead + Unpin + Send,
517        OnClose: FnOnce(H3ErrorCode),
518    {
519        let inner = self
520            .swansong
521            .interrupt(self.process_inbound_uni_inner(&mut stream))
522            .await
523            .unwrap_or(Ok(UniInnerResult::Handled)); // interrupted
524
525        match inner {
526            Ok(UniInnerResult::Handled) => Ok(UniStreamResult::Handled),
527            Ok(UniInnerResult::WebTransport { session_id, buffer }) => {
528                Ok(UniStreamResult::WebTransport {
529                    session_id,
530                    stream,
531                    buffer,
532                })
533            }
534            Ok(UniInnerResult::Unknown { stream_type }) => Ok(UniStreamResult::Unknown {
535                stream_type,
536                stream,
537            }),
538            Err(error) => {
539                // Fire `on_close` BEFORE returning so the caller's connection.close
540                // call sets quinn's `conn.error` while `stream` is still alive. When
541                // `stream` then drops at function return, quinn's `RecvStream::drop`
542                // skips STOP_SENDING — preventing the peer-RESET_STREAM race that
543                // otherwise replaces our app close code with FINAL_SIZE_ERROR.
544                if let H3Error::Protocol(code) = &error
545                    && code.is_connection_error()
546                {
547                    on_close(*code);
548                }
549                Err(error)
550            }
551        }
552    }
553
554    /// Inner-loop body of [`process_inbound_uni_with_close`][Self::process_inbound_uni_with_close].
555    /// Borrows `stream` so the outer function can keep ownership of it across the await,
556    /// which lets the caller's close callback fire before the recv stream drops.
557    async fn process_inbound_uni_inner<T>(&self, stream: &mut T) -> Result<UniInnerResult, H3Error>
558    where
559        T: AsyncRead + Unpin + Send,
560    {
561        let mut buf = vec![0; 128];
562        let mut filled = 0;
563
564        // Read stream type varint (decode as raw u64 to handle unknown types)
565        let stream_type = read(&mut buf, &mut filled, stream, |data| {
566            match quic_varint::decode(data) {
567                Ok(ok) => Ok(Some(ok)),
568                Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
569                // this branch is unreachable because u64 is always From<u64>
570                Err(QuicVarIntError::UnknownValue { bytes, value }) => Ok(Some((value, bytes))),
571            }
572        })
573        .await?;
574
575        match UniStreamType::try_from(stream_type) {
576            Ok(UniStreamType::Control) => {
577                log::trace!("H3 inbound uni: control stream");
578                self.run_inbound_control(&mut buf, &mut filled, stream)
579                    .await?;
580                Ok(UniInnerResult::Handled)
581            }
582
583            Ok(UniStreamType::QpackEncoder) => {
584                log::trace!("H3 inbound uni: QPACK encoder stream ({filled} bytes pre-read)");
585                let mut reader = Prepended {
586                    head: &buf[..filled],
587                    tail: stream,
588                };
589
590                log::trace!("QPACK encoder stream: started");
591                self.decoder_dynamic_table.run_reader(&mut reader).await?;
592
593                Ok(UniInnerResult::Handled)
594            }
595
596            Ok(UniStreamType::QpackDecoder) => {
597                log::trace!("H3 inbound uni: QPACK decoder stream ({filled} bytes pre-read)");
598                let mut reader = Prepended {
599                    head: &buf[..filled],
600                    tail: stream,
601                };
602                self.encoder_dynamic_table.run_reader(&mut reader).await?;
603                Ok(UniInnerResult::Handled)
604            }
605
606            Ok(UniStreamType::WebTransport) => {
607                log::trace!("H3 inbound uni: WebTransport stream");
608                let session_id =
609                    read(
610                        &mut buf,
611                        &mut filled,
612                        stream,
613                        |data| match quic_varint::decode(data) {
614                            Ok(ok) => Ok(Some(ok)),
615                            Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
616                            Err(QuicVarIntError::UnknownValue { bytes, value }) => {
617                                Ok(Some((value, bytes)))
618                            }
619                        },
620                    )
621                    .await?;
622
623                buf.truncate(filled);
624
625                Ok(UniInnerResult::WebTransport {
626                    session_id,
627                    buffer: buf.into(),
628                })
629            }
630
631            Ok(UniStreamType::Push) => {
632                // Trillium does not support HTTP/3 push, so we hand these back as `Unknown`
633                // identically to truly-unknown stream types — the explicit arm exists so
634                // trace output names "push stream" rather than a bare type id.
635                log::trace!("H3 inbound uni: push stream (push not supported)");
636                Ok(UniInnerResult::Unknown { stream_type })
637            }
638
639            Err(_) => {
640                log::trace!("H3 inbound uni: unknown stream type {stream_type:#x}");
641                Ok(UniInnerResult::Unknown { stream_type })
642            }
643        }
644    }
645
646    /// Handle the http/3 peer's inbound control stream.
647    ///
648    /// # Errors
649    ///
650    /// Returns a `H3Error` in case of io error or HTTP/3 semantic error.
651    async fn run_inbound_control<T>(
652        &self,
653        buf: &mut Vec<u8>,
654        filled: &mut usize,
655        stream: &mut T,
656    ) -> Result<(), H3Error>
657    where
658        T: AsyncRead + Unpin + Send,
659    {
660        // SettingsError takes priority: a SETTINGS frame whose payload is itself invalid
661        // (e.g. forbidden HTTP/2 setting IDs) is reported as SETTINGS_ERROR, not the
662        // MISSING_SETTINGS we report for everything else here.
663        let settings = read(buf, filled, stream, |data| match Frame::decode(data) {
664            Ok((Frame::Settings(s), consumed)) => Ok(Some((s, consumed))),
665            Err(FrameDecodeError::Incomplete) => Ok(None),
666            Err(FrameDecodeError::Error(H3ErrorCode::SettingsError)) => {
667                Err(H3ErrorCode::SettingsError)
668            }
669            Ok(_) | Err(FrameDecodeError::Error(_)) => Err(H3ErrorCode::MissingSettings),
670        })
671        .await
672        .map_err(map_critical_stream_eof)?;
673
674        log::trace!("H3 peer settings: {settings:?}");
675
676        self.peer_settings
677            .set(settings)
678            .map_err(|_| H3ErrorCode::FrameUnexpected)?;
679        self.wake_peer_settings_waiters();
680
681        self.encoder_dynamic_table
682            .initialize_from_peer_settings(settings);
683
684        loop {
685            let frame = self
686                .swansong
687                .interrupt(read(buf, filled, stream, |data| {
688                    match Frame::decode(data) {
689                        Ok((frame, consumed)) => Ok(Some((frame, consumed))),
690                        Err(FrameDecodeError::Incomplete) => Ok(None),
691                        Err(FrameDecodeError::Error(code)) => Err(code),
692                    }
693                }))
694                .await
695                .transpose()
696                .map_err(map_critical_stream_eof)?;
697
698            match frame {
699                None => {
700                    log::trace!("H3 control stream: interrupted by shutdown");
701                    return Ok(());
702                }
703
704                Some(Frame::Goaway(id)) => {
705                    log::trace!("H3 control stream: peer sent GOAWAY(stream_id={id})");
706                    self.swansong.shut_down();
707                    return Ok(());
708                }
709
710                Some(Frame::Unknown(n)) => {
711                    // Consume the payload bytes so the stream stays synchronized.
712                    log::trace!("H3 control stream: skipping unknown frame (payload {n} bytes)");
713                    let n = usize::try_from(n).unwrap_or(usize::MAX);
714                    let in_buf = n.min(*filled);
715                    buf.copy_within(in_buf..*filled, 0);
716                    *filled -= in_buf;
717                    let mut todo = n - in_buf;
718                    let mut scratch = [0u8; 256];
719                    while todo > 0 {
720                        let to_read = todo.min(scratch.len());
721                        let n = stream
722                            .read(&mut scratch[..to_read])
723                            .await
724                            .map_err(H3Error::Io)?;
725                        if n == 0 {
726                            return Err(H3ErrorCode::ClosedCriticalStream.into());
727                        }
728                        todo -= n;
729                    }
730                }
731
732                Some(
733                    Frame::Settings(_)
734                    | Frame::Data(_)
735                    | Frame::Headers(_)
736                    | Frame::PushPromise { .. }
737                    | Frame::WebTransport(_),
738                ) => {
739                    return Err(H3ErrorCode::FrameUnexpected.into());
740                }
741
742                // Trillium doesn't implement push, so these are ignored rather than acted on.
743                Some(Frame::CancelPush(_) | Frame::MaxPushId(_)) => {
744                    log::trace!("H3 control stream: ignoring {frame:?}");
745                }
746            }
747        }
748    }
749}
750
751/// Map an `UnexpectedEof` I/O error (the `read` helper's "stream FIN'd" signal) to
752/// `H3_CLOSED_CRITICAL_STREAM`. Closure of the control stream or of either QPACK
753/// side-channel is a connection error. Other I/O errors and any protocol error are passed
754/// through unchanged.
755fn map_critical_stream_eof(error: H3Error) -> H3Error {
756    match error {
757        H3Error::Io(e) if e.kind() == ErrorKind::UnexpectedEof => {
758            H3ErrorCode::ClosedCriticalStream.into()
759        }
760        other => other,
761    }
762}
763
764async fn write(
765    buf: &mut Vec<u8>,
766    mut stream: impl AsyncWrite + Unpin + Send,
767    mut f: impl FnMut(&mut [u8]) -> Option<usize>,
768) -> io::Result<usize> {
769    let written = loop {
770        if let Some(w) = f(buf) {
771            break w;
772        }
773        if buf.len() >= MAX_BUFFER_SIZE {
774            return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation"));
775        }
776        buf.resize(buf.len() * 2, 0);
777    };
778
779    stream.write_all(&buf[..written]).await?;
780    stream.flush().await?;
781    Ok(written)
782}
783
784/// An `AsyncRead` adapter that drains a byte slice before reading from an inner stream.
785///
786/// Used to replay bytes that were read ahead while parsing a stream-type varint, before
787/// dispatching to the inner runner that consumes the rest of the stream.
788struct Prepended<'a, T> {
789    head: &'a [u8],
790    tail: T,
791}
792
793impl<T: AsyncRead + Unpin> AsyncRead for Prepended<'_, T> {
794    fn poll_read(
795        self: Pin<&mut Self>,
796        cx: &mut Context<'_>,
797        out: &mut [u8],
798    ) -> Poll<io::Result<usize>> {
799        let this = self.get_mut();
800        if !this.head.is_empty() {
801            let n = this.head.len().min(out.len());
802            out[..n].copy_from_slice(&this.head[..n]);
803            this.head = &this.head[n..];
804            return Poll::Ready(Ok(n));
805        }
806        Pin::new(&mut this.tail).poll_read(cx, out)
807    }
808}
809
810/// Read from `stream` into `buf` until `f` can decode a value.
811///
812/// `f` receives the filled portion of the buffer and returns:
813/// - `Ok(Some((value, consumed)))` — success; consumed bytes are removed from the front
814/// - `Ok(None)` — need more data; reads more bytes and retries
815/// - `Err(e)` — unrecoverable error; propagated to caller
816async fn read<R>(
817    buf: &mut Vec<u8>,
818    filled: &mut usize,
819    stream: &mut (impl AsyncRead + Unpin + Send),
820    f: impl Fn(&[u8]) -> Result<Option<(R, usize)>, H3ErrorCode>,
821) -> Result<R, H3Error> {
822    loop {
823        if let Some((result, consumed)) = f(&buf[..*filled])? {
824            buf.copy_within(consumed..*filled, 0);
825            *filled -= consumed;
826            return Ok(result);
827        }
828
829        if *filled >= buf.len() {
830            if buf.len() >= MAX_BUFFER_SIZE {
831                return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation").into());
832            }
833            buf.resize(buf.len() * 2, 0);
834        }
835
836        let n = stream.read(&mut buf[*filled..]).await?;
837        if n == 0 {
838            return Err(io::Error::new(ErrorKind::UnexpectedEof, "stream closed").into());
839        }
840        *filled += n;
841    }
842}