Skip to main content

trillium_http/h3/
body_wrapper.rs

1use crate::{Body, Headers, body::BodyType, h3::Frame};
2use futures_lite::{AsyncRead, ready};
3use std::{io, pin::Pin, task::Poll};
4
5/// h3 view over a [`Body`] that prepends HTTP/3 DATA frame headers to the payload.
6///
7/// Each `poll_read` writes a varint-length DATA frame header followed by payload into
8/// the caller's buffer. Known-length bodies open one frame whose payload spans the
9/// whole body and stream into it across polls; unknown-length bodies emit one frame
10/// per `poll_read` — the per-frame length must be known when the header is written,
11/// so a single frame can't be opened ahead of time.
12#[derive(Debug)]
13pub struct H3Body {
14    body: BodyType,
15    /// True once the single DATA frame header has been emitted for a known-length body.
16    /// Unused for unknown-length bodies, which open a new frame per `poll_read`.
17    header_written: bool,
18}
19
20impl From<BodyType> for H3Body {
21    fn from(body: BodyType) -> Self {
22        Self {
23            body,
24            header_written: false,
25        }
26    }
27}
28
29impl H3Body {
30    pub(crate) fn new(body: Body) -> Self {
31        body.0.into()
32    }
33
34    /// Returns trailers from the body source, if any.
35    ///
36    /// Only meaningful after the body has been fully read (`done == true`).
37    pub fn trailers(&mut self) -> Option<Headers> {
38        match &mut self.body {
39            BodyType::Streaming {
40                async_read, done, ..
41            } if *done => async_read.get_mut().as_mut().trailers(),
42            _ => None,
43        }
44    }
45}
46
47impl AsyncRead for H3Body {
48    fn poll_read(
49        self: Pin<&mut Self>,
50        cx: &mut std::task::Context<'_>,
51        buf: &mut [u8],
52    ) -> Poll<io::Result<usize>> {
53        let this = self.get_mut();
54        match &mut this.body {
55            BodyType::Empty => Poll::Ready(Ok(0)),
56
57            BodyType::Static { content, cursor } => {
58                let remaining = content.len() - *cursor;
59                if remaining == 0 {
60                    return Poll::Ready(Ok(0));
61                }
62
63                let mut written = 0;
64                if !this.header_written {
65                    let frame = Frame::Data(remaining as u64);
66                    written += frame.encode(buf).ok_or_else(|| {
67                        io::Error::new(
68                            io::ErrorKind::WriteZero,
69                            "buffer too small for frame header",
70                        )
71                    })?;
72                    this.header_written = true;
73                }
74
75                let bytes = remaining.min(buf.len() - written);
76                buf[written..written + bytes].copy_from_slice(&content[*cursor..*cursor + bytes]);
77                *cursor += bytes;
78                Poll::Ready(Ok(written + bytes))
79            }
80
81            BodyType::Streaming {
82                async_read,
83                len: Some(len),
84                done,
85                progress,
86                ..
87            } => {
88                if *done {
89                    return Poll::Ready(Ok(0));
90                }
91
92                let header_len = if this.header_written {
93                    0
94                } else {
95                    Frame::Data(*len).encoded_len()
96                };
97
98                let max_bytes = (*len - *progress)
99                    .try_into()
100                    .unwrap_or(buf.len() - header_len)
101                    .min(buf.len() - header_len);
102
103                let bytes = ready!(
104                    async_read
105                        .get_mut()
106                        .as_mut()
107                        .poll_read(cx, &mut buf[header_len..header_len + max_bytes])
108                )?;
109
110                if !this.header_written {
111                    Frame::Data(*len).encode(buf);
112                    this.header_written = true;
113                }
114
115                if bytes == 0 {
116                    *done = true;
117                } else {
118                    *progress += bytes as u64;
119                }
120
121                Poll::Ready(Ok(header_len + bytes))
122            }
123
124            BodyType::Streaming {
125                async_read,
126                len: None,
127                done,
128                progress,
129                ..
130            } => {
131                if *done {
132                    return Poll::Ready(Ok(0));
133                }
134
135                let reserved = Frame::Data(buf.len() as u64).encoded_len();
136                if buf.len() <= reserved {
137                    return Poll::Ready(Err(io::Error::new(
138                        io::ErrorKind::WriteZero,
139                        "buffer too small for DATA frame",
140                    )));
141                }
142
143                let bytes = ready!(
144                    async_read
145                        .get_mut()
146                        .as_mut()
147                        .poll_read(cx, &mut buf[reserved..])
148                )?;
149
150                if bytes == 0 {
151                    *done = true;
152                    return Poll::Ready(Ok(0));
153                }
154
155                *progress += bytes as u64;
156
157                let frame = Frame::Data(bytes as u64);
158                let header_len = frame.encode(buf).unwrap();
159                if header_len < reserved {
160                    buf.copy_within(reserved..reserved + bytes, header_len);
161                }
162
163                Poll::Ready(Ok(header_len + bytes))
164            }
165        }
166    }
167}