Skip to main content

trillium_http/h2/
transport.rs

1//! Per-stream transport handed to handler tasks.
2//!
3//! [`H2Transport`] is the [`AsyncRead`] + [`AsyncWrite`] view of a single HTTP/2 stream. It is
4//! carried on the emitted [`Conn`][crate::Conn] returned from [`H2Driver::next`], and the
5//! runtime adapter spawns a handler task that consumes it. The transport never touches the
6//! underlying TCP connection directly — all I/O coordinates through shared per-stream state
7//! on the [`H2Connection`] driven by the driver task.
8//!
9//! Two paths reach the impls:
10//!
11//! - **Normal HTTP/2 request/response**: handlers usually don't touch [`H2Transport`] directly
12//!   (same sharp edge h1 and h3 document). [`ReceivedBody`][crate::ReceivedBody] reads request body
13//!   bytes through the transport's `AsyncRead`. Response bytes are handed to the driver as a queue
14//!   of [`OutboundPart`]s via [`H2Connection::submit_send`][submit_send]; the send pump frames them
15//!   without ever touching this `AsyncWrite`.
16//!
17//! - **Extended-CONNECT upgrades** ([RFC 8441] WebSocket-over-h2, plus the in-progress
18//!   `draft-ietf-webtrans-http2` for WebTransport-over-h2): after the handler responds 200 to a
19//!   `CONNECT` request with a `:protocol` pseudo-header, [`Conn::send_h2`][crate::Conn::send_h2]
20//!   routes through [`H2Connection::submit_upgrade`][submit_upgrade], which enqueues HEADERS (and
21//!   an optional prelude body) *without* a terminating [`OutboundPart::Close`], leaving the stream
22//!   open as a bidirectional byte channel. The runtime adapter then dispatches
23//!   [`Handler::upgrade`][trillium::Handler::upgrade], which gets an [`Upgrade`][crate::Upgrade]
24//!   wrapping this transport. `AsyncWrite::poll_write` appends to a per-stream outbound ring
25//!   ([`SendState::outbound`]); the send pump drains it into DATA frames bounded by the per-stream
26//!   and connection send windows. `AsyncWrite::poll_close` enqueues [`OutboundPart::Close`] so the
27//!   driver emits the `END_STREAM` terminator and tears the stream down.
28//!
29//! [`H2Driver::next`]: super::H2Driver::next
30//! [`H2Connection`]: super::H2Connection
31//! [submit_send]: super::H2Connection::submit_send
32//! [submit_upgrade]: super::H2Connection::submit_upgrade
33//! [RFC 8441]: https://www.rfc-editor.org/rfc/rfc8441
34
35use super::{
36    H2Connection, H2ErrorCode,
37    stream_state::{StreamEvent, StreamLifecycle},
38};
39use crate::{
40    Body, Buffer, Headers,
41    headers::hpack::{FieldSection, PseudoHeaders},
42};
43use atomic_waker::AtomicWaker;
44use futures_lite::io::{AsyncRead, AsyncWrite};
45use std::{
46    collections::VecDeque,
47    fmt, io,
48    pin::Pin,
49    sync::{
50        Arc, Mutex, MutexGuard,
51        atomic::{AtomicBool, AtomicU64, Ordering},
52    },
53    task::{Context, Poll},
54};
55
56/// A single HTTP/2 stream's transport handle.
57///
58/// Carries a backref to the shared [`H2Connection`], the stream id, and the per-stream
59/// `Arc<StreamState>` used by the read and write sides. Normal HTTP/2 operation reads through
60/// [`ReceivedBody`][crate::ReceivedBody] and writes through the driver's send queue; the
61/// `AsyncRead` / `AsyncWrite` impls here are only reached by code that borrows the transport
62/// directly (typically an upgrade handler after extended CONNECT).
63pub struct H2Transport {
64    connection: Arc<H2Connection>,
65    stream_id: u32,
66    state: Arc<StreamState>,
67}
68
69impl fmt::Debug for H2Transport {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        f.debug_struct("H2Transport")
72            .field("stream_id", &self.stream_id)
73            .finish_non_exhaustive()
74    }
75}
76
77impl H2Transport {
78    /// Create a transport for a stream that has just been opened by the driver.
79    pub(super) fn new(
80        connection: Arc<H2Connection>,
81        stream_id: u32,
82        state: Arc<StreamState>,
83    ) -> Self {
84        Self {
85            connection,
86            stream_id,
87            state,
88        }
89    }
90
91    /// The stream identifier this transport is bound to.
92    pub fn stream_id(&self) -> u32 {
93        self.stream_id
94    }
95
96    /// The shared [`H2Connection`] backing this stream.
97    pub fn connection(&self) -> &Arc<H2Connection> {
98        &self.connection
99    }
100}
101
102impl Drop for H2Transport {
103    /// Application-side release / cancel signal, decided from the stream's protocol state plus
104    /// whether a response has been handed off:
105    ///
106    /// - **Already removed from the shared map** — the driver beat us to cleanup; no-op.
107    /// - **`Closed`** (both halves done on the wire; client-role streams linger in the map for
108    ///   post-EOF trailer/response access) — set `SendState::transport_dropped` so the driver GCs
109    ///   the lingering entry.
110    /// - **Send half still open + a response was submitted** (`submit_resolved`) — a bidirectional
111    ///   upgrade tunnel the handler is dropping; enqueue `OutboundPart::Close` for a graceful
112    ///   `END_STREAM`.
113    /// - **Anything else** (`HalfClosedLocal` awaiting the peer, or send-open with no response ever
114    ///   submitted) — abandoning the stream; enqueue `RST_STREAM(Cancel)`.
115    fn drop(&mut self) {
116        if !self.connection.streams_lock().contains_key(&self.stream_id) {
117            log::trace!(
118                "h2 stream {}: H2Transport dropped on already-released stream",
119                self.stream_id,
120            );
121            return;
122        }
123
124        let lifecycle = *self.state.lifecycle_lock();
125        if lifecycle.is_closed() {
126            log::trace!(
127                "h2 stream {}: H2Transport dropped on wire-closed stream — releasing",
128                self.stream_id,
129            );
130            self.state
131                .send
132                .transport_dropped
133                .store(true, Ordering::Release);
134        } else if !lifecycle.send_closed()
135            && self.state.send.submit_resolved.load(Ordering::Acquire)
136        {
137            // Send half open and a response is on the wire: an upgrade tunnel being dropped.
138            // Graceful close.
139            log::trace!(
140                "h2 stream {}: H2Transport dropped (upgrade tunnel) — scheduling graceful close",
141                self.stream_id,
142            );
143            self.state.request_close();
144        } else {
145            // Send half closed awaiting the peer, or never-responded: abandon the stream.
146            log::debug!(
147                "h2 stream {}: H2Transport dropped mid-stream — RST_STREAM(Cancel)",
148                self.stream_id,
149            );
150            self.state.request_reset(H2ErrorCode::Cancel);
151        }
152        self.state.needs_servicing.store(true, Ordering::Release);
153        self.state.send.outbound_write_waker.wake();
154        self.connection.outbound_waker().wake();
155    }
156}
157
158impl AsyncRead for H2Transport {
159    fn poll_read(
160        self: Pin<&mut Self>,
161        cx: &mut Context<'_>,
162        out: &mut [u8],
163    ) -> Poll<io::Result<usize>> {
164        if out.is_empty() {
165            return Poll::Ready(Ok(0));
166        }
167
168        // The first `poll_read` is the handler's declaration of intent to consume the request
169        // body — until this point, we've advertised a zero recv window and the peer has sent
170        // nothing beyond HEADERS. Tell the driver to top up our per-stream window now. Later
171        // calls CAS-fail silently and don't re-signal.
172        let recv_state = &self.state.recv;
173        let connection = &*self.connection;
174        if !recv_state.is_reading.swap(true, Ordering::AcqRel) {
175            self.state.needs_servicing.store(true, Ordering::Release);
176            connection.outbound_waker().wake();
177        }
178
179        let mut recv = recv_state.buf.lock().expect("recv buf mutex poisoned");
180
181        // Copy as many bytes as fit from the front of the ring into `out`, then advance the
182        // ring's virtual read cursor. `Buffer::ignore_front` truncates the underlying `Vec` to
183        // zero when we drain fully, so capacity stays bounded by peak in-flight bytes rather
184        // than cumulative traffic.
185        let take = out.len().min(recv.len());
186        if take > 0 {
187            out[..take].copy_from_slice(&recv[..take]);
188            recv.ignore_front(take);
189            drop(recv);
190            recv_state
191                .bytes_consumed
192                .fetch_add(take as u64, Ordering::AcqRel);
193            self.state.needs_servicing.store(true, Ordering::Release);
194            connection.outbound_waker().wake();
195            return Poll::Ready(Ok(take));
196        }
197
198        // Buffer empty. Register the waker *before* releasing the buf lock so a driver push
199        // between this poll and the recv-closed check is guaranteed to wake us:
200        //   1. We take buf lock (driver-push blocked).
201        //   2. We register waker.
202        //   3. We drop buf lock (driver-push may now proceed and fire waker).
203        //   4. We read recv-closed from the lifecycle.
204        //   5. Return Pending or Ready(0); if a push raced through step 3, the waker is registered
205        //      and a fresh poll will see the new bytes.
206        recv_state.waker.register(cx.waker());
207        drop(recv);
208        if self.state.lifecycle_lock().recv_closed() {
209            return Poll::Ready(Ok(0));
210        }
211        Poll::Pending
212    }
213}
214
215impl AsyncWrite for H2Transport {
216    fn poll_write(
217        self: Pin<&mut Self>,
218        cx: &mut Context<'_>,
219        buf: &[u8],
220    ) -> Poll<io::Result<usize>> {
221        // Parity with h1/h3: writing to a stream whose send half is already closed (clean
222        // `END_STREAM`, or reset) is a `BrokenPipe` rather than silently swallowed bytes.
223        // Otherwise we always accept the write — the "don't write at an inappropriate time"
224        // contract is the caller's, same as borrowing the raw transport in h1/h3.
225        if self.state.lifecycle_lock().send_closed() {
226            return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
227        }
228
229        // Append into the per-stream outbound ring. The send pump drains the same ring into DATA
230        // frames bounded by per-stream + connection send windows. Bounded by
231        // `response_buffer_max_len` (the cap h1 and h3 use for their transit buffers): a stalled
232        // peer window means the driver can't drain, the cap is hit, and we return `Pending` so the
233        // handler is throttled. The drain side wakes `outbound_write_waker` after each take.
234        let send = &self.state.send;
235        let cap = self.connection.context.config.response_buffer_max_len;
236        let mut outbound = send.outbound.lock().expect("outbound buf mutex poisoned");
237        if outbound.len() >= cap {
238            // Register first, then re-check under lock to close the race against the drain side
239            // (which takes the same lock to `ignore_front` and then wakes us).
240            send.outbound_write_waker.register(cx.waker());
241            if outbound.len() >= cap {
242                return Poll::Pending;
243            }
244        }
245        let take = (cap - outbound.len()).min(buf.len());
246        log::trace!(
247            "h2 stream {}: H2Transport::poll_write appending {take}/{} bytes to outbound ring",
248            self.stream_id,
249            buf.len(),
250        );
251        outbound.extend_from_slice(&buf[..take]);
252        drop(outbound);
253
254        // Wake the driver (parked on the connection-level waker).
255        self.connection.outbound_waker().wake();
256        Poll::Ready(Ok(take))
257    }
258
259    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
260        // Best-effort: bytes appended via `poll_write` are already visible to the driver and will
261        // be framed on the next tick. There's no application-level "flushed" state below us.
262        Poll::Ready(Ok(()))
263    }
264
265    fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
266        // Mark the write half closed by enqueuing the `END_STREAM` terminator. Once the driver
267        // drains the remaining outbound ring bytes it frames the terminator and tears the stream
268        // down. Idempotent — `request_close` is a no-op if a terminator is already queued or the
269        // send half is already closed.
270        log::trace!(
271            "h2 stream {}: H2Transport::poll_close enqueuing Close",
272            self.stream_id,
273        );
274        self.state.request_close();
275        self.state.needs_servicing.store(true, Ordering::Release);
276        self.state.send.outbound_write_waker.wake();
277        self.connection.outbound_waker().wake();
278        Poll::Ready(Ok(()))
279    }
280}
281
282/// A unit of outbound work the conn task hands the driver, framed in order by the send pump.
283///
284/// `Headers` opens the response/request; `Body` is an owned source the driver frames lazily under
285/// flow control (no intermediate buffering); `Trailers` and `Close` are alternative `END_STREAM`
286/// terminators; `Reset` abandons the stream (the conn task clears the rest of the queue when it
287/// pushes one — nothing else is valid to send after `RST_STREAM`).
288///
289/// Streaming bytes a handler writes through [`H2Transport`]'s `AsyncWrite` do *not* go here — they
290/// flow through the reused [`SendState::outbound`] ring, which the pump drains before framing a
291/// terminator.
292#[derive(Debug)]
293pub(super) enum OutboundPart {
294    /// Initial HEADERS block — HPACK-encoded by the driver at frame time so the wire order
295    /// matches the dynamic-table mutation order.
296    Headers {
297        pseudos: PseudoHeaders<'static>,
298        headers: Headers,
299    },
300    /// An owned body source, framed directly into DATA frames under flow control.
301    Body(Body),
302    /// Trailing HEADERS — carries `END_STREAM`.
303    Trailers(Headers),
304    /// Empty `DATA(END_STREAM)` terminator.
305    Close,
306    /// `RST_STREAM(code)` — conn-task-initiated reset; clears any preceding queued parts.
307    Reset(H2ErrorCode),
308}
309
310impl OutboundPart {
311    /// `true` for the `END_STREAM`/`RST_STREAM` terminators — used by `request_close` to stay
312    /// idempotent and by the send pump to know the ring must be drained first.
313    pub(super) fn is_terminal(&self) -> bool {
314        matches!(self, Self::Trailers(_) | Self::Close | Self::Reset(_))
315    }
316}
317
318/// Shared per-stream state. Owned by an [`Arc`] held jointly by the driver (via the connection's
319/// stream table) and the handler task (via [`H2Transport`]).
320///
321/// The cross-task-visible protocol state machine is the [`StreamLifecycle`] in [`Self::lifecycle`]
322/// — written only by the driver (which feeds it [`StreamEvent`]s as it frames/receives) and read by
323/// both sides. Everything else is data and mailbox plumbing: the recv ring, the outbound parts
324/// queue + streaming ring, and the conn-task → driver signals.
325#[derive(Debug, Default)]
326pub(super) struct StreamState {
327    /// Protocol state (RFC 9113 §5.1). Driver is the sole writer; the handler side reads it for
328    /// recv-EOF (`poll_read`) and send-closed (`poll_write`) decisions. Held under a `Mutex` so
329    /// observe-then-act sequences are atomic.
330    lifecycle: Mutex<StreamLifecycle>,
331
332    /// Recv side: inbound DATA payloads, handler waker, handler-intent signal, trailers.
333    pub(super) recv: RecvState,
334
335    /// Send side: the outbound parts queue, the streaming ring, the driver→conn completion
336    /// channel, and the application-release signal.
337    pub(super) send: SendState,
338
339    /// Mailbox flag for conn-task → driver work signaling. Set by conn-task code whenever it
340    /// produces work the driver should service (enqueue a part, raise `is_reading`, increment
341    /// `bytes_consumed`, set `transport_dropped`). The driver's `service_handler_signals` consults
342    /// it via `swap(false, AcqRel)` — only streams where it returns `true` pay for the queue-lock
343    /// pickup; idle streams cost a single atomic RMW per tick.
344    ///
345    /// **Setter ordering rule**: write the underlying state first, then store `true` with
346    /// `Release`, then wake the connection's outbound waker. Over-notification is harmless;
347    /// under-notification would lose a signal.
348    pub(super) needs_servicing: AtomicBool,
349}
350
351impl StreamState {
352    /// Lock the per-stream protocol lifecycle. Convenience wrapper — every site treats poisoning as
353    /// a programming error.
354    pub(super) fn lifecycle_lock(&self) -> MutexGuard<'_, StreamLifecycle> {
355        self.lifecycle.lock().expect("lifecycle mutex poisoned")
356    }
357
358    /// Apply a [`StreamEvent`] to the lifecycle. Driver-only — the sole writer of protocol state.
359    pub(super) fn apply_event(
360        &self,
361        event: StreamEvent,
362    ) -> Result<(), super::stream_state::StreamProtocolError> {
363        self.lifecycle_lock().on_event(event)
364    }
365
366    /// Stage a full submission of outbound parts atomically, so the send pump sees a complete
367    /// message rather than a partial one (the `SubmitSend` future keys "done" off the queue
368    /// draining to empty). Raises `needs_servicing`; the caller wakes the driver.
369    pub(super) fn stage(&self, parts: impl IntoIterator<Item = OutboundPart>) {
370        self.send
371            .queue
372            .lock()
373            .expect("send queue mutex poisoned")
374            .extend(parts);
375        self.needs_servicing.store(true, Ordering::Release);
376    }
377
378    /// Enqueue the `END_STREAM` terminator unless one (or a reset) is already queued or the send
379    /// half is already closed. Idempotent — safe to call from repeated `poll_close` / Drop.
380    pub(super) fn request_close(&self) {
381        if self.lifecycle_lock().send_closed() {
382            return;
383        }
384        let mut queue = self.send.queue.lock().expect("send queue mutex poisoned");
385        if queue.back().is_none_or(|p| !p.is_terminal()) {
386            queue.push_back(OutboundPart::Close);
387        }
388        drop(queue);
389        self.needs_servicing.store(true, Ordering::Release);
390    }
391
392    /// Clear any queued parts and request `RST_STREAM(code)`. First-wins: a reset already at the
393    /// back keeps its code. Nothing else is valid to send after a reset, so clearing the queue
394    /// models the §5.1 sequence faithfully. Raises `needs_servicing`; the caller wakes the driver.
395    pub(super) fn request_reset(&self, code: H2ErrorCode) {
396        let mut queue = self.send.queue.lock().expect("send queue mutex poisoned");
397        if matches!(queue.back(), Some(OutboundPart::Reset(_))) {
398            return;
399        }
400        queue.clear();
401        queue.push_back(OutboundPart::Reset(code));
402        drop(queue);
403        self.needs_servicing.store(true, Ordering::Release);
404    }
405}
406
407/// Receive-side per-stream state.
408#[derive(Debug, Default)]
409pub(super) struct RecvState {
410    /// Inbound DATA body bytes awaiting handler read. A single persistent ring (append-at-tail,
411    /// `ignore_front`-at-head): the driver appends via `extend_from_slice` when a DATA frame
412    /// arrives; the handler reads from the front and virtually drops consumed bytes. When
413    /// `ignore_front` catches up to the data end the `Buffer` truncates to zero, so the underlying
414    /// `Vec` capacity stays bounded by peak in-flight bytes rather than cumulative traffic — zero
415    /// amortized allocations per DATA frame.
416    pub(super) buf: Mutex<Buffer>,
417
418    /// Handler-task waker, fired by the driver after pushing DATA into `buf` or after the
419    /// lifecycle transitions to recv-closed. Single-waiter: only one task ever polls a given
420    /// `H2Transport`.
421    pub(super) waker: AtomicWaker,
422
423    /// Set by the handler's first [`H2Transport::poll_read`] to declare intent to consume the
424    /// request body. The driver observes the transition and emits a `WINDOW_UPDATE` for this
425    /// stream, topping its recv window up from `SETTINGS_INITIAL_WINDOW_SIZE` (advertised as `0`)
426    /// to the per-stream maximum. Once set, stays set.
427    pub(super) is_reading: AtomicBool,
428
429    /// Bytes the handler has consumed from `buf` since the driver last sampled this counter.
430    /// Incremented by [`H2Transport::poll_read`] using `fetch_add` after each drain; the driver
431    /// reads it via `swap(0)` per tick and emits stream-level + connection-level `WINDOW_UPDATE`
432    /// for the consumed total.
433    pub(super) bytes_consumed: AtomicU64,
434
435    /// Trailers, populated by the driver if a trailing HEADERS frame arrives for this stream.
436    /// Always written *before* the lifecycle transitions to recv-closed, so once the handler
437    /// observes `Ready(0)` on the recv side, any trailers for this request are guaranteed to
438    /// be in place.
439    pub(super) trailers: Mutex<Option<Headers>>,
440
441    /// Client-role: response HEADERS field section, populated by the driver on the first non-1xx
442    /// HEADERS frame arrival for a client-initiated stream. Server role doesn't use this slot.
443    /// Single-shot: the conn task takes the `FieldSection` once; subsequent HEADERS arrivals are
444    /// interpreted as trailers. Interim 1xx HEADERS frames are discarded by the driver without
445    /// touching this slot or latching `first_response_headers_seen`.
446    pub(super) response_headers: Mutex<Option<FieldSection<'static>>>,
447
448    /// Client-role: latching flag for "first HEADERS arrived for this stream." Distinct from
449    /// `response_headers.is_some()` — the conn task drains that slot when it consumes headers, so
450    /// the driver can't use slot occupancy to distinguish "haven't seen HEADERS yet" from "headers
451    /// seen + already taken." Set inside `finalize_response_headers` before that slot is
452    /// populated; checked by `route_headers` on subsequent HEADERS to route them as trailers.
453    /// Never cleared.
454    pub(super) first_response_headers_seen: AtomicBool,
455
456    /// Client-role: waker the conn task registers via
457    /// [`H2Connection::response_headers`][super::H2Connection]; fired by the driver after stashing
458    /// the `FieldSection` *or* on stream removal (so a parked conn task observing the stream gone
459    /// surfaces `NotConnected` instead of hanging).
460    pub(super) response_headers_waker: AtomicWaker,
461}
462
463/// Send-side per-stream state: the conn-task → driver outbound parts queue, the streaming ring for
464/// bidirectional upgrades, the driver → conn-task completion channel, and the application-release
465/// signal.
466///
467/// **Normal response path**: the conn task `stage`s `[Headers, Body?, Close]` once via
468/// [`H2Connection::submit_send`][submit] and waits on `completion_waker` for `submit_resolved` to
469/// flip. The driver drains the queue into its private send cursor, frames the parts, and on the
470/// queue draining to empty stores `completion_result`, sets `submit_resolved = true`, and wakes the
471/// conn task.
472///
473/// **Extended-CONNECT upgrade path** ([RFC 8441]): the conn task `stage`s `[Headers, Body?]` with
474/// *no* `Close`, so the stream stays open after the prelude frames. `submit_resolved` flips when
475/// the queue first drains (i.e. once the prelude is on the wire) — matching h1/h3, not at
476/// `END_HEADERS` — so `submit_upgrade().await` returns and the runtime can dispatch
477/// [`Handler::upgrade`][trillium::Handler::upgrade]. The handler then writes through
478/// [`H2Transport`]'s `AsyncWrite`, which appends to `outbound`; the send pump drains that ring into
479/// DATA. `poll_close` enqueues [`OutboundPart::Close`], the pump drains the ring, frames the
480/// terminator, and tears the stream down.
481///
482/// [submit]: super::H2Connection::submit_send
483/// [RFC 8441]: https://www.rfc-editor.org/rfc/rfc8441
484#[derive(Debug, Default)]
485pub(super) struct SendState {
486    /// Outbound parts the conn task has handed off, drained by the driver into its private send
487    /// cursor under `needs_servicing`. Cold: a handful of pushes per stream lifetime.
488    pub(super) queue: Mutex<VecDeque<OutboundPart>>,
489
490    /// Streaming bytes for a bidirectional upgrade: [`H2Transport`]'s `AsyncWrite::poll_write`
491    /// appends here, the send pump drains it into DATA frames before framing a terminator. A
492    /// single reused ring (`ignore_front` at head) — empty for normal responses, which frame an
493    /// owned [`OutboundPart::Body`] directly.
494    pub(super) outbound: Mutex<Buffer>,
495
496    /// Reverse-direction backpressure waker: registered by [`H2Transport::poll_write`] when
497    /// `outbound` hits the cap, fired by the send pump after it drains bytes so a parked writer
498    /// can resume. Without it a slow/unresponsive peer's closed window would let `outbound`
499    /// grow unbounded.
500    pub(super) outbound_write_waker: AtomicWaker,
501
502    /// Set to `true` by the driver once the conn task's [`SubmitSend`][super::SubmitSend] future
503    /// may resolve and its `completion_result` is readable. The future polls this atomic and
504    /// registers on `completion_waker`. Flips when the outbound queue first drains to empty —
505    /// `END_STREAM` for a normal response, the prelude-handoff for an upgrade.
506    pub(super) submit_resolved: AtomicBool,
507
508    /// The driver writes the final result here before flipping `submit_resolved`. The conn task
509    /// takes it once `submit_resolved` is observed true.
510    pub(super) completion_result: Mutex<Option<io::Result<()>>>,
511
512    /// The conn task's waker, registered by `SubmitSend::poll` and fired by the driver after
513    /// `submit_resolved` is set.
514    pub(super) completion_waker: AtomicWaker,
515
516    /// Set by [`H2Transport::Drop`] when the lifecycle is already `Closed` and the application is
517    /// releasing a stream that lingered in the map for post-EOF access (client role). The driver
518    /// picks it up and removes the entry from both maps. Not protocol state — a local
519    /// resource-ownership fact the lifecycle can't represent.
520    pub(super) transport_dropped: AtomicBool,
521}
522
523#[cfg(test)]
524mod tests {
525    use super::*;
526    use crate::HttpContext;
527
528    #[test]
529    fn request_close_enqueues_single_close() {
530        let state = StreamState::default();
531        *state.lifecycle_lock() = StreamLifecycle::Open;
532        state.request_close();
533        state.request_close();
534        let queue = state.send.queue.lock().unwrap();
535        assert_eq!(queue.len(), 1, "second request_close is a no-op");
536        assert!(matches!(queue.front(), Some(OutboundPart::Close)));
537    }
538
539    #[test]
540    fn request_close_noop_when_send_closed() {
541        let state = StreamState::default();
542        *state.lifecycle_lock() = StreamLifecycle::HalfClosedLocal;
543        state.request_close();
544        assert!(
545            state.send.queue.lock().unwrap().is_empty(),
546            "no terminator queued once the send half is closed"
547        );
548    }
549
550    #[test]
551    fn request_reset_clears_queue_and_is_first_wins() {
552        let state = StreamState::default();
553        *state.lifecycle_lock() = StreamLifecycle::Open;
554        state.stage([OutboundPart::Body(Body::default()), OutboundPart::Close]);
555        state.request_reset(H2ErrorCode::Cancel);
556        state.request_reset(H2ErrorCode::InternalError);
557        let queue = state.send.queue.lock().unwrap();
558        assert_eq!(queue.len(), 1, "queue cleared, single reset");
559        assert!(
560            matches!(
561                queue.front(),
562                Some(OutboundPart::Reset(H2ErrorCode::Cancel))
563            ),
564            "first reset code wins",
565        );
566    }
567
568    #[test]
569    fn poll_write_caps_at_response_buffer_max_len() {
570        use futures_lite::AsyncWrite;
571        use std::task::{Context, Poll, Waker};
572        let mut context = HttpContext::new();
573        context.config.response_buffer_max_len = 16;
574        let connection = H2Connection::new(Arc::new(context));
575        let state = Arc::new(StreamState::default());
576        *state.lifecycle_lock() = StreamLifecycle::Open;
577        let mut transport = H2Transport::new(connection, 1, state);
578        let waker = Waker::noop();
579        let mut cx = Context::from_waker(waker);
580        let buf = [0u8; 32];
581        match Pin::new(&mut transport).poll_write(&mut cx, &buf) {
582            Poll::Ready(Ok(n)) => assert_eq!(n, 16, "should accept exactly cap bytes"),
583            other => panic!("expected Ready(Ok(16)), got {other:?}"),
584        }
585    }
586}