trillium_http/h2/transport.rs
1//! Per-stream transport handed to handler tasks.
2//!
3//! [`H2Transport`] is the [`AsyncRead`] + [`AsyncWrite`] view of a single HTTP/2 stream. It is
4//! carried on the emitted [`Conn`][crate::Conn] returned from [`H2Driver::next`], and the
5//! runtime adapter spawns a handler task that consumes it. The transport never touches the
6//! underlying TCP connection directly — all I/O coordinates through shared per-stream state
7//! on the [`H2Connection`] driven by the driver task.
8//!
9//! Two paths reach the impls:
10//!
11//! - **Normal HTTP/2 request/response**: handlers usually don't touch [`H2Transport`] directly
12//! (same sharp edge h1 and h3 document). [`ReceivedBody`][crate::ReceivedBody] reads request body
13//! bytes through the transport's `AsyncRead`. Response bytes are handed to the driver as a queue
14//! of [`OutboundPart`]s via [`H2Connection::submit_send`][submit_send]; the send pump frames them
15//! without ever touching this `AsyncWrite`.
16//!
17//! - **Extended-CONNECT upgrades** ([RFC 8441] WebSocket-over-h2, plus the in-progress
18//! `draft-ietf-webtrans-http2` for WebTransport-over-h2): after the handler responds 200 to a
19//! `CONNECT` request with a `:protocol` pseudo-header, [`Conn::send_h2`][crate::Conn::send_h2]
20//! routes through [`H2Connection::submit_upgrade`][submit_upgrade], which enqueues HEADERS (and
21//! an optional prelude body) *without* a terminating [`OutboundPart::Close`], leaving the stream
22//! open as a bidirectional byte channel. The runtime adapter then dispatches
23//! [`Handler::upgrade`][trillium::Handler::upgrade], which gets an [`Upgrade`][crate::Upgrade]
24//! wrapping this transport. `AsyncWrite::poll_write` appends to a per-stream outbound ring
25//! ([`SendState::outbound`]); the send pump drains it into DATA frames bounded by the per-stream
26//! and connection send windows. `AsyncWrite::poll_close` enqueues [`OutboundPart::Close`] so the
27//! driver emits the `END_STREAM` terminator and tears the stream down.
28//!
29//! [`H2Driver::next`]: super::H2Driver::next
30//! [`H2Connection`]: super::H2Connection
31//! [submit_send]: super::H2Connection::submit_send
32//! [submit_upgrade]: super::H2Connection::submit_upgrade
33//! [RFC 8441]: https://www.rfc-editor.org/rfc/rfc8441
34
35use super::{
36 H2Connection, H2ErrorCode,
37 stream_state::{StreamEvent, StreamLifecycle},
38};
39use crate::{
40 Body, Buffer, Headers,
41 headers::hpack::{FieldSection, PseudoHeaders},
42};
43use atomic_waker::AtomicWaker;
44use futures_lite::io::{AsyncRead, AsyncWrite};
45use std::{
46 collections::VecDeque,
47 fmt, io,
48 pin::Pin,
49 sync::{
50 Arc, Mutex, MutexGuard,
51 atomic::{AtomicBool, AtomicU64, Ordering},
52 },
53 task::{Context, Poll},
54};
55
56/// A single HTTP/2 stream's transport handle.
57///
58/// Carries a backref to the shared [`H2Connection`], the stream id, and the per-stream
59/// `Arc<StreamState>` used by the read and write sides. Normal HTTP/2 operation reads through
60/// [`ReceivedBody`][crate::ReceivedBody] and writes through the driver's send queue; the
61/// `AsyncRead` / `AsyncWrite` impls here are only reached by code that borrows the transport
62/// directly (typically an upgrade handler after extended CONNECT).
63pub struct H2Transport {
64 connection: Arc<H2Connection>,
65 stream_id: u32,
66 state: Arc<StreamState>,
67}
68
69impl fmt::Debug for H2Transport {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 f.debug_struct("H2Transport")
72 .field("stream_id", &self.stream_id)
73 .finish_non_exhaustive()
74 }
75}
76
77impl H2Transport {
78 /// Create a transport for a stream that has just been opened by the driver.
79 pub(super) fn new(
80 connection: Arc<H2Connection>,
81 stream_id: u32,
82 state: Arc<StreamState>,
83 ) -> Self {
84 Self {
85 connection,
86 stream_id,
87 state,
88 }
89 }
90
91 /// The stream identifier this transport is bound to.
92 pub fn stream_id(&self) -> u32 {
93 self.stream_id
94 }
95
96 /// The shared [`H2Connection`] backing this stream.
97 pub fn connection(&self) -> &Arc<H2Connection> {
98 &self.connection
99 }
100}
101
102impl Drop for H2Transport {
103 /// Application-side release / cancel signal, decided from the stream's protocol state plus
104 /// whether a response has been handed off:
105 ///
106 /// - **Already removed from the shared map** — the driver beat us to cleanup; no-op.
107 /// - **`Closed`** (both halves done on the wire; client-role streams linger in the map for
108 /// post-EOF trailer/response access) — set `SendState::transport_dropped` so the driver GCs
109 /// the lingering entry.
110 /// - **Send half still open + a response was submitted** (`submit_resolved`) — a bidirectional
111 /// upgrade tunnel the handler is dropping; enqueue `OutboundPart::Close` for a graceful
112 /// `END_STREAM`.
113 /// - **Anything else** (`HalfClosedLocal` awaiting the peer, or send-open with no response ever
114 /// submitted) — abandoning the stream; enqueue `RST_STREAM(Cancel)`.
115 fn drop(&mut self) {
116 if !self.connection.streams_lock().contains_key(&self.stream_id) {
117 log::trace!(
118 "h2 stream {}: H2Transport dropped on already-released stream",
119 self.stream_id,
120 );
121 return;
122 }
123
124 let lifecycle = *self.state.lifecycle_lock();
125 if lifecycle.is_closed() {
126 log::trace!(
127 "h2 stream {}: H2Transport dropped on wire-closed stream — releasing",
128 self.stream_id,
129 );
130 self.state
131 .send
132 .transport_dropped
133 .store(true, Ordering::Release);
134 } else if !lifecycle.send_closed()
135 && self.state.send.submit_resolved.load(Ordering::Acquire)
136 {
137 // Send half open and a response is on the wire: an upgrade tunnel being dropped.
138 // Graceful close.
139 log::trace!(
140 "h2 stream {}: H2Transport dropped (upgrade tunnel) — scheduling graceful close",
141 self.stream_id,
142 );
143 self.state.request_close();
144 } else {
145 // Send half closed awaiting the peer, or never-responded: abandon the stream.
146 log::debug!(
147 "h2 stream {}: H2Transport dropped mid-stream — RST_STREAM(Cancel)",
148 self.stream_id,
149 );
150 self.state.request_reset(H2ErrorCode::Cancel);
151 }
152 self.state.needs_servicing.store(true, Ordering::Release);
153 self.state.send.outbound_write_waker.wake();
154 self.connection.outbound_waker().wake();
155 }
156}
157
158impl AsyncRead for H2Transport {
159 fn poll_read(
160 self: Pin<&mut Self>,
161 cx: &mut Context<'_>,
162 out: &mut [u8],
163 ) -> Poll<io::Result<usize>> {
164 if out.is_empty() {
165 return Poll::Ready(Ok(0));
166 }
167
168 // The first `poll_read` is the handler's declaration of intent to consume the request
169 // body — until this point, we've advertised a zero recv window and the peer has sent
170 // nothing beyond HEADERS. Tell the driver to top up our per-stream window now. Later
171 // calls CAS-fail silently and don't re-signal.
172 let recv_state = &self.state.recv;
173 let connection = &*self.connection;
174 if !recv_state.is_reading.swap(true, Ordering::AcqRel) {
175 self.state.needs_servicing.store(true, Ordering::Release);
176 connection.outbound_waker().wake();
177 }
178
179 let mut recv = recv_state.buf.lock().expect("recv buf mutex poisoned");
180
181 // Copy as many bytes as fit from the front of the ring into `out`, then advance the
182 // ring's virtual read cursor. `Buffer::ignore_front` truncates the underlying `Vec` to
183 // zero when we drain fully, so capacity stays bounded by peak in-flight bytes rather
184 // than cumulative traffic.
185 let take = out.len().min(recv.len());
186 if take > 0 {
187 out[..take].copy_from_slice(&recv[..take]);
188 recv.ignore_front(take);
189 drop(recv);
190 recv_state
191 .bytes_consumed
192 .fetch_add(take as u64, Ordering::AcqRel);
193 self.state.needs_servicing.store(true, Ordering::Release);
194 connection.outbound_waker().wake();
195 return Poll::Ready(Ok(take));
196 }
197
198 // Buffer empty. Register the waker *before* releasing the buf lock so a driver push
199 // between this poll and the recv-closed check is guaranteed to wake us:
200 // 1. We take buf lock (driver-push blocked).
201 // 2. We register waker.
202 // 3. We drop buf lock (driver-push may now proceed and fire waker).
203 // 4. We read recv-closed from the lifecycle.
204 // 5. Return Pending or Ready(0); if a push raced through step 3, the waker is registered
205 // and a fresh poll will see the new bytes.
206 recv_state.waker.register(cx.waker());
207 drop(recv);
208 if self.state.lifecycle_lock().recv_closed() {
209 return Poll::Ready(Ok(0));
210 }
211 Poll::Pending
212 }
213}
214
215impl AsyncWrite for H2Transport {
216 fn poll_write(
217 self: Pin<&mut Self>,
218 cx: &mut Context<'_>,
219 buf: &[u8],
220 ) -> Poll<io::Result<usize>> {
221 // Parity with h1/h3: writing to a stream whose send half is already closed (clean
222 // `END_STREAM`, or reset) is a `BrokenPipe` rather than silently swallowed bytes.
223 // Otherwise we always accept the write — the "don't write at an inappropriate time"
224 // contract is the caller's, same as borrowing the raw transport in h1/h3.
225 if self.state.lifecycle_lock().send_closed() {
226 return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
227 }
228
229 // Append into the per-stream outbound ring. The send pump drains the same ring into DATA
230 // frames bounded by per-stream + connection send windows. Bounded by
231 // `response_buffer_max_len` (the cap h1 and h3 use for their transit buffers): a stalled
232 // peer window means the driver can't drain, the cap is hit, and we return `Pending` so the
233 // handler is throttled. The drain side wakes `outbound_write_waker` after each take.
234 let send = &self.state.send;
235 let cap = self.connection.context.config.response_buffer_max_len;
236 let mut outbound = send.outbound.lock().expect("outbound buf mutex poisoned");
237 if outbound.len() >= cap {
238 // Register first, then re-check under lock to close the race against the drain side
239 // (which takes the same lock to `ignore_front` and then wakes us).
240 send.outbound_write_waker.register(cx.waker());
241 if outbound.len() >= cap {
242 return Poll::Pending;
243 }
244 }
245 let take = (cap - outbound.len()).min(buf.len());
246 log::trace!(
247 "h2 stream {}: H2Transport::poll_write appending {take}/{} bytes to outbound ring",
248 self.stream_id,
249 buf.len(),
250 );
251 outbound.extend_from_slice(&buf[..take]);
252 drop(outbound);
253
254 // Wake the driver (parked on the connection-level waker).
255 self.connection.outbound_waker().wake();
256 Poll::Ready(Ok(take))
257 }
258
259 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
260 // Best-effort: bytes appended via `poll_write` are already visible to the driver and will
261 // be framed on the next tick. There's no application-level "flushed" state below us.
262 Poll::Ready(Ok(()))
263 }
264
265 fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
266 // Mark the write half closed by enqueuing the `END_STREAM` terminator. Once the driver
267 // drains the remaining outbound ring bytes it frames the terminator and tears the stream
268 // down. Idempotent — `request_close` is a no-op if a terminator is already queued or the
269 // send half is already closed.
270 log::trace!(
271 "h2 stream {}: H2Transport::poll_close enqueuing Close",
272 self.stream_id,
273 );
274 self.state.request_close();
275 self.state.needs_servicing.store(true, Ordering::Release);
276 self.state.send.outbound_write_waker.wake();
277 self.connection.outbound_waker().wake();
278 Poll::Ready(Ok(()))
279 }
280}
281
282/// A unit of outbound work the conn task hands the driver, framed in order by the send pump.
283///
284/// `Headers` opens the response/request; `Body` is an owned source the driver frames lazily under
285/// flow control (no intermediate buffering); `Trailers` and `Close` are alternative `END_STREAM`
286/// terminators; `Reset` abandons the stream (the conn task clears the rest of the queue when it
287/// pushes one — nothing else is valid to send after `RST_STREAM`).
288///
289/// Streaming bytes a handler writes through [`H2Transport`]'s `AsyncWrite` do *not* go here — they
290/// flow through the reused [`SendState::outbound`] ring, which the pump drains before framing a
291/// terminator.
292#[derive(Debug)]
293pub(super) enum OutboundPart {
294 /// Initial HEADERS block — HPACK-encoded by the driver at frame time so the wire order
295 /// matches the dynamic-table mutation order.
296 Headers {
297 pseudos: PseudoHeaders<'static>,
298 headers: Headers,
299 },
300 /// An owned body source, framed directly into DATA frames under flow control.
301 Body(Body),
302 /// Trailing HEADERS — carries `END_STREAM`.
303 Trailers(Headers),
304 /// Empty `DATA(END_STREAM)` terminator.
305 Close,
306 /// `RST_STREAM(code)` — conn-task-initiated reset; clears any preceding queued parts.
307 Reset(H2ErrorCode),
308}
309
310impl OutboundPart {
311 /// `true` for the `END_STREAM`/`RST_STREAM` terminators — used by `request_close` to stay
312 /// idempotent and by the send pump to know the ring must be drained first.
313 pub(super) fn is_terminal(&self) -> bool {
314 matches!(self, Self::Trailers(_) | Self::Close | Self::Reset(_))
315 }
316}
317
318/// Shared per-stream state. Owned by an [`Arc`] held jointly by the driver (via the connection's
319/// stream table) and the handler task (via [`H2Transport`]).
320///
321/// The cross-task-visible protocol state machine is the [`StreamLifecycle`] in [`Self::lifecycle`]
322/// — written only by the driver (which feeds it [`StreamEvent`]s as it frames/receives) and read by
323/// both sides. Everything else is data and mailbox plumbing: the recv ring, the outbound parts
324/// queue + streaming ring, and the conn-task → driver signals.
325#[derive(Debug, Default)]
326pub(super) struct StreamState {
327 /// Protocol state (RFC 9113 §5.1). Driver is the sole writer; the handler side reads it for
328 /// recv-EOF (`poll_read`) and send-closed (`poll_write`) decisions. Held under a `Mutex` so
329 /// observe-then-act sequences are atomic.
330 lifecycle: Mutex<StreamLifecycle>,
331
332 /// Recv side: inbound DATA payloads, handler waker, handler-intent signal, trailers.
333 pub(super) recv: RecvState,
334
335 /// Send side: the outbound parts queue, the streaming ring, the driver→conn completion
336 /// channel, and the application-release signal.
337 pub(super) send: SendState,
338
339 /// Mailbox flag for conn-task → driver work signaling. Set by conn-task code whenever it
340 /// produces work the driver should service (enqueue a part, raise `is_reading`, increment
341 /// `bytes_consumed`, set `transport_dropped`). The driver's `service_handler_signals` consults
342 /// it via `swap(false, AcqRel)` — only streams where it returns `true` pay for the queue-lock
343 /// pickup; idle streams cost a single atomic RMW per tick.
344 ///
345 /// **Setter ordering rule**: write the underlying state first, then store `true` with
346 /// `Release`, then wake the connection's outbound waker. Over-notification is harmless;
347 /// under-notification would lose a signal.
348 pub(super) needs_servicing: AtomicBool,
349}
350
351impl StreamState {
352 /// Lock the per-stream protocol lifecycle. Convenience wrapper — every site treats poisoning as
353 /// a programming error.
354 pub(super) fn lifecycle_lock(&self) -> MutexGuard<'_, StreamLifecycle> {
355 self.lifecycle.lock().expect("lifecycle mutex poisoned")
356 }
357
358 /// Apply a [`StreamEvent`] to the lifecycle. Driver-only — the sole writer of protocol state.
359 pub(super) fn apply_event(
360 &self,
361 event: StreamEvent,
362 ) -> Result<(), super::stream_state::StreamProtocolError> {
363 self.lifecycle_lock().on_event(event)
364 }
365
366 /// Stage a full submission of outbound parts atomically, so the send pump sees a complete
367 /// message rather than a partial one (the `SubmitSend` future keys "done" off the queue
368 /// draining to empty). Raises `needs_servicing`; the caller wakes the driver.
369 pub(super) fn stage(&self, parts: impl IntoIterator<Item = OutboundPart>) {
370 self.send
371 .queue
372 .lock()
373 .expect("send queue mutex poisoned")
374 .extend(parts);
375 self.needs_servicing.store(true, Ordering::Release);
376 }
377
378 /// Enqueue the `END_STREAM` terminator unless one (or a reset) is already queued or the send
379 /// half is already closed. Idempotent — safe to call from repeated `poll_close` / Drop.
380 pub(super) fn request_close(&self) {
381 if self.lifecycle_lock().send_closed() {
382 return;
383 }
384 let mut queue = self.send.queue.lock().expect("send queue mutex poisoned");
385 if queue.back().is_none_or(|p| !p.is_terminal()) {
386 queue.push_back(OutboundPart::Close);
387 }
388 drop(queue);
389 self.needs_servicing.store(true, Ordering::Release);
390 }
391
392 /// Clear any queued parts and request `RST_STREAM(code)`. First-wins: a reset already at the
393 /// back keeps its code. Nothing else is valid to send after a reset, so clearing the queue
394 /// models the §5.1 sequence faithfully. Raises `needs_servicing`; the caller wakes the driver.
395 pub(super) fn request_reset(&self, code: H2ErrorCode) {
396 let mut queue = self.send.queue.lock().expect("send queue mutex poisoned");
397 if matches!(queue.back(), Some(OutboundPart::Reset(_))) {
398 return;
399 }
400 queue.clear();
401 queue.push_back(OutboundPart::Reset(code));
402 drop(queue);
403 self.needs_servicing.store(true, Ordering::Release);
404 }
405}
406
407/// Receive-side per-stream state.
408#[derive(Debug, Default)]
409pub(super) struct RecvState {
410 /// Inbound DATA body bytes awaiting handler read. A single persistent ring (append-at-tail,
411 /// `ignore_front`-at-head): the driver appends via `extend_from_slice` when a DATA frame
412 /// arrives; the handler reads from the front and virtually drops consumed bytes. When
413 /// `ignore_front` catches up to the data end the `Buffer` truncates to zero, so the underlying
414 /// `Vec` capacity stays bounded by peak in-flight bytes rather than cumulative traffic — zero
415 /// amortized allocations per DATA frame.
416 pub(super) buf: Mutex<Buffer>,
417
418 /// Handler-task waker, fired by the driver after pushing DATA into `buf` or after the
419 /// lifecycle transitions to recv-closed. Single-waiter: only one task ever polls a given
420 /// `H2Transport`.
421 pub(super) waker: AtomicWaker,
422
423 /// Set by the handler's first [`H2Transport::poll_read`] to declare intent to consume the
424 /// request body. The driver observes the transition and emits a `WINDOW_UPDATE` for this
425 /// stream, topping its recv window up from `SETTINGS_INITIAL_WINDOW_SIZE` (advertised as `0`)
426 /// to the per-stream maximum. Once set, stays set.
427 pub(super) is_reading: AtomicBool,
428
429 /// Bytes the handler has consumed from `buf` since the driver last sampled this counter.
430 /// Incremented by [`H2Transport::poll_read`] using `fetch_add` after each drain; the driver
431 /// reads it via `swap(0)` per tick and emits stream-level + connection-level `WINDOW_UPDATE`
432 /// for the consumed total.
433 pub(super) bytes_consumed: AtomicU64,
434
435 /// Trailers, populated by the driver if a trailing HEADERS frame arrives for this stream.
436 /// Always written *before* the lifecycle transitions to recv-closed, so once the handler
437 /// observes `Ready(0)` on the recv side, any trailers for this request are guaranteed to
438 /// be in place.
439 pub(super) trailers: Mutex<Option<Headers>>,
440
441 /// Client-role: response HEADERS field section, populated by the driver on the first non-1xx
442 /// HEADERS frame arrival for a client-initiated stream. Server role doesn't use this slot.
443 /// Single-shot: the conn task takes the `FieldSection` once; subsequent HEADERS arrivals are
444 /// interpreted as trailers. Interim 1xx HEADERS frames are discarded by the driver without
445 /// touching this slot or latching `first_response_headers_seen`.
446 pub(super) response_headers: Mutex<Option<FieldSection<'static>>>,
447
448 /// Client-role: latching flag for "first HEADERS arrived for this stream." Distinct from
449 /// `response_headers.is_some()` — the conn task drains that slot when it consumes headers, so
450 /// the driver can't use slot occupancy to distinguish "haven't seen HEADERS yet" from "headers
451 /// seen + already taken." Set inside `finalize_response_headers` before that slot is
452 /// populated; checked by `route_headers` on subsequent HEADERS to route them as trailers.
453 /// Never cleared.
454 pub(super) first_response_headers_seen: AtomicBool,
455
456 /// Client-role: waker the conn task registers via
457 /// [`H2Connection::response_headers`][super::H2Connection]; fired by the driver after stashing
458 /// the `FieldSection` *or* on stream removal (so a parked conn task observing the stream gone
459 /// surfaces `NotConnected` instead of hanging).
460 pub(super) response_headers_waker: AtomicWaker,
461}
462
463/// Send-side per-stream state: the conn-task → driver outbound parts queue, the streaming ring for
464/// bidirectional upgrades, the driver → conn-task completion channel, and the application-release
465/// signal.
466///
467/// **Normal response path**: the conn task `stage`s `[Headers, Body?, Close]` once via
468/// [`H2Connection::submit_send`][submit] and waits on `completion_waker` for `submit_resolved` to
469/// flip. The driver drains the queue into its private send cursor, frames the parts, and on the
470/// queue draining to empty stores `completion_result`, sets `submit_resolved = true`, and wakes the
471/// conn task.
472///
473/// **Extended-CONNECT upgrade path** ([RFC 8441]): the conn task `stage`s `[Headers, Body?]` with
474/// *no* `Close`, so the stream stays open after the prelude frames. `submit_resolved` flips when
475/// the queue first drains (i.e. once the prelude is on the wire) — matching h1/h3, not at
476/// `END_HEADERS` — so `submit_upgrade().await` returns and the runtime can dispatch
477/// [`Handler::upgrade`][trillium::Handler::upgrade]. The handler then writes through
478/// [`H2Transport`]'s `AsyncWrite`, which appends to `outbound`; the send pump drains that ring into
479/// DATA. `poll_close` enqueues [`OutboundPart::Close`], the pump drains the ring, frames the
480/// terminator, and tears the stream down.
481///
482/// [submit]: super::H2Connection::submit_send
483/// [RFC 8441]: https://www.rfc-editor.org/rfc/rfc8441
484#[derive(Debug, Default)]
485pub(super) struct SendState {
486 /// Outbound parts the conn task has handed off, drained by the driver into its private send
487 /// cursor under `needs_servicing`. Cold: a handful of pushes per stream lifetime.
488 pub(super) queue: Mutex<VecDeque<OutboundPart>>,
489
490 /// Streaming bytes for a bidirectional upgrade: [`H2Transport`]'s `AsyncWrite::poll_write`
491 /// appends here, the send pump drains it into DATA frames before framing a terminator. A
492 /// single reused ring (`ignore_front` at head) — empty for normal responses, which frame an
493 /// owned [`OutboundPart::Body`] directly.
494 pub(super) outbound: Mutex<Buffer>,
495
496 /// Reverse-direction backpressure waker: registered by [`H2Transport::poll_write`] when
497 /// `outbound` hits the cap, fired by the send pump after it drains bytes so a parked writer
498 /// can resume. Without it a slow/unresponsive peer's closed window would let `outbound`
499 /// grow unbounded.
500 pub(super) outbound_write_waker: AtomicWaker,
501
502 /// Set to `true` by the driver once the conn task's [`SubmitSend`][super::SubmitSend] future
503 /// may resolve and its `completion_result` is readable. The future polls this atomic and
504 /// registers on `completion_waker`. Flips when the outbound queue first drains to empty —
505 /// `END_STREAM` for a normal response, the prelude-handoff for an upgrade.
506 pub(super) submit_resolved: AtomicBool,
507
508 /// The driver writes the final result here before flipping `submit_resolved`. The conn task
509 /// takes it once `submit_resolved` is observed true.
510 pub(super) completion_result: Mutex<Option<io::Result<()>>>,
511
512 /// The conn task's waker, registered by `SubmitSend::poll` and fired by the driver after
513 /// `submit_resolved` is set.
514 pub(super) completion_waker: AtomicWaker,
515
516 /// Set by [`H2Transport::Drop`] when the lifecycle is already `Closed` and the application is
517 /// releasing a stream that lingered in the map for post-EOF access (client role). The driver
518 /// picks it up and removes the entry from both maps. Not protocol state — a local
519 /// resource-ownership fact the lifecycle can't represent.
520 pub(super) transport_dropped: AtomicBool,
521}
522
523#[cfg(test)]
524mod tests {
525 use super::*;
526 use crate::HttpContext;
527
528 #[test]
529 fn request_close_enqueues_single_close() {
530 let state = StreamState::default();
531 *state.lifecycle_lock() = StreamLifecycle::Open;
532 state.request_close();
533 state.request_close();
534 let queue = state.send.queue.lock().unwrap();
535 assert_eq!(queue.len(), 1, "second request_close is a no-op");
536 assert!(matches!(queue.front(), Some(OutboundPart::Close)));
537 }
538
539 #[test]
540 fn request_close_noop_when_send_closed() {
541 let state = StreamState::default();
542 *state.lifecycle_lock() = StreamLifecycle::HalfClosedLocal;
543 state.request_close();
544 assert!(
545 state.send.queue.lock().unwrap().is_empty(),
546 "no terminator queued once the send half is closed"
547 );
548 }
549
550 #[test]
551 fn request_reset_clears_queue_and_is_first_wins() {
552 let state = StreamState::default();
553 *state.lifecycle_lock() = StreamLifecycle::Open;
554 state.stage([OutboundPart::Body(Body::default()), OutboundPart::Close]);
555 state.request_reset(H2ErrorCode::Cancel);
556 state.request_reset(H2ErrorCode::InternalError);
557 let queue = state.send.queue.lock().unwrap();
558 assert_eq!(queue.len(), 1, "queue cleared, single reset");
559 assert!(
560 matches!(
561 queue.front(),
562 Some(OutboundPart::Reset(H2ErrorCode::Cancel))
563 ),
564 "first reset code wins",
565 );
566 }
567
568 #[test]
569 fn poll_write_caps_at_response_buffer_max_len() {
570 use futures_lite::AsyncWrite;
571 use std::task::{Context, Poll, Waker};
572 let mut context = HttpContext::new();
573 context.config.response_buffer_max_len = 16;
574 let connection = H2Connection::new(Arc::new(context));
575 let state = Arc::new(StreamState::default());
576 *state.lifecycle_lock() = StreamLifecycle::Open;
577 let mut transport = H2Transport::new(connection, 1, state);
578 let waker = Waker::noop();
579 let mut cx = Context::from_waker(waker);
580 let buf = [0u8; 32];
581 match Pin::new(&mut transport).poll_write(&mut cx, &buf) {
582 Poll::Ready(Ok(n)) => assert_eq!(n, 16, "should accept exactly cap bytes"),
583 other => panic!("expected Ready(Ok(16)), got {other:?}"),
584 }
585 }
586}