Skip to main content

trillium_client/
conn.rs

1use crate::{
2    Client, ResponseBody,
3    response_body::{CleanupContext, OverrideBody},
4    util::encoding,
5};
6use std::{borrow::Cow, mem, net::SocketAddr, sync::Arc, time::Duration};
7use trillium_http::{
8    Body, Buffer, Error, HeaderName, HeaderValues, Headers, HttpContext, Method, ProtocolSession,
9    ReceivedBody, ReceivedBodyState, Status, TypeSet, Version,
10};
11use trillium_server_common::{Transport, url::Url};
12
13mod h1;
14mod h2;
15mod h3;
16mod shared;
17mod unexpected_status_error;
18
19pub(crate) use h2::H2Pooled;
20#[cfg(any(feature = "serde_json", feature = "sonic-rs"))]
21pub use shared::ClientSerdeError;
22pub use unexpected_status_error::UnexpectedStatusError;
23
24/// a client connection, representing both an outbound http request and a
25/// http response
26#[must_use]
27#[derive(fieldwork::Fieldwork)]
28pub struct Conn {
29    pub(crate) protocol_session: ProtocolSession,
30    /// QUIC-connection WebTransport dispatcher slot (lazy-init) and the QUIC connection
31    /// itself, retained on extended-CONNECT-with-`:protocol = webtransport` requests so
32    /// `into_webtransport` can install the router and hand the QUIC connection to the
33    /// returned [`WebTransportConnection`][trillium_webtransport::WebTransportConnection].
34    #[cfg(feature = "webtransport")]
35    pub(crate) wt_pool_entry: Option<crate::h3::H3PoolEntry>,
36    pub(crate) buffer: Buffer,
37    pub(crate) response_body_state: ReceivedBodyState,
38    pub(crate) headers_finalized: bool,
39    pub(crate) max_head_length: usize,
40    pub(crate) state: TypeSet,
41    pub(crate) context: Arc<HttpContext>,
42
43    /// the transport for this conn
44    ///
45    /// This should only be used to call your own custom methods on the transport that do not read
46    /// or write any data. Calling any method that reads from or writes to the transport will
47    /// disrupt the HTTP protocol.
48    #[field(get, get_mut)]
49    pub(crate) transport: Option<Box<dyn Transport>>,
50
51    /// the url for this conn.
52    ///
53    /// ```
54    /// use trillium_client::{Client, Method};
55    /// use trillium_testing::client_config;
56    ///
57    /// let client = Client::from(client_config());
58    ///
59    /// let conn = client.get("http://localhost:9080");
60    ///
61    /// let url = conn.url(); //<-
62    ///
63    /// assert_eq!(url.host_str().unwrap(), "localhost");
64    /// ```
65    #[field(get, set, get_mut)]
66    pub(crate) url: Url,
67
68    /// the method for this conn.
69    ///
70    /// ```
71    /// use trillium_client::{Client, Method};
72    /// use trillium_testing::client_config;
73    ///
74    /// let client = Client::from(client_config());
75    /// let conn = client.get("http://localhost:9080");
76    ///
77    /// let method = conn.method(); //<-
78    ///
79    /// assert_eq!(method, Method::Get);
80    /// ```
81    #[field(get, set, copy)]
82    pub(crate) method: Method,
83
84    /// the request headers
85    #[field(get, get_mut)]
86    pub(crate) request_headers: Headers,
87
88    #[field(get)]
89    /// the response headers
90    pub(crate) response_headers: Headers,
91
92    /// the status code for this conn.
93    ///
94    /// If the conn has not yet been sent, this will be None.
95    ///
96    /// ```
97    /// use trillium_client::{Client, Status};
98    /// use trillium_testing::{client_config, with_server};
99    ///
100    /// async fn handler(conn: trillium::Conn) -> trillium::Conn {
101    ///     conn.with_status(418)
102    /// }
103    ///
104    /// with_server(handler, |url| async move {
105    ///     let client = Client::new(client_config());
106    ///     let conn = client.get(url).await?;
107    ///     assert_eq!(Status::ImATeapot, conn.status().unwrap());
108    ///     Ok(())
109    /// });
110    /// ```
111    #[field(get, copy)]
112    pub(crate) status: Option<Status>,
113
114    /// the request body
115    ///
116    /// ```
117    /// env_logger::init();
118    /// use trillium_client::Client;
119    /// use trillium_testing::{client_config, with_server};
120    ///
121    /// let handler = |mut conn: trillium::Conn| async move {
122    ///     let body = conn.request_body_string().await.unwrap();
123    ///     conn.ok(format!("request body was: {}", body))
124    /// };
125    ///
126    /// with_server(handler, |url| async move {
127    ///     let client = Client::from(client_config());
128    ///     let mut conn = client
129    ///         .post(url)
130    ///         .with_body("body") //<-
131    ///         .await?;
132    ///
133    ///     assert_eq!(
134    ///         conn.response_body().read_string().await?,
135    ///         "request body was: body"
136    ///     );
137    ///     Ok(())
138    /// });
139    /// ```
140    #[field(get, with = with_body, argument = body, set, into, take, option_set_some)]
141    pub(crate) request_body: Option<Body>,
142
143    /// the timeout for this conn
144    ///
145    /// this can also be set on the client with [`Client::set_timeout`](crate::Client::set_timeout)
146    /// and [`Client::with_timeout`](crate::Client::with_timeout)
147    #[field(with, set, get, get_mut, take, copy, option_set_some)]
148    pub(crate) timeout: Option<Duration>,
149
150    /// whether this conn is halted.
151    ///
152    /// When set to `true` before execution, the network round-trip is skipped — the conn is
153    /// returned to the caller with whatever response state has been populated synthetically
154    /// (status, headers, body). Used by client middleware to short-circuit on cache hits,
155    /// mocked responses, or open circuit-breakers. Cleared on egress so the user's conn handle
156    /// never observes residual halt state after the awaited conn returns.
157    ///
158    /// Driven via [`ConnExt`](crate::ConnExt) — `halt` / `set_halted` / `is_halted`.
159    pub(crate) halted: bool,
160
161    /// transport-level error from the round-trip, if any.
162    ///
163    /// When the network call fails (connect refused, TLS handshake error, malformed HTTP frame,
164    /// timeout, etc.) the framework stashes the error here and runs the handler chain's
165    /// [`after_response`](crate::ClientHandler::after_response) anyway. A handler that recovers
166    /// (stale-if-error cache, retry-with-fallback) calls
167    /// [`ConnExt::take_error`](crate::ConnExt::take_error) to clear the error
168    /// and populates response state synthetically; if the error is still present after all
169    /// handlers finish, it propagates as `Err` from the awaited conn.
170    pub(crate) error: Option<Error>,
171
172    /// An override response body installed by middleware via
173    /// [`ConnExt::set_response_body`](crate::ConnExt::set_response_body) or
174    /// [`ConnExt::with_response_body`](crate::ConnExt::with_response_body). When
175    /// set, [`Conn::response_body`] returns a [`ResponseBody`] backed by this body instead of
176    /// the transport.
177    pub(crate) body_override: Option<Body>,
178
179    /// the http version *hint* for this conn
180    ///
181    /// Pre-execution this is the prior-knowledge hint, not the version that will necessarily be
182    /// on the wire. `None` means "no hint, use auto-discovery" (Alt-Svc h3, ALPN/pooled h2);
183    /// any `Some(version)` pins the protocol and suppresses auto-discovery. Post-execution this
184    /// is `Some(version)` reflecting the version the request was actually sent over.
185    ///
186    /// The public [`http_version`](Conn::http_version) accessor resolves `None` to
187    /// [`Version::Http1_1`]. See the crate-level [Protocol selection][crate#protocol-selection]
188    /// documentation for the full hint → behavior table.
189    #[field(set, with, option_set_some)]
190    pub(crate) http_version: Option<Version>,
191
192    /// the :authority pseudo-header, populated during h2 or h3 header finalization
193    #[field(get)]
194    pub(crate) authority: Option<Cow<'static, str>>,
195    /// the :scheme pseudo-header, populated during h2 or h3 header finalization
196
197    #[field(get)]
198    pub(crate) scheme: Option<Cow<'static, str>>,
199
200    /// the :path pseudo-header, populated during h2 or h3 header finalization
201    #[field(get)]
202    pub(crate) path: Option<Cow<'static, str>>,
203
204    /// an explicit request target override, used only for `OPTIONS *` and `CONNECT host:port`
205    ///
206    /// When set and the method is OPTIONS or CONNECT, this value is used as the HTTP request
207    /// target instead of deriving it from the url. For all other methods, this field is ignored.
208    #[field(with, set, get, option_set_some, into)]
209    pub(crate) request_target: Option<Cow<'static, str>>,
210
211    /// the `:protocol` pseudo-header for an extended-CONNECT bootstrap (RFC 8441 over h2,
212    /// RFC 9220 over h3). Triggers the h2/h3 exec paths to send HEADERS without `END_STREAM`
213    /// and leave the stream open as a bidirectional byte channel.
214    ///
215    /// Only meaningful when method is `CONNECT` and [`http_version`][Self::http_version] is
216    /// `Http2` or `Http3`. h1 and prior-version requests ignore this field.
217    #[field(get)]
218    pub(crate) protocol: Option<Cow<'static, str>>,
219
220    /// trailers sent with the request body, populated after the body has been fully sent.
221    ///
222    /// Only present when the request body was constructed with [`Body::new_with_trailers`] and
223    /// the body has been fully sent.
224    #[field(get)]
225    pub(crate) request_trailers: Option<Headers>,
226
227    /// trailers received with the response body, populated after the response body has been fully
228    /// read.
229    #[field(get)]
230    pub(crate) response_trailers: Option<Headers>,
231
232    /// the [`Client`] that built this conn.
233    #[field(get)]
234    pub(crate) client: Client,
235
236    /// A queued follow-up conn installed by middleware via
237    /// [`ConnExt::set_followup`](crate::ConnExt::set_followup).
238    ///
239    /// When `Some` after the handler chain's `after_response` has fully unwound, the
240    /// [`IntoFuture`][std::future::IntoFuture] loop picks it up: the current conn's response
241    /// body is recycled, then the follow-up is swapped in and runs another full
242    /// `(run → network → after_response)` cycle. Used by re-issuing handlers
243    /// (`FollowRedirects`, retry, auth-refresh) instead of recursing into a nested `.await`.
244    pub(crate) followup: Option<Box<Conn>>,
245
246    /// Whether this conn is armed for an upgrade. When set, the protocol drivers
247    /// transmit only request headers and leave the outbound direction open. Armed via
248    /// [`ConnExt::upgrade`](crate::ConnExt::upgrade).
249    pub(crate) upgrade: bool,
250}
251
252/// default http user-agent header
253pub const USER_AGENT: &str = concat!("trillium-client/", env!("CARGO_PKG_VERSION"));
254
255impl Conn {
256    /// the http version for this conn
257    ///
258    /// Pre-execution this resolves the version *hint* — the default (no hint) reports
259    /// [`Version::Http1_1`], which means "use auto-discovery," not "force HTTP/1.1." Setting any
260    /// explicit version via [`with_http_version`](Conn::with_http_version) pins the protocol and
261    /// suppresses auto-discovery. Post-execution this reflects the version the request was actually
262    /// sent over.
263    ///
264    /// See the crate-level [Protocol selection][crate#protocol-selection] documentation for the
265    /// full hint → behavior table.
266    #[must_use]
267    pub fn http_version(&self) -> Version {
268        self.http_version.unwrap_or(Version::Http1_1)
269    }
270
271    /// chainable setter for [`inserting`](Headers::insert) a request header
272    ///
273    /// ```
274    /// use trillium_client::Client;
275    /// use trillium_testing::{client_config, with_server};
276    ///
277    /// let handler = |conn: trillium::Conn| async move {
278    ///     let header = conn
279    ///         .request_headers()
280    ///         .get_str("some-request-header")
281    ///         .unwrap_or_default();
282    ///     let response = format!("some-request-header was {}", header);
283    ///     conn.ok(response)
284    /// };
285    ///
286    /// with_server(handler, |url| async move {
287    ///     let client = Client::new(client_config());
288    ///     let mut conn = client
289    ///         .get(url)
290    ///         .with_request_header("some-request-header", "header-value") // <--
291    ///         .await?;
292    ///     assert_eq!(
293    ///         conn.response_body().read_string().await?,
294    ///         "some-request-header was header-value"
295    ///     );
296    ///     Ok(())
297    /// })
298    /// ```
299    pub fn with_request_header(
300        mut self,
301        name: impl Into<HeaderName<'static>>,
302        value: impl Into<HeaderValues>,
303    ) -> Self {
304        self.request_headers.insert(name, value);
305        self
306    }
307
308    /// chainable setter for `extending` request headers
309    ///
310    /// ```
311    /// use trillium_client::Client;
312    /// use trillium_testing::{client_config, with_server};
313    ///
314    /// let handler = |conn: trillium::Conn| async move {
315    ///     let header = conn
316    ///         .request_headers()
317    ///         .get_str("some-request-header")
318    ///         .unwrap_or_default();
319    ///     let response = format!("some-request-header was {}", header);
320    ///     conn.ok(response)
321    /// };
322    ///
323    /// with_server(handler, move |url| async move {
324    ///     let client = Client::new(client_config());
325    ///     let mut conn = client
326    ///         .get(url)
327    ///         .with_request_headers([
328    ///             ("some-request-header", "header-value"),
329    ///             ("some-other-req-header", "other-header-value"),
330    ///         ])
331    ///         .await?;
332    ///
333    ///     assert_eq!(
334    ///         conn.response_body().read_string().await?,
335    ///         "some-request-header was header-value"
336    ///     );
337    ///     Ok(())
338    /// })
339    /// ```
340    pub fn with_request_headers<HN, HV, I>(mut self, headers: I) -> Self
341    where
342        I: IntoIterator<Item = (HN, HV)> + Send,
343        HN: Into<HeaderName<'static>>,
344        HV: Into<HeaderValues>,
345    {
346        self.request_headers.extend(headers);
347        self
348    }
349
350    /// Chainable method to remove a request header if present
351    pub fn without_request_header(mut self, name: impl Into<HeaderName<'static>>) -> Self {
352        self.request_headers.remove(name);
353        self
354    }
355
356    /// chainable setter for json body. this requires the `serde_json` crate feature to be enabled.
357    #[cfg(feature = "serde_json")]
358    pub fn with_json_body(self, body: &impl serde::Serialize) -> serde_json::Result<Self> {
359        use trillium_http::KnownHeaderName;
360
361        Ok(self
362            .with_body(serde_json::to_string(body)?)
363            .with_request_header(KnownHeaderName::ContentType, "application/json"))
364    }
365
366    /// chainable setter for json body. this requires the `sonic-rs` crate feature to be enabled.
367    #[cfg(feature = "sonic-rs")]
368    pub fn with_json_body(self, body: &impl serde::Serialize) -> sonic_rs::Result<Self> {
369        use trillium_http::KnownHeaderName;
370
371        Ok(self
372            .with_body(sonic_rs::to_string(body)?)
373            .with_request_header(KnownHeaderName::ContentType, "application/json"))
374    }
375
376    /// returns a [`ResponseBody`](crate::ResponseBody) that borrows the connection inside this
377    /// conn.
378    /// ```
379    /// use trillium_client::Client;
380    /// use trillium_testing::{client_config, with_server};
381    ///
382    /// let handler = |mut conn: trillium::Conn| async move { conn.ok("hello from trillium") };
383    ///
384    /// with_server(handler, |url| async move {
385    ///     let client = Client::from(client_config());
386    ///     let mut conn = client.get(url).await?;
387    ///
388    ///     let response_body = conn.response_body(); //<-
389    ///
390    ///     assert_eq!(19, response_body.content_length().unwrap());
391    ///     let string = response_body.read_string().await?;
392    ///     assert_eq!("hello from trillium", string);
393    ///     Ok(())
394    /// });
395    /// ```
396    #[allow(clippy::needless_borrow, clippy::needless_borrows_for_generic_args)]
397    pub fn response_body(&mut self) -> ResponseBody<'_> {
398        let content_length = self.response_content_length();
399        let encoding = encoding(&self.response_headers);
400        if let Some(body) = self.body_override.as_mut() {
401            OverrideBody::new(body, encoding, self.context.config()).into()
402        } else {
403            ReceivedBody::new(
404                content_length,
405                &mut self.buffer,
406                self.transport.as_mut().unwrap(),
407                &mut self.response_body_state,
408                None,
409                encoding,
410            )
411            .with_trailers(&mut self.response_trailers)
412            .with_protocol_session(self.protocol_session.clone())
413            .into()
414        }
415    }
416
417    /// Attempt to deserialize the response body. Note that this consumes the body content.
418    #[cfg(feature = "serde_json")]
419    pub async fn response_json<T>(&mut self) -> Result<T, ClientSerdeError>
420    where
421        T: serde::de::DeserializeOwned,
422    {
423        let body = self.response_body().read_string().await?;
424        Ok(serde_json::from_str(&body)?)
425    }
426
427    /// Attempt to deserialize the response body. Note that this consumes the body content.
428    #[cfg(feature = "sonic-rs")]
429    pub async fn response_json<T>(&mut self) -> Result<T, ClientSerdeError>
430    where
431        T: serde::de::DeserializeOwned,
432    {
433        let body = self.response_body().read_string().await?;
434        Ok(sonic_rs::from_str(&body)?)
435    }
436
437    /// Returns the conn or an [`UnexpectedStatusError`] that contains the conn
438    ///
439    /// ```
440    /// use trillium_client::{Client, Status};
441    /// use trillium_testing::{client_config, with_server};
442    ///
443    /// with_server(Status::NotFound, |url| async move {
444    ///     let client = Client::new(client_config());
445    ///     assert_eq!(
446    ///         client.get(url).await?.success().unwrap_err().to_string(),
447    ///         "expected a success (2xx) status code, but got 404 Not Found"
448    ///     );
449    ///     Ok(())
450    /// });
451    ///
452    /// with_server(Status::Ok, |url| async move {
453    ///     let client = Client::new(client_config());
454    ///     assert!(client.get(url).await?.success().is_ok());
455    ///     Ok(())
456    /// });
457    /// ```
458    pub fn success(self) -> Result<Self, UnexpectedStatusError> {
459        match self.status() {
460            Some(status) if status.is_success() => Ok(self),
461            _ => Err(self.into()),
462        }
463    }
464
465    /// Detach the response body as an owned, `'static` value.
466    ///
467    /// Returns `None` if there is no body to take — neither an override has been installed nor
468    /// a transport-backed body is available. Subsequent calls return `None`. Callers who want
469    /// to wrap-and-replace the body (e.g. tee through a cache) compose this with
470    /// [`ConnExt::set_response_body`][crate::ConnExt::set_response_body]; the conn's
471    /// body slot is empty between the two calls.
472    ///
473    /// For a transport-backed body, this moves the transport into the returned
474    /// `ResponseBody<'static>`. Drop on that value drains-and-pools (keepalive) or closes
475    /// (otherwise) the transport via a spawned task; [`ResponseBody::recycle`] is the
476    /// `await`-able variant. For an override body, the inner [`Body`] is moved out and any
477    /// leftover transport on the conn is recycled immediately.
478    #[must_use]
479    pub fn take_response_body(&mut self) -> Option<ResponseBody<'static>> {
480        let encoding = encoding(&self.response_headers);
481        if let Some(body) = self.body_override.take() {
482            return Some(OverrideBody::new(body, encoding, self.context.config()).into());
483        }
484
485        let cleanup = self.build_cleanup_context();
486        let received = self.take_received_body(false)?;
487        Some(ResponseBody::received_owned(received, cleanup))
488    }
489
490    /// Build a [`CleanupContext`] capturing the runtime and (if keepalive + pool configured)
491    /// the pool + origin to insert into. Single source of truth for "what should happen to
492    /// this conn's transport when its body is released" — both the on_completion callback
493    /// wired into the body and the [`ResponseBody::recycle`] / `Drop` paths consume clones
494    /// of this same context, so the user-driven and Drop-driven release paths agree.
495    fn build_cleanup_context(&self) -> CleanupContext {
496        // Only pool a transport whose response head we actually received (`status.is_some()`): a
497        // conn abandoned before the response — a timeout or transport error mid-request — has an
498        // empty `response_headers`, which `is_keep_alive` would read as persistent and recycle a
499        // half-spent connection into the pool, poisoning the next request that reuses it.
500        let h1_pool_origin = if self.status.is_some()
501            && self.is_keep_alive()
502            && let Some(pool) = self.client.pool().cloned()
503        {
504            Some((pool, self.url.origin()))
505        } else {
506            None
507        };
508
509        CleanupContext {
510            runtime: self.client.connector().runtime(),
511            h1_pool_origin,
512        }
513    }
514
515    /// Detach the transport-backed receive side of this conn as an owned `ReceivedBody`.
516    ///
517    /// Returns `None` when no transport is attached.
518    ///
519    /// `cleanup: true` wires a spawn-on-End callback inside the body for callers that hand
520    /// the body off without awaiting it (`From<Conn> for Body`). `cleanup: false` is for
521    /// callers that drive the body to End themselves and release the transport inline in
522    /// their own poll loop — `take_response_body` does this so callers get a "transport is
523    /// settled when read_to_end returns Ok(0)" guarantee instead of racing a spawned task.
524    pub(crate) fn take_received_body(
525        &mut self,
526        cleanup: bool,
527    ) -> Option<ReceivedBody<'static, Box<dyn Transport>>> {
528        let _ = self.finalize_headers();
529        let transport = self.transport.take()?;
530
531        let on_completion = cleanup.then(|| {
532            let cleanup = self.build_cleanup_context();
533            Box::new(move |transport| cleanup.handoff(transport))
534                as Box<dyn FnOnce(Box<dyn Transport>) + Send + Sync + 'static>
535        });
536
537        Some(
538            ReceivedBody::new(
539                self.response_content_length(),
540                mem::take(&mut self.buffer),
541                transport,
542                self.response_body_state,
543                on_completion,
544                encoding(&self.response_headers),
545            )
546            .with_protocol_session(self.protocol_session.clone()),
547        )
548    }
549
550    /// Returns this conn to the connection pool if it is keepalive, and
551    /// closes it otherwise. This will happen asynchronously as a spawned
552    /// task when the conn is dropped, but calling it explicitly allows
553    /// you to block on it and control where it happens.
554    pub async fn recycle(mut self) {
555        if let Some(rb) = self.take_response_body() {
556            rb.recycle().await;
557        }
558    }
559
560    /// attempts to retrieve the connected peer address
561    pub fn peer_addr(&self) -> Option<SocketAddr> {
562        self.transport
563            .as_ref()
564            .and_then(|t| t.peer_addr().ok().flatten())
565    }
566
567    /// add state to the client conn and return self
568    pub fn with_state<T: Send + Sync + 'static>(mut self, state: T) -> Self {
569        self.insert_state(state);
570        self
571    }
572
573    /// add state to the client conn, returning any previously set state of this type
574    pub fn insert_state<T: Send + Sync + 'static>(&mut self, state: T) -> Option<T> {
575        self.state.insert(state)
576    }
577
578    /// borrow state
579    pub fn state<T: Send + Sync + 'static>(&self) -> Option<&T> {
580        self.state.get()
581    }
582
583    /// borrow state mutably
584    pub fn state_mut<T: Send + Sync + 'static>(&mut self) -> Option<&mut T> {
585        self.state.get_mut()
586    }
587
588    /// take state
589    pub fn take_state<T: Send + Sync + 'static>(&mut self) -> Option<T> {
590        self.state.take()
591    }
592}