trillium_http/h3/body_wrapper.rs
1use crate::{Body, Headers, body::BodyType, h3::Frame};
2use futures_lite::{AsyncRead, ready};
3use std::{io, pin::Pin, task::Poll};
4
5/// h3 view over a [`Body`] that prepends HTTP/3 DATA frame headers (RFC 9114 §7.2.1)
6/// to the payload yielded by the inner [`BodyType`].
7///
8/// h3 frames DATA inline with the body bytes (unlike h2, which frames at the driver layer
9/// in the send pump): each `poll_read` writes a varint-length DATA frame header followed
10/// by payload into the caller's buffer. Known-length bodies emit one frame whose payload
11/// spans the whole body and stream into it across polls; unknown-length bodies emit one
12/// frame per `poll_read` (the per-frame length must be known when the header is written,
13/// so we can't open a single frame ahead of time).
14///
15/// Mirrors [`H2Body`][crate::h2::H2Body] in role — both peel off the chunked-transfer
16/// wrapping that [`Body::poll_read`] applies for the h1 path on streaming bodies of
17/// unknown length — and adds the h3-specific DATA framing on top.
18#[derive(Debug)]
19pub struct H3Body {
20 body: BodyType,
21 /// Whether the single DATA frame header has been written. Only meaningful for
22 /// known-length bodies, which open one frame whose payload spans the whole body
23 /// and stream into it across polls. Stays false for unknown-length bodies (each
24 /// poll opens a new frame, so there is no persistent "header already written" state).
25 header_written: bool,
26}
27
28impl From<BodyType> for H3Body {
29 fn from(body: BodyType) -> Self {
30 Self {
31 body,
32 header_written: false,
33 }
34 }
35}
36
37impl H3Body {
38 pub(crate) fn new(body: Body) -> Self {
39 body.0.into()
40 }
41
42 /// Returns trailers from the body source, if any.
43 ///
44 /// Only meaningful after the body has been fully read (`done == true`).
45 pub fn trailers(&mut self) -> Option<Headers> {
46 match &mut self.body {
47 BodyType::Streaming {
48 async_read, done, ..
49 } if *done => async_read.get_mut().as_mut().trailers(),
50 _ => None,
51 }
52 }
53}
54
55impl AsyncRead for H3Body {
56 fn poll_read(
57 self: Pin<&mut Self>,
58 cx: &mut std::task::Context<'_>,
59 buf: &mut [u8],
60 ) -> Poll<io::Result<usize>> {
61 let this = self.get_mut();
62 // Each branch encodes the body as one or more HTTP/3 DATA frames (RFC 9114 §7.2.1).
63 // Known-length bodies (Static, Streaming { len: Some(_) }) emit one DATA frame whose
64 // payload length spans the whole body; later polls deliver more bytes into that
65 // already-opened frame. Unknown-length bodies (Streaming { len: None }) emit one
66 // DATA frame per poll — the per-frame length must be known when the header is
67 // written, so we can't open a single frame ahead of time.
68 match &mut this.body {
69 BodyType::Empty => Poll::Ready(Ok(0)),
70
71 BodyType::Static { content, cursor } => {
72 let remaining = content.len() - *cursor;
73 if remaining == 0 {
74 return Poll::Ready(Ok(0));
75 }
76
77 let mut written = 0;
78 if !this.header_written {
79 let frame = Frame::Data(remaining as u64);
80 written += frame.encode(buf).ok_or_else(|| {
81 io::Error::new(
82 io::ErrorKind::WriteZero,
83 "buffer too small for frame header",
84 )
85 })?;
86 this.header_written = true;
87 }
88
89 let bytes = remaining.min(buf.len() - written);
90 buf[written..written + bytes].copy_from_slice(&content[*cursor..*cursor + bytes]);
91 *cursor += bytes;
92 Poll::Ready(Ok(written + bytes))
93 }
94
95 BodyType::Streaming {
96 async_read,
97 len: Some(len),
98 done,
99 progress,
100 } => {
101 if *done {
102 return Poll::Ready(Ok(0));
103 }
104
105 let header_len = if this.header_written {
106 0
107 } else {
108 Frame::Data(*len).encoded_len()
109 };
110
111 let max_bytes = (*len - *progress)
112 .try_into()
113 .unwrap_or(buf.len() - header_len)
114 .min(buf.len() - header_len);
115
116 let bytes = ready!(
117 async_read
118 .get_mut()
119 .as_mut()
120 .poll_read(cx, &mut buf[header_len..header_len + max_bytes])
121 )?;
122
123 if !this.header_written {
124 Frame::Data(*len).encode(buf);
125 this.header_written = true;
126 }
127
128 if bytes == 0 {
129 *done = true;
130 } else {
131 *progress += bytes as u64;
132 }
133
134 Poll::Ready(Ok(header_len + bytes))
135 }
136
137 BodyType::Streaming {
138 async_read,
139 len: None,
140 done,
141 progress,
142 } => {
143 if *done {
144 return Poll::Ready(Ok(0));
145 }
146
147 let reserved = Frame::Data(buf.len() as u64).encoded_len();
148 if buf.len() <= reserved {
149 return Poll::Ready(Err(io::Error::new(
150 io::ErrorKind::WriteZero,
151 "buffer too small for DATA frame",
152 )));
153 }
154
155 let bytes = ready!(
156 async_read
157 .get_mut()
158 .as_mut()
159 .poll_read(cx, &mut buf[reserved..])
160 )?;
161
162 if bytes == 0 {
163 *done = true;
164 return Poll::Ready(Ok(0));
165 }
166
167 *progress += bytes as u64;
168
169 let frame = Frame::Data(bytes as u64);
170 let header_len = frame.encode(buf).unwrap();
171 if header_len < reserved {
172 buf.copy_within(reserved..reserved + bytes, header_len);
173 }
174
175 Poll::Ready(Ok(header_len + bytes))
176 }
177 }
178 }
179}