Skip to main content

trillium_http/h2/acceptor/
types.rs

1//! Component types used in the definition of `h2::acceptor`
2
3use crate::{
4    Conn, HttpConfig,
5    h2::{
6        H2Driver, H2Error, H2ErrorCode, H2Transport,
7        acceptor::{inflow::Inflow, send::SendCursor},
8        transport::StreamState,
9    },
10};
11use futures_lite::{AsyncRead, AsyncWrite, stream::Stream};
12use std::{
13    io,
14    pin::Pin,
15    sync::Arc,
16    task::{Context, Poll},
17};
18
19/// h2-relevant configuration extracted from [`HttpConfig`][crate::HttpConfig] at acceptor
20/// construction. Carried as a plain value so hot-loop policy checks don't cross the
21/// `Arc<HttpContext>` indirection.
22#[derive(Debug, Clone, Copy, fieldwork::Fieldwork)]
23#[fieldwork(get)]
24pub(in crate::h2) struct AcceptorConfig {
25    initial_stream_window_size: u32,
26    max_stream_recv_window_size: u32,
27    initial_connection_window_size: u32,
28    max_concurrent_streams: u32,
29    max_frame_size: u32,
30    max_header_list_size: u64,
31    copy_loops_per_yield: usize,
32    hpack_table_capacity: usize,
33}
34
35impl AcceptorConfig {
36    pub(super) fn from_http_config(config: &HttpConfig) -> Self {
37        let initial_stream_window_size = config.h2_initial_stream_window_size();
38        let configured_max = config.h2_max_stream_recv_window_size();
39        // The post-read window target must be at least the advertised initial: a smaller `max`
40        // would leave the stream stuck above its own "max" (promotion via `Inflow::raise_target`
41        // can only grow the window, never shrink it). Coerce it up and warn once so the
42        // misconfiguration is visible without spamming the log on every connection.
43        let max_stream_recv_window_size = configured_max.max(initial_stream_window_size);
44        if max_stream_recv_window_size != configured_max {
45            warn_misordered_stream_window_config(initial_stream_window_size, configured_max);
46        }
47        Self {
48            initial_stream_window_size,
49            max_stream_recv_window_size,
50            initial_connection_window_size: config.h2_initial_connection_window_size(),
51            max_concurrent_streams: config.h2_max_concurrent_streams(),
52            max_frame_size: config.h2_max_frame_size(),
53            max_header_list_size: config.max_header_list_size(),
54            copy_loops_per_yield: config.copy_loops_per_yield(),
55            hpack_table_capacity: config.dynamic_table_capacity(),
56        }
57    }
58}
59
60/// Warn (at most once per process) that `h2_max_stream_recv_window_size` was configured below
61/// `h2_initial_stream_window_size` and has been clamped up to it. Best-effort once — a second
62/// differently-misconfigured `HttpConfig` won't re-warn — which is fine for a setup-time hint.
63fn warn_misordered_stream_window_config(initial: u32, configured_max: u32) {
64    use std::sync::atomic::{AtomicBool, Ordering};
65    static WARNED: AtomicBool = AtomicBool::new(false);
66    if !WARNED.swap(true, Ordering::Relaxed) {
67        log::warn!(
68            "h2_max_stream_recv_window_size ({configured_max}) is below \
69             h2_initial_stream_window_size ({initial}); clamping the per-stream recv window up to \
70             the initial. Set max >= initial to silence this."
71        );
72    }
73}
74
75/// Position of the connection in its high-level lifecycle.
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
77pub(super) enum DriverState {
78    /// Haven't read the client preface yet.
79    AwaitingPreface,
80    /// Preface read; need to queue our initial SETTINGS frame to `write_buf`.
81    NeedsServerSettings,
82    /// Steady state — read frames from the transport and dispatch.
83    Running,
84    /// GOAWAY has been queued; drain `write_buf` then transition to [`Drained`] (for
85    /// graceful shutdown) or terminate directly (for I/O error paths where the transport
86    /// is already untrustworthy).
87    ///
88    /// [`Drained`]: Self::Drained
89    Closing,
90    /// Our outbound bytes are on the wire (including our GOAWAY). Now we're waiting for
91    /// the peer to close its write half (recv returns 0) so our Drop doesn't look like a
92    /// reset to the client — the sequence the h2 spec and most clients assume. Any
93    /// inbound bytes the peer happens to send during this window are discarded; we've
94    /// already committed to closing.
95    Drained,
96}
97
98/// Where the read cursor is inside the current frame.
99#[derive(Debug, Clone, Copy)]
100pub(super) enum ReadPhase {
101    /// Not yet read the 9 bytes of the next frame header.
102    NeedHeader,
103    /// Header read and validated; still collecting payload bytes. `total` is the full target
104    /// fill (`FRAME_HEADER_LEN + payload_len`). The decoded header itself is cheap enough to
105    /// re-parse from the buffer when we dispatch, so we don't stash it here.
106    NeedPayload { total: usize },
107}
108
109/// Why the driver is closing — shaped around what the terminal `drive` result should be.
110#[derive(Debug)]
111pub(super) enum CloseOutcome {
112    /// Clean close. `drive` returns `None`.
113    Graceful,
114    /// Protocol error. `drive` returns `Some(Err(...))`. GOAWAY with this code has been
115    /// queued.
116    Protocol(H2ErrorCode),
117    /// I/O error. GOAWAY was NOT queued (transport is untrustworthy). Propagated verbatim.
118    Io(io::Error),
119}
120
121/// Driver-side view of a single open stream: the shared state the handler also sees, plus a
122/// cache of decisions the driver has made for this stream (which the handler doesn't need
123/// to know).
124#[derive(Debug)]
125pub(super) struct StreamEntry {
126    /// Shared state (recv buffer, send slot, handler wakers). Owned by `Arc` so the
127    /// handler task can outlive or operate concurrently with the driver's view.
128    pub(super) shared: Arc<StreamState>,
129
130    /// Driver-private send-side state for an in-progress response. `None` until the conn
131    /// task submits a response via [`H2Connection::submit_send`] and the driver picks it
132    /// up on its next `service_handler_signals` tick.
133    ///
134    /// [`H2Connection::submit_send`]: super::H2Connection::submit_send
135    pub(super) send: Option<SendCursor>,
136
137    /// Per-stream send flow-control window. Seeded from
138    /// `peer_settings.effective_initial_window_size()` when the stream is opened;
139    /// decremented as we emit DATA frames; incremented by peer
140    /// `WINDOW_UPDATE(stream_id, inc)`; adjusted by `SETTINGS_INITIAL_WINDOW_SIZE` delta on
141    /// mid-connection SETTINGS change (may drive negative). Overflow past
142    /// [`MAX_FLOW_CONTROL_WINDOW`] is a stream-level `FLOW_CONTROL_ERROR` (→ `RST_STREAM`).
143    pub(super) send_window: i64,
144
145    /// Per-stream receive flow-control window (RFC 9113 §6.9). Seeded at the advertised
146    /// `SETTINGS_INITIAL_WINDOW_SIZE`; promoted to `h2_max_stream_recv_window_size` once the
147    /// handler signals it intends to read the body (`is_reading`), then topped up as the handler
148    /// drains. See [`Inflow`] for the accounting model. A peer that sends past the granted window
149    /// earns a connection-level `FLOW_CONTROL_ERROR`.
150    pub(super) stream_inflow: Inflow,
151
152    /// Declared request-body length from the `content-length` request header, if present and
153    /// parseable. The driver tallies inbound DATA octets against this to enforce RFC 9113
154    /// §8.1.2.6: a body whose length disagrees with `content-length` is a stream-level
155    /// `PROTOCOL_ERROR`. `None` means no declared length, so no check applies.
156    pub(super) expected_content_length: Option<u64>,
157
158    /// Running total of inbound DATA payload octets (body bytes only — pad-length byte and
159    /// padding excluded) received on this stream, compared against `expected_content_length`.
160    pub(super) received_body_len: u64,
161}
162
163impl StreamEntry {
164    pub(super) fn new(
165        shared: Arc<StreamState>,
166        send_window: i64,
167        stream_inflow: Inflow,
168        expected_content_length: Option<u64>,
169    ) -> Self {
170        Self {
171            shared,
172            send: None,
173            send_window,
174            stream_inflow,
175            expected_content_length,
176            received_body_len: 0,
177        }
178    }
179}
180
181/// Result of dispatching one decoded frame.
182pub(super) enum Action {
183    /// Frame handled; continue the main loop.
184    Continue,
185    /// A stream just opened and the request validated — return the [`Conn`] to the caller;
186    /// the runtime adapter spawns a handler task per emitted Conn. Boxed to keep the enum
187    /// small — `Conn` is over 500 bytes and most dispatches return `Continue`.
188    Emit(Box<Conn<H2Transport>>),
189    /// Begin graceful or erroring close with this outcome.
190    Close(CloseOutcome),
191}
192
193/// Future returned by [`H2Driver::next`]. Resolves to `None` on graceful close, `Some(Ok)`
194/// when a new request stream opens, or `Some(Err)` on a fatal protocol or I/O error.
195#[must_use = "futures do nothing unless awaited"]
196#[derive(Debug)]
197pub struct Next<'a, T> {
198    pub(super) driver: &'a mut H2Driver<T>,
199}
200
201impl<T> Future for Next<'_, T>
202where
203    T: AsyncRead + AsyncWrite + Unpin + Send,
204{
205    type Output = Option<Result<Conn<H2Transport>, H2Error>>;
206
207    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
208        self.driver.drive(cx)
209    }
210}
211
212impl<T> Stream for H2Driver<T>
213where
214    T: AsyncRead + AsyncWrite + Unpin + Send,
215{
216    type Item = Result<Conn<H2Transport>, H2Error>;
217
218    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
219        self.get_mut().drive(cx)
220    }
221}
222
223/// Slice the interesting bytes out of a just-read frame. Bounds-checks to defend against a
224/// payload length on the wire that disagrees with a body-bearing frame's declared inner
225/// length.
226pub(super) fn frame_slice(
227    buf: &[u8],
228    start: usize,
229    length: u32,
230    total: usize,
231) -> Result<&[u8], CloseOutcome> {
232    let length =
233        usize::try_from(length).map_err(|_| CloseOutcome::Protocol(H2ErrorCode::FrameSizeError))?;
234    let end = start
235        .checked_add(length)
236        .ok_or(CloseOutcome::Protocol(H2ErrorCode::FrameSizeError))?;
237    if end > total {
238        return Err(CloseOutcome::Protocol(H2ErrorCode::FrameSizeError));
239    }
240    Ok(&buf[start..end])
241}