trillium_client/conn.rs
1use crate::{
2 Client, ResponseBody,
3 response_body::{CleanupContext, OverrideBody},
4 util::encoding,
5};
6use std::{borrow::Cow, mem, net::SocketAddr, sync::Arc, time::Duration};
7use trillium_http::{
8 Body, Buffer, Error, HeaderName, HeaderValues, Headers, HttpContext, Method, ProtocolSession,
9 ReceivedBody, ReceivedBodyState, Status, TypeSet, Version,
10};
11use trillium_server_common::{Transport, url::Url};
12
13mod h1;
14mod h2;
15mod h3;
16mod shared;
17mod unexpected_status_error;
18
19pub(crate) use h2::H2Pooled;
20#[cfg(any(feature = "serde_json", feature = "sonic-rs"))]
21pub use shared::ClientSerdeError;
22pub use unexpected_status_error::UnexpectedStatusError;
23
24/// a client connection, representing both an outbound http request and a
25/// http response
26#[must_use]
27#[derive(fieldwork::Fieldwork)]
28pub struct Conn {
29 pub(crate) protocol_session: ProtocolSession,
30 /// QUIC-connection WebTransport dispatcher slot (lazy-init) and the QUIC connection
31 /// itself, retained on extended-CONNECT-with-`:protocol = webtransport` requests so
32 /// `into_webtransport` can install the router and hand the QUIC connection to the
33 /// returned [`WebTransportConnection`][trillium_webtransport::WebTransportConnection].
34 #[cfg(feature = "webtransport")]
35 pub(crate) wt_pool_entry: Option<crate::h3::H3PoolEntry>,
36 pub(crate) buffer: Buffer,
37 pub(crate) response_body_state: ReceivedBodyState,
38 pub(crate) headers_finalized: bool,
39 pub(crate) max_head_length: usize,
40 pub(crate) state: TypeSet,
41 pub(crate) context: Arc<HttpContext>,
42
43 /// the transport for this conn
44 ///
45 /// This should only be used to call your own custom methods on the transport that do not read
46 /// or write any data. Calling any method that reads from or writes to the transport will
47 /// disrupt the HTTP protocol.
48 #[field(get, get_mut)]
49 pub(crate) transport: Option<Box<dyn Transport>>,
50
51 /// the url for this conn.
52 ///
53 /// ```
54 /// use trillium_client::{Client, Method};
55 /// use trillium_testing::client_config;
56 ///
57 /// let client = Client::from(client_config());
58 ///
59 /// let conn = client.get("http://localhost:9080");
60 ///
61 /// let url = conn.url(); //<-
62 ///
63 /// assert_eq!(url.host_str().unwrap(), "localhost");
64 /// ```
65 #[field(get, set, get_mut)]
66 pub(crate) url: Url,
67
68 /// the method for this conn.
69 ///
70 /// ```
71 /// use trillium_client::{Client, Method};
72 /// use trillium_testing::client_config;
73 ///
74 /// let client = Client::from(client_config());
75 /// let conn = client.get("http://localhost:9080");
76 ///
77 /// let method = conn.method(); //<-
78 ///
79 /// assert_eq!(method, Method::Get);
80 /// ```
81 #[field(get, set, copy)]
82 pub(crate) method: Method,
83
84 /// the request headers
85 #[field(get, get_mut)]
86 pub(crate) request_headers: Headers,
87
88 #[field(get)]
89 /// the response headers
90 pub(crate) response_headers: Headers,
91
92 /// the status code for this conn.
93 ///
94 /// If the conn has not yet been sent, this will be None.
95 ///
96 /// ```
97 /// use trillium_client::{Client, Status};
98 /// use trillium_testing::{client_config, with_server};
99 ///
100 /// async fn handler(conn: trillium::Conn) -> trillium::Conn {
101 /// conn.with_status(418)
102 /// }
103 ///
104 /// with_server(handler, |url| async move {
105 /// let client = Client::new(client_config());
106 /// let conn = client.get(url).await?;
107 /// assert_eq!(Status::ImATeapot, conn.status().unwrap());
108 /// Ok(())
109 /// });
110 /// ```
111 #[field(get, copy)]
112 pub(crate) status: Option<Status>,
113
114 /// the request body
115 ///
116 /// ```
117 /// env_logger::init();
118 /// use trillium_client::Client;
119 /// use trillium_testing::{client_config, with_server};
120 ///
121 /// let handler = |mut conn: trillium::Conn| async move {
122 /// let body = conn.request_body_string().await.unwrap();
123 /// conn.ok(format!("request body was: {}", body))
124 /// };
125 ///
126 /// with_server(handler, |url| async move {
127 /// let client = Client::from(client_config());
128 /// let mut conn = client
129 /// .post(url)
130 /// .with_body("body") //<-
131 /// .await?;
132 ///
133 /// assert_eq!(
134 /// conn.response_body().read_string().await?,
135 /// "request body was: body"
136 /// );
137 /// Ok(())
138 /// });
139 /// ```
140 #[field(get, with = with_body, argument = body, set, into, take, option_set_some)]
141 pub(crate) request_body: Option<Body>,
142
143 /// the timeout for this conn
144 ///
145 /// this can also be set on the client with [`Client::set_timeout`](crate::Client::set_timeout)
146 /// and [`Client::with_timeout`](crate::Client::with_timeout)
147 #[field(with, set, get, get_mut, take, copy, option_set_some)]
148 pub(crate) timeout: Option<Duration>,
149
150 /// whether this conn is halted.
151 ///
152 /// When set to `true` before execution, the network round-trip is skipped — the conn is
153 /// returned to the caller with whatever response state has been populated synthetically
154 /// (status, headers, body). Used by client middleware to short-circuit on cache hits,
155 /// mocked responses, or open circuit-breakers. Cleared on egress so the user's conn handle
156 /// never observes residual halt state after the awaited conn returns.
157 ///
158 /// Driven via [`ConnExt`](crate::ConnExt) — `halt` / `set_halted` / `is_halted`.
159 pub(crate) halted: bool,
160
161 /// transport-level error from the round-trip, if any.
162 ///
163 /// When the network call fails (connect refused, TLS handshake error, malformed HTTP frame,
164 /// timeout, etc.) the framework stashes the error here and runs the handler chain's
165 /// [`after_response`](crate::ClientHandler::after_response) anyway. A handler that recovers
166 /// (stale-if-error cache, retry-with-fallback) calls
167 /// [`ConnExt::take_error`](crate::ConnExt::take_error) to clear the error
168 /// and populates response state synthetically; if the error is still present after all
169 /// handlers finish, it propagates as `Err` from the awaited conn.
170 pub(crate) error: Option<Error>,
171
172 /// An override response body installed by middleware via
173 /// [`ConnExt::set_response_body`](crate::ConnExt::set_response_body) or
174 /// [`ConnExt::with_response_body`](crate::ConnExt::with_response_body). When
175 /// set, [`Conn::response_body`] returns a [`ResponseBody`] backed by this body instead of
176 /// the transport.
177 pub(crate) body_override: Option<Body>,
178
179 /// the http version *hint* for this conn
180 ///
181 /// Pre-execution this is the prior-knowledge hint, not the version that will necessarily be
182 /// on the wire. `None` means "no hint, use auto-discovery" (Alt-Svc h3, ALPN/pooled h2);
183 /// any `Some(version)` pins the protocol and suppresses auto-discovery. Post-execution this
184 /// is `Some(version)` reflecting the version the request was actually sent over.
185 ///
186 /// The public [`http_version`](Conn::http_version) accessor resolves `None` to
187 /// [`Version::Http1_1`]. See the crate-level [Protocol selection][crate#protocol-selection]
188 /// documentation for the full hint → behavior table.
189 #[field(set, with, option_set_some)]
190 pub(crate) http_version: Option<Version>,
191
192 /// the :authority pseudo-header, populated during h2 or h3 header finalization
193 #[field(get)]
194 pub(crate) authority: Option<Cow<'static, str>>,
195 /// the :scheme pseudo-header, populated during h2 or h3 header finalization
196
197 #[field(get)]
198 pub(crate) scheme: Option<Cow<'static, str>>,
199
200 /// the :path pseudo-header, populated during h2 or h3 header finalization
201 #[field(get)]
202 pub(crate) path: Option<Cow<'static, str>>,
203
204 /// an explicit request target override, used only for `OPTIONS *` and `CONNECT host:port`
205 ///
206 /// When set and the method is OPTIONS or CONNECT, this value is used as the HTTP request
207 /// target instead of deriving it from the url. For all other methods, this field is ignored.
208 #[field(with, set, get, option_set_some, into)]
209 pub(crate) request_target: Option<Cow<'static, str>>,
210
211 /// the `:protocol` pseudo-header for an extended-CONNECT bootstrap (RFC 8441 over h2,
212 /// RFC 9220 over h3). Triggers the h2/h3 exec paths to send HEADERS without `END_STREAM`
213 /// and leave the stream open as a bidirectional byte channel.
214 ///
215 /// Only meaningful when method is `CONNECT` and [`http_version`][Self::http_version] is
216 /// `Http2` or `Http3`. h1 and prior-version requests ignore this field.
217 #[field(get)]
218 pub(crate) protocol: Option<Cow<'static, str>>,
219
220 /// trailers sent with the request body, populated after the body has been fully sent.
221 ///
222 /// Only present when the request body was constructed with [`Body::new_with_trailers`] and
223 /// the body has been fully sent.
224 #[field(get)]
225 pub(crate) request_trailers: Option<Headers>,
226
227 /// trailers received with the response body, populated after the response body has been fully
228 /// read.
229 #[field(get)]
230 pub(crate) response_trailers: Option<Headers>,
231
232 /// the [`Client`] that built this conn.
233 #[field(get)]
234 pub(crate) client: Client,
235
236 /// A queued follow-up conn installed by middleware via
237 /// [`ConnExt::set_followup`](crate::ConnExt::set_followup).
238 ///
239 /// When `Some` after the handler chain's `after_response` has fully unwound, the
240 /// [`IntoFuture`][std::future::IntoFuture] loop picks it up: the current conn's response
241 /// body is recycled, then the follow-up is swapped in and runs another full
242 /// `(run → network → after_response)` cycle. Used by re-issuing handlers
243 /// (`FollowRedirects`, retry, auth-refresh) instead of recursing into a nested `.await`.
244 pub(crate) followup: Option<Box<Conn>>,
245
246 /// Whether this conn is armed for an upgrade. When set, the protocol drivers
247 /// transmit only request headers and leave the outbound direction open. Armed via
248 /// [`ConnExt::upgrade`](crate::ConnExt::upgrade).
249 pub(crate) upgrade: bool,
250}
251
252/// default http user-agent header
253pub const USER_AGENT: &str = concat!("trillium-client/", env!("CARGO_PKG_VERSION"));
254
255impl Conn {
256 /// the http version for this conn
257 ///
258 /// Pre-execution this resolves the version *hint* — the default (no hint) reports
259 /// [`Version::Http1_1`], which means "use auto-discovery," not "force HTTP/1.1." Setting any
260 /// explicit version via [`with_http_version`](Conn::with_http_version) pins the protocol and
261 /// suppresses auto-discovery. Post-execution this reflects the version the request was actually
262 /// sent over.
263 ///
264 /// See the crate-level [Protocol selection][crate#protocol-selection] documentation for the
265 /// full hint → behavior table.
266 #[must_use]
267 pub fn http_version(&self) -> Version {
268 self.http_version.unwrap_or(Version::Http1_1)
269 }
270
271 /// chainable setter for [`inserting`](Headers::insert) a request header
272 ///
273 /// ```
274 /// use trillium_client::Client;
275 /// use trillium_testing::{client_config, with_server};
276 ///
277 /// let handler = |conn: trillium::Conn| async move {
278 /// let header = conn
279 /// .request_headers()
280 /// .get_str("some-request-header")
281 /// .unwrap_or_default();
282 /// let response = format!("some-request-header was {}", header);
283 /// conn.ok(response)
284 /// };
285 ///
286 /// with_server(handler, |url| async move {
287 /// let client = Client::new(client_config());
288 /// let mut conn = client
289 /// .get(url)
290 /// .with_request_header("some-request-header", "header-value") // <--
291 /// .await?;
292 /// assert_eq!(
293 /// conn.response_body().read_string().await?,
294 /// "some-request-header was header-value"
295 /// );
296 /// Ok(())
297 /// })
298 /// ```
299 pub fn with_request_header(
300 mut self,
301 name: impl Into<HeaderName<'static>>,
302 value: impl Into<HeaderValues>,
303 ) -> Self {
304 self.request_headers.insert(name, value);
305 self
306 }
307
308 /// chainable setter for `extending` request headers
309 ///
310 /// ```
311 /// use trillium_client::Client;
312 /// use trillium_testing::{client_config, with_server};
313 ///
314 /// let handler = |conn: trillium::Conn| async move {
315 /// let header = conn
316 /// .request_headers()
317 /// .get_str("some-request-header")
318 /// .unwrap_or_default();
319 /// let response = format!("some-request-header was {}", header);
320 /// conn.ok(response)
321 /// };
322 ///
323 /// with_server(handler, move |url| async move {
324 /// let client = Client::new(client_config());
325 /// let mut conn = client
326 /// .get(url)
327 /// .with_request_headers([
328 /// ("some-request-header", "header-value"),
329 /// ("some-other-req-header", "other-header-value"),
330 /// ])
331 /// .await?;
332 ///
333 /// assert_eq!(
334 /// conn.response_body().read_string().await?,
335 /// "some-request-header was header-value"
336 /// );
337 /// Ok(())
338 /// })
339 /// ```
340 pub fn with_request_headers<HN, HV, I>(mut self, headers: I) -> Self
341 where
342 I: IntoIterator<Item = (HN, HV)> + Send,
343 HN: Into<HeaderName<'static>>,
344 HV: Into<HeaderValues>,
345 {
346 self.request_headers.extend(headers);
347 self
348 }
349
350 /// Chainable method to remove a request header if present
351 pub fn without_request_header(mut self, name: impl Into<HeaderName<'static>>) -> Self {
352 self.request_headers.remove(name);
353 self
354 }
355
356 /// chainable setter for json body. this requires the `serde_json` crate feature to be enabled.
357 #[cfg(feature = "serde_json")]
358 pub fn with_json_body(self, body: &impl serde::Serialize) -> serde_json::Result<Self> {
359 use trillium_http::KnownHeaderName;
360
361 Ok(self
362 .with_body(serde_json::to_string(body)?)
363 .with_request_header(KnownHeaderName::ContentType, "application/json"))
364 }
365
366 /// chainable setter for json body. this requires the `sonic-rs` crate feature to be enabled.
367 #[cfg(feature = "sonic-rs")]
368 pub fn with_json_body(self, body: &impl serde::Serialize) -> sonic_rs::Result<Self> {
369 use trillium_http::KnownHeaderName;
370
371 Ok(self
372 .with_body(sonic_rs::to_string(body)?)
373 .with_request_header(KnownHeaderName::ContentType, "application/json"))
374 }
375
376 /// returns a [`ResponseBody`](crate::ResponseBody) that borrows the connection inside this
377 /// conn.
378 /// ```
379 /// use trillium_client::Client;
380 /// use trillium_testing::{client_config, with_server};
381 ///
382 /// let handler = |mut conn: trillium::Conn| async move { conn.ok("hello from trillium") };
383 ///
384 /// with_server(handler, |url| async move {
385 /// let client = Client::from(client_config());
386 /// let mut conn = client.get(url).await?;
387 ///
388 /// let response_body = conn.response_body(); //<-
389 ///
390 /// assert_eq!(19, response_body.content_length().unwrap());
391 /// let string = response_body.read_string().await?;
392 /// assert_eq!("hello from trillium", string);
393 /// Ok(())
394 /// });
395 /// ```
396 #[allow(clippy::needless_borrow, clippy::needless_borrows_for_generic_args)]
397 pub fn response_body(&mut self) -> ResponseBody<'_> {
398 let content_length = self.response_content_length();
399 let encoding = encoding(&self.response_headers);
400 if let Some(body) = self.body_override.as_mut() {
401 OverrideBody::new(body, encoding, self.context.config()).into()
402 } else {
403 ReceivedBody::new(
404 content_length,
405 &mut self.buffer,
406 self.transport.as_mut().unwrap(),
407 &mut self.response_body_state,
408 None,
409 encoding,
410 )
411 .with_trailers(&mut self.response_trailers)
412 .with_protocol_session(self.protocol_session.clone())
413 .into()
414 }
415 }
416
417 /// Attempt to deserialize the response body. Note that this consumes the body content.
418 #[cfg(feature = "serde_json")]
419 pub async fn response_json<T>(&mut self) -> Result<T, ClientSerdeError>
420 where
421 T: serde::de::DeserializeOwned,
422 {
423 let body = self.response_body().read_string().await?;
424 Ok(serde_json::from_str(&body)?)
425 }
426
427 /// Attempt to deserialize the response body. Note that this consumes the body content.
428 #[cfg(feature = "sonic-rs")]
429 pub async fn response_json<T>(&mut self) -> Result<T, ClientSerdeError>
430 where
431 T: serde::de::DeserializeOwned,
432 {
433 let body = self.response_body().read_string().await?;
434 Ok(sonic_rs::from_str(&body)?)
435 }
436
437 /// Returns the conn or an [`UnexpectedStatusError`] that contains the conn
438 ///
439 /// ```
440 /// use trillium_client::{Client, Status};
441 /// use trillium_testing::{client_config, with_server};
442 ///
443 /// with_server(Status::NotFound, |url| async move {
444 /// let client = Client::new(client_config());
445 /// assert_eq!(
446 /// client.get(url).await?.success().unwrap_err().to_string(),
447 /// "expected a success (2xx) status code, but got 404 Not Found"
448 /// );
449 /// Ok(())
450 /// });
451 ///
452 /// with_server(Status::Ok, |url| async move {
453 /// let client = Client::new(client_config());
454 /// assert!(client.get(url).await?.success().is_ok());
455 /// Ok(())
456 /// });
457 /// ```
458 pub fn success(self) -> Result<Self, UnexpectedStatusError> {
459 match self.status() {
460 Some(status) if status.is_success() => Ok(self),
461 _ => Err(self.into()),
462 }
463 }
464
465 /// Detach the response body as an owned, `'static` value.
466 ///
467 /// Returns `None` if there is no body to take — neither an override has been installed nor
468 /// a transport-backed body is available. Subsequent calls return `None`. Callers who want
469 /// to wrap-and-replace the body (e.g. tee through a cache) compose this with
470 /// [`ConnExt::set_response_body`][crate::ConnExt::set_response_body]; the conn's
471 /// body slot is empty between the two calls.
472 ///
473 /// For a transport-backed body, this moves the transport into the returned
474 /// `ResponseBody<'static>`. Drop on that value drains-and-pools (keepalive) or closes
475 /// (otherwise) the transport via a spawned task; [`ResponseBody::recycle`] is the
476 /// `await`-able variant. For an override body, the inner [`Body`] is moved out and any
477 /// leftover transport on the conn is recycled immediately.
478 #[must_use]
479 pub fn take_response_body(&mut self) -> Option<ResponseBody<'static>> {
480 let encoding = encoding(&self.response_headers);
481 if let Some(body) = self.body_override.take() {
482 return Some(OverrideBody::new(body, encoding, self.context.config()).into());
483 }
484
485 let cleanup = self.build_cleanup_context();
486 let received = self.take_received_body(false)?;
487 Some(ResponseBody::received_owned(received, cleanup))
488 }
489
490 /// Build a [`CleanupContext`] capturing the runtime and (if keepalive + pool configured)
491 /// the pool + origin to insert into. Single source of truth for "what should happen to
492 /// this conn's transport when its body is released" — both the on_completion callback
493 /// wired into the body and the [`ResponseBody::recycle`] / `Drop` paths consume clones
494 /// of this same context, so the user-driven and Drop-driven release paths agree.
495 fn build_cleanup_context(&self) -> CleanupContext {
496 // Only pool a transport whose response head we actually received (`status.is_some()`): a
497 // conn abandoned before the response — a timeout or transport error mid-request — has an
498 // empty `response_headers`, which `is_keep_alive` would read as persistent and recycle a
499 // half-spent connection into the pool, poisoning the next request that reuses it.
500 let h1_pool_origin = if self.status.is_some()
501 && self.is_keep_alive()
502 && let Some(pool) = self.client.pool().cloned()
503 {
504 Some((pool, self.url.origin()))
505 } else {
506 None
507 };
508
509 CleanupContext {
510 runtime: self.client.connector().runtime(),
511 h1_pool_origin,
512 }
513 }
514
515 /// Detach the transport-backed receive side of this conn as an owned `ReceivedBody`.
516 ///
517 /// Returns `None` when no transport is attached.
518 ///
519 /// `cleanup: true` wires a spawn-on-End callback inside the body for callers that hand
520 /// the body off without awaiting it (`From<Conn> for Body`). `cleanup: false` is for
521 /// callers that drive the body to End themselves and release the transport inline in
522 /// their own poll loop — `take_response_body` does this so callers get a "transport is
523 /// settled when read_to_end returns Ok(0)" guarantee instead of racing a spawned task.
524 pub(crate) fn take_received_body(
525 &mut self,
526 cleanup: bool,
527 ) -> Option<ReceivedBody<'static, Box<dyn Transport>>> {
528 let _ = self.finalize_headers();
529 let transport = self.transport.take()?;
530
531 let on_completion = cleanup.then(|| {
532 let cleanup = self.build_cleanup_context();
533 Box::new(move |transport| cleanup.handoff(transport))
534 as Box<dyn FnOnce(Box<dyn Transport>) + Send + Sync + 'static>
535 });
536
537 Some(
538 ReceivedBody::new(
539 self.response_content_length(),
540 mem::take(&mut self.buffer),
541 transport,
542 self.response_body_state,
543 on_completion,
544 encoding(&self.response_headers),
545 )
546 .with_protocol_session(self.protocol_session.clone()),
547 )
548 }
549
550 /// Returns this conn to the connection pool if it is keepalive, and
551 /// closes it otherwise. This will happen asynchronously as a spawned
552 /// task when the conn is dropped, but calling it explicitly allows
553 /// you to block on it and control where it happens.
554 pub async fn recycle(mut self) {
555 if let Some(rb) = self.take_response_body() {
556 rb.recycle().await;
557 }
558 }
559
560 /// attempts to retrieve the connected peer address
561 pub fn peer_addr(&self) -> Option<SocketAddr> {
562 self.transport
563 .as_ref()
564 .and_then(|t| t.peer_addr().ok().flatten())
565 }
566
567 /// add state to the client conn and return self
568 pub fn with_state<T: Send + Sync + 'static>(mut self, state: T) -> Self {
569 self.insert_state(state);
570 self
571 }
572
573 /// add state to the client conn, returning any previously set state of this type
574 pub fn insert_state<T: Send + Sync + 'static>(&mut self, state: T) -> Option<T> {
575 self.state.insert(state)
576 }
577
578 /// borrow state
579 pub fn state<T: Send + Sync + 'static>(&self) -> Option<&T> {
580 self.state.get()
581 }
582
583 /// borrow state mutably
584 pub fn state_mut<T: Send + Sync + 'static>(&mut self) -> Option<&mut T> {
585 self.state.get_mut()
586 }
587
588 /// take state
589 pub fn take_state<T: Send + Sync + 'static>(&mut self) -> Option<T> {
590 self.state.take()
591 }
592}