Skip to main content

trillium_http/
conn.rs

1use crate::{
2    Body, Buffer, Headers, HttpContext,
3    KnownHeaderName::Host,
4    Method, ReceivedBody, Status, Swansong, TypeSet, Version,
5    after_send::{AfterSend, SendStatus},
6    h3::H3Connection,
7    liveness::{CancelOnDisconnect, LivenessFut},
8    received_body::ReceivedBodyState,
9    util::encoding,
10};
11use encoding_rs::Encoding;
12use futures_lite::{
13    future,
14    io::{AsyncRead, AsyncWrite},
15};
16use std::{
17    borrow::Cow,
18    fmt::{self, Debug, Formatter},
19    future::Future,
20    net::IpAddr,
21    pin::pin,
22    str,
23    sync::Arc,
24    time::Instant,
25};
26mod h1;
27mod h3;
28
29/// A http connection
30///
31/// Unlike in other rust http implementations, this struct represents both
32/// the request and the response, and holds the transport over which the
33/// response will be sent.
34#[derive(fieldwork::Fieldwork)]
35pub struct Conn<Transport> {
36    #[field(get)]
37    /// the shared [`HttpContext`]
38    pub(crate) context: Arc<HttpContext>,
39
40    /// request [headers](Headers)
41    #[field(get, get_mut)]
42    pub(crate) request_headers: Headers,
43
44    /// response [headers](Headers)
45    #[field(get, get_mut)]
46    pub(crate) response_headers: Headers,
47
48    pub(crate) path: Cow<'static, str>,
49
50    /// the http method for this conn's request
51    ///
52    /// ```
53    /// # use trillium_http::{Conn, Method};
54    /// let mut conn = Conn::new_synthetic(Method::Get, "/some/path?and&a=query", ());
55    /// assert_eq!(conn.method(), Method::Get);
56    /// ```
57    #[field(get, set, copy)]
58    pub(crate) method: Method,
59
60    /// the http status for this conn, if set
61    #[field(get, copy)]
62    pub(crate) status: Option<Status>,
63
64    #[field(get = http_version, copy)]
65    /// the http version for this conn
66    pub(crate) version: Version,
67
68    /// the [state typemap](TypeSet) for this conn
69    #[field(get, get_mut)]
70    pub(crate) state: TypeSet,
71
72    /// the response [body](Body)
73    ///
74    /// ```
75    /// # use trillium_testing::HttpTest;
76    /// HttpTest::new(|conn| async move { conn.with_response_body("hello") })
77    ///     .get("/")
78    ///     .block()
79    ///     .assert_body("hello");
80    ///
81    /// HttpTest::new(|conn| async move { conn.with_response_body(String::from("world")) })
82    ///     .get("/")
83    ///     .block()
84    ///     .assert_body("world");
85    ///
86    /// HttpTest::new(|conn| async move { conn.with_response_body(vec![99, 97, 116]) })
87    ///     .get("/")
88    ///     .block()
89    ///     .assert_body("cat");
90    /// ```
91    #[field(get, set, into, option_set_some, take, with)]
92    pub(crate) response_body: Option<Body>,
93
94    /// the transport
95    ///
96    /// This should only be used to call your own custom methods on the transport that do not read
97    /// or write any data. Calling any method that reads from or writes to the transport will
98    /// disrupt the HTTP protocol. If you're looking to transition from HTTP to another protocol,
99    /// use an HTTP upgrade.
100    #[field(get, get_mut)]
101    pub(crate) transport: Transport,
102
103    pub(crate) buffer: Buffer,
104
105    pub(crate) request_body_state: ReceivedBodyState,
106
107    pub(crate) after_send: AfterSend,
108
109    /// whether the connection is secure
110    ///
111    /// note that this does not necessarily indicate that the transport itself is secure, as it may
112    /// indicate that `trillium_http` is behind a trusted reverse proxy that has terminated tls and
113    /// provided appropriate headers to indicate this.
114    #[field(get, set, rename_predicates)]
115    pub(crate) secure: bool,
116
117    /// The [`Instant`] that the first header bytes for this conn were
118    /// received, before any processing or parsing has been performed.
119    #[field(get, copy)]
120    pub(crate) start_time: Instant,
121
122    /// The IP Address for the connection, if available
123    #[field(set, get, copy, into)]
124    pub(crate) peer_ip: Option<IpAddr>,
125
126    /// the :authority http/3 pseudo-header
127    #[field(set, get, into)]
128    pub(crate) authority: Option<Cow<'static, str>>,
129
130    /// the :scheme http/3 pseudo-header
131    #[field(set, get, into)]
132    pub(crate) scheme: Option<Cow<'static, str>>,
133
134    /// the [`H3Connection`] for this conn, if this is an HTTP/3 request
135    #[field(get)]
136    pub(crate) h3_connection: Option<Arc<H3Connection>>,
137
138    /// stream id
139    pub(crate) h3_stream_id: Option<u64>,
140
141    /// the :protocol http/3 pseudo-header
142    #[field(set, get, into)]
143    pub(crate) protocol: Option<Cow<'static, str>>,
144
145    /// request trailers, populated after the request body has been fully read
146    #[field(get, get_mut)]
147    pub(crate) request_trailers: Option<Headers>,
148}
149
150impl<Transport> Debug for Conn<Transport> {
151    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
152        f.debug_struct("Conn")
153            .field("context", &self.context)
154            .field("request_headers", &self.request_headers)
155            .field("response_headers", &self.response_headers)
156            .field("path", &self.path)
157            .field("method", &self.method)
158            .field("status", &self.status)
159            .field("version", &self.version)
160            .field("state", &self.state)
161            .field("response_body", &self.response_body)
162            .field("transport", &format_args!(".."))
163            .field("buffer", &format_args!(".."))
164            .field("request_body_state", &self.request_body_state)
165            .field("secure", &self.secure)
166            .field("after_send", &format_args!(".."))
167            .field("start_time", &self.start_time)
168            .field("peer_ip", &self.peer_ip)
169            .field("authority", &self.authority)
170            .field("scheme", &self.scheme)
171            .field("protocol", &self.protocol)
172            .field("h3_connection", &self.h3_connection)
173            .field("h3_stream_id", &self.h3_stream_id)
174            .field("request_trailers", &self.request_trailers)
175            .finish()
176    }
177}
178
179impl<Transport> Conn<Transport>
180where
181    Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
182{
183    /// Returns the shared state on this conn, if set
184    pub fn shared_state(&self) -> &TypeSet {
185        &self.context.shared_state
186    }
187
188    /// sets the http status code from any `TryInto<Status>`.
189    ///
190    /// ```
191    /// # use trillium_http::Status;
192    /// # trillium_testing::HttpTest::new(|mut conn| async move {
193    /// assert!(conn.status().is_none());
194    ///
195    /// conn.set_status(200); // a status can be set as a u16
196    /// assert_eq!(conn.status().unwrap(), Status::Ok);
197    ///
198    /// conn.set_status(Status::ImATeapot); // or as a Status
199    /// assert_eq!(conn.status().unwrap(), Status::ImATeapot);
200    /// conn
201    /// # }).get("/").block().assert_status(Status::ImATeapot);
202    /// ```
203    pub fn set_status(&mut self, status: impl TryInto<Status>) -> &mut Self {
204        self.status = Some(status.try_into().unwrap_or_else(|_| {
205            log::error!("attempted to set an invalid status code");
206            Status::InternalServerError
207        }));
208        self
209    }
210
211    /// sets the http status code from any `TryInto<Status>`, returning Conn
212    #[must_use]
213    pub fn with_status(mut self, status: impl TryInto<Status>) -> Self {
214        self.set_status(status);
215        self
216    }
217
218    /// retrieves the path part of the request url, up to and excluding any query component
219    /// ```
220    /// # use trillium_testing::HttpTest;
221    /// HttpTest::new(|mut conn| async move {
222    ///     assert_eq!(conn.path(), "/some/path");
223    ///     conn.with_status(200)
224    /// })
225    /// .get("/some/path?and&a=query")
226    /// .block()
227    /// .assert_ok();
228    /// ```
229    pub fn path(&self) -> &str {
230        match self.path.split_once('?') {
231            Some((path, _)) => path,
232            None => &self.path,
233        }
234    }
235
236    /// retrieves the combined path and any query
237    pub fn path_and_query(&self) -> &str {
238        &self.path
239    }
240
241    /// retrieves the query component of the path, or an empty &str
242    ///
243    /// ```
244    /// # use trillium_testing::HttpTest;
245    /// let server = HttpTest::new(|conn| async move {
246    ///     let querystring = conn.querystring().to_string();
247    ///     conn.with_response_body(querystring).with_status(200)
248    /// });
249    ///
250    /// server
251    ///     .get("/some/path?and&a=query")
252    ///     .block()
253    ///     .assert_body("and&a=query");
254    ///
255    /// server.get("/some/path").block().assert_body("");
256    /// ```
257    pub fn querystring(&self) -> &str {
258        self.path
259            .split_once('?')
260            .map(|(_, query)| query)
261            .unwrap_or_default()
262    }
263
264    /// get the host for this conn, if it exists
265    pub fn host(&self) -> Option<&str> {
266        self.request_headers.get_str(Host)
267    }
268
269    /// set the host for this conn
270    pub fn set_host(&mut self, host: String) -> &mut Self {
271        self.request_headers.insert(Host, host);
272        self
273    }
274
275    /// Cancels and drops the future if reading from the transport results in an error or empty read
276    ///
277    /// The use of this method is not advised if your connected http client employs pipelining
278    /// (rarely seen in the wild), as it will buffer an unbounded number of requests one byte at a
279    /// time
280    ///
281    /// If the client disconnects from the conn's transport, this function will return None. If the
282    /// future completes without disconnection, this future will return Some containing the output
283    /// of the future.
284    ///
285    /// Note that the inner future cannot borrow conn, so you will need to clone or take any
286    /// information needed to execute the future prior to executing this method.
287    ///
288    /// # Example
289    ///
290    /// ```rust
291    /// # use futures_lite::{AsyncRead, AsyncWrite};
292    /// # use trillium_http::{Conn, Method};
293    /// async fn something_slow_and_cancel_safe() -> String {
294    ///     String::from("this was not actually slow")
295    /// }
296    /// async fn handler<T>(mut conn: Conn<T>) -> Conn<T>
297    /// where
298    ///     T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
299    /// {
300    ///     let Some(returned_body) = conn
301    ///         .cancel_on_disconnect(async { something_slow_and_cancel_safe().await })
302    ///         .await
303    ///     else {
304    ///         return conn;
305    ///     };
306    ///     conn.with_response_body(returned_body).with_status(200)
307    /// }
308    /// ```
309    pub async fn cancel_on_disconnect<'a, Fut>(&'a mut self, fut: Fut) -> Option<Fut::Output>
310    where
311        Fut: Future + Send + 'a,
312    {
313        CancelOnDisconnect(self, pin!(fut)).await
314    }
315
316    /// Check if the transport is connected by attempting to read from the transport
317    ///
318    /// # Example
319    ///
320    /// This is best to use at appropriate points in a long-running handler, like:
321    ///
322    /// ```rust
323    /// # use futures_lite::{AsyncRead, AsyncWrite};
324    /// # use trillium_http::{Conn, Method};
325    /// # async fn something_slow_but_not_cancel_safe() {}
326    /// async fn handler<T>(mut conn: Conn<T>) -> Conn<T>
327    /// where
328    ///     T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
329    /// {
330    ///     for _ in 0..100 {
331    ///         if conn.is_disconnected().await {
332    ///             return conn;
333    ///         }
334    ///         something_slow_but_not_cancel_safe().await;
335    ///     }
336    ///     conn.with_status(200)
337    /// }
338    /// ```
339    pub async fn is_disconnected(&mut self) -> bool {
340        future::poll_once(LivenessFut::new(self)).await.is_some()
341    }
342
343    /// returns the [`encoding_rs::Encoding`] for this request, as determined from the mime-type
344    /// charset, if available
345    ///
346    /// ```
347    /// # use trillium_testing::HttpTest;
348    /// HttpTest::new(|mut conn| async move {
349    ///     assert_eq!(conn.request_encoding(), encoding_rs::WINDOWS_1252); // the default
350    ///
351    ///     conn.request_headers_mut()
352    ///         .insert("content-type", "text/plain;charset=utf-16");
353    ///     assert_eq!(conn.request_encoding(), encoding_rs::UTF_16LE);
354    ///
355    ///     conn.with_status(200)
356    /// })
357    /// .get("/")
358    /// .block()
359    /// .assert_ok();
360    /// ```
361    pub fn request_encoding(&self) -> &'static Encoding {
362        encoding(&self.request_headers)
363    }
364
365    /// returns the [`encoding_rs::Encoding`] for this response, as
366    /// determined from the mime-type charset, if available
367    ///
368    /// ```
369    /// # use trillium_testing::HttpTest;
370    /// HttpTest::new(|mut conn| async move {
371    ///     assert_eq!(conn.response_encoding(), encoding_rs::WINDOWS_1252); // the default
372    ///     conn.response_headers_mut()
373    ///         .insert("content-type", "text/plain;charset=utf-16");
374    ///
375    ///     assert_eq!(conn.response_encoding(), encoding_rs::UTF_16LE);
376    ///
377    ///     conn.with_status(200)
378    /// })
379    /// .get("/")
380    /// .block()
381    /// .assert_ok();
382    /// ```
383    pub fn response_encoding(&self) -> &'static Encoding {
384        encoding(&self.response_headers)
385    }
386
387    /// returns a [`ReceivedBody`] that references this conn. the conn
388    /// retains all data and holds the singular transport, but the
389    /// `ReceivedBody` provides an interface to read body content.
390    ///
391    /// If the request included an `Expect: 100-continue` header, the 100 Continue response is sent
392    /// lazily on the first read from the returned [`ReceivedBody`].
393    /// ```
394    /// # use trillium_testing::HttpTest;
395    /// let server = HttpTest::new(|mut conn| async move {
396    ///     let request_body = conn.request_body();
397    ///     assert_eq!(request_body.content_length(), Some(5));
398    ///     assert_eq!(request_body.read_string().await.unwrap(), "hello");
399    ///     conn.with_status(200)
400    /// });
401    ///
402    /// server.post("/").with_body("hello").block().assert_ok();
403    /// ```
404    pub fn request_body(&mut self) -> ReceivedBody<'_, Transport> {
405        let needs_100_continue = self.needs_100_continue();
406        let body = self.build_request_body();
407        if needs_100_continue {
408            body.with_send_100_continue()
409        } else {
410            body
411        }
412    }
413
414    /// returns a clone of the [`swansong::Swansong`] for this Conn. use
415    /// this to gracefully stop long-running futures and streams
416    /// inside of handler functions
417    pub fn swansong(&self) -> Swansong {
418        self.h3_connection
419            .as_ref()
420            .map_or_else(|| self.context.swansong.clone(), |h| h.swansong().clone())
421    }
422
423    /// Registers a function to call after the http response has been
424    /// completely transferred. Please note that this is a sync function
425    /// and should be computationally lightweight. If your _application_
426    /// needs additional async processing, use your runtime's task spawn
427    /// within this hook.  If your _library_ needs additional async
428    /// processing in an `after_send` hook, please open an issue. This hook
429    /// is currently designed for simple instrumentation and logging, and
430    /// should be thought of as equivalent to a Drop hook.
431    pub fn after_send<F>(&mut self, after_send: F)
432    where
433        F: FnOnce(SendStatus) + Send + Sync + 'static,
434    {
435        self.after_send.append(after_send);
436    }
437
438    /// applies a mapping function from one transport to another. This
439    /// is particularly useful for boxing the transport. unless you're
440    /// sure this is what you're looking for, you probably don't want
441    /// to be using this
442    pub fn map_transport<NewTransport>(
443        self,
444        f: impl Fn(Transport) -> NewTransport,
445    ) -> Conn<NewTransport>
446    where
447        NewTransport: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
448    {
449        Conn {
450            context: self.context,
451            request_headers: self.request_headers,
452            response_headers: self.response_headers,
453            method: self.method,
454            response_body: self.response_body,
455            path: self.path,
456            status: self.status,
457            version: self.version,
458            state: self.state,
459            transport: f(self.transport),
460            buffer: self.buffer,
461            request_body_state: self.request_body_state,
462            secure: self.secure,
463            after_send: self.after_send,
464            start_time: self.start_time,
465            peer_ip: self.peer_ip,
466            authority: self.authority,
467            scheme: self.scheme,
468            h3_connection: self.h3_connection,
469            protocol: self.protocol,
470            request_trailers: self.request_trailers,
471            h3_stream_id: self.h3_stream_id,
472        }
473    }
474
475    /// whether this conn is suitable for an http upgrade to another protocol
476    pub fn should_upgrade(&self) -> bool {
477        (self.method() == Method::Connect && self.status == Some(Status::Ok))
478            || self.status == Some(Status::SwitchingProtocols)
479    }
480
481    #[doc(hidden)]
482    pub fn finalize_headers(&mut self) {
483        if self.version == Version::Http3 {
484            self.finalize_response_headers_h3();
485        } else {
486            self.finalize_response_headers_1x();
487        }
488    }
489}