Skip to main content

trillium_http/h2/connection/
ping.rs

1//! PING / PING-ACK round-trip tracking.
2//!
3//! [`H2Connection::send_ping`][super::H2Connection::send_ping] returns a [`SendPing`] future
4//! that resolves with the round-trip time once the peer's `PING ACK` arrives. The driver-side
5//! hooks ([`drain_pending_ping_outbound`][super::H2Connection::drain_pending_ping_outbound],
6//! [`complete_pending_ping`][super::H2Connection::complete_pending_ping],
7//! [`fail_pending_pings`][super::H2Connection::fail_pending_pings]) are the in-driver-task
8//! counterparts: queue-drain, ack-arrival, connection-close.
9
10use super::H2Connection;
11use std::{
12    future::Future,
13    io,
14    pin::Pin,
15    task::{Context, Poll, Waker},
16    time::{Duration, Instant},
17};
18
19/// Tracks a single outstanding active PING's lifecycle.
20#[derive(Debug)]
21pub(crate) struct PendingPing {
22    pub(crate) sent_at: Instant,
23    pub(crate) waker: Option<Waker>,
24    pub(crate) completed: Option<io::Result<Duration>>,
25}
26
27/// Future returned by [`H2Connection::send_ping`].
28///
29/// Resolves to the round-trip time once the peer's PING ACK arrives, or to an `io::Error`
30/// if the connection closes first. Dropping the future before completion removes the
31/// pending entry so the [`H2Connection`]'s map doesn't accumulate stale state.
32#[must_use = "futures do nothing unless awaited"]
33#[derive(Debug)]
34pub struct SendPing<'a> {
35    pub(super) connection: &'a H2Connection,
36    pub(super) opaque: [u8; 8],
37    /// `true` while this future still owns an entry in `pending_pings` that `Drop` must
38    /// remove. Set to `false` once registration fails (duplicate opaque) or `poll` returns
39    /// `Ready` with the entry removed.
40    pub(super) needs_cleanup: bool,
41}
42
43impl Future for SendPing<'_> {
44    type Output = io::Result<Duration>;
45
46    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
47        let this = self.get_mut();
48        if !this.needs_cleanup {
49            return Poll::Ready(Err(io::Error::new(
50                io::ErrorKind::AlreadyExists,
51                "PING with this opaque payload is already in flight",
52            )));
53        }
54        let mut pending = this
55            .connection
56            .pending_pings
57            .lock()
58            .expect("pending_pings mutex poisoned");
59        let entry = pending
60            .get_mut(&this.opaque)
61            .expect("pending_pings entry removed while SendPing future still pending");
62        if let Some(result) = entry.completed.take() {
63            pending.remove(&this.opaque);
64            this.needs_cleanup = false;
65            return Poll::Ready(result);
66        }
67        entry.waker = Some(cx.waker().clone());
68        Poll::Pending
69    }
70}
71
72impl Drop for SendPing<'_> {
73    fn drop(&mut self) {
74        if self.needs_cleanup
75            && let Ok(mut pending) = self.connection.pending_pings.lock()
76        {
77            pending.remove(&self.opaque);
78        }
79    }
80}
81
82impl H2Connection {
83    /// Send a `PING` frame to the peer and resolve when its `PING ACK` arrives, returning
84    /// the round-trip time.
85    ///
86    /// `opaque` is the 8-byte payload the peer echoes back; the caller picks the value
87    /// (typically a counter or random nonce). A `PING` whose opaque payload is already
88    /// in flight resolves to `io::ErrorKind::AlreadyExists`.
89    ///
90    /// No internal timeout. Wrap the returned future with the runtime's
91    /// `race_with_timeout` (or equivalent) to bound the wait.
92    ///
93    /// # Cancel safety
94    ///
95    /// Dropping the returned future before completion removes the pending entry. The PING
96    /// frame may still go out (or already have gone out) and the peer's ACK is silently
97    /// dropped. Re-using the same `opaque` after drop is safe.
98    ///
99    /// # Panics
100    ///
101    /// Panics if any per-connection mutex is poisoned.
102    pub fn send_ping(&self, opaque: [u8; 8]) -> SendPing<'_> {
103        let mut pending = self
104            .pending_pings
105            .lock()
106            .expect("pending_pings mutex poisoned");
107        if pending.contains_key(&opaque) {
108            return SendPing {
109                connection: self,
110                opaque,
111                needs_cleanup: false,
112            };
113        }
114        pending.insert(
115            opaque,
116            PendingPing {
117                sent_at: Instant::now(),
118                waker: None,
119                completed: None,
120            },
121        );
122        drop(pending);
123        self.pending_ping_outbound
124            .lock()
125            .expect("pending_ping_outbound mutex poisoned")
126            .push_back(opaque);
127        self.outbound_waker.wake();
128        SendPing {
129            connection: self,
130            opaque,
131            needs_cleanup: true,
132        }
133    }
134
135    /// Driver-side: drain the queue of outbound active PING opaque payloads for emission.
136    pub(in crate::h2) fn drain_pending_ping_outbound(&self) -> Vec<[u8; 8]> {
137        let mut queue = self
138            .pending_ping_outbound
139            .lock()
140            .expect("pending_ping_outbound mutex poisoned");
141        queue.drain(..).collect()
142    }
143
144    /// Driver-side: a `PING ACK` for the given opaque payload arrived. Marks the pending
145    /// entry complete with the elapsed RTT and wakes its waker, if any. A no-op if the
146    /// payload doesn't match an outstanding PING (unsolicited ACK).
147    pub(in crate::h2) fn complete_pending_ping(&self, opaque: [u8; 8]) {
148        let mut pending = self
149            .pending_pings
150            .lock()
151            .expect("pending_pings mutex poisoned");
152        if let Some(entry) = pending.get_mut(&opaque) {
153            let elapsed = entry.sent_at.elapsed();
154            entry.completed = Some(Ok(elapsed));
155            if let Some(waker) = entry.waker.take() {
156                waker.wake();
157            }
158        }
159    }
160
161    /// Driver-side: connection is closing. Complete every outstanding PING with the given
162    /// error so awaiting `send_ping` futures don't block forever.
163    pub(in crate::h2) fn fail_pending_pings(
164        &self,
165        error_kind: io::ErrorKind,
166        message: &'static str,
167    ) {
168        let mut pending = self
169            .pending_pings
170            .lock()
171            .expect("pending_pings mutex poisoned");
172        for entry in pending.values_mut() {
173            if entry.completed.is_none() {
174                entry.completed = Some(Err(io::Error::new(error_kind, message)));
175                if let Some(waker) = entry.waker.take() {
176                    waker.wake();
177                }
178            }
179        }
180    }
181}