trillium_http/h3/connection.rs
1mod peer_settings_wait;
2
3use super::{
4 H3Error,
5 frame::{Frame, FrameDecodeError, UniStreamType},
6 quic_varint::{self, QuicVarIntError},
7 settings::H3Settings,
8};
9use crate::{
10 Buffer, Conn, HttpContext,
11 conn::H3FirstFrame,
12 h3::{H3ErrorCode, MAX_BUFFER_SIZE},
13 headers::qpack::{DecoderDynamicTable, EncoderDynamicTable, FieldSection},
14};
15use event_listener::Event;
16use futures_lite::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
17use std::{
18 future::Future,
19 io::{self, ErrorKind},
20 pin::Pin,
21 sync::{
22 Arc, OnceLock,
23 atomic::{AtomicBool, AtomicU64, Ordering},
24 },
25 task::{Context, Poll},
26};
27use swansong::{ShutdownCompletion, Swansong};
28
29/// The result of processing an HTTP/3 bidirectional stream.
30#[derive(Debug)]
31#[allow(
32 clippy::large_enum_variant,
33 reason = "Request is the hot path; boxing it would add an allocation per request"
34)]
35pub enum H3StreamResult<Transport> {
36 /// The stream carried a normal HTTP/3 request.
37 Request(Conn<Transport>),
38
39 /// The stream carries a WebTransport bidirectional data stream. The `session_id` identifies
40 /// the associated WebTransport session.
41 WebTransport {
42 /// The WebTransport session ID (stream ID of the CONNECT request).
43 session_id: u64,
44 /// The underlying transport, ready for application data.
45 transport: Transport,
46 /// Any bytes buffered after the session ID during stream negotiation.
47 buffer: Buffer,
48 },
49}
50
51/// Inner-loop result of [`H3Connection::process_inbound_uni_with_close`] before the recv
52/// stream is reattached. Decouples the inner async block (which only borrows the stream)
53/// from the caller-visible [`UniStreamResult`] (which returns the stream by value on
54/// non-`Handled` variants), so the function can keep ownership of `stream` long enough to
55/// fire its close callback before `stream` drops.
56enum UniInnerResult {
57 Handled,
58 WebTransport { session_id: u64, buffer: Buffer },
59 Unknown { stream_type: u64 },
60}
61
62/// The result of processing an HTTP/3 unidirectional stream.
63#[derive(Debug)]
64pub enum UniStreamResult<T> {
65 /// The stream was a known internal type (control, QPACK encoder/decoder) and was handled
66 /// automatically.
67 Handled,
68
69 /// A WebTransport unidirectional data stream. The `session_id` identifies the associated
70 /// WebTransport session.
71 WebTransport {
72 /// The WebTransport session ID.
73 session_id: u64,
74 /// The receive stream, ready for application data.
75 stream: T,
76 /// Any bytes buffered after the session ID during stream negotiation.
77 buffer: Buffer,
78 },
79
80 /// A stream whose type is recognized but unsupported (e.g. `Push`) or not recognized
81 /// at all by this crate.
82 ///
83 /// The caller is responsible for disposing of the stream — the in-tree consumers RST
84 /// it with `H3_STREAM_CREATION_ERROR`. `process_inbound_uni` deliberately does *not*
85 /// close the stream itself: handing it back gives a downstream extension the option to
86 /// implement a stream type trillium-http doesn't know about (a future RFC, an
87 /// experiment, etc.) without forking the codec.
88 Unknown {
89 /// The raw stream type value.
90 stream_type: u64,
91 /// The stream.
92 stream: T,
93 },
94}
95
96/// Shared state for a single HTTP/3 QUIC connection.
97///
98/// Call the appropriate methods on this type for each stream accepted from the QUIC connection.
99///
100/// # Driver shape (vs h2)
101///
102/// h2 multiplexes everything onto a single TCP byte stream, so a single
103/// [`H2Driver`][crate::h2::H2Driver] task suffices. h3 instead has the QUIC layer hand us multiple
104/// independent streams: an inbound and outbound control stream, an inbound and outbound QPACK
105/// encoder stream, an inbound and outbound QPACK decoder stream, and one bidi stream per
106/// request. There is no single "h3 driver" — each stream is driven by its own future returned from
107/// `H3Connection`'s `run_*` / `process_*` methods, and the caller decides how those futures are
108/// scheduled.
109///
110/// The trillium-http boundary is **runtime-free by design**: this crate hands out anonymous futures
111/// and lets the caller pick the executor. The in-tree consumers (`trillium-server-common`,
112/// `trillium-client`) follow a task-per-stream pattern — spawn each long-lived control / encoder /
113/// decoder future on its own task at connection setup, then spawn one task per accepted request
114/// stream. Nothing in this crate requires that pattern; a caller could in principle race all the
115/// futures on one task instead, with different perf characteristics.
116#[derive(Debug)]
117pub struct H3Connection {
118 /// Shared configuration across all protocols.
119 context: Arc<HttpContext>,
120
121 /// Connection-scoped shutdown signal. Shut down when we receive GOAWAY from the peer or when
122 /// the server-level Swansong shuts down. Request stream tasks use this to interrupt
123 /// in-progress work.
124 swansong: Swansong,
125
126 /// The peer's H3 settings, received on their control stream. Request streams may need to
127 /// consult these (e.g. max field section size).
128 pub(super) peer_settings: OnceLock<H3Settings>,
129
130 /// Multi-listener wake source for
131 /// [`PeerSettingsReady`][peer_settings_wait::PeerSettingsReady]. Notified by
132 /// `run_inbound_control` after applying peer SETTINGS, and again on connection
133 /// close, so any number of concurrently-parked futures all unblock together.
134 pub(super) peer_settings_event: Event,
135
136 /// The highest bidirectional stream ID we have accepted. Used to compute the GOAWAY value
137 /// (this + 4) to tell the peer which requests we saw. None until the first stream is accepted.
138 /// Updated by the runtime adapter's accept loop via [`record_accepted_stream`].
139 max_accepted_stream_id: AtomicU64,
140
141 /// Whether we have accepted any streams yet.
142 has_accepted_stream: AtomicBool,
143
144 /// The decoder-side QPACK dynamic table for this connection.
145 decoder_dynamic_table: DecoderDynamicTable,
146
147 /// The encoder-side QPACK dynamic table for this connection.
148 encoder_dynamic_table: EncoderDynamicTable,
149}
150
151impl H3Connection {
152 /// Construct a new `H3Connection` to manage HTTP/3 for a given peer.
153 pub fn new(context: Arc<HttpContext>) -> Arc<Self> {
154 let swansong = context.swansong.child();
155 let max_table_capacity = context.config.dynamic_table_capacity;
156 let blocked_streams = context.config.h3_blocked_streams;
157 let encoder_dynamic_table = EncoderDynamicTable::new(&context);
158 Arc::new(Self {
159 context,
160 swansong,
161 peer_settings: OnceLock::new(),
162 peer_settings_event: Event::new(),
163 max_accepted_stream_id: AtomicU64::new(0),
164 has_accepted_stream: AtomicBool::new(false),
165 decoder_dynamic_table: DecoderDynamicTable::new(max_table_capacity, blocked_streams),
166 encoder_dynamic_table,
167 })
168 }
169
170 /// Retrieve the [`Swansong`] shutdown handle for this HTTP/3 connection. See also
171 /// [`H3Connection::shut_down`]
172 pub fn swansong(&self) -> &Swansong {
173 &self.swansong
174 }
175
176 /// Attempt graceful shutdown of this HTTP/3 connection (all streams).
177 ///
178 /// The returned [`ShutdownCompletion`] type can
179 /// either be awaited in an async context or blocked on with [`ShutdownCompletion::block`] in a
180 /// blocking context
181 ///
182 /// Note that this will NOT shut down the server. To shut down the whole server, use
183 /// [`HttpContext::shut_down`]
184 pub fn shut_down(&self) -> ShutdownCompletion {
185 // Wake any in-flight `decode_field_section` calls parked on the decoder
186 // table's `ThresholdWait` (a non-I/O future awaiting dynamic-table inserts
187 // from the peer). The encoder table's writer loop is already swansong-
188 // aware, but we mark it failed too for symmetry: any future state
189 // mutations after shutdown are no longer wire-relevant.
190 self.decoder_dynamic_table.fail(H3ErrorCode::NoError);
191 self.encoder_dynamic_table.fail(H3ErrorCode::NoError);
192 self.wake_peer_settings_waiters();
193 self.swansong.shut_down()
194 }
195
196 /// Retrieve the [`HttpContext`] for this server.
197 pub fn context(&self) -> Arc<HttpContext> {
198 self.context.clone()
199 }
200
201 /// Returns the peer's HTTP/3 settings, available once the peer's control stream has been
202 /// processed.
203 pub fn peer_settings(&self) -> Option<&H3Settings> {
204 self.peer_settings.get()
205 }
206
207 /// Record that we accepted a bidirectional stream with this ID.
208 fn record_accepted_stream(&self, stream_id: u64) {
209 self.max_accepted_stream_id
210 .fetch_max(stream_id, Ordering::Relaxed);
211 self.has_accepted_stream.store(true, Ordering::Relaxed);
212 }
213
214 /// The stream ID to send in a GOAWAY frame: one past the highest stream we accepted, or 0 if we
215 /// haven't accepted any.
216 fn goaway_id(&self) -> u64 {
217 if self.has_accepted_stream.load(Ordering::Relaxed) {
218 self.max_accepted_stream_id.load(Ordering::Relaxed) + 4
219 } else {
220 0
221 }
222 }
223
224 /// Process a single HTTP/3 request-response cycle on a bidirectional stream.
225 ///
226 /// Call this once per accepted bidirectional stream. Returns
227 /// [`H3StreamResult::WebTransport`] if the stream opens a WebTransport session rather than
228 /// a standard HTTP/3 request.
229 ///
230 /// On a stream-level protocol error (e.g. malformed pseudo-headers,
231 /// `H3_MESSAGE_ERROR`), this method drops the transport without resetting it. To honour
232 /// RFC 9114's stream-error MUSTs, callers should use [`process_inbound_bidi_with_reset`]
233 /// instead and pass a closure that issues a stream RST with the protocol error code.
234 ///
235 /// [`process_inbound_bidi_with_reset`]: Self::process_inbound_bidi_with_reset
236 ///
237 /// # Errors
238 ///
239 /// Returns an `H3Error` in case of io error or http/3 semantic error.
240 #[deprecated(
241 since = "1.2.0",
242 note = "use `process_inbound_bidi_with_reset` so stream-level protocol errors RST the \
243 stream as required by RFC 9114"
244 )]
245 pub async fn process_inbound_bidi<Transport, Handler, Fut>(
246 self: Arc<Self>,
247 transport: Transport,
248 handler: Handler,
249 stream_id: u64,
250 ) -> Result<H3StreamResult<Transport>, H3Error>
251 where
252 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
253 Handler: FnOnce(Conn<Transport>) -> Fut,
254 Fut: Future<Output = Conn<Transport>>,
255 {
256 self.process_inbound_bidi_with_reset(transport, handler, stream_id, |_, _| {})
257 .await
258 }
259
260 /// Process a single HTTP/3 request-response cycle on a bidirectional stream, calling
261 /// `reset` to issue a stream RST when a stream-level protocol error occurs.
262 ///
263 /// Identical to [`process_inbound_bidi`][Self::process_inbound_bidi] except that on any
264 /// `H3Error::Protocol(code)` produced by first-frame processing (HEADERS decode,
265 /// pseudo-header validation, etc.), `reset` is invoked with the still-owned transport and
266 /// the error code before the error is returned. This lets callers RST both the recv and
267 /// send halves of the bidi stream — required by RFC 9114 for stream errors like
268 /// `H3_MESSAGE_ERROR`. I/O errors and successful runs do not invoke `reset`.
269 ///
270 /// `reset` is a `FnOnce` taking `(&mut Transport, H3ErrorCode)`. trillium-http does not
271 /// itself depend on any reset capability of the transport; callers wire up the actual
272 /// stream-RST mechanism (e.g. quinn's `RecvStream::stop` + `SendStream::reset`) inside
273 /// the closure.
274 ///
275 /// # Errors
276 ///
277 /// Returns an `H3Error` in case of io error or http/3 semantic error.
278 pub async fn process_inbound_bidi_with_reset<Transport, Handler, Fut, Reset>(
279 self: Arc<Self>,
280 mut transport: Transport,
281 handler: Handler,
282 stream_id: u64,
283 reset: Reset,
284 ) -> Result<H3StreamResult<Transport>, H3Error>
285 where
286 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
287 Handler: FnOnce(Conn<Transport>) -> Fut,
288 Fut: Future<Output = Conn<Transport>>,
289 Reset: FnOnce(&mut Transport, H3ErrorCode),
290 {
291 self.record_accepted_stream(stream_id);
292 let _guard = self.swansong.guard();
293 let mut buffer: Buffer =
294 Vec::with_capacity(self.context.config.request_buffer_initial_len).into();
295
296 let outcome =
297 Conn::process_first_frame_h3(&self, &mut transport, &mut buffer, stream_id).await;
298
299 match outcome {
300 Ok(H3FirstFrame::Request {
301 validated,
302 start_time,
303 }) => {
304 let conn =
305 Conn::build_h3(self, transport, buffer, validated, start_time, stream_id);
306 Ok(H3StreamResult::Request(
307 handler(conn).await.send_h3().await?,
308 ))
309 }
310 Ok(H3FirstFrame::WebTransport { session_id }) => Ok(H3StreamResult::WebTransport {
311 session_id,
312 transport,
313 buffer,
314 }),
315 Err(error) => {
316 if let H3Error::Protocol(code) = &error {
317 reset(&mut transport, *code);
318 }
319 Err(error)
320 }
321 }
322 }
323
324 /// Decode a QPACK-encoded field section, consulting the dynamic table as needed.
325 ///
326 /// If the field section's Required Insert Count is greater than zero, waits until the
327 /// dynamic table has received enough entries. Returns an error on protocol violations or
328 /// if the encoder stream fails while waiting.
329 ///
330 /// Duplicate pseudo-headers are silently ignored (first value wins). Unknown
331 /// pseudo-headers are rejected.
332 ///
333 /// # Errors
334 ///
335 /// Returns an error if the encoded bytes cannot be parsed as a valid field section.
336 #[cfg(feature = "unstable")]
337 pub async fn decode_field_section(
338 &self,
339 encoded: &[u8],
340 stream_id: u64,
341 ) -> Result<FieldSection<'static>, H3Error> {
342 self.decoder_dynamic_table.decode(encoded, stream_id).await
343 }
344
345 #[cfg(not(feature = "unstable"))]
346 pub(crate) async fn decode_field_section(
347 &self,
348 encoded: &[u8],
349 stream_id: u64,
350 ) -> Result<FieldSection<'static>, H3Error> {
351 self.decoder_dynamic_table.decode(encoded, stream_id).await
352 }
353
354 /// Encode a QPACK field section from pseudo-headers and headers, consulting the encoder
355 /// dynamic table to emit literal-with-name-reference or indexed representations as the
356 /// table's contents allow.
357 ///
358 /// # Errors
359 ///
360 /// Returns an `H3Error` in case of http/3 semantic error.
361 #[cfg(feature = "unstable")]
362 #[allow(clippy::unnecessary_wraps, reason = "future-proofing api")]
363 pub fn encode_field_section(
364 &self,
365 field_section: &FieldSection<'_>,
366 buf: &mut Vec<u8>,
367 stream_id: u64,
368 ) -> Result<(), H3Error> {
369 self.encoder_dynamic_table
370 .encode(field_section, buf, stream_id);
371 Ok(())
372 }
373
374 #[cfg(not(feature = "unstable"))]
375 #[allow(clippy::unnecessary_wraps, reason = "future-proofing api")]
376 pub(crate) fn encode_field_section(
377 &self,
378 field_section: &FieldSection<'_>,
379 buf: &mut Vec<u8>,
380 stream_id: u64,
381 ) -> Result<(), H3Error> {
382 self.encoder_dynamic_table
383 .encode(field_section, buf, stream_id);
384 Ok(())
385 }
386
387 /// Run this connection's HTTP/3 outbound control stream.
388 ///
389 /// Sends the initial SETTINGS frame, then sends GOAWAY when the connection shuts down.
390 /// Returns after GOAWAY is sent; keep the stream open until the QUIC connection closes
391 /// (closing a control stream is a connection error).
392 ///
393 /// # Errors
394 ///
395 /// Returns an `H3Error` in case of io error or http/3 semantic error.
396 pub async fn run_outbound_control<T>(&self, mut stream: T) -> Result<(), H3Error>
397 where
398 T: AsyncWrite + Unpin + Send,
399 {
400 let mut buf = vec![0; 128];
401
402 let settings = Frame::Settings(H3Settings::from(&self.context.config));
403 log::trace!(
404 "H3 outbound control: sending SETTINGS: {:?}",
405 H3Settings::from(&self.context.config)
406 );
407
408 write(&mut buf, &mut stream, |buf| {
409 let mut written = quic_varint::encode(UniStreamType::Control, buf)?;
410 written += settings.encode(&mut buf[written..])?;
411 Some(written)
412 })
413 .await?;
414 log::trace!("H3 outbound control: SETTINGS sent");
415
416 self.swansong.clone().await;
417
418 write(&mut buf, &mut stream, |buf| {
419 Frame::Goaway(self.goaway_id()).encode(buf)
420 })
421 .await?;
422
423 Ok(())
424 }
425
426 /// Run the outbound QPACK encoder stream for the duration of the connection.
427 ///
428 /// Writes the stream type byte, then drains encoder-stream instructions from the encoder
429 /// dynamic table as they are enqueued. Returns when the connection shuts down or the table is
430 /// marked failed.
431 ///
432 /// # Errors
433 ///
434 /// Returns an `H3Error` in case of io error.
435 pub async fn run_encoder<T>(&self, mut stream: T) -> Result<(), H3Error>
436 where
437 T: AsyncWrite + Unpin + Send,
438 {
439 self.encoder_dynamic_table
440 .run_writer(&mut stream, self.swansong.clone())
441 .await
442 }
443
444 /// Run the outbound QPACK decoder stream for the duration of the connection.
445 ///
446 /// Writes the stream type byte, then loops sending Section Acknowledgement and Insert
447 /// Count Increment instructions as they become needed. Returns when the connection
448 /// shuts down.
449 ///
450 /// # Errors
451 ///
452 /// Returns an `H3Error` in case of io error or http/3 semantic error.
453 pub async fn run_decoder<T>(&self, mut stream: T) -> Result<(), H3Error>
454 where
455 T: AsyncWrite + Unpin + Send,
456 {
457 self.decoder_dynamic_table
458 .run_writer(&mut stream, self.swansong.clone())
459 .await
460 }
461
462 /// Handle an inbound unidirectional HTTP/3 stream from the peer.
463 ///
464 /// Internal stream types (control, QPACK encoder/decoder) are handled automatically;
465 /// application streams are returned via [`UniStreamResult`] for the caller to process.
466 ///
467 /// On a connection-level protocol error, this method drops the recv stream before
468 /// the caller can react. Quinn's `RecvStream::drop` then sends `STOP_SENDING`, which
469 /// races against the caller's `connection.close` — if the peer responds with a
470 /// malformed `RESET_STREAM` (notably `final_offset = 0`) before our app close is
471 /// applied, the transport-level error overrides our app error code on the wire.
472 /// Use [`process_inbound_uni_with_close`] to thread the close call through the
473 /// function so it fires before the stream drops.
474 ///
475 /// [`process_inbound_uni_with_close`]: Self::process_inbound_uni_with_close
476 ///
477 /// # Errors
478 ///
479 /// Returns a `H3Error` in case of io error or http/3 semantic error.
480 #[deprecated(
481 since = "1.2.0",
482 note = "use `process_inbound_uni_with_close` so connection-level protocol errors close \
483 the QUIC connection before the recv stream drops, avoiding a `FINAL_SIZE_ERROR` \
484 race with the peer's response to STOP_SENDING"
485 )]
486 pub async fn process_inbound_uni<T>(&self, stream: T) -> Result<UniStreamResult<T>, H3Error>
487 where
488 T: AsyncRead + Unpin + Send,
489 {
490 self.process_inbound_uni_with_close(stream, |_| {}).await
491 }
492
493 /// Handle an inbound unidirectional HTTP/3 stream from the peer, calling `on_close` to
494 /// close the QUIC connection if a connection-level protocol error is detected.
495 ///
496 /// Identical to [`process_inbound_uni`][Self::process_inbound_uni] except that on
497 /// any `H3Error::Protocol(code)` whose code is a connection-level error (RFC 9114,
498 /// RFC 9204), `on_close` is invoked with that code while the recv stream is still alive. This
499 /// lets callers send a `CONNECTION_CLOSE` before the stream drops — if the close call sets
500 /// quinn's `conn.error`, quinn's `RecvStream::drop` skips `STOP_SENDING`, eliminating a
501 /// peer race that otherwise causes `FINAL_SIZE_ERROR` to override the app error code.
502 ///
503 /// `on_close` is a `FnOnce` taking `H3ErrorCode`. trillium-http does not itself
504 /// hold the QUIC connection; callers wire up the actual `connection.close()` call
505 /// inside the closure (e.g. quinn's `Connection::close`).
506 ///
507 /// # Errors
508 ///
509 /// Returns a `H3Error` in case of io error or http/3 semantic error.
510 pub async fn process_inbound_uni_with_close<T, OnClose>(
511 &self,
512 mut stream: T,
513 on_close: OnClose,
514 ) -> Result<UniStreamResult<T>, H3Error>
515 where
516 T: AsyncRead + Unpin + Send,
517 OnClose: FnOnce(H3ErrorCode),
518 {
519 let inner = self
520 .swansong
521 .interrupt(self.process_inbound_uni_inner(&mut stream))
522 .await
523 .unwrap_or(Ok(UniInnerResult::Handled)); // interrupted
524
525 match inner {
526 Ok(UniInnerResult::Handled) => Ok(UniStreamResult::Handled),
527 Ok(UniInnerResult::WebTransport { session_id, buffer }) => {
528 Ok(UniStreamResult::WebTransport {
529 session_id,
530 stream,
531 buffer,
532 })
533 }
534 Ok(UniInnerResult::Unknown { stream_type }) => Ok(UniStreamResult::Unknown {
535 stream_type,
536 stream,
537 }),
538 Err(error) => {
539 // Fire `on_close` BEFORE returning so the caller's connection.close
540 // call sets quinn's `conn.error` while `stream` is still alive. When
541 // `stream` then drops at function return, quinn's `RecvStream::drop`
542 // skips STOP_SENDING — preventing the peer-RESET_STREAM race that
543 // otherwise replaces our app close code with FINAL_SIZE_ERROR.
544 if let H3Error::Protocol(code) = &error
545 && code.is_connection_error()
546 {
547 on_close(*code);
548 }
549 Err(error)
550 }
551 }
552 }
553
554 /// Inner-loop body of [`process_inbound_uni_with_close`][Self::process_inbound_uni_with_close].
555 /// Borrows `stream` so the outer function can keep ownership of it across the await,
556 /// which lets the caller's close callback fire before the recv stream drops.
557 async fn process_inbound_uni_inner<T>(&self, stream: &mut T) -> Result<UniInnerResult, H3Error>
558 where
559 T: AsyncRead + Unpin + Send,
560 {
561 let mut buf = vec![0; 128];
562 let mut filled = 0;
563
564 // Read stream type varint (decode as raw u64 to handle unknown types)
565 let stream_type = read(&mut buf, &mut filled, stream, |data| {
566 match quic_varint::decode(data) {
567 Ok(ok) => Ok(Some(ok)),
568 Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
569 // this branch is unreachable because u64 is always From<u64>
570 Err(QuicVarIntError::UnknownValue { bytes, value }) => Ok(Some((value, bytes))),
571 }
572 })
573 .await?;
574
575 match UniStreamType::try_from(stream_type) {
576 Ok(UniStreamType::Control) => {
577 log::trace!("H3 inbound uni: control stream");
578 self.run_inbound_control(&mut buf, &mut filled, stream)
579 .await?;
580 Ok(UniInnerResult::Handled)
581 }
582
583 Ok(UniStreamType::QpackEncoder) => {
584 log::trace!("H3 inbound uni: QPACK encoder stream ({filled} bytes pre-read)");
585 let mut reader = Prepended {
586 head: &buf[..filled],
587 tail: stream,
588 };
589
590 log::trace!("QPACK encoder stream: started");
591 self.decoder_dynamic_table.run_reader(&mut reader).await?;
592
593 Ok(UniInnerResult::Handled)
594 }
595
596 Ok(UniStreamType::QpackDecoder) => {
597 log::trace!("H3 inbound uni: QPACK decoder stream ({filled} bytes pre-read)");
598 let mut reader = Prepended {
599 head: &buf[..filled],
600 tail: stream,
601 };
602 self.encoder_dynamic_table.run_reader(&mut reader).await?;
603 Ok(UniInnerResult::Handled)
604 }
605
606 Ok(UniStreamType::WebTransport) => {
607 log::trace!("H3 inbound uni: WebTransport stream");
608 let session_id =
609 read(
610 &mut buf,
611 &mut filled,
612 stream,
613 |data| match quic_varint::decode(data) {
614 Ok(ok) => Ok(Some(ok)),
615 Err(QuicVarIntError::UnexpectedEnd) => Ok(None),
616 Err(QuicVarIntError::UnknownValue { bytes, value }) => {
617 Ok(Some((value, bytes)))
618 }
619 },
620 )
621 .await?;
622
623 buf.truncate(filled);
624
625 Ok(UniInnerResult::WebTransport {
626 session_id,
627 buffer: buf.into(),
628 })
629 }
630
631 Ok(UniStreamType::Push) => {
632 // Trillium does not support HTTP/3 push, so we hand these back as `Unknown`
633 // identically to truly-unknown stream types — the explicit arm exists so
634 // trace output names "push stream" rather than a bare type id.
635 log::trace!("H3 inbound uni: push stream (push not supported)");
636 Ok(UniInnerResult::Unknown { stream_type })
637 }
638
639 Err(_) => {
640 log::trace!("H3 inbound uni: unknown stream type {stream_type:#x}");
641 Ok(UniInnerResult::Unknown { stream_type })
642 }
643 }
644 }
645
646 /// Handle the http/3 peer's inbound control stream.
647 ///
648 /// # Errors
649 ///
650 /// Returns a `H3Error` in case of io error or HTTP/3 semantic error.
651 async fn run_inbound_control<T>(
652 &self,
653 buf: &mut Vec<u8>,
654 filled: &mut usize,
655 stream: &mut T,
656 ) -> Result<(), H3Error>
657 where
658 T: AsyncRead + Unpin + Send,
659 {
660 // SettingsError takes priority: a SETTINGS frame whose payload is itself invalid
661 // (e.g. forbidden HTTP/2 setting IDs) is reported as SETTINGS_ERROR, not the
662 // MISSING_SETTINGS we report for everything else here.
663 let settings = read(buf, filled, stream, |data| match Frame::decode(data) {
664 Ok((Frame::Settings(s), consumed)) => Ok(Some((s, consumed))),
665 Err(FrameDecodeError::Incomplete) => Ok(None),
666 Err(FrameDecodeError::Error(H3ErrorCode::SettingsError)) => {
667 Err(H3ErrorCode::SettingsError)
668 }
669 Ok(_) | Err(FrameDecodeError::Error(_)) => Err(H3ErrorCode::MissingSettings),
670 })
671 .await
672 .map_err(map_critical_stream_eof)?;
673
674 log::trace!("H3 peer settings: {settings:?}");
675
676 self.peer_settings
677 .set(settings)
678 .map_err(|_| H3ErrorCode::FrameUnexpected)?;
679 self.wake_peer_settings_waiters();
680
681 self.encoder_dynamic_table
682 .initialize_from_peer_settings(settings);
683
684 loop {
685 let frame = self
686 .swansong
687 .interrupt(read(buf, filled, stream, |data| {
688 match Frame::decode(data) {
689 Ok((frame, consumed)) => Ok(Some((frame, consumed))),
690 Err(FrameDecodeError::Incomplete) => Ok(None),
691 Err(FrameDecodeError::Error(code)) => Err(code),
692 }
693 }))
694 .await
695 .transpose()
696 .map_err(map_critical_stream_eof)?;
697
698 match frame {
699 None => {
700 log::trace!("H3 control stream: interrupted by shutdown");
701 return Ok(());
702 }
703
704 Some(Frame::Goaway(id)) => {
705 log::trace!("H3 control stream: peer sent GOAWAY(stream_id={id})");
706 self.swansong.shut_down();
707 return Ok(());
708 }
709
710 Some(Frame::Unknown(n)) => {
711 // Consume the payload bytes so the stream stays synchronized.
712 log::trace!("H3 control stream: skipping unknown frame (payload {n} bytes)");
713 let n = usize::try_from(n).unwrap_or(usize::MAX);
714 let in_buf = n.min(*filled);
715 buf.copy_within(in_buf..*filled, 0);
716 *filled -= in_buf;
717 let mut todo = n - in_buf;
718 let mut scratch = [0u8; 256];
719 while todo > 0 {
720 let to_read = todo.min(scratch.len());
721 let n = stream
722 .read(&mut scratch[..to_read])
723 .await
724 .map_err(H3Error::Io)?;
725 if n == 0 {
726 return Err(H3ErrorCode::ClosedCriticalStream.into());
727 }
728 todo -= n;
729 }
730 }
731
732 Some(
733 Frame::Settings(_)
734 | Frame::Data(_)
735 | Frame::Headers(_)
736 | Frame::PushPromise { .. }
737 | Frame::WebTransport(_),
738 ) => {
739 return Err(H3ErrorCode::FrameUnexpected.into());
740 }
741
742 // Trillium doesn't implement push, so these are ignored rather than acted on.
743 Some(Frame::CancelPush(_) | Frame::MaxPushId(_)) => {
744 log::trace!("H3 control stream: ignoring {frame:?}");
745 }
746 }
747 }
748 }
749}
750
751/// Map an `UnexpectedEof` I/O error (the `read` helper's "stream FIN'd" signal) to
752/// `H3_CLOSED_CRITICAL_STREAM`. Closure of the control stream or of either QPACK
753/// side-channel is a connection error. Other I/O errors and any protocol error are passed
754/// through unchanged.
755fn map_critical_stream_eof(error: H3Error) -> H3Error {
756 match error {
757 H3Error::Io(e) if e.kind() == ErrorKind::UnexpectedEof => {
758 H3ErrorCode::ClosedCriticalStream.into()
759 }
760 other => other,
761 }
762}
763
764async fn write(
765 buf: &mut Vec<u8>,
766 mut stream: impl AsyncWrite + Unpin + Send,
767 mut f: impl FnMut(&mut [u8]) -> Option<usize>,
768) -> io::Result<usize> {
769 let written = loop {
770 if let Some(w) = f(buf) {
771 break w;
772 }
773 if buf.len() >= MAX_BUFFER_SIZE {
774 return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation"));
775 }
776 buf.resize(buf.len() * 2, 0);
777 };
778
779 stream.write_all(&buf[..written]).await?;
780 stream.flush().await?;
781 Ok(written)
782}
783
784/// An `AsyncRead` adapter that drains a byte slice before reading from an inner stream.
785///
786/// Used to replay bytes that were read ahead while parsing a stream-type varint, before
787/// dispatching to the inner runner that consumes the rest of the stream.
788struct Prepended<'a, T> {
789 head: &'a [u8],
790 tail: T,
791}
792
793impl<T: AsyncRead + Unpin> AsyncRead for Prepended<'_, T> {
794 fn poll_read(
795 self: Pin<&mut Self>,
796 cx: &mut Context<'_>,
797 out: &mut [u8],
798 ) -> Poll<io::Result<usize>> {
799 let this = self.get_mut();
800 if !this.head.is_empty() {
801 let n = this.head.len().min(out.len());
802 out[..n].copy_from_slice(&this.head[..n]);
803 this.head = &this.head[n..];
804 return Poll::Ready(Ok(n));
805 }
806 Pin::new(&mut this.tail).poll_read(cx, out)
807 }
808}
809
810/// Read from `stream` into `buf` until `f` can decode a value.
811///
812/// `f` receives the filled portion of the buffer and returns:
813/// - `Ok(Some((value, consumed)))` — success; consumed bytes are removed from the front
814/// - `Ok(None)` — need more data; reads more bytes and retries
815/// - `Err(e)` — unrecoverable error; propagated to caller
816async fn read<R>(
817 buf: &mut Vec<u8>,
818 filled: &mut usize,
819 stream: &mut (impl AsyncRead + Unpin + Send),
820 f: impl Fn(&[u8]) -> Result<Option<(R, usize)>, H3ErrorCode>,
821) -> Result<R, H3Error> {
822 loop {
823 if let Some((result, consumed)) = f(&buf[..*filled])? {
824 buf.copy_within(consumed..*filled, 0);
825 *filled -= consumed;
826 return Ok(result);
827 }
828
829 if *filled >= buf.len() {
830 if buf.len() >= MAX_BUFFER_SIZE {
831 return Err(io::Error::new(ErrorKind::OutOfMemory, "runaway allocation").into());
832 }
833 buf.resize(buf.len() * 2, 0);
834 }
835
836 let n = stream.read(&mut buf[*filled..]).await?;
837 if n == 0 {
838 return Err(io::Error::new(ErrorKind::UnexpectedEof, "stream closed").into());
839 }
840 *filled += n;
841 }
842}