Skip to main content

trillium_http/h3/
body_wrapper.rs

1use crate::{Body, Headers, body::BodyType, h3::Frame};
2use futures_lite::{AsyncRead, ready};
3use std::{io, pin::Pin, task::Poll};
4
5/// h3 view over a [`Body`] that prepends HTTP/3 DATA frame headers (RFC 9114 §7.2.1)
6/// to the payload yielded by the inner [`BodyType`].
7///
8/// h3 frames DATA inline with the body bytes (unlike h2, which frames at the driver layer
9/// in the send pump): each `poll_read` writes a varint-length DATA frame header followed
10/// by payload into the caller's buffer. Known-length bodies emit one frame whose payload
11/// spans the whole body and stream into it across polls; unknown-length bodies emit one
12/// frame per `poll_read` (the per-frame length must be known when the header is written,
13/// so we can't open a single frame ahead of time).
14///
15/// Mirrors [`H2Body`][crate::h2::H2Body] in role — both peel off the chunked-transfer
16/// wrapping that [`Body::poll_read`] applies for the h1 path on streaming bodies of
17/// unknown length — and adds the h3-specific DATA framing on top.
18#[derive(Debug)]
19pub struct H3Body {
20    body: BodyType,
21    /// Whether the single DATA frame header has been written. Only meaningful for
22    /// known-length bodies, which open one frame whose payload spans the whole body
23    /// and stream into it across polls. Stays false for unknown-length bodies (each
24    /// poll opens a new frame, so there is no persistent "header already written" state).
25    header_written: bool,
26}
27
28impl From<BodyType> for H3Body {
29    fn from(body: BodyType) -> Self {
30        Self {
31            body,
32            header_written: false,
33        }
34    }
35}
36
37impl H3Body {
38    pub(crate) fn new(body: Body) -> Self {
39        body.0.into()
40    }
41
42    /// Returns trailers from the body source, if any.
43    ///
44    /// Only meaningful after the body has been fully read (`done == true`).
45    pub fn trailers(&mut self) -> Option<Headers> {
46        match &mut self.body {
47            BodyType::Streaming {
48                async_read, done, ..
49            } if *done => async_read.get_mut().as_mut().trailers(),
50            _ => None,
51        }
52    }
53}
54
55impl AsyncRead for H3Body {
56    fn poll_read(
57        self: Pin<&mut Self>,
58        cx: &mut std::task::Context<'_>,
59        buf: &mut [u8],
60    ) -> Poll<io::Result<usize>> {
61        let this = self.get_mut();
62        // Each branch encodes the body as one or more HTTP/3 DATA frames (RFC 9114 §7.2.1).
63        // Known-length bodies (Static, Streaming { len: Some(_) }) emit one DATA frame whose
64        // payload length spans the whole body; later polls deliver more bytes into that
65        // already-opened frame. Unknown-length bodies (Streaming { len: None }) emit one
66        // DATA frame per poll — the per-frame length must be known when the header is
67        // written, so we can't open a single frame ahead of time.
68        match &mut this.body {
69            BodyType::Empty => Poll::Ready(Ok(0)),
70
71            BodyType::Static { content, cursor } => {
72                let remaining = content.len() - *cursor;
73                if remaining == 0 {
74                    return Poll::Ready(Ok(0));
75                }
76
77                let mut written = 0;
78                if !this.header_written {
79                    let frame = Frame::Data(remaining as u64);
80                    written += frame.encode(buf).ok_or_else(|| {
81                        io::Error::new(
82                            io::ErrorKind::WriteZero,
83                            "buffer too small for frame header",
84                        )
85                    })?;
86                    this.header_written = true;
87                }
88
89                let bytes = remaining.min(buf.len() - written);
90                buf[written..written + bytes].copy_from_slice(&content[*cursor..*cursor + bytes]);
91                *cursor += bytes;
92                Poll::Ready(Ok(written + bytes))
93            }
94
95            BodyType::Streaming {
96                async_read,
97                len: Some(len),
98                done,
99                progress,
100            } => {
101                if *done {
102                    return Poll::Ready(Ok(0));
103                }
104
105                let header_len = if this.header_written {
106                    0
107                } else {
108                    Frame::Data(*len).encoded_len()
109                };
110
111                let max_bytes = (*len - *progress)
112                    .try_into()
113                    .unwrap_or(buf.len() - header_len)
114                    .min(buf.len() - header_len);
115
116                let bytes = ready!(
117                    async_read
118                        .get_mut()
119                        .as_mut()
120                        .poll_read(cx, &mut buf[header_len..header_len + max_bytes])
121                )?;
122
123                if !this.header_written {
124                    Frame::Data(*len).encode(buf);
125                    this.header_written = true;
126                }
127
128                if bytes == 0 {
129                    *done = true;
130                } else {
131                    *progress += bytes as u64;
132                }
133
134                Poll::Ready(Ok(header_len + bytes))
135            }
136
137            BodyType::Streaming {
138                async_read,
139                len: None,
140                done,
141                progress,
142            } => {
143                if *done {
144                    return Poll::Ready(Ok(0));
145                }
146
147                let reserved = Frame::Data(buf.len() as u64).encoded_len();
148                if buf.len() <= reserved {
149                    return Poll::Ready(Err(io::Error::new(
150                        io::ErrorKind::WriteZero,
151                        "buffer too small for DATA frame",
152                    )));
153                }
154
155                let bytes = ready!(
156                    async_read
157                        .get_mut()
158                        .as_mut()
159                        .poll_read(cx, &mut buf[reserved..])
160                )?;
161
162                if bytes == 0 {
163                    *done = true;
164                    return Poll::Ready(Ok(0));
165                }
166
167                *progress += bytes as u64;
168
169                let frame = Frame::Data(bytes as u64);
170                let header_len = frame.encode(buf).unwrap();
171                if header_len < reserved {
172                    buf.copy_within(reserved..reserved + bytes, header_len);
173                }
174
175                Poll::Ready(Ok(header_len + bytes))
176            }
177        }
178    }
179}