trillium_http/conn.rs
1use crate::{
2 Body, Buffer, Headers, HttpContext,
3 KnownHeaderName::Host,
4 Method, ReceivedBody, Status, Swansong, TypeSet, Version,
5 after_send::{AfterSend, SendStatus},
6 h3::H3Connection,
7 liveness::{CancelOnDisconnect, LivenessFut},
8 received_body::ReceivedBodyState,
9 util::encoding,
10};
11use encoding_rs::Encoding;
12use futures_lite::{
13 future,
14 io::{AsyncRead, AsyncWrite},
15};
16use std::{
17 borrow::Cow,
18 fmt::{self, Debug, Formatter},
19 future::Future,
20 net::IpAddr,
21 pin::pin,
22 str,
23 sync::Arc,
24 time::Instant,
25};
26mod h1;
27mod h3;
28
29/// A http connection
30///
31/// Unlike in other rust http implementations, this struct represents both
32/// the request and the response, and holds the transport over which the
33/// response will be sent.
34#[derive(fieldwork::Fieldwork)]
35pub struct Conn<Transport> {
36 #[field(get)]
37 /// the shared [`HttpContext`]
38 pub(crate) context: Arc<HttpContext>,
39
40 /// request [headers](Headers)
41 #[field(get, get_mut)]
42 pub(crate) request_headers: Headers,
43
44 /// response [headers](Headers)
45 #[field(get, get_mut)]
46 pub(crate) response_headers: Headers,
47
48 pub(crate) path: Cow<'static, str>,
49
50 /// the http method for this conn's request
51 ///
52 /// ```
53 /// # use trillium_http::{Conn, Method};
54 /// let mut conn = Conn::new_synthetic(Method::Get, "/some/path?and&a=query", ());
55 /// assert_eq!(conn.method(), Method::Get);
56 /// ```
57 #[field(get, set, copy)]
58 pub(crate) method: Method,
59
60 /// the http status for this conn, if set
61 #[field(get, copy)]
62 pub(crate) status: Option<Status>,
63
64 #[field(get = http_version, copy)]
65 /// the http version for this conn
66 pub(crate) version: Version,
67
68 /// the [state typemap](TypeSet) for this conn
69 #[field(get, get_mut)]
70 pub(crate) state: TypeSet,
71
72 /// the response [body](Body)
73 ///
74 /// ```
75 /// # use trillium_testing::HttpTest;
76 /// HttpTest::new(|conn| async move { conn.with_response_body("hello") })
77 /// .get("/")
78 /// .block()
79 /// .assert_body("hello");
80 ///
81 /// HttpTest::new(|conn| async move { conn.with_response_body(String::from("world")) })
82 /// .get("/")
83 /// .block()
84 /// .assert_body("world");
85 ///
86 /// HttpTest::new(|conn| async move { conn.with_response_body(vec![99, 97, 116]) })
87 /// .get("/")
88 /// .block()
89 /// .assert_body("cat");
90 /// ```
91 #[field(get, set, into, option_set_some, take, with)]
92 pub(crate) response_body: Option<Body>,
93
94 /// the transport
95 ///
96 /// This should only be used to call your own custom methods on the transport that do not read
97 /// or write any data. Calling any method that reads from or writes to the transport will
98 /// disrupt the HTTP protocol. If you're looking to transition from HTTP to another protocol,
99 /// use an HTTP upgrade.
100 #[field(get, get_mut)]
101 pub(crate) transport: Transport,
102
103 pub(crate) buffer: Buffer,
104
105 pub(crate) request_body_state: ReceivedBodyState,
106
107 pub(crate) after_send: AfterSend,
108
109 /// whether the connection is secure
110 ///
111 /// note that this does not necessarily indicate that the transport itself is secure, as it may
112 /// indicate that `trillium_http` is behind a trusted reverse proxy that has terminated tls and
113 /// provided appropriate headers to indicate this.
114 #[field(get, set, rename_predicates)]
115 pub(crate) secure: bool,
116
117 /// The [`Instant`] that the first header bytes for this conn were
118 /// received, before any processing or parsing has been performed.
119 #[field(get, copy)]
120 pub(crate) start_time: Instant,
121
122 /// The IP Address for the connection, if available
123 #[field(set, get, copy, into)]
124 pub(crate) peer_ip: Option<IpAddr>,
125
126 /// the :authority http/3 pseudo-header
127 #[field(set, get, into)]
128 pub(crate) authority: Option<Cow<'static, str>>,
129
130 /// the :scheme http/3 pseudo-header
131 #[field(set, get, into)]
132 pub(crate) scheme: Option<Cow<'static, str>>,
133
134 /// the [`H3Connection`] for this conn, if this is an HTTP/3 request
135 #[field(get)]
136 pub(crate) h3_connection: Option<Arc<H3Connection>>,
137
138 /// stream id
139 pub(crate) h3_stream_id: Option<u64>,
140
141 /// the :protocol http/3 pseudo-header
142 #[field(set, get, into)]
143 pub(crate) protocol: Option<Cow<'static, str>>,
144
145 /// request trailers, populated after the request body has been fully read
146 #[field(get, get_mut)]
147 pub(crate) request_trailers: Option<Headers>,
148}
149
150impl<Transport> Debug for Conn<Transport> {
151 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
152 f.debug_struct("Conn")
153 .field("context", &self.context)
154 .field("request_headers", &self.request_headers)
155 .field("response_headers", &self.response_headers)
156 .field("path", &self.path)
157 .field("method", &self.method)
158 .field("status", &self.status)
159 .field("version", &self.version)
160 .field("state", &self.state)
161 .field("response_body", &self.response_body)
162 .field("transport", &format_args!(".."))
163 .field("buffer", &format_args!(".."))
164 .field("request_body_state", &self.request_body_state)
165 .field("secure", &self.secure)
166 .field("after_send", &format_args!(".."))
167 .field("start_time", &self.start_time)
168 .field("peer_ip", &self.peer_ip)
169 .field("authority", &self.authority)
170 .field("scheme", &self.scheme)
171 .field("protocol", &self.protocol)
172 .field("h3_connection", &self.h3_connection)
173 .field("h3_stream_id", &self.h3_stream_id)
174 .field("request_trailers", &self.request_trailers)
175 .finish()
176 }
177}
178
179impl<Transport> Conn<Transport>
180where
181 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
182{
183 /// Returns the shared state on this conn, if set
184 pub fn shared_state(&self) -> &TypeSet {
185 &self.context.shared_state
186 }
187
188 /// sets the http status code from any `TryInto<Status>`.
189 ///
190 /// ```
191 /// # use trillium_http::Status;
192 /// # trillium_testing::HttpTest::new(|mut conn| async move {
193 /// assert!(conn.status().is_none());
194 ///
195 /// conn.set_status(200); // a status can be set as a u16
196 /// assert_eq!(conn.status().unwrap(), Status::Ok);
197 ///
198 /// conn.set_status(Status::ImATeapot); // or as a Status
199 /// assert_eq!(conn.status().unwrap(), Status::ImATeapot);
200 /// conn
201 /// # }).get("/").block().assert_status(Status::ImATeapot);
202 /// ```
203 pub fn set_status(&mut self, status: impl TryInto<Status>) -> &mut Self {
204 self.status = Some(status.try_into().unwrap_or_else(|_| {
205 log::error!("attempted to set an invalid status code");
206 Status::InternalServerError
207 }));
208 self
209 }
210
211 /// sets the http status code from any `TryInto<Status>`, returning Conn
212 #[must_use]
213 pub fn with_status(mut self, status: impl TryInto<Status>) -> Self {
214 self.set_status(status);
215 self
216 }
217
218 /// retrieves the path part of the request url, up to and excluding any query component
219 /// ```
220 /// # use trillium_testing::HttpTest;
221 /// HttpTest::new(|mut conn| async move {
222 /// assert_eq!(conn.path(), "/some/path");
223 /// conn.with_status(200)
224 /// })
225 /// .get("/some/path?and&a=query")
226 /// .block()
227 /// .assert_ok();
228 /// ```
229 pub fn path(&self) -> &str {
230 match self.path.split_once('?') {
231 Some((path, _)) => path,
232 None => &self.path,
233 }
234 }
235
236 /// retrieves the combined path and any query
237 pub fn path_and_query(&self) -> &str {
238 &self.path
239 }
240
241 /// retrieves the query component of the path, or an empty &str
242 ///
243 /// ```
244 /// # use trillium_testing::HttpTest;
245 /// let server = HttpTest::new(|conn| async move {
246 /// let querystring = conn.querystring().to_string();
247 /// conn.with_response_body(querystring).with_status(200)
248 /// });
249 ///
250 /// server
251 /// .get("/some/path?and&a=query")
252 /// .block()
253 /// .assert_body("and&a=query");
254 ///
255 /// server.get("/some/path").block().assert_body("");
256 /// ```
257 pub fn querystring(&self) -> &str {
258 self.path
259 .split_once('?')
260 .map(|(_, query)| query)
261 .unwrap_or_default()
262 }
263
264 /// get the host for this conn, if it exists
265 pub fn host(&self) -> Option<&str> {
266 self.request_headers.get_str(Host)
267 }
268
269 /// set the host for this conn
270 pub fn set_host(&mut self, host: String) -> &mut Self {
271 self.request_headers.insert(Host, host);
272 self
273 }
274
275 /// Cancels and drops the future if reading from the transport results in an error or empty read
276 ///
277 /// The use of this method is not advised if your connected http client employs pipelining
278 /// (rarely seen in the wild), as it will buffer an unbounded number of requests one byte at a
279 /// time
280 ///
281 /// If the client disconnects from the conn's transport, this function will return None. If the
282 /// future completes without disconnection, this future will return Some containing the output
283 /// of the future.
284 ///
285 /// Note that the inner future cannot borrow conn, so you will need to clone or take any
286 /// information needed to execute the future prior to executing this method.
287 ///
288 /// # Example
289 ///
290 /// ```rust
291 /// # use futures_lite::{AsyncRead, AsyncWrite};
292 /// # use trillium_http::{Conn, Method};
293 /// async fn something_slow_and_cancel_safe() -> String {
294 /// String::from("this was not actually slow")
295 /// }
296 /// async fn handler<T>(mut conn: Conn<T>) -> Conn<T>
297 /// where
298 /// T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
299 /// {
300 /// let Some(returned_body) = conn
301 /// .cancel_on_disconnect(async { something_slow_and_cancel_safe().await })
302 /// .await
303 /// else {
304 /// return conn;
305 /// };
306 /// conn.with_response_body(returned_body).with_status(200)
307 /// }
308 /// ```
309 pub async fn cancel_on_disconnect<'a, Fut>(&'a mut self, fut: Fut) -> Option<Fut::Output>
310 where
311 Fut: Future + Send + 'a,
312 {
313 CancelOnDisconnect(self, pin!(fut)).await
314 }
315
316 /// Check if the transport is connected by attempting to read from the transport
317 ///
318 /// # Example
319 ///
320 /// This is best to use at appropriate points in a long-running handler, like:
321 ///
322 /// ```rust
323 /// # use futures_lite::{AsyncRead, AsyncWrite};
324 /// # use trillium_http::{Conn, Method};
325 /// # async fn something_slow_but_not_cancel_safe() {}
326 /// async fn handler<T>(mut conn: Conn<T>) -> Conn<T>
327 /// where
328 /// T: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
329 /// {
330 /// for _ in 0..100 {
331 /// if conn.is_disconnected().await {
332 /// return conn;
333 /// }
334 /// something_slow_but_not_cancel_safe().await;
335 /// }
336 /// conn.with_status(200)
337 /// }
338 /// ```
339 pub async fn is_disconnected(&mut self) -> bool {
340 future::poll_once(LivenessFut::new(self)).await.is_some()
341 }
342
343 /// returns the [`encoding_rs::Encoding`] for this request, as determined from the mime-type
344 /// charset, if available
345 ///
346 /// ```
347 /// # use trillium_testing::HttpTest;
348 /// HttpTest::new(|mut conn| async move {
349 /// assert_eq!(conn.request_encoding(), encoding_rs::WINDOWS_1252); // the default
350 ///
351 /// conn.request_headers_mut()
352 /// .insert("content-type", "text/plain;charset=utf-16");
353 /// assert_eq!(conn.request_encoding(), encoding_rs::UTF_16LE);
354 ///
355 /// conn.with_status(200)
356 /// })
357 /// .get("/")
358 /// .block()
359 /// .assert_ok();
360 /// ```
361 pub fn request_encoding(&self) -> &'static Encoding {
362 encoding(&self.request_headers)
363 }
364
365 /// returns the [`encoding_rs::Encoding`] for this response, as
366 /// determined from the mime-type charset, if available
367 ///
368 /// ```
369 /// # use trillium_testing::HttpTest;
370 /// HttpTest::new(|mut conn| async move {
371 /// assert_eq!(conn.response_encoding(), encoding_rs::WINDOWS_1252); // the default
372 /// conn.response_headers_mut()
373 /// .insert("content-type", "text/plain;charset=utf-16");
374 ///
375 /// assert_eq!(conn.response_encoding(), encoding_rs::UTF_16LE);
376 ///
377 /// conn.with_status(200)
378 /// })
379 /// .get("/")
380 /// .block()
381 /// .assert_ok();
382 /// ```
383 pub fn response_encoding(&self) -> &'static Encoding {
384 encoding(&self.response_headers)
385 }
386
387 /// returns a [`ReceivedBody`] that references this conn. the conn
388 /// retains all data and holds the singular transport, but the
389 /// `ReceivedBody` provides an interface to read body content.
390 ///
391 /// If the request included an `Expect: 100-continue` header, the 100 Continue response is sent
392 /// lazily on the first read from the returned [`ReceivedBody`].
393 /// ```
394 /// # use trillium_testing::HttpTest;
395 /// let server = HttpTest::new(|mut conn| async move {
396 /// let request_body = conn.request_body();
397 /// assert_eq!(request_body.content_length(), Some(5));
398 /// assert_eq!(request_body.read_string().await.unwrap(), "hello");
399 /// conn.with_status(200)
400 /// });
401 ///
402 /// server.post("/").with_body("hello").block().assert_ok();
403 /// ```
404 pub fn request_body(&mut self) -> ReceivedBody<'_, Transport> {
405 let needs_100_continue = self.needs_100_continue();
406 let body = self.build_request_body();
407 if needs_100_continue {
408 body.with_send_100_continue()
409 } else {
410 body
411 }
412 }
413
414 /// returns a clone of the [`swansong::Swansong`] for this Conn. use
415 /// this to gracefully stop long-running futures and streams
416 /// inside of handler functions
417 pub fn swansong(&self) -> Swansong {
418 self.h3_connection
419 .as_ref()
420 .map_or_else(|| self.context.swansong.clone(), |h| h.swansong().clone())
421 }
422
423 /// Registers a function to call after the http response has been
424 /// completely transferred. Please note that this is a sync function
425 /// and should be computationally lightweight. If your _application_
426 /// needs additional async processing, use your runtime's task spawn
427 /// within this hook. If your _library_ needs additional async
428 /// processing in an `after_send` hook, please open an issue. This hook
429 /// is currently designed for simple instrumentation and logging, and
430 /// should be thought of as equivalent to a Drop hook.
431 pub fn after_send<F>(&mut self, after_send: F)
432 where
433 F: FnOnce(SendStatus) + Send + Sync + 'static,
434 {
435 self.after_send.append(after_send);
436 }
437
438 /// applies a mapping function from one transport to another. This
439 /// is particularly useful for boxing the transport. unless you're
440 /// sure this is what you're looking for, you probably don't want
441 /// to be using this
442 pub fn map_transport<NewTransport>(
443 self,
444 f: impl Fn(Transport) -> NewTransport,
445 ) -> Conn<NewTransport>
446 where
447 NewTransport: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
448 {
449 Conn {
450 context: self.context,
451 request_headers: self.request_headers,
452 response_headers: self.response_headers,
453 method: self.method,
454 response_body: self.response_body,
455 path: self.path,
456 status: self.status,
457 version: self.version,
458 state: self.state,
459 transport: f(self.transport),
460 buffer: self.buffer,
461 request_body_state: self.request_body_state,
462 secure: self.secure,
463 after_send: self.after_send,
464 start_time: self.start_time,
465 peer_ip: self.peer_ip,
466 authority: self.authority,
467 scheme: self.scheme,
468 h3_connection: self.h3_connection,
469 protocol: self.protocol,
470 request_trailers: self.request_trailers,
471 h3_stream_id: self.h3_stream_id,
472 }
473 }
474
475 /// whether this conn is suitable for an http upgrade to another protocol
476 pub fn should_upgrade(&self) -> bool {
477 (self.method() == Method::Connect && self.status == Some(Status::Ok))
478 || self.status == Some(Status::SwitchingProtocols)
479 }
480
481 #[doc(hidden)]
482 pub fn finalize_headers(&mut self) {
483 if self.version == Version::Http3 {
484 self.finalize_response_headers_h3();
485 } else {
486 self.finalize_response_headers_1x();
487 }
488 }
489}