trillium_http/h2/connection/ping.rs
1//! PING / PING-ACK round-trip tracking.
2//!
3//! [`H2Connection::send_ping`][super::H2Connection::send_ping] returns a [`SendPing`] future
4//! that resolves with the round-trip time once the peer's `PING ACK` arrives. The driver-side
5//! hooks ([`drain_pending_ping_outbound`][super::H2Connection::drain_pending_ping_outbound],
6//! [`complete_pending_ping`][super::H2Connection::complete_pending_ping],
7//! [`fail_pending_pings`][super::H2Connection::fail_pending_pings]) are the in-driver-task
8//! counterparts: queue-drain, ack-arrival, connection-close.
9
10use super::H2Connection;
11use std::{
12 future::Future,
13 io,
14 pin::Pin,
15 task::{Context, Poll, Waker},
16 time::{Duration, Instant},
17};
18
19/// Tracks a single outstanding active PING's lifecycle.
20#[derive(Debug)]
21pub(crate) struct PendingPing {
22 pub(crate) sent_at: Instant,
23 pub(crate) waker: Option<Waker>,
24 pub(crate) completed: Option<io::Result<Duration>>,
25}
26
27/// Future returned by [`H2Connection::send_ping`].
28///
29/// Resolves to the round-trip time once the peer's PING ACK arrives, or to an `io::Error`
30/// if the connection closes first. Dropping the future before completion removes the
31/// pending entry so the [`H2Connection`]'s map doesn't accumulate stale state.
32#[must_use = "futures do nothing unless awaited"]
33#[derive(Debug)]
34pub struct SendPing<'a> {
35 pub(super) connection: &'a H2Connection,
36 pub(super) opaque: [u8; 8],
37 /// `true` while this future still owns an entry in `pending_pings` that `Drop` must
38 /// remove. Set to `false` once registration fails (duplicate opaque) or `poll` returns
39 /// `Ready` with the entry removed.
40 pub(super) needs_cleanup: bool,
41}
42
43impl Future for SendPing<'_> {
44 type Output = io::Result<Duration>;
45
46 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
47 let this = self.get_mut();
48 if !this.needs_cleanup {
49 return Poll::Ready(Err(io::Error::new(
50 io::ErrorKind::AlreadyExists,
51 "PING with this opaque payload is already in flight",
52 )));
53 }
54 let mut pending = this
55 .connection
56 .pending_pings
57 .lock()
58 .expect("pending_pings mutex poisoned");
59 let entry = pending
60 .get_mut(&this.opaque)
61 .expect("pending_pings entry removed while SendPing future still pending");
62 if let Some(result) = entry.completed.take() {
63 pending.remove(&this.opaque);
64 this.needs_cleanup = false;
65 return Poll::Ready(result);
66 }
67 entry.waker = Some(cx.waker().clone());
68 Poll::Pending
69 }
70}
71
72impl Drop for SendPing<'_> {
73 fn drop(&mut self) {
74 if self.needs_cleanup
75 && let Ok(mut pending) = self.connection.pending_pings.lock()
76 {
77 pending.remove(&self.opaque);
78 }
79 }
80}
81
82impl H2Connection {
83 /// Send a `PING` frame to the peer and resolve when its `PING ACK` arrives, returning
84 /// the round-trip time.
85 ///
86 /// `opaque` is the 8-byte payload the peer echoes back; the caller picks the value
87 /// (typically a counter or random nonce). A `PING` whose opaque payload is already
88 /// in flight resolves to `io::ErrorKind::AlreadyExists`.
89 ///
90 /// No internal timeout. Wrap the returned future with the runtime's
91 /// `race_with_timeout` (or equivalent) to bound the wait.
92 ///
93 /// # Cancel safety
94 ///
95 /// Dropping the returned future before completion removes the pending entry. The PING
96 /// frame may still go out (or already have gone out) and the peer's ACK is silently
97 /// dropped. Re-using the same `opaque` after drop is safe.
98 ///
99 /// # Panics
100 ///
101 /// Panics if any per-connection mutex is poisoned.
102 pub fn send_ping(&self, opaque: [u8; 8]) -> SendPing<'_> {
103 let mut pending = self
104 .pending_pings
105 .lock()
106 .expect("pending_pings mutex poisoned");
107 if pending.contains_key(&opaque) {
108 return SendPing {
109 connection: self,
110 opaque,
111 needs_cleanup: false,
112 };
113 }
114 pending.insert(
115 opaque,
116 PendingPing {
117 sent_at: Instant::now(),
118 waker: None,
119 completed: None,
120 },
121 );
122 drop(pending);
123 self.pending_ping_outbound
124 .lock()
125 .expect("pending_ping_outbound mutex poisoned")
126 .push_back(opaque);
127 self.outbound_waker.wake();
128 SendPing {
129 connection: self,
130 opaque,
131 needs_cleanup: true,
132 }
133 }
134
135 /// Driver-side: drain the queue of outbound active PING opaque payloads for emission.
136 pub(in crate::h2) fn drain_pending_ping_outbound(&self) -> Vec<[u8; 8]> {
137 let mut queue = self
138 .pending_ping_outbound
139 .lock()
140 .expect("pending_ping_outbound mutex poisoned");
141 queue.drain(..).collect()
142 }
143
144 /// Driver-side: a `PING ACK` for the given opaque payload arrived. Marks the pending
145 /// entry complete with the elapsed RTT and wakes its waker, if any. A no-op if the
146 /// payload doesn't match an outstanding PING (unsolicited ACK).
147 pub(in crate::h2) fn complete_pending_ping(&self, opaque: [u8; 8]) {
148 let mut pending = self
149 .pending_pings
150 .lock()
151 .expect("pending_pings mutex poisoned");
152 if let Some(entry) = pending.get_mut(&opaque) {
153 let elapsed = entry.sent_at.elapsed();
154 entry.completed = Some(Ok(elapsed));
155 if let Some(waker) = entry.waker.take() {
156 waker.wake();
157 }
158 }
159 }
160
161 /// Driver-side: connection is closing. Complete every outstanding PING with the given
162 /// error so awaiting `send_ping` futures don't block forever.
163 pub(in crate::h2) fn fail_pending_pings(
164 &self,
165 error_kind: io::ErrorKind,
166 message: &'static str,
167 ) {
168 let mut pending = self
169 .pending_pings
170 .lock()
171 .expect("pending_pings mutex poisoned");
172 for entry in pending.values_mut() {
173 if entry.completed.is_none() {
174 entry.completed = Some(Err(io::Error::new(error_kind, message)));
175 if let Some(waker) = entry.waker.take() {
176 waker.wake();
177 }
178 }
179 }
180 }
181}