Skip to main content

trillium_http/h2/acceptor/
types.rs

1//! Component types used in the definition of `h2::acceptor`
2
3use crate::{
4    Conn, HttpConfig,
5    h2::{
6        H2Driver, H2Error, H2ErrorCode, H2Transport, acceptor::send::SendCursor,
7        transport::StreamState,
8    },
9};
10use futures_lite::{AsyncRead, AsyncWrite, stream::Stream};
11use std::{
12    io,
13    pin::Pin,
14    sync::Arc,
15    task::{Context, Poll},
16};
17
18/// h2-relevant configuration extracted from [`HttpConfig`][crate::HttpConfig] at acceptor
19/// construction. Carried as a plain value so hot-loop policy checks don't cross the
20/// `Arc<HttpContext>` indirection.
21#[derive(Debug, Clone, Copy, fieldwork::Fieldwork)]
22#[fieldwork(get)]
23pub(in crate::h2) struct AcceptorConfig {
24    initial_stream_window_size: u32,
25    max_stream_recv_window_size: u32,
26    initial_connection_window_size: u32,
27    max_concurrent_streams: u32,
28    max_frame_size: u32,
29    max_header_list_size: u64,
30    copy_loops_per_yield: usize,
31    hpack_table_capacity: usize,
32}
33
34impl AcceptorConfig {
35    pub(super) fn from_http_config(config: &HttpConfig) -> Self {
36        Self {
37            initial_stream_window_size: config.h2_initial_stream_window_size(),
38            max_stream_recv_window_size: config.h2_max_stream_recv_window_size(),
39            initial_connection_window_size: config.h2_initial_connection_window_size(),
40            max_concurrent_streams: config.h2_max_concurrent_streams(),
41            max_frame_size: config.h2_max_frame_size(),
42            max_header_list_size: config.max_header_list_size(),
43            copy_loops_per_yield: config.copy_loops_per_yield(),
44            hpack_table_capacity: config.dynamic_table_capacity(),
45        }
46    }
47}
48
49/// Position of the connection in its high-level lifecycle.
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub(super) enum DriverState {
52    /// Haven't read the client preface yet.
53    AwaitingPreface,
54    /// Preface read; need to queue our initial SETTINGS frame to `write_buf`.
55    NeedsServerSettings,
56    /// Steady state — read frames from the transport and dispatch.
57    Running,
58    /// GOAWAY has been queued; drain `write_buf` then transition to [`Drained`] (for
59    /// graceful shutdown) or terminate directly (for I/O error paths where the transport
60    /// is already untrustworthy).
61    ///
62    /// [`Drained`]: Self::Drained
63    Closing,
64    /// Our outbound bytes are on the wire (including our GOAWAY). Now we're waiting for
65    /// the peer to close its write half (recv returns 0) so our Drop doesn't look like a
66    /// reset to the client — the sequence the h2 spec and most clients assume. Any
67    /// inbound bytes the peer happens to send during this window are discarded; we've
68    /// already committed to closing.
69    Drained,
70}
71
72/// Where the read cursor is inside the current frame.
73#[derive(Debug, Clone, Copy)]
74pub(super) enum ReadPhase {
75    /// Not yet read the 9 bytes of the next frame header.
76    NeedHeader,
77    /// Header read and validated; still collecting payload bytes. `total` is the full target
78    /// fill (`FRAME_HEADER_LEN + payload_len`). The decoded header itself is cheap enough to
79    /// re-parse from the buffer when we dispatch, so we don't stash it here.
80    NeedPayload { total: usize },
81}
82
83/// Why the driver is closing — shaped around what the terminal `drive` result should be.
84#[derive(Debug)]
85pub(super) enum CloseOutcome {
86    /// Clean close. `drive` returns `None`.
87    Graceful,
88    /// Protocol error. `drive` returns `Some(Err(...))`. GOAWAY with this code has been
89    /// queued.
90    Protocol(H2ErrorCode),
91    /// I/O error. GOAWAY was NOT queued (transport is untrustworthy). Propagated verbatim.
92    Io(io::Error),
93}
94
95/// Driver-side view of a single open stream: the shared state the handler also sees, plus a
96/// cache of decisions the driver has made for this stream (which the handler doesn't need
97/// to know).
98#[derive(Debug)]
99pub(super) struct StreamEntry {
100    /// Shared state (recv buffer, send slot, handler wakers). Owned by `Arc` so the
101    /// handler task can outlive or operate concurrently with the driver's view.
102    pub(super) shared: Arc<StreamState>,
103
104    /// Driver-private send-side state for an in-progress response. `None` until the conn
105    /// task submits a response via [`H2Connection::submit_send`] and the driver picks it
106    /// up on its next `service_handler_signals` tick.
107    ///
108    /// [`H2Connection::submit_send`]: super::H2Connection::submit_send
109    pub(super) send: Option<SendCursor>,
110
111    /// Per-stream send flow-control window. Seeded from
112    /// `peer_settings.effective_initial_window_size()` when the stream is opened;
113    /// decremented as we emit DATA frames; incremented by peer
114    /// `WINDOW_UPDATE(stream_id, inc)`; adjusted by `SETTINGS_INITIAL_WINDOW_SIZE` delta on
115    /// mid-connection SETTINGS change (may drive negative). Overflow past
116    /// [`MAX_FLOW_CONTROL_WINDOW`] is a stream-level `FLOW_CONTROL_ERROR` (→ `RST_STREAM`).
117    pub(super) send_window: i64,
118
119    /// Per-stream recv flow-control window — how many bytes we've told the peer it may
120    /// still send on this stream. Starts at the server's advertised
121    /// `SETTINGS_INITIAL_WINDOW_SIZE` (0 in the lazy-WU pattern we use); decremented as
122    /// the peer's DATA frames arrive; incremented as we emit stream-level `WINDOW_UPDATE`
123    /// (both the initial raise on the handler's `is_reading` signal and every subsequent
124    /// refill crediting bytes the handler has consumed). A negative value means the peer
125    /// overran the window — connection-level `FLOW_CONTROL_ERROR`.
126    pub(super) peer_recv_window: i64,
127
128    /// Declared request-body length from the `content-length` request header, if present and
129    /// parseable. The driver tallies inbound DATA octets against this to enforce RFC 9113
130    /// §8.1.2.6: a body whose length disagrees with `content-length` is a stream-level
131    /// `PROTOCOL_ERROR`. `None` means no declared length, so no check applies.
132    pub(super) expected_content_length: Option<u64>,
133
134    /// Running total of inbound DATA payload octets (body bytes only — pad-length byte and
135    /// padding excluded) received on this stream, compared against `expected_content_length`.
136    pub(super) received_body_len: u64,
137}
138
139impl StreamEntry {
140    pub(super) fn new(
141        shared: Arc<StreamState>,
142        send_window: i64,
143        peer_recv_window: i64,
144        expected_content_length: Option<u64>,
145    ) -> Self {
146        Self {
147            shared,
148            send: None,
149            send_window,
150            peer_recv_window,
151            expected_content_length,
152            received_body_len: 0,
153        }
154    }
155}
156
157/// Result of dispatching one decoded frame.
158pub(super) enum Action {
159    /// Frame handled; continue the main loop.
160    Continue,
161    /// A stream just opened and the request validated — return the [`Conn`] to the caller;
162    /// the runtime adapter spawns a handler task per emitted Conn. Boxed to keep the enum
163    /// small — `Conn` is over 500 bytes and most dispatches return `Continue`.
164    Emit(Box<Conn<H2Transport>>),
165    /// Begin graceful or erroring close with this outcome.
166    Close(CloseOutcome),
167}
168
169/// Future returned by [`H2Driver::next`]. Resolves to `None` on graceful close, `Some(Ok)`
170/// when a new request stream opens, or `Some(Err)` on a fatal protocol or I/O error.
171#[must_use = "futures do nothing unless awaited"]
172#[derive(Debug)]
173pub struct Next<'a, T> {
174    pub(super) driver: &'a mut H2Driver<T>,
175}
176
177impl<T> Future for Next<'_, T>
178where
179    T: AsyncRead + AsyncWrite + Unpin + Send,
180{
181    type Output = Option<Result<Conn<H2Transport>, H2Error>>;
182
183    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
184        self.driver.drive(cx)
185    }
186}
187
188impl<T> Stream for H2Driver<T>
189where
190    T: AsyncRead + AsyncWrite + Unpin + Send,
191{
192    type Item = Result<Conn<H2Transport>, H2Error>;
193
194    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
195        self.get_mut().drive(cx)
196    }
197}
198
199/// Slice the interesting bytes out of a just-read frame. Bounds-checks to defend against a
200/// payload length on the wire that disagrees with a body-bearing frame's declared inner
201/// length.
202pub(super) fn frame_slice(
203    buf: &[u8],
204    start: usize,
205    length: u32,
206    total: usize,
207) -> Result<&[u8], CloseOutcome> {
208    let length =
209        usize::try_from(length).map_err(|_| CloseOutcome::Protocol(H2ErrorCode::FrameSizeError))?;
210    let end = start
211        .checked_add(length)
212        .ok_or(CloseOutcome::Protocol(H2ErrorCode::FrameSizeError))?;
213    if end > total {
214        return Err(CloseOutcome::Protocol(H2ErrorCode::FrameSizeError));
215    }
216    Ok(&buf[start..end])
217}