Skip to main content

trillium_http/
body.rs

1use crate::{Headers, h2::H2Body, h3::H3Body};
2use BodyType::{Empty, Static, Streaming};
3use futures_lite::{AsyncRead, AsyncReadExt, io::Cursor, ready};
4use pin_project_lite::pin_project;
5use std::{
6    borrow::Cow,
7    fmt::{self, Debug, Formatter},
8    io::{Error, Result},
9    pin::Pin,
10    task::{Context, Poll},
11};
12use sync_wrapper::SyncWrapper;
13
14/// Streaming body source that can optionally produce trailers.
15///
16/// Implement this on types that compute trailer headers dynamically as the body
17/// is read — for example, a hashing wrapper that produces a `Digest` trailer
18/// after all bytes have been streamed. For plain [`AsyncRead`] sources with no
19/// trailers, [`Body::new_streaming`] is simpler.
20pub trait BodySource: AsyncRead + Send + 'static {
21    /// Returns the trailers for this body, called after the body has been fully read.
22    ///
23    /// Implementations may clear internal state on this call; the result is
24    /// only meaningful after [`AsyncRead::poll_read`] has returned `Ok(0)`.
25    fn trailers(self: Pin<&mut Self>) -> Option<Headers>;
26}
27
28pin_project! {
29    struct PlainBody<T> {
30        #[pin]
31        async_read: T,
32    }
33}
34
35impl<T: AsyncRead> AsyncRead for PlainBody<T> {
36    fn poll_read(
37        self: Pin<&mut Self>,
38        cx: &mut Context<'_>,
39        buf: &mut [u8],
40    ) -> Poll<Result<usize>> {
41        self.project().async_read.poll_read(cx, buf)
42    }
43}
44
45impl<T: AsyncRead + Send + 'static> BodySource for PlainBody<T> {
46    fn trailers(self: Pin<&mut Self>) -> Option<Headers> {
47        None
48    }
49}
50
51/// The trillium representation of a http body. This can contain
52/// either `&'static [u8]` content, `Vec<u8>` content, or a boxed
53/// [`AsyncRead`]/[`BodySource`] type.
54#[derive(Debug, Default)]
55pub struct Body(pub(crate) BodyType);
56
57impl Body {
58    /// Construct a new body from a streaming [`AsyncRead`] source. If
59    /// you have the body content in memory already, prefer
60    /// [`Body::new_static`] or one of the From conversions.
61    pub fn new_streaming(async_read: impl AsyncRead + Send + 'static, len: Option<u64>) -> Self {
62        Self::new_with_trailers(PlainBody { async_read }, len)
63    }
64
65    /// Construct a new body from a [`BodySource`] that can produce trailers after
66    /// the body has been fully read.
67    ///
68    /// Use this when trailers must be computed dynamically from the body bytes,
69    /// for example to append a content hash.
70    pub fn new_with_trailers(body: impl BodySource, len: Option<u64>) -> Self {
71        Self(Streaming {
72            async_read: SyncWrapper::new(Box::pin(body)),
73            len,
74            done: false,
75            progress: 0,
76            chunked_framing: true,
77        })
78    }
79
80    /// Disable chunked-encoding framing emitted by [`AsyncRead`] for streaming bodies
81    /// of unknown length.
82    ///
83    /// By default, when a streaming body has no known length, this type's [`AsyncRead`]
84    /// implementation emits chunked framing so the h1 codec can write its bytes directly.
85    /// That framing is wrong for any consumer that wants raw body bytes.
86    #[doc(hidden)]
87    #[cfg(feature = "unstable")]
88    #[must_use]
89    pub fn without_chunked_framing(mut self) -> Self {
90        if let Streaming {
91            ref mut chunked_framing,
92            ..
93        } = self.0
94        {
95            *chunked_framing = false;
96        }
97        self
98    }
99
100    pub(crate) fn ensure_chunked_framing(&mut self) -> &mut Self {
101        if let Streaming {
102            ref mut chunked_framing,
103            ..
104        } = self.0
105        {
106            *chunked_framing = true;
107        }
108
109        self
110    }
111
112    /// Returns trailers from the body source, if any.
113    ///
114    /// Only meaningful after the body has been fully read (i.e., [`AsyncRead::poll_read`]
115    /// has returned `Ok(0)`). Returns `None` for bodies constructed with
116    /// [`Body::new_streaming`] or [`Body::new_static`].
117    #[doc(hidden)]
118    pub fn trailers(&mut self) -> Option<Headers> {
119        match &mut self.0 {
120            Streaming {
121                async_read, done, ..
122            } if *done => async_read.get_mut().as_mut().trailers(),
123            _ => None,
124        }
125    }
126
127    /// Construct a fixed-length Body from a `Vec<u8>` or `&'static
128    /// [u8]`.
129    pub fn new_static(content: impl Into<Cow<'static, [u8]>>) -> Self {
130        Self(Static {
131            content: content.into(),
132            cursor: 0,
133        })
134    }
135
136    /// Retrieve a borrow of the static content in this body. If this
137    /// body is a streaming body or an empty body, this will return
138    /// None.
139    pub fn static_bytes(&self) -> Option<&[u8]> {
140        match &self.0 {
141            Static { content, .. } => Some(content.as_ref()),
142            _ => None,
143        }
144    }
145
146    /// Transform this Body into a dyn [`AsyncRead`], wrapping static content in
147    /// a [`Cursor`]. Unlike reading from the Body directly, this does not apply
148    /// chunked encoding.
149    pub fn into_reader(self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> {
150        match self.0 {
151            Streaming { async_read, .. } => Box::pin(SyncAsyncReader(async_read)),
152            Static { content, .. } => Box::pin(Cursor::new(content)),
153            Empty => Box::pin(Cursor::new("")),
154        }
155    }
156
157    /// Consume this body and return the full content. If the body was constructed
158    /// with [`Body::new_streaming`], this will read the entire streaming body into
159    /// memory, awaiting the streaming source's completion.
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if the underlying transport errors, or if a streaming body
164    /// has already been partially or fully read.
165    pub async fn into_bytes(self) -> Result<Cow<'static, [u8]>> {
166        match self.0 {
167            Static { content, .. } => Ok(content),
168
169            Streaming {
170                async_read,
171                len,
172                progress: 0,
173                done: false,
174                ..
175            } => {
176                let mut async_read = async_read.into_inner();
177                let mut buf = len
178                    .and_then(|c| c.try_into().ok())
179                    .map(Vec::with_capacity)
180                    .unwrap_or_default();
181
182                async_read.read_to_end(&mut buf).await?;
183
184                Ok(Cow::Owned(buf))
185            }
186
187            Empty => Ok(Cow::Borrowed(b"")),
188
189            Streaming { .. } => Err(Error::other("body already read to completion")),
190        }
191    }
192
193    /// Retrieve the number of bytes that have been read from this
194    /// body
195    pub fn bytes_read(&self) -> u64 {
196        self.0.bytes_read()
197    }
198
199    /// returns the content length of this body, if known and
200    /// available.
201    pub fn len(&self) -> Option<u64> {
202        self.0.len()
203    }
204
205    /// determine if the this body represents no data
206    pub fn is_empty(&self) -> bool {
207        self.0.is_empty()
208    }
209
210    /// determine if the this body represents static content
211    pub fn is_static(&self) -> bool {
212        matches!(self.0, Static { .. })
213    }
214
215    /// determine if the this body represents streaming content
216    pub fn is_streaming(&self) -> bool {
217        matches!(self.0, Streaming { .. })
218    }
219
220    /// Attempt to clone this body. Returns `None` for streaming bodies, which are one-shot.
221    ///
222    /// Static bodies clone cheaply — a `Cow` clone, which is a pointer copy for borrowed
223    /// `&'static` content and a `Vec` clone for owned content. The clone resets read
224    /// progress, so it can be sent again from the beginning. Empty bodies always clone
225    /// successfully.
226    #[doc(hidden)]
227    #[cfg(feature = "unstable")]
228    pub fn try_clone(&self) -> Option<Self> {
229        match &self.0 {
230            Empty => Some(Self::default()),
231            Static { content, .. } => Some(Self(Static {
232                content: content.clone(),
233                cursor: 0,
234            })),
235            Streaming { .. } => None,
236        }
237    }
238
239    /// Convert this body into an `H3Body` for reading
240    #[cfg(feature = "unstable")]
241    pub fn into_h3(self) -> H3Body {
242        H3Body::new(self)
243    }
244
245    /// Convert this body into an `H3Body` for reading
246    #[cfg(not(feature = "unstable"))]
247    pub(crate) fn into_h3(self) -> H3Body {
248        H3Body::new(self)
249    }
250
251    /// Convert this body into an [`H2Body`] for reading by the h2 send pump.
252    ///
253    /// h2 frames DATA at the connection layer, so the body bytes that reach the send pump
254    /// must be plain payload — not chunk-encoded. [`H2Body`] strips the chunked-transfer
255    /// wrapping that [`Body::poll_read`] applies for the h1 path on streaming bodies of
256    /// unknown length, and forwards trailers so the send pump can emit trailing HEADERS.
257    pub(crate) fn into_h2(self) -> H2Body {
258        H2Body::new(self)
259    }
260}
261
262#[allow(
263    clippy::cast_sign_loss,
264    clippy::cast_possible_truncation,
265    clippy::cast_precision_loss,
266    reason = "buffers are well below petabyte scale; log2/4 of a usize stays in f64 range, and \
267              the subtraction always yields a non-negative usize-representable value"
268)]
269fn max_bytes_to_read(buf_len: usize) -> usize {
270    assert!(
271        buf_len >= 6,
272        "buffers of length {buf_len} are too small for this implementation.
273            if this is a problem for you, please open an issue"
274    );
275
276    let bytes_remaining_after_two_cr_lns = (buf_len - 4) as f64;
277    // maximum number of bytes the hex representation of the remaining bytes might take
278    let max_bytes_of_hex_framing = (bytes_remaining_after_two_cr_lns).log2() / 4f64;
279    (bytes_remaining_after_two_cr_lns - max_bytes_of_hex_framing.ceil()) as usize
280}
281
282impl AsyncRead for Body {
283    fn poll_read(
284        mut self: Pin<&mut Self>,
285        cx: &mut Context<'_>,
286        buf: &mut [u8],
287    ) -> Poll<Result<usize>> {
288        match &mut self.0 {
289            Empty => Poll::Ready(Ok(0)),
290            Static { content, cursor } => {
291                let length = content.len();
292                if length == *cursor {
293                    return Poll::Ready(Ok(0));
294                }
295                let bytes = (length - *cursor).min(buf.len());
296                buf[0..bytes].copy_from_slice(&content[*cursor..*cursor + bytes]);
297                *cursor += bytes;
298                Poll::Ready(Ok(bytes))
299            }
300
301            Streaming {
302                async_read,
303                len: Some(len),
304                done,
305                progress,
306                ..
307            } => {
308                if *done {
309                    return Poll::Ready(Ok(0));
310                }
311
312                let max_bytes_to_read = (*len - *progress)
313                    .try_into()
314                    .unwrap_or(buf.len())
315                    .min(buf.len());
316
317                let bytes = ready!(
318                    async_read
319                        .get_mut()
320                        .as_mut()
321                        .poll_read(cx, &mut buf[..max_bytes_to_read])
322                )?;
323
324                if bytes == 0 {
325                    *done = true;
326                } else {
327                    *progress += bytes as u64;
328                }
329
330                Poll::Ready(Ok(bytes))
331            }
332
333            Streaming {
334                async_read,
335                len: None,
336                done,
337                progress,
338                chunked_framing,
339            } => {
340                if *done {
341                    return Poll::Ready(Ok(0));
342                }
343
344                if !*chunked_framing {
345                    let bytes = ready!(async_read.get_mut().as_mut().poll_read(cx, buf))?;
346                    if bytes == 0 {
347                        *done = true;
348                    } else {
349                        *progress += bytes as u64;
350                    }
351                    return Poll::Ready(Ok(bytes));
352                }
353
354                let max_bytes_to_read = max_bytes_to_read(buf.len());
355
356                let bytes = ready!(
357                    async_read
358                        .get_mut()
359                        .as_mut()
360                        .poll_read(cx, &mut buf[..max_bytes_to_read])
361                )?;
362
363                if bytes == 0 {
364                    *done = true;
365                    // Last-chunk marker only; the caller emits the trailer-section
366                    // (possibly empty) followed by the terminating `\r\n`. Trailers come
367                    // from `BodySource::trailers()` as structured `Headers`, not bytes,
368                    // and the caller writes them in one shot so this path doesn't need
369                    // a multi-poll state machine spanning buffers.
370                    buf[..3].copy_from_slice(b"0\r\n");
371                    return Poll::Ready(Ok(3));
372                }
373
374                *progress += bytes as u64;
375
376                let start = format!("{bytes:X}\r\n");
377                let start_length = start.len();
378                let total = bytes + start_length + 2;
379                buf.copy_within(..bytes, start_length);
380                buf[..start_length].copy_from_slice(start.as_bytes());
381                buf[total - 2..total].copy_from_slice(b"\r\n");
382                Poll::Ready(Ok(total))
383            }
384        }
385    }
386}
387
388struct SyncAsyncReader(SyncWrapper<Pin<Box<dyn BodySource>>>);
389impl Debug for SyncAsyncReader {
390    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
391        f.debug_struct("SyncAsyncReader").finish()
392    }
393}
394impl AsyncRead for SyncAsyncReader {
395    fn poll_read(
396        self: Pin<&mut Self>,
397        cx: &mut Context<'_>,
398        buf: &mut [u8],
399    ) -> Poll<Result<usize>> {
400        self.get_mut().0.get_mut().as_mut().poll_read(cx, buf)
401    }
402}
403
404#[derive(Default)]
405pub(crate) enum BodyType {
406    #[default]
407    Empty,
408
409    Static {
410        content: Cow<'static, [u8]>,
411        cursor: usize,
412    },
413
414    Streaming {
415        async_read: SyncWrapper<Pin<Box<dyn BodySource>>>,
416        progress: u64,
417        len: Option<u64>,
418        done: bool,
419        /// When true (the default), [`Body`]'s [`AsyncRead`] impl emits chunked
420        /// framing for the `len: None` case; when false (via
421        /// [`Body::without_chunked_framing`]), it passes through raw bytes.
422        chunked_framing: bool,
423    },
424}
425
426impl Debug for BodyType {
427    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
428        match self {
429            Empty => f.debug_tuple("BodyType::Empty").finish(),
430            Static { content, cursor } => f
431                .debug_struct("BodyType::Static")
432                .field("content", &String::from_utf8_lossy(content))
433                .field("cursor", cursor)
434                .finish(),
435            Streaming {
436                len,
437                done,
438                progress,
439                ..
440            } => f
441                .debug_struct("BodyType::Streaming")
442                .field("async_read", &format_args!(".."))
443                .field("len", &len)
444                .field("done", &done)
445                .field("progress", &progress)
446                .finish(),
447        }
448    }
449}
450
451impl BodyType {
452    fn is_empty(&self) -> bool {
453        match *self {
454            Empty => true,
455            Static { ref content, .. } => content.is_empty(),
456            Streaming { len, .. } => len == Some(0),
457        }
458    }
459
460    fn len(&self) -> Option<u64> {
461        match *self {
462            Empty => Some(0),
463            Static { ref content, .. } => Some(content.len() as u64),
464            Streaming { len, .. } => len,
465        }
466    }
467
468    fn bytes_read(&self) -> u64 {
469        match *self {
470            Empty => 0,
471            Static { cursor, .. } => cursor as u64,
472            Streaming { progress, .. } => progress,
473        }
474    }
475}
476
477impl From<String> for Body {
478    fn from(s: String) -> Self {
479        s.into_bytes().into()
480    }
481}
482
483impl From<&'static str> for Body {
484    fn from(s: &'static str) -> Self {
485        s.as_bytes().into()
486    }
487}
488
489impl From<&'static [u8]> for Body {
490    fn from(content: &'static [u8]) -> Self {
491        Self::new_static(content)
492    }
493}
494
495impl From<Vec<u8>> for Body {
496    fn from(content: Vec<u8>) -> Self {
497        Self::new_static(content)
498    }
499}
500
501impl From<Cow<'static, [u8]>> for Body {
502    fn from(value: Cow<'static, [u8]>) -> Self {
503        Self::new_static(value)
504    }
505}
506
507impl From<Cow<'static, str>> for Body {
508    fn from(value: Cow<'static, str>) -> Self {
509        match value {
510            Cow::Borrowed(b) => b.into(),
511            Cow::Owned(o) => o.into(),
512        }
513    }
514}
515
516#[cfg(test)]
517mod test_bytes_to_read {
518    #[test]
519    fn simple_check_of_known_values() {
520        // the marked rows are the most important part of this test,
521        // and a nonobvious but intentional consequence of the
522        // implementation. in order to avoid overflowing, we must use
523        // one fewer than the available buffer bytes because
524        // increasing the read size increase the number of framed
525        // bytes by two. This occurs when the hex representation of
526        // the content bytes is near an increase in order of magnitude
527        // (F->10, FF->100, FFF-> 1000, etc)
528        let values = vec![
529            (6, 1),       // 1
530            (7, 2),       // 2
531            (20, 15),     // F
532            (21, 15),     // F <-
533            (22, 16),     // 10
534            (23, 17),     // 11
535            (260, 254),   // FE
536            (261, 254),   // FE <-
537            (262, 255),   // FF <-
538            (263, 256),   // 100
539            (4100, 4093), // FFD
540            (4101, 4093), // FFD <-
541            (4102, 4094), // FFE <-
542            (4103, 4095), // FFF <-
543            (4104, 4096), // 1000
544        ];
545
546        for (input, expected) in values {
547            let actual = super::max_bytes_to_read(input);
548            assert_eq!(
549                actual, expected,
550                "\n\nexpected max_bytes_to_read({input}) to be {expected}, but it was {actual}"
551            );
552
553            // testing the test:
554            let used_bytes = expected + 4 + format!("{expected:X}").len();
555            assert!(
556                used_bytes == input || used_bytes == input - 1,
557                "\n\nfor an input of {}, expected used bytes to be {} or {}, but was {}",
558                input,
559                input,
560                input - 1,
561                used_bytes
562            );
563        }
564    }
565}