Skip to main content

trillium_http/
received_body.rs

1use crate::{Body, Buffer, Error, Headers, HttpConfig, MutCow, ProtocolSession, copy};
2use Poll::{Pending, Ready};
3use ReceivedBodyState::{Chunked, End, FixedLength, PartialChunkSize, Start};
4use encoding_rs::Encoding;
5use futures_lite::{AsyncRead, AsyncReadExt, AsyncWrite, ready};
6use std::{
7    fmt::{self, Debug, Formatter},
8    io::{self, ErrorKind},
9    pin::Pin,
10    task::{Context, Poll},
11};
12
13mod chunked;
14mod fixed_length;
15mod h2_data;
16mod h3_data;
17
18/// A received http body
19///
20/// This type represents a body that will be read from the underlying transport, which it may either
21/// borrow from a [`Conn`](crate::Conn) or own.
22///
23/// ```rust
24/// # use trillium_testing::HttpTest;
25/// let app = HttpTest::new(|mut conn| async move {
26///     let body = conn.request_body();
27///     let body_string = body.read_string().await.unwrap();
28///     conn.with_response_body(format!("received: {body_string}"))
29/// });
30///
31/// app.get("/").block().assert_body("received: ");
32/// app.post("/")
33///     .with_body("hello")
34///     .block()
35///     .assert_body("received: hello");
36/// ```
37///
38/// ## Bounds checking
39///
40/// Every `ReceivedBody` has a maximum length beyond which it will return an error, expressed as a
41/// u64. To override this on the specific `ReceivedBody`, use [`ReceivedBody::with_max_len`] or
42/// [`ReceivedBody::set_max_len`]
43///
44/// The default maximum length is 10mb; see [`HttpConfig::received_body_max_len`] to configure
45/// this server-wide.
46///
47/// ## Large chunks, small read buffers
48///
49/// Attempting to read a chunked body with a buffer that is shorter than the chunk size in hex will
50/// result in an error.
51#[derive(fieldwork::Fieldwork)]
52pub struct ReceivedBody<'conn, Transport> {
53    /// The content-length of this body, derived from the content-length header.
54    /// `None` for transfer-encoding chunked bodies.
55    ///
56    /// ```rust
57    /// # use trillium_testing::HttpTest;
58    /// HttpTest::new(|mut conn| async move {
59    ///     let body = conn.request_body();
60    ///     assert_eq!(body.content_length(), Some(5));
61    ///     let body_string = body.read_string().await.unwrap();
62    ///     conn.with_status(200)
63    ///         .with_response_body(format!("received: {body_string}"))
64    /// })
65    /// .post("/")
66    /// .with_body("hello")
67    /// .block()
68    /// .assert_ok()
69    /// .assert_body("received: hello");
70    /// ```
71    #[field(get)]
72    content_length: Option<u64>,
73
74    buffer: MutCow<'conn, Buffer>,
75
76    transport: Option<MutCow<'conn, Transport>>,
77
78    state: MutCow<'conn, ReceivedBodyState>,
79
80    on_completion: Option<Box<dyn FnOnce(Transport) + Send + Sync + 'static>>,
81
82    /// the character encoding of this body, usually determined from the content type
83    /// (mime-type) of the associated Conn.
84    #[field(get)]
85    encoding: &'static Encoding,
86
87    /// The maximum length that can be read from this body before error
88    ///
89    /// See also [`HttpConfig::received_body_max_len`]
90    #[field(with, get, set)]
91    max_len: u64,
92
93    /// The initial buffer capacity allocated when reading the body to bytes or a string
94    ///
95    /// See [`HttpConfig::received_body_initial_len`]
96    #[field(with, get, set)]
97    initial_len: usize,
98
99    /// The maximum number of read loops that reading this received body will perform before
100    /// yielding back to the runtime
101    ///
102    /// See [`HttpConfig::copy_loops_per_yield`]
103    #[field(with, get, set)]
104    copy_loops_per_yield: usize,
105
106    /// Maximum size to pre-allocate based on content-length for buffering this received body
107    ///
108    /// See [`HttpConfig::received_body_max_preallocate`]
109    #[field(with, get, set)]
110    max_preallocate: usize,
111
112    max_header_list_size: u64,
113
114    trailers: MutCow<'conn, Option<Headers>>,
115
116    /// Byte offset into `b"HTTP/1.1 100 Continue\r\n\r\n"` that remains to be written before the
117    /// first read. `None` means no pending write.
118    send_100_continue_offset: Option<usize>,
119
120    /// Protocol session this body belongs to; used by the `End` transition to pull
121    /// driver-decoded trailers (h2 synchronously, h3 asynchronously).
122    protocol_session: ProtocolSession,
123
124    /// pending h3 trailer-decode future
125    h3_trailer_future:
126        Option<Pin<Box<dyn Future<Output = io::Result<Headers>> + Send + Sync + 'static>>>,
127}
128
129fn slice_from(min: u64, buf: &[u8]) -> Option<&[u8]> {
130    buf.get(usize::try_from(min).unwrap_or(usize::MAX)..)
131        .filter(|buf| !buf.is_empty())
132}
133
134impl<'conn, Transport> ReceivedBody<'conn, Transport>
135where
136    Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
137{
138    #[allow(missing_docs)]
139    #[doc(hidden)]
140    pub fn new(
141        content_length: Option<u64>,
142        buffer: impl Into<MutCow<'conn, Buffer>>,
143        transport: impl Into<MutCow<'conn, Transport>>,
144        state: impl Into<MutCow<'conn, ReceivedBodyState>>,
145        on_completion: Option<Box<dyn FnOnce(Transport) + Send + Sync + 'static>>,
146        encoding: &'static Encoding,
147    ) -> Self {
148        Self::new_with_config(
149            content_length,
150            buffer,
151            transport,
152            state,
153            on_completion,
154            encoding,
155            &HttpConfig::DEFAULT,
156        )
157    }
158
159    #[allow(missing_docs)]
160    #[doc(hidden)]
161    pub(crate) fn new_with_config(
162        content_length: Option<u64>,
163        buffer: impl Into<MutCow<'conn, Buffer>>,
164        transport: impl Into<MutCow<'conn, Transport>>,
165        state: impl Into<MutCow<'conn, ReceivedBodyState>>,
166        on_completion: Option<Box<dyn FnOnce(Transport) + Send + Sync + 'static>>,
167        encoding: &'static Encoding,
168        config: &HttpConfig,
169    ) -> Self {
170        Self {
171            content_length,
172            buffer: buffer.into(),
173            transport: Some(transport.into()),
174            state: state.into(),
175            on_completion,
176            encoding,
177            max_len: config.received_body_max_len,
178            initial_len: config.received_body_initial_len,
179            copy_loops_per_yield: config.copy_loops_per_yield,
180            max_preallocate: config.received_body_max_preallocate,
181            max_header_list_size: config.max_header_list_size,
182            trailers: None.into(),
183            send_100_continue_offset: None,
184            protocol_session: ProtocolSession::Http1,
185            h3_trailer_future: None,
186        }
187    }
188
189    /// Sets the destination for trailers decoded from the request body.
190    ///
191    /// When the body is fully read, any trailers will be written to the provided storage.
192    #[doc(hidden)]
193    #[must_use]
194    pub fn with_trailers(mut self, trailers: impl Into<MutCow<'conn, Option<Headers>>>) -> Self {
195        self.trailers = trailers.into();
196        self
197    }
198
199    /// Associate this body with the [`ProtocolSession`] that produced it. The End
200    /// transition of the body state machine consults this to pull driver-decoded
201    /// trailers into [`Conn::request_trailers`][crate::Conn] (h2 synchronously,
202    /// h3 via a boxed future). For h1 bodies the session is
203    /// [`ProtocolSession::Http1`] and no trailer-driver hook fires.
204    #[doc(hidden)]
205    #[must_use]
206    #[cfg(feature = "unstable")]
207    pub fn with_protocol_session(mut self, protocol_session: ProtocolSession) -> Self {
208        self.protocol_session = protocol_session;
209        self
210    }
211
212    #[doc(hidden)]
213    #[must_use]
214    #[cfg(not(feature = "unstable"))]
215    pub(crate) fn with_protocol_session(mut self, protocol_session: ProtocolSession) -> Self {
216        self.protocol_session = protocol_session;
217        self
218    }
219
220    /// Arranges for `HTTP/1.1 100 Continue\r\n\r\n` to be written to the transport before the
221    /// first body read. Used to implement lazy 100-continue for HTTP/1.1 request bodies.
222    #[must_use]
223    pub(crate) fn with_send_100_continue(mut self) -> Self {
224        self.send_100_continue_offset = Some(0);
225        self
226    }
227
228    /// # Reads entire body to String.
229    ///
230    /// This uses the encoding determined by the content-type (mime) charset. If an
231    /// encoding problem is encountered, the returned String will contain utf8
232    /// replacement characters.
233    ///
234    /// Can only be performed once per Conn — the body bytes are not cached.
235    ///
236    /// # Errors
237    ///
238    /// This will return an error if there is an IO error on the
239    /// underlying transport such as a disconnect
240    ///
241    /// This will also return an error if the length exceeds the maximum length. To override this
242    /// value on this specific body, use [`ReceivedBody::with_max_len`] or
243    /// [`ReceivedBody::set_max_len`]
244    pub async fn read_string(self) -> crate::Result<String> {
245        let encoding = self.encoding();
246        let bytes = self.read_bytes().await?;
247        let (s, _, _) = encoding.decode(&bytes);
248        Ok(s.to_string())
249    }
250
251    fn owns_transport(&self) -> bool {
252        self.transport.as_ref().is_some_and(MutCow::is_owned)
253    }
254
255    /// Similar to [`ReceivedBody::read_string`], but returns the raw bytes. This is useful for
256    /// bodies that are not text.
257    ///
258    /// You can use this in conjunction with `encoding` if you need different handling of malformed
259    /// character encoding than the lossy conversion provided by [`ReceivedBody::read_string`].
260    ///
261    /// # Errors
262    ///
263    /// This will return an error if there is an IO error on the underlying transport such as a
264    /// disconnect
265    ///
266    /// This will also return an error if the length exceeds
267    /// [`received_body_max_len`][HttpConfig::with_received_body_max_len]. To override this value on
268    /// this specific body, use [`ReceivedBody::with_max_len`] or [`ReceivedBody::set_max_len`]
269    pub async fn read_bytes(mut self) -> crate::Result<Vec<u8>> {
270        let mut vec = if let Some(len) = self.content_length {
271            if len > self.max_len {
272                return Err(Error::ReceivedBodyTooLong(self.max_len));
273            }
274
275            let len = usize::try_from(len).map_err(|_| Error::ReceivedBodyTooLong(self.max_len))?;
276
277            Vec::with_capacity(len.min(self.max_preallocate))
278        } else {
279            Vec::with_capacity(self.initial_len)
280        };
281
282        self.read_to_end(&mut vec).await?;
283        Ok(vec)
284    }
285
286    fn read_raw(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
287        if let Some(transport) = self.transport.as_deref_mut() {
288            read_buffered(&mut self.buffer, transport, cx, buf)
289        } else {
290            Ready(Err(ErrorKind::NotConnected.into()))
291        }
292    }
293
294    /// Consumes the remainder of this body from the underlying transport by reading it to the end
295    /// and discarding the contents. This is important for http1.1 keepalive, but most of the
296    /// time you do not need to directly call this. It returns the number of bytes consumed.
297    ///
298    /// # Errors
299    ///
300    /// This will return an [`std::io::Result::Err`] if there is an io error on the underlying
301    /// transport, such as a disconnect
302    #[allow(
303        clippy::missing_errors_doc,
304        reason = "errors are documented above; clippy doesn't detect the section"
305    )]
306    pub async fn drain(self) -> io::Result<u64> {
307        let copy_loops_per_yield = self.copy_loops_per_yield;
308        copy(self, futures_lite::io::sink(), copy_loops_per_yield).await
309    }
310}
311
312impl<T> ReceivedBody<'static, T> {
313    /// takes the static transport from this received body
314    pub fn take_transport(&mut self) -> Option<T> {
315        self.transport.take().map(MutCow::unwrap_owned)
316    }
317
318    #[doc(hidden)]
319    #[cfg(feature = "unstable")]
320    pub fn state(&self) -> ReceivedBodyState {
321        *self.state
322    }
323}
324
325impl<T> ReceivedBody<'_, T> {
326    /// Retype as `ReceivedBody<'static, T>` if every internal `MutCow` field is `Owned`.
327    ///
328    /// Returns `None` if any field is `Borrowed`, in which case `self` is dropped — the
329    /// borrows can't be extended, and there's no useful way to hand a half-destructured
330    /// body back. For callers whose runtime invariants guarantee ownership but whose
331    /// type-level `'a` parameter the compiler can't see is `'static`.
332    #[doc(hidden)]
333    #[cfg(feature = "unstable")]
334    pub fn try_into_owned(self) -> Option<ReceivedBody<'static, T>> {
335        let Self {
336            content_length,
337            buffer,
338            transport,
339            state,
340            on_completion,
341            encoding,
342            max_len,
343            initial_len,
344            copy_loops_per_yield,
345            max_preallocate,
346            max_header_list_size,
347            trailers,
348            send_100_continue_offset,
349            protocol_session,
350            h3_trailer_future,
351        } = self;
352
353        let transport = match transport {
354            None => None,
355            Some(t) => Some(t.try_into_owned()?),
356        };
357
358        Some(ReceivedBody {
359            content_length,
360            buffer: buffer.try_into_owned()?,
361            transport,
362            state: state.try_into_owned()?,
363            on_completion,
364            encoding,
365            max_len,
366            initial_len,
367            copy_loops_per_yield,
368            max_preallocate,
369            max_header_list_size,
370            trailers: trailers.try_into_owned()?,
371            send_100_continue_offset,
372            protocol_session,
373            h3_trailer_future,
374        })
375    }
376}
377
378pub(crate) fn read_buffered<Transport>(
379    buffer: &mut Buffer,
380    transport: &mut Transport,
381    cx: &mut Context<'_>,
382    buf: &mut [u8],
383) -> Poll<io::Result<usize>>
384where
385    Transport: AsyncRead + Unpin,
386{
387    if buffer.is_empty() {
388        Pin::new(transport).poll_read(cx, buf)
389    } else if buffer.len() >= buf.len() {
390        let len = buf.len();
391        buf.copy_from_slice(&buffer[..len]);
392        buffer.ignore_front(len);
393        Ready(Ok(len))
394    } else {
395        let self_buffer_len = buffer.len();
396        buf[..self_buffer_len].copy_from_slice(buffer);
397        buffer.truncate(0);
398        match Pin::new(transport).poll_read(cx, &mut buf[self_buffer_len..]) {
399            Ready(Ok(additional)) => Ready(Ok(additional + self_buffer_len)),
400            Pending => Ready(Ok(self_buffer_len)),
401            other @ Ready(_) => other,
402        }
403    }
404}
405
406type StateOutput = Poll<io::Result<(ReceivedBodyState, usize)>>;
407
408impl<Transport> ReceivedBody<'_, Transport>
409where
410    Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
411{
412    #[inline]
413    fn handle_start(&mut self) -> StateOutput {
414        Ready(Ok((
415            match self.content_length {
416                Some(0) => End,
417
418                Some(total_length) if total_length <= self.max_len => FixedLength {
419                    current_index: 0,
420                    total: total_length,
421                },
422
423                Some(_) => {
424                    return Ready(Err(io::Error::new(
425                        ErrorKind::Unsupported,
426                        "content too long",
427                    )));
428                }
429
430                None => Chunked {
431                    remaining: 0,
432                    total: 0,
433                },
434            },
435            0,
436        )))
437    }
438}
439
440impl<Transport> AsyncRead for ReceivedBody<'_, Transport>
441where
442    Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
443{
444    fn poll_read(
445        mut self: Pin<&mut Self>,
446        cx: &mut Context<'_>,
447        buf: &mut [u8],
448    ) -> Poll<io::Result<usize>> {
449        const CONTINUE: &[u8] = b"HTTP/1.1 100 Continue\r\n\r\n";
450        while let Some(offset) = self.send_100_continue_offset {
451            let n = {
452                let Some(transport) = self.transport.as_deref_mut() else {
453                    return Ready(Err(ErrorKind::NotConnected.into()));
454                };
455                if offset == 0 {
456                    log::trace!("sending 100-continue");
457                }
458                ready!(Pin::new(transport).poll_write(cx, &CONTINUE[offset..]))?
459            };
460            if n == 0 {
461                return Ready(Err(ErrorKind::WriteZero.into()));
462            }
463            let new_offset = offset + n;
464            self.send_100_continue_offset = if new_offset >= CONTINUE.len() {
465                None
466            } else {
467                Some(new_offset)
468            };
469        }
470
471        for _ in 0..self.copy_loops_per_yield {
472            let (new_body_state, bytes) = ready!(match *self.state {
473                Start => self.handle_start(),
474                Chunked { remaining, total } => self.handle_chunked(cx, buf, remaining, total),
475                PartialChunkSize { total } => self.handle_partial(cx, buf, total),
476                FixedLength {
477                    current_index,
478                    total,
479                } => self.handle_fixed_length(cx, buf, current_index, total),
480                ReceivedBodyState::H2Data { total } => self.handle_h2_data(cx, buf, total),
481                ReceivedBodyState::H3Data {
482                    remaining_in_frame,
483                    total,
484                    frame_type,
485                    partial_frame_header,
486                } => self.handle_h3_data(
487                    cx,
488                    buf,
489                    remaining_in_frame,
490                    total,
491                    frame_type,
492                    partial_frame_header,
493                ),
494                ReceivedBodyState::ReadingH1Trailers { total } => {
495                    self.handle_reading_h1_trailers(cx, buf, total)
496                }
497                End => Ready(Ok((End, 0))),
498            })?;
499
500            *self.state = new_body_state;
501
502            if *self.state == End {
503                if bytes == 0
504                    && let Some(h3_trailer_future) = &mut self.h3_trailer_future
505                {
506                    let trailers = ready!(h3_trailer_future.as_mut().poll(cx))?;
507                    *self.trailers = Some(trailers);
508                    self.h3_trailer_future = None;
509                }
510
511                // h2 trailers are stashed on the per-stream `StreamState` before EOF, so
512                // they're already present at `End` — no boxed future needed. Replacing
513                // the session with `Http1` makes subsequent `End` re-entries idempotent.
514                if bytes == 0
515                    && let Some((h2_connection, stream_id)) =
516                        std::mem::replace(&mut self.protocol_session, ProtocolSession::Http1)
517                            .as_h2()
518                    && let Some(trailers) = h2_connection.take_trailers(stream_id)
519                {
520                    *self.trailers = Some(trailers);
521                }
522
523                if self.on_completion.is_some() && self.owns_transport() {
524                    let transport = self.transport.take().unwrap().unwrap_owned();
525                    let on_completion = self.on_completion.take().unwrap();
526                    on_completion(transport);
527                }
528                return Ready(Ok(bytes));
529            } else if bytes != 0 {
530                return Ready(Ok(bytes));
531            }
532        }
533
534        cx.waker().wake_by_ref();
535        Pending
536    }
537}
538
539impl<Transport> crate::BodySource for ReceivedBody<'static, Transport>
540where
541    Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
542{
543    fn trailers(self: Pin<&mut Self>) -> Option<Headers> {
544        self.get_mut().trailers.take()
545    }
546}
547
548impl<Transport> Debug for ReceivedBody<'_, Transport> {
549    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
550        f.debug_struct("ReceivedBody")
551            .field("state", &*self.state)
552            .field("content_length", &self.content_length)
553            .field("buffer", &format_args!(".."))
554            .field("on_completion", &self.on_completion.is_some())
555            .finish()
556    }
557}
558
559/// The type of H3 frame currently being processed in [`ReceivedBodyState::H3Data`].
560#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
561#[allow(missing_docs)]
562#[doc(hidden)]
563pub enum H3BodyFrameType {
564    /// Initial state — no frame decoded yet.
565    #[default]
566    Start,
567    /// Inside a DATA frame — body bytes to keep.
568    Data,
569    /// Inside an unknown frame — payload bytes to discard.
570    Unknown,
571    /// Inside a trailing HEADERS frame — accumulate into buffer for parsing.
572    Trailers,
573}
574
575#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
576#[allow(missing_docs)]
577#[doc(hidden)]
578pub enum ReceivedBodyState {
579    /// initial state
580    #[default]
581    Start,
582
583    /// read state for a chunked-encoded body. the number of bytes that have been read from the
584    /// current chunk is the difference between remaining and total.
585    Chunked {
586        /// remaining indicates the bytes left _in the current
587        /// chunk_. initial state is zero.
588        remaining: u64,
589
590        /// total indicates the absolute number of bytes read from all chunks
591        total: u64,
592    },
593
594    /// read state when we have buffered content between subsequent polls because chunk framing
595    /// overlapped a buffer boundary
596    PartialChunkSize { total: u64 },
597
598    /// read state for a fixed-length body.
599    FixedLength {
600        /// current index represents the bytes that have already been
601        /// read. initial state is zero
602        current_index: u64,
603
604        /// total length indicates the claimed length, usually
605        /// determined by the content-length header
606        total: u64,
607    },
608
609    /// read state for an H2 body. The h2 driver demuxes DATA frames into a per-stream recv
610    /// ring on a separate task before we see them, so there's no frame-boundary state here —
611    /// just a running byte total for `max_len` / content-length enforcement. Transitions to
612    /// [`End`] when the transport yields `Ready(0)`. Initial state for any h2 body.
613    H2Data {
614        /// total body bytes read across all DATA frames.
615        total: u64,
616    },
617
618    /// read state for an H3 body framed as DATA frames.
619    H3Data {
620        /// bytes remaining in the current frame (DATA, Unknown, or Trailers). zero means we need
621        /// to read the next frame header.
622        remaining_in_frame: u64,
623
624        /// total body bytes read across all DATA frames.
625        total: u64,
626
627        /// what kind of frame we're currently inside.
628        frame_type: H3BodyFrameType,
629
630        /// when true, a partial frame header is sitting in `self.buffer` and needs more bytes
631        /// before we can decode it.
632        partial_frame_header: bool,
633    },
634
635    /// accumulating the HTTP/1.1 chunked trailer-section after the last-chunk (`0\r\n`).
636    ///
637    /// The trailer bytes (including any partially-received trailer headers) live in
638    /// `ReceivedBody::buffer` until a final empty line (`\r\n\r\n` or bare `\r\n`) is found.
639    ReadingH1Trailers {
640        /// total body bytes read across all chunks (for bounds-checking)
641        total: u64,
642    },
643
644    /// the terminal read state
645    End,
646}
647
648impl ReceivedBodyState {
649    pub fn new_h2() -> Self {
650        Self::H2Data { total: 0 }
651    }
652
653    pub fn new_h3() -> Self {
654        Self::H3Data {
655            remaining_in_frame: 0,
656            total: 0,
657            frame_type: H3BodyFrameType::Start,
658            partial_frame_header: false,
659        }
660    }
661}
662
663impl<Transport> From<ReceivedBody<'static, Transport>> for Body
664where
665    Transport: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
666{
667    fn from(rb: ReceivedBody<'static, Transport>) -> Self {
668        let len = rb.content_length;
669        Body::new_with_trailers(rb, len)
670    }
671}