trillium_http/h2/acceptor/types.rs
1//! Component types used in the definition of `h2::acceptor`
2
3use crate::{
4 Conn, HttpConfig,
5 h2::{
6 H2Driver, H2Error, H2ErrorCode, H2Transport, acceptor::send::SendCursor,
7 transport::StreamState,
8 },
9};
10use futures_lite::{AsyncRead, AsyncWrite, stream::Stream};
11use std::{
12 io,
13 pin::Pin,
14 sync::Arc,
15 task::{Context, Poll},
16};
17
18/// h2-relevant configuration extracted from [`HttpConfig`][crate::HttpConfig] at acceptor
19/// construction. Carried as a plain value so hot-loop policy checks don't cross the
20/// `Arc<HttpContext>` indirection.
21#[derive(Debug, Clone, Copy, fieldwork::Fieldwork)]
22#[fieldwork(get)]
23pub(in crate::h2) struct AcceptorConfig {
24 initial_stream_window_size: u32,
25 max_stream_recv_window_size: u32,
26 initial_connection_window_size: u32,
27 max_concurrent_streams: u32,
28 max_frame_size: u32,
29 copy_loops_per_yield: usize,
30 hpack_table_capacity: usize,
31}
32
33impl AcceptorConfig {
34 pub(super) fn from_http_config(config: &HttpConfig) -> Self {
35 Self {
36 initial_stream_window_size: config.h2_initial_stream_window_size(),
37 max_stream_recv_window_size: config.h2_max_stream_recv_window_size(),
38 initial_connection_window_size: config.h2_initial_connection_window_size(),
39 max_concurrent_streams: config.h2_max_concurrent_streams(),
40 max_frame_size: config.h2_max_frame_size(),
41 copy_loops_per_yield: config.copy_loops_per_yield(),
42 hpack_table_capacity: config.dynamic_table_capacity(),
43 }
44 }
45}
46
47/// Position of the connection in its high-level lifecycle.
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub(super) enum DriverState {
50 /// Haven't read the client preface yet.
51 AwaitingPreface,
52 /// Preface read; need to queue our initial SETTINGS frame to `write_buf`.
53 NeedsServerSettings,
54 /// Steady state — read frames from the transport and dispatch.
55 Running,
56 /// GOAWAY has been queued; drain `write_buf` then transition to [`Drained`] (for
57 /// graceful shutdown) or terminate directly (for I/O error paths where the transport
58 /// is already untrustworthy).
59 ///
60 /// [`Drained`]: Self::Drained
61 Closing,
62 /// Our outbound bytes are on the wire (including our GOAWAY). Now we're waiting for
63 /// the peer to close its write half (recv returns 0) so our Drop doesn't look like a
64 /// reset to the client — the sequence the h2 spec and most clients (hyper-h2 in
65 /// particular) assume. Any inbound bytes the peer happens to send during this window
66 /// are discarded; we've already committed to closing.
67 Drained,
68}
69
70/// Where the read cursor is inside the current frame.
71#[derive(Debug, Clone, Copy)]
72pub(super) enum ReadPhase {
73 /// Not yet read the 9 bytes of the next frame header.
74 NeedHeader,
75 /// Header read and validated; still collecting payload bytes. `total` is the full target
76 /// fill (`FRAME_HEADER_LEN + payload_len`). The decoded header itself is cheap enough to
77 /// re-parse from the buffer when we dispatch, so we don't stash it here.
78 NeedPayload { total: usize },
79}
80
81/// Why the driver is closing — shaped around what the terminal `drive` result should be.
82#[derive(Debug)]
83pub(super) enum CloseOutcome {
84 /// Clean close. `drive` returns `None`.
85 Graceful,
86 /// Protocol error. `drive` returns `Some(Err(...))`. GOAWAY with this code has been
87 /// queued.
88 Protocol(H2ErrorCode),
89 /// I/O error. GOAWAY was NOT queued (transport is untrustworthy). Propagated verbatim.
90 Io(io::Error),
91}
92
93/// Driver-side view of a single open stream: the shared state the handler also sees, plus a
94/// cache of decisions the driver has made for this stream (which the handler doesn't need
95/// to know).
96#[derive(Debug)]
97pub(super) struct StreamEntry {
98 /// Shared state (recv buffer, send slot, handler wakers). Owned by `Arc` so the
99 /// handler task can outlive or operate concurrently with the driver's view.
100 pub(super) shared: Arc<StreamState>,
101
102 /// Driver-private send-side state for an in-progress response. `None` until the conn
103 /// task submits a response via [`H2Connection::submit_send`] and the driver picks it
104 /// up on its next `service_handler_signals` tick.
105 ///
106 /// [`H2Connection::submit_send`]: super::H2Connection::submit_send
107 pub(super) send: Option<SendCursor>,
108
109 /// Per-stream send flow-control window (RFC 9113 §6.9). Seeded from
110 /// `peer_settings.effective_initial_window_size()` when the stream is opened;
111 /// decremented as we emit DATA frames; incremented by peer
112 /// `WINDOW_UPDATE(stream_id, inc)`; adjusted by `SETTINGS_INITIAL_WINDOW_SIZE` delta on
113 /// mid-connection SETTINGS change (§6.9.2 — may drive negative). Overflow past
114 /// [`MAX_FLOW_CONTROL_WINDOW`] is a stream-level `FLOW_CONTROL_ERROR` (→ `RST_STREAM`).
115 pub(super) send_window: i64,
116
117 /// Per-stream recv flow-control window (RFC 9113 §6.9) — how many bytes we've told
118 /// the peer it may still send on this stream. Starts at the server's advertised
119 /// `SETTINGS_INITIAL_WINDOW_SIZE` (currently 0 — lazy-WU pattern); decremented as the
120 /// peer's DATA frames arrive; incremented as we emit stream-level `WINDOW_UPDATE`
121 /// (both the initial raise on the handler's `is_reading` signal and every subsequent
122 /// refill crediting bytes the handler has consumed). A negative value means the peer
123 /// overran the window — connection-level `FLOW_CONTROL_ERROR`.
124 pub(super) peer_recv_window: i64,
125}
126
127impl StreamEntry {
128 pub(super) fn new(shared: Arc<StreamState>, send_window: i64, peer_recv_window: i64) -> Self {
129 Self {
130 shared,
131 send: None,
132 send_window,
133 peer_recv_window,
134 }
135 }
136}
137
138/// Result of dispatching one decoded frame.
139pub(super) enum Action {
140 /// Frame handled; continue the main loop.
141 Continue,
142 /// A stream just opened and the request validated — return the [`Conn`] to the caller;
143 /// the runtime adapter spawns a handler task per emitted Conn. Boxed to keep the enum
144 /// small — `Conn` is over 500 bytes and most dispatches return `Continue`.
145 Emit(Box<Conn<H2Transport>>),
146 /// Begin graceful or erroring close with this outcome.
147 Close(CloseOutcome),
148}
149
150/// Future returned by [`H2Driver::next`]. Resolves to `None` on graceful close, `Some(Ok)`
151/// when a new request stream opens, or `Some(Err)` on a fatal protocol or I/O error.
152#[must_use = "futures do nothing unless awaited"]
153#[derive(Debug)]
154pub struct Next<'a, T> {
155 pub(super) driver: &'a mut H2Driver<T>,
156}
157
158impl<T> Future for Next<'_, T>
159where
160 T: AsyncRead + AsyncWrite + Unpin + Send,
161{
162 type Output = Option<Result<Conn<H2Transport>, H2Error>>;
163
164 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
165 self.driver.drive(cx)
166 }
167}
168
169impl<T> Stream for H2Driver<T>
170where
171 T: AsyncRead + AsyncWrite + Unpin + Send,
172{
173 type Item = Result<Conn<H2Transport>, H2Error>;
174
175 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
176 self.get_mut().drive(cx)
177 }
178}
179
180/// Slice the interesting bytes out of a just-read frame. Bounds-checks to defend against a
181/// payload length on the wire that disagrees with a body-bearing frame's declared inner
182/// length.
183pub(super) fn frame_slice(
184 buf: &[u8],
185 start: usize,
186 length: u32,
187 total: usize,
188) -> Result<&[u8], CloseOutcome> {
189 let length =
190 usize::try_from(length).map_err(|_| CloseOutcome::Protocol(H2ErrorCode::FrameSizeError))?;
191 let end = start
192 .checked_add(length)
193 .ok_or(CloseOutcome::Protocol(H2ErrorCode::FrameSizeError))?;
194 if end > total {
195 return Err(CloseOutcome::Protocol(H2ErrorCode::FrameSizeError));
196 }
197 Ok(&buf[start..end])
198}