Skip to main content

trillium_client/
conn.rs

1use crate::{
2    Client, ResponseBody,
3    response_body::{CleanupContext, OverrideBody},
4    util::encoding,
5};
6use std::{borrow::Cow, mem, net::SocketAddr, sync::Arc, time::Duration};
7use trillium_http::{
8    Body, Buffer, Error, HeaderName, HeaderValues, Headers, HttpContext, Method, ProtocolSession,
9    ReceivedBody, ReceivedBodyState, Status, TypeSet, Version,
10};
11use trillium_server_common::{Transport, url::Url};
12
13mod h1;
14mod h2;
15mod h3;
16mod shared;
17mod unexpected_status_error;
18
19pub(crate) use h2::H2Pooled;
20#[cfg(any(feature = "serde_json", feature = "sonic-rs"))]
21pub use shared::ClientSerdeError;
22pub use unexpected_status_error::UnexpectedStatusError;
23
24/// a client connection, representing both an outbound http request and a
25/// http response
26#[must_use]
27#[derive(fieldwork::Fieldwork)]
28pub struct Conn {
29    pub(crate) protocol_session: ProtocolSession,
30    /// QUIC-connection WebTransport dispatcher slot (lazy-init) and the QUIC connection
31    /// itself, retained on extended-CONNECT-with-`:protocol = webtransport` requests so
32    /// `into_webtransport` can install the router and hand the QUIC connection to the
33    /// returned [`WebTransportConnection`][trillium_webtransport::WebTransportConnection].
34    #[cfg(feature = "webtransport")]
35    pub(crate) wt_pool_entry: Option<crate::h3::H3PoolEntry>,
36    pub(crate) buffer: Buffer,
37    pub(crate) response_body_state: ReceivedBodyState,
38    pub(crate) headers_finalized: bool,
39    pub(crate) max_head_length: usize,
40    pub(crate) state: TypeSet,
41    pub(crate) context: Arc<HttpContext>,
42
43    /// the transport for this conn
44    ///
45    /// This should only be used to call your own custom methods on the transport that do not read
46    /// or write any data. Calling any method that reads from or writes to the transport will
47    /// disrupt the HTTP protocol.
48    #[field(get, get_mut)]
49    pub(crate) transport: Option<Box<dyn Transport>>,
50
51    /// the url for this conn.
52    ///
53    /// ```
54    /// use trillium_client::{Client, Method};
55    /// use trillium_testing::client_config;
56    ///
57    /// let client = Client::from(client_config());
58    ///
59    /// let conn = client.get("http://localhost:9080");
60    ///
61    /// let url = conn.url(); //<-
62    ///
63    /// assert_eq!(url.host_str().unwrap(), "localhost");
64    /// ```
65    #[field(get, set, get_mut)]
66    pub(crate) url: Url,
67
68    /// the method for this conn.
69    ///
70    /// ```
71    /// use trillium_client::{Client, Method};
72    /// use trillium_testing::client_config;
73    ///
74    /// let client = Client::from(client_config());
75    /// let conn = client.get("http://localhost:9080");
76    ///
77    /// let method = conn.method(); //<-
78    ///
79    /// assert_eq!(method, Method::Get);
80    /// ```
81    #[field(get, set, copy)]
82    pub(crate) method: Method,
83
84    /// the request headers
85    #[field(get, get_mut)]
86    pub(crate) request_headers: Headers,
87
88    #[field(get)]
89    /// the response headers
90    pub(crate) response_headers: Headers,
91
92    /// the status code for this conn.
93    ///
94    /// If the conn has not yet been sent, this will be None.
95    ///
96    /// ```
97    /// use trillium_client::{Client, Status};
98    /// use trillium_testing::{client_config, with_server};
99    ///
100    /// async fn handler(conn: trillium::Conn) -> trillium::Conn {
101    ///     conn.with_status(418)
102    /// }
103    ///
104    /// with_server(handler, |url| async move {
105    ///     let client = Client::new(client_config());
106    ///     let conn = client.get(url).await?;
107    ///     assert_eq!(Status::ImATeapot, conn.status().unwrap());
108    ///     Ok(())
109    /// });
110    /// ```
111    #[field(get, copy)]
112    pub(crate) status: Option<Status>,
113
114    /// the request body
115    ///
116    /// ```
117    /// env_logger::init();
118    /// use trillium_client::Client;
119    /// use trillium_testing::{client_config, with_server};
120    ///
121    /// let handler = |mut conn: trillium::Conn| async move {
122    ///     let body = conn.request_body_string().await.unwrap();
123    ///     conn.ok(format!("request body was: {}", body))
124    /// };
125    ///
126    /// with_server(handler, |url| async move {
127    ///     let client = Client::from(client_config());
128    ///     let mut conn = client
129    ///         .post(url)
130    ///         .with_body("body") //<-
131    ///         .await?;
132    ///
133    ///     assert_eq!(
134    ///         conn.response_body().read_string().await?,
135    ///         "request body was: body"
136    ///     );
137    ///     Ok(())
138    /// });
139    /// ```
140    #[field(get, with = with_body, argument = body, set, into, take, option_set_some)]
141    pub(crate) request_body: Option<Body>,
142
143    /// the timeout for this conn
144    ///
145    /// this can also be set on the client with [`Client::set_timeout`](crate::Client::set_timeout)
146    /// and [`Client::with_timeout`](crate::Client::with_timeout)
147    #[field(with, set, get, get_mut, take, copy, option_set_some)]
148    pub(crate) timeout: Option<Duration>,
149
150    /// whether this conn is halted.
151    ///
152    /// When set to `true` before execution, the network round-trip is skipped — the conn is
153    /// returned to the caller with whatever response state has been populated synthetically
154    /// (status, headers, body). Used by client middleware to short-circuit on cache hits,
155    /// mocked responses, or open circuit-breakers. Cleared on egress so the user's conn handle
156    /// never observes residual halt state after the awaited conn returns.
157    ///
158    /// Driven via [`ConnExt`](crate::ConnExt) — `halt` / `set_halted` / `is_halted`.
159    pub(crate) halted: bool,
160
161    /// transport-level error from the round-trip, if any.
162    ///
163    /// When the network call fails (connect refused, TLS handshake error, malformed HTTP frame,
164    /// timeout, etc.) the framework stashes the error here and runs the handler chain's
165    /// [`after_response`](crate::ClientHandler::after_response) anyway. A handler that recovers
166    /// (stale-if-error cache, retry-with-fallback) calls
167    /// [`ConnExt::take_error`](crate::ConnExt::take_error) to clear the error
168    /// and populates response state synthetically; if the error is still present after all
169    /// handlers finish, it propagates as `Err` from the awaited conn.
170    pub(crate) error: Option<Error>,
171
172    /// An override response body installed by middleware via
173    /// [`ConnExt::set_response_body`](crate::ConnExt::set_response_body) or
174    /// [`ConnExt::with_response_body`](crate::ConnExt::with_response_body). When
175    /// set, [`Conn::response_body`] returns a [`ResponseBody`] backed by this body instead of
176    /// the transport.
177    pub(crate) body_override: Option<Body>,
178
179    /// the http version for this conn
180    ///
181    /// Pre-execution this is the version *hint* (prior knowledge), not the version that will
182    /// necessarily be on the wire — the default [`Version::Http1_1`] means "no hint, use
183    /// auto-discovery" rather than "force HTTP/1.1." Post-execution this reflects the version
184    /// the request was actually sent over.
185    ///
186    /// See the crate-level [Protocol selection][crate#protocol-selection] documentation for
187    /// the full hint → behavior table.
188    #[field(get, set, with, copy)]
189    pub(crate) http_version: Version,
190
191    /// the :authority pseudo-header, populated during h2 or h3 header finalization
192    #[field(get)]
193    pub(crate) authority: Option<Cow<'static, str>>,
194    /// the :scheme pseudo-header, populated during h2 or h3 header finalization
195
196    #[field(get)]
197    pub(crate) scheme: Option<Cow<'static, str>>,
198
199    /// the :path pseudo-header, populated during h2 or h3 header finalization
200    #[field(get)]
201    pub(crate) path: Option<Cow<'static, str>>,
202
203    /// an explicit request target override, used only for `OPTIONS *` and `CONNECT host:port`
204    ///
205    /// When set and the method is OPTIONS or CONNECT, this value is used as the HTTP request
206    /// target instead of deriving it from the url. For all other methods, this field is ignored.
207    #[field(with, set, get, option_set_some, into)]
208    pub(crate) request_target: Option<Cow<'static, str>>,
209
210    /// the `:protocol` pseudo-header for an extended-CONNECT bootstrap (RFC 8441 over h2,
211    /// RFC 9220 over h3). Triggers the h2/h3 exec paths to send HEADERS without `END_STREAM`
212    /// and leave the stream open as a bidirectional byte channel.
213    ///
214    /// Only meaningful when method is `CONNECT` and [`http_version`][Self::http_version] is
215    /// `Http2` or `Http3`. h1 and prior-version requests ignore this field.
216    #[field(get)]
217    pub(crate) protocol: Option<Cow<'static, str>>,
218
219    /// trailers sent with the request body, populated after the body has been fully sent.
220    ///
221    /// Only present when the request body was constructed with [`Body::new_with_trailers`] and
222    /// the body has been fully sent.
223    #[field(get)]
224    pub(crate) request_trailers: Option<Headers>,
225
226    /// trailers received with the response body, populated after the response body has been fully
227    /// read.
228    #[field(get)]
229    pub(crate) response_trailers: Option<Headers>,
230
231    /// the [`Client`] that built this conn.
232    #[field(get)]
233    pub(crate) client: Client,
234
235    /// A queued follow-up conn installed by middleware via
236    /// [`ConnExt::set_followup`](crate::ConnExt::set_followup).
237    ///
238    /// When `Some` after the handler chain's `after_response` has fully unwound, the
239    /// [`IntoFuture`][std::future::IntoFuture] loop picks it up: the current conn's response
240    /// body is recycled, then the follow-up is swapped in and runs another full
241    /// `(run → network → after_response)` cycle. Used by re-issuing handlers
242    /// (`FollowRedirects`, retry, auth-refresh) instead of recursing into a nested `.await`.
243    pub(crate) followup: Option<Box<Conn>>,
244}
245
246/// default http user-agent header
247pub const USER_AGENT: &str = concat!("trillium-client/", env!("CARGO_PKG_VERSION"));
248
249impl Conn {
250    /// chainable setter for [`inserting`](Headers::insert) a request header
251    ///
252    /// ```
253    /// use trillium_client::Client;
254    /// use trillium_testing::{client_config, with_server};
255    ///
256    /// let handler = |conn: trillium::Conn| async move {
257    ///     let header = conn
258    ///         .request_headers()
259    ///         .get_str("some-request-header")
260    ///         .unwrap_or_default();
261    ///     let response = format!("some-request-header was {}", header);
262    ///     conn.ok(response)
263    /// };
264    ///
265    /// with_server(handler, |url| async move {
266    ///     let client = Client::new(client_config());
267    ///     let mut conn = client
268    ///         .get(url)
269    ///         .with_request_header("some-request-header", "header-value") // <--
270    ///         .await?;
271    ///     assert_eq!(
272    ///         conn.response_body().read_string().await?,
273    ///         "some-request-header was header-value"
274    ///     );
275    ///     Ok(())
276    /// })
277    /// ```
278    pub fn with_request_header(
279        mut self,
280        name: impl Into<HeaderName<'static>>,
281        value: impl Into<HeaderValues>,
282    ) -> Self {
283        self.request_headers.insert(name, value);
284        self
285    }
286
287    /// chainable setter for `extending` request headers
288    ///
289    /// ```
290    /// use trillium_client::Client;
291    /// use trillium_testing::{client_config, with_server};
292    ///
293    /// let handler = |conn: trillium::Conn| async move {
294    ///     let header = conn
295    ///         .request_headers()
296    ///         .get_str("some-request-header")
297    ///         .unwrap_or_default();
298    ///     let response = format!("some-request-header was {}", header);
299    ///     conn.ok(response)
300    /// };
301    ///
302    /// with_server(handler, move |url| async move {
303    ///     let client = Client::new(client_config());
304    ///     let mut conn = client
305    ///         .get(url)
306    ///         .with_request_headers([
307    ///             ("some-request-header", "header-value"),
308    ///             ("some-other-req-header", "other-header-value"),
309    ///         ])
310    ///         .await?;
311    ///
312    ///     assert_eq!(
313    ///         conn.response_body().read_string().await?,
314    ///         "some-request-header was header-value"
315    ///     );
316    ///     Ok(())
317    /// })
318    /// ```
319    pub fn with_request_headers<HN, HV, I>(mut self, headers: I) -> Self
320    where
321        I: IntoIterator<Item = (HN, HV)> + Send,
322        HN: Into<HeaderName<'static>>,
323        HV: Into<HeaderValues>,
324    {
325        self.request_headers.extend(headers);
326        self
327    }
328
329    /// Chainable method to remove a request header if present
330    pub fn without_request_header(mut self, name: impl Into<HeaderName<'static>>) -> Self {
331        self.request_headers.remove(name);
332        self
333    }
334
335    /// chainable setter for json body. this requires the `serde_json` crate feature to be enabled.
336    #[cfg(feature = "serde_json")]
337    pub fn with_json_body(self, body: &impl serde::Serialize) -> serde_json::Result<Self> {
338        use trillium_http::KnownHeaderName;
339
340        Ok(self
341            .with_body(serde_json::to_string(body)?)
342            .with_request_header(KnownHeaderName::ContentType, "application/json"))
343    }
344
345    /// chainable setter for json body. this requires the `sonic-rs` crate feature to be enabled.
346    #[cfg(feature = "sonic-rs")]
347    pub fn with_json_body(self, body: &impl serde::Serialize) -> sonic_rs::Result<Self> {
348        use trillium_http::KnownHeaderName;
349
350        Ok(self
351            .with_body(sonic_rs::to_string(body)?)
352            .with_request_header(KnownHeaderName::ContentType, "application/json"))
353    }
354
355    /// returns a [`ResponseBody`](crate::ResponseBody) that borrows the connection inside this
356    /// conn.
357    /// ```
358    /// use trillium_client::Client;
359    /// use trillium_testing::{client_config, with_server};
360    ///
361    /// let handler = |mut conn: trillium::Conn| async move { conn.ok("hello from trillium") };
362    ///
363    /// with_server(handler, |url| async move {
364    ///     let client = Client::from(client_config());
365    ///     let mut conn = client.get(url).await?;
366    ///
367    ///     let response_body = conn.response_body(); //<-
368    ///
369    ///     assert_eq!(19, response_body.content_length().unwrap());
370    ///     let string = response_body.read_string().await?;
371    ///     assert_eq!("hello from trillium", string);
372    ///     Ok(())
373    /// });
374    /// ```
375    #[allow(clippy::needless_borrow, clippy::needless_borrows_for_generic_args)]
376    pub fn response_body(&mut self) -> ResponseBody<'_> {
377        let content_length = self.response_content_length();
378        let encoding = encoding(&self.response_headers);
379        if let Some(body) = self.body_override.as_mut() {
380            OverrideBody::new(body, encoding, self.context.config()).into()
381        } else {
382            ReceivedBody::new(
383                content_length,
384                &mut self.buffer,
385                self.transport.as_mut().unwrap(),
386                &mut self.response_body_state,
387                None,
388                encoding,
389            )
390            .with_trailers(&mut self.response_trailers)
391            .with_protocol_session(self.protocol_session.clone())
392            .into()
393        }
394    }
395
396    /// Attempt to deserialize the response body. Note that this consumes the body content.
397    #[cfg(feature = "serde_json")]
398    pub async fn response_json<T>(&mut self) -> Result<T, ClientSerdeError>
399    where
400        T: serde::de::DeserializeOwned,
401    {
402        let body = self.response_body().read_string().await?;
403        Ok(serde_json::from_str(&body)?)
404    }
405
406    /// Attempt to deserialize the response body. Note that this consumes the body content.
407    #[cfg(feature = "sonic-rs")]
408    pub async fn response_json<T>(&mut self) -> Result<T, ClientSerdeError>
409    where
410        T: serde::de::DeserializeOwned,
411    {
412        let body = self.response_body().read_string().await?;
413        Ok(sonic_rs::from_str(&body)?)
414    }
415
416    /// Returns the conn or an [`UnexpectedStatusError`] that contains the conn
417    ///
418    /// ```
419    /// use trillium_client::{Client, Status};
420    /// use trillium_testing::{client_config, with_server};
421    ///
422    /// with_server(Status::NotFound, |url| async move {
423    ///     let client = Client::new(client_config());
424    ///     assert_eq!(
425    ///         client.get(url).await?.success().unwrap_err().to_string(),
426    ///         "expected a success (2xx) status code, but got 404 Not Found"
427    ///     );
428    ///     Ok(())
429    /// });
430    ///
431    /// with_server(Status::Ok, |url| async move {
432    ///     let client = Client::new(client_config());
433    ///     assert!(client.get(url).await?.success().is_ok());
434    ///     Ok(())
435    /// });
436    /// ```
437    pub fn success(self) -> Result<Self, UnexpectedStatusError> {
438        match self.status() {
439            Some(status) if status.is_success() => Ok(self),
440            _ => Err(self.into()),
441        }
442    }
443
444    /// Detach the response body as an owned, `'static` value.
445    ///
446    /// Returns `None` if there is no body to take — neither an override has been installed nor
447    /// a transport-backed body is available. Subsequent calls return `None`. Callers who want
448    /// to wrap-and-replace the body (e.g. tee through a cache) compose this with
449    /// [`ConnExt::set_response_body`][crate::ConnExt::set_response_body]; the conn's
450    /// body slot is empty between the two calls.
451    ///
452    /// For a transport-backed body, this moves the transport into the returned
453    /// `ResponseBody<'static>`. Drop on that value drains-and-pools (keepalive) or closes
454    /// (otherwise) the transport via a spawned task; [`ResponseBody::recycle`] is the
455    /// `await`-able variant. For an override body, the inner [`Body`] is moved out and any
456    /// leftover transport on the conn is recycled immediately.
457    #[must_use]
458    pub fn take_response_body(&mut self) -> Option<ResponseBody<'static>> {
459        let encoding = encoding(&self.response_headers);
460        if let Some(body) = self.body_override.take() {
461            return Some(OverrideBody::new(body, encoding, self.context.config()).into());
462        }
463
464        let cleanup = self.build_cleanup_context();
465        let received = self.take_received_body(false)?;
466        Some(ResponseBody::received_owned(received, cleanup))
467    }
468
469    /// Build a [`CleanupContext`] capturing the runtime and (if keepalive + pool configured)
470    /// the pool + origin to insert into. Single source of truth for "what should happen to
471    /// this conn's transport when its body is released" — both the on_completion callback
472    /// wired into the body and the [`ResponseBody::recycle`] / `Drop` paths consume clones
473    /// of this same context, so the user-driven and Drop-driven release paths agree.
474    fn build_cleanup_context(&self) -> CleanupContext {
475        let h1_pool_origin = if self.is_keep_alive()
476            && let Some(pool) = self.client.pool().cloned()
477        {
478            Some((pool, self.url.origin()))
479        } else {
480            None
481        };
482
483        CleanupContext {
484            runtime: self.client.connector().runtime(),
485            h1_pool_origin,
486        }
487    }
488
489    /// Detach the transport-backed receive side of this conn as an owned `ReceivedBody`.
490    ///
491    /// Returns `None` when no transport is attached.
492    ///
493    /// `cleanup: true` wires a spawn-on-End callback inside the body for callers that hand
494    /// the body off without awaiting it (`From<Conn> for Body`). `cleanup: false` is for
495    /// callers that drive the body to End themselves and release the transport inline in
496    /// their own poll loop — `take_response_body` does this so callers get a "transport is
497    /// settled when read_to_end returns Ok(0)" guarantee instead of racing a spawned task.
498    pub(crate) fn take_received_body(
499        &mut self,
500        cleanup: bool,
501    ) -> Option<ReceivedBody<'static, Box<dyn Transport>>> {
502        let _ = self.finalize_headers();
503        let transport = self.transport.take()?;
504
505        let on_completion = cleanup.then(|| {
506            let cleanup = self.build_cleanup_context();
507            Box::new(move |transport| cleanup.handoff(transport))
508                as Box<dyn FnOnce(Box<dyn Transport>) + Send + Sync + 'static>
509        });
510
511        Some(ReceivedBody::new(
512            self.response_content_length(),
513            mem::take(&mut self.buffer),
514            transport,
515            self.response_body_state,
516            on_completion,
517            encoding(&self.response_headers),
518        ))
519    }
520
521    /// Returns this conn to the connection pool if it is keepalive, and
522    /// closes it otherwise. This will happen asynchronously as a spawned
523    /// task when the conn is dropped, but calling it explicitly allows
524    /// you to block on it and control where it happens.
525    pub async fn recycle(mut self) {
526        if let Some(rb) = self.take_response_body() {
527            rb.recycle().await;
528        }
529    }
530
531    /// attempts to retrieve the connected peer address
532    pub fn peer_addr(&self) -> Option<SocketAddr> {
533        self.transport
534            .as_ref()
535            .and_then(|t| t.peer_addr().ok().flatten())
536    }
537
538    /// add state to the client conn and return self
539    pub fn with_state<T: Send + Sync + 'static>(mut self, state: T) -> Self {
540        self.insert_state(state);
541        self
542    }
543
544    /// add state to the client conn, returning any previously set state of this type
545    pub fn insert_state<T: Send + Sync + 'static>(&mut self, state: T) -> Option<T> {
546        self.state.insert(state)
547    }
548
549    /// borrow state
550    pub fn state<T: Send + Sync + 'static>(&self) -> Option<&T> {
551        self.state.get()
552    }
553
554    /// borrow state mutably
555    pub fn state_mut<T: Send + Sync + 'static>(&mut self) -> Option<&mut T> {
556        self.state.get_mut()
557    }
558
559    /// take state
560    pub fn take_state<T: Send + Sync + 'static>(&mut self) -> Option<T> {
561        self.state.take()
562    }
563}