trillium_http/h2/connection.rs
1//! Shared per-connection HTTP/2 state ([`H2Connection`]).
2//!
3//! [`H2Connection`] is `Arc`-shared between the driver task ([`H2Driver`]) and every conn
4//! task that holds an open stream's [`Conn`]. It owns the per-stream `StreamState` map,
5//! the cross-task wake primitive ([`AtomicWaker`]), and the [`HttpContext`] / [`Swansong`]
6//! the broader server stack reaches in through.
7//!
8//! The driver loop itself lives in [`super::acceptor`] — see that module for the
9//! per-connection state machine and how send / receive concerns are split.
10//!
11//! # Module layout
12//!
13//! Conn-task-side primitives are split across child modules so each subsystem reads
14//! independently:
15//!
16//! - [`ping`]: `PING` / `PING ACK` round-trip tracking and the [`SendPing`] future.
17//! - [`peer_settings_wait`]: the [`PeerSettings`] sync primitive that parks until the peer's first
18//! SETTINGS frame is applied.
19//! - [`submit`]: send-staging API ([`submit_send`][H2Connection::submit_send],
20//! [`submit_upgrade`][H2Connection::submit_upgrade]) and client-side stream-open primitives
21//! ([`open_stream`][H2Connection::open_stream] /
22//! [`open_connect_stream`][H2Connection::open_connect_stream]) + the [`SubmitSend`] future.
23//! - [`response`]: client-role recv-side primitives — [`ResponseHeaders`] and
24//! [`take_trailers`][H2Connection::take_trailers].
25//!
26//! [`H2Driver`]: super::H2Driver
27
28mod peer_settings_wait;
29mod ping;
30mod response;
31mod submit;
32
33#[cfg(feature = "unstable")]
34use super::H2Initiator;
35use super::{H2Driver, H2Settings, role::Role, transport::StreamState};
36use crate::{Conn, HttpContext};
37use atomic_waker::AtomicWaker;
38use event_listener::Event;
39use futures_lite::io::{AsyncRead, AsyncWrite};
40use ping::PendingPing;
41#[cfg(feature = "unstable")]
42use std::sync::atomic::Ordering;
43use std::{
44 collections::{HashMap, VecDeque},
45 future::Future,
46 io,
47 sync::{Arc, Mutex, MutexGuard, atomic::AtomicBool},
48};
49use swansong::{ShutdownCompletion, Swansong};
50#[cfg(feature = "unstable")]
51#[allow(unused_imports)]
52// re-exports for h2.rs's `pub use connection::{ResponseHeaders, SubmitSend}`
53pub use {response::ResponseHeaders, submit::SubmitSend};
54
55/// Shared per-connection state for HTTP/2.
56///
57/// Wrapped in an [`Arc`] and held by both the [`H2Driver`] driver and every conn task
58/// that holds an open stream's [`Conn`]. Per-stream `StreamState`, HPACK encoder state, and
59/// connection-level send flow control lives here.
60#[derive(Debug)]
61pub struct H2Connection {
62 pub(super) context: Arc<HttpContext>,
63 pub(super) swansong: Swansong,
64 /// Driver-side waker that conn tasks fire whenever they produce work the driver should
65 /// act on — the is-reading signal on first `H2Transport::poll_read`, and the
66 /// `submit_send` arrival. Single-consumer (the driver); N producers (conn tasks). The
67 /// driver registers its current `drive` waker here each iteration it parks.
68 pub(super) outbound_waker: AtomicWaker,
69 /// Per-stream shared state, keyed by stream id. The driver inserts on stream open and
70 /// removes on close. Conn-task code looks up via private accessors on `H2Connection`
71 /// rather than touching the map directly — `StreamState` stays module-private.
72 pub(super) streams: Mutex<HashMap<u32, Arc<StreamState>>>,
73 /// The peer's most recently announced SETTINGS values. The driver writes on every
74 /// inbound SETTINGS frame and is the only reader, so a plain `Mutex` suffices.
75 /// `H2Settings` is `Copy`, so readers take the guard, copy out, and release.
76 ///
77 /// Default-constructed (all fields `None`) means "peer has not yet sent SETTINGS";
78 /// readers should use [`H2Settings::effective_*`][H2Settings::effective_max_frame_size]
79 /// helpers that apply the RFC defaults to absent fields.
80 pub(super) peer_settings: Mutex<H2Settings>,
81 /// Latch flipped to `true` the first (and every subsequent) time the driver applies
82 /// a peer SETTINGS frame. Distinct from `peer_settings` because an absent field is
83 /// ambiguous between "peer hasn't sent SETTINGS yet" and "peer sent SETTINGS without
84 /// that field" — the latch disambiguates, gating operations that require having seen
85 /// the peer's first SETTINGS (e.g. extended CONNECT).
86 pub(super) peer_settings_received: AtomicBool,
87 /// Multi-listener wake source for [`PeerSettings`]. The driver fires `notify(usize::MAX)`
88 /// after applying peer SETTINGS and again on connection close, so any number of
89 /// concurrently-parked `PeerSettings` futures all unblock together. [`Event`] (rather
90 /// than a single [`AtomicWaker`]) is required because multiple application tasks can
91 /// park on `peer_settings` concurrently — e.g. a fan-out of WebSocket-over-h2 upgrades
92 /// on one pooled connection — and `AtomicWaker`'s last-writer-wins semantics would
93 /// strand all but one.
94 pub(super) peer_settings_event: Event,
95 /// Next stream id to allocate for client-role outbound streams. Starts at 1 and
96 /// `+= 2` per allocation. Capped at `2^31` — once exhausted, `fetch_update`'s closure
97 /// refuses to advance, and `open_stream` returns `None` (the caller is expected to
98 /// fail over to a fresh connection).
99 ///
100 /// Allocation happens **while holding `streams_lock`** (see `open_stream`), so id order
101 /// matches shared-map insertion order — the invariant the BTreeMap-ordered send pump
102 /// relies on to frame opening HEADERS in monotonic order (RFC 9113 §5.1.1). The lock,
103 /// not the atomic, is what orders allocations; the `AtomicU32` is just the interior
104 /// mutability this `Arc`-shared field needs, so `Relaxed` suffices. The one out-of-band
105 /// reader (`can_open_stream`'s exhaustion check) takes `streams_lock` immediately after,
106 /// so its `load` is an advisory fast-path, not a separately-synchronized access.
107 #[cfg(feature = "unstable")]
108 pub(super) next_client_stream_id: std::sync::atomic::AtomicU32,
109 /// Outstanding active PINGs awaiting ACKs, keyed by opaque payload. Completed by the
110 /// driver when a `PING { ack: true }` arrives whose payload matches an entry. Drained
111 /// on connection close so awaiting `send_ping` futures don't leak.
112 pub(super) pending_pings: Mutex<HashMap<[u8; 8], PendingPing>>,
113 /// Opaque payloads queued for outbound `PING { ack: false }` emission. Decoupled from
114 /// `pending_pings` so registration and queuing can happen without holding two locks.
115 pub(super) pending_ping_outbound: Mutex<VecDeque<[u8; 8]>>,
116}
117
118impl H2Connection {
119 /// Construct a new `H2Connection` to manage HTTP/2 for a single peer.
120 pub fn new(context: Arc<HttpContext>) -> Arc<Self> {
121 let swansong = context.swansong().child();
122 Arc::new(Self {
123 context,
124 swansong,
125 outbound_waker: AtomicWaker::new(),
126 streams: Mutex::new(HashMap::new()),
127 peer_settings: Mutex::new(H2Settings::default()),
128 peer_settings_received: AtomicBool::new(false),
129 peer_settings_event: Event::new(),
130 #[cfg(feature = "unstable")]
131 next_client_stream_id: std::sync::atomic::AtomicU32::new(1),
132 pending_pings: Mutex::new(HashMap::new()),
133 pending_ping_outbound: Mutex::new(VecDeque::new()),
134 })
135 }
136
137 /// The [`HttpContext`] this connection was constructed with.
138 pub fn context(&self) -> Arc<HttpContext> {
139 self.context.clone()
140 }
141
142 /// The connection-scoped [`Swansong`]. Shuts down on peer GOAWAY or when the server-
143 /// level swansong shuts down.
144 pub fn swansong(&self) -> &Swansong {
145 &self.swansong
146 }
147
148 /// Attempt graceful shutdown of this HTTP/2 connection.
149 pub fn shut_down(&self) -> ShutdownCompletion {
150 self.swansong.shut_down()
151 }
152
153 /// Whether a fresh stream could be opened on this connection right now.
154 ///
155 /// `true` requires: the connection is running (no GOAWAY received, swansong not asked
156 /// to shut down), inflight streams are below the peer's advertised
157 /// `MAX_CONCURRENT_STREAMS`, and the client stream-id space is not exhausted (capped
158 /// at `2^31 - 1`).
159 ///
160 /// `false` doesn't mean the connection is dead — it might just be saturated and free
161 /// up momentarily. Callers should keep saturated connections in their pool rather than
162 /// evicting; pair this with a separate aliveness check to decide eviction.
163 ///
164 /// Stream-id exhaustion is the one "false" case that *is* permanent: the connection
165 /// will never accept another `open_stream` call, though in-flight streams will still
166 /// complete.
167 ///
168 /// # Panics
169 ///
170 /// Panics if any per-connection mutex is poisoned.
171 #[cfg(feature = "unstable")]
172 pub fn can_open_stream(&self) -> bool {
173 if !self.swansong.state().is_running() {
174 return false;
175 }
176 // Stream-id exhaustion check guards against an exhausted connection passing the
177 // inflight-vs-MAX_CONCURRENT_STREAMS check (no streams in flight → counts as 0)
178 // and the pool selecting it as Available, only for `open_stream` to fail at the
179 // call site with a misleading "shutting down" error.
180 if self.next_client_stream_id.load(Ordering::Relaxed) >= (1u32 << 31) {
181 return false;
182 }
183 // Count wire-active streams only — entries the application is still holding after
184 // a clean wire-close stay in the map but don't count against the peer's
185 // MAX_CONCURRENT_STREAMS.
186 let inflight: u32 = self
187 .streams_lock()
188 .values()
189 .filter(|s| !s.lifecycle_lock().is_closed())
190 .count()
191 .try_into()
192 .unwrap_or(u32::MAX);
193 let cap = self
194 .current_peer_settings()
195 .effective_max_concurrent_streams();
196 inflight < cap
197 }
198
199 /// Driver-side wake primitive. Fire after producing work the driver should service.
200 pub(super) fn outbound_waker(&self) -> &AtomicWaker {
201 &self.outbound_waker
202 }
203
204 /// Lock the per-stream `StreamState` map.
205 pub(super) fn streams_lock(&self) -> MutexGuard<'_, HashMap<u32, Arc<StreamState>>> {
206 self.streams
207 .lock()
208 .expect("connection streams mutex poisoned")
209 }
210
211 /// Lock the peer's SETTINGS. Cheap; held only as long as the returned guard lives.
212 /// Use the `effective_*` helpers on [`H2Settings`] to get a value with RFC defaults
213 /// applied for fields the peer hasn't set; typical callers copy out via `*guard` and
214 /// release immediately.
215 pub(super) fn current_peer_settings(&self) -> MutexGuard<'_, H2Settings> {
216 self.peer_settings
217 .lock()
218 .expect("peer_settings mutex poisoned")
219 }
220
221 /// Request that the driver emit `RST_STREAM` on this stream with the given error code and clean
222 /// up. Clears any queued outbound parts and enqueues an [`OutboundPart::Reset`][reset] —
223 /// nothing else is valid to send after a reset — then wakes the driver, which frames the
224 /// `RST_STREAM` and tears the stream down.
225 ///
226 /// First-wins idempotent: a stream already reset-requested keeps its original code. No-op if
227 /// the stream is already gone from the shared map.
228 ///
229 /// [reset]: super::transport::OutboundPart::Reset
230 pub(crate) fn stream_error(&self, stream_id: u32, code: super::H2ErrorCode) {
231 let Some(stream) = self.streams_lock().get(&stream_id).cloned() else {
232 return;
233 };
234 stream.request_reset(code);
235 self.outbound_waker.wake();
236 }
237
238 /// Bind this `H2Connection` to a TCP transport and return an [`H2Driver`] that drives
239 /// the connection.
240 ///
241 /// The driver must be polled to completion via repeated calls to
242 /// [`H2Driver::next`] (or its [`Stream`][futures_lite::stream::Stream] impl); each returned
243 /// [`Conn`] should be spawned on its own task.
244 pub fn run<T>(self: Arc<Self>, transport: T) -> H2Driver<T>
245 where
246 T: AsyncRead + AsyncWrite + Unpin + Send,
247 {
248 H2Driver::new(self, transport, Role::Server)
249 }
250
251 /// Bind this `H2Connection` to an outbound transport and return an [`H2Initiator`] —
252 /// the background-task future a client spawns to drive the connection.
253 ///
254 /// On first poll the driver writes the 24-byte client preface and its initial
255 /// SETTINGS; thereafter it demuxes inbound frames (peer SETTINGS, response HEADERS /
256 /// DATA on our streams, etc.) and pumps outbound bytes (new stream opens, DATA,
257 /// `WINDOW_UPDATEs`) until the connection closes or errors out.
258 ///
259 /// Awaiting the returned future resolves with `Ok(())` on graceful close or
260 /// `Err(H2Error)` on protocol / I/O failure. Streams are not opened via the future
261 /// itself — client code calls stream-open primitives on `H2Connection`; this future
262 /// just runs the framing loop.
263 #[cfg(feature = "unstable")]
264 pub fn run_client<T>(self: Arc<Self>, transport: T) -> H2Initiator<T>
265 where
266 T: AsyncRead + AsyncWrite + Unpin + Send,
267 {
268 H2Initiator::new(H2Driver::new(self, transport, Role::Client))
269 }
270
271 /// Per-stream entry point — call from the runtime adapter's spawned task for each
272 /// [`Conn`] returned by [`H2Driver::next`]. Runs `handler` to produce the response,
273 /// then `send_h2` to hand the framed response to the driver.
274 ///
275 /// # Errors
276 ///
277 /// Returns the [`io::Error`] from `send_h2` if the body's `poll_read` errors or the
278 /// underlying transport fails partway through the response.
279 pub async fn process_inbound<Transport, Handler, Fut>(
280 conn: Conn<Transport>,
281 handler: Handler,
282 ) -> io::Result<Conn<Transport>>
283 where
284 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
285 Handler: FnOnce(Conn<Transport>) -> Fut,
286 Fut: Future<Output = Conn<Transport>>,
287 {
288 let _guard = conn.context().swansong().guard();
289 handler(conn).await.send_h2().await
290 }
291}