trillium_http/h3/
body_wrapper.rs1use crate::{Body, Headers, body::BodyType, h3::Frame};
2use futures_lite::{AsyncRead, ready};
3use std::{io, pin::Pin, task::Poll};
4
5#[derive(Debug)]
13pub struct H3Body {
14 body: BodyType,
15 header_written: bool,
18}
19
20impl From<BodyType> for H3Body {
21 fn from(body: BodyType) -> Self {
22 Self {
23 body,
24 header_written: false,
25 }
26 }
27}
28
29impl H3Body {
30 pub(crate) fn new(body: Body) -> Self {
31 body.0.into()
32 }
33
34 pub fn trailers(&mut self) -> Option<Headers> {
38 match &mut self.body {
39 BodyType::Streaming {
40 async_read, done, ..
41 } if *done => async_read.get_mut().as_mut().trailers(),
42 _ => None,
43 }
44 }
45}
46
47impl AsyncRead for H3Body {
48 fn poll_read(
49 self: Pin<&mut Self>,
50 cx: &mut std::task::Context<'_>,
51 buf: &mut [u8],
52 ) -> Poll<io::Result<usize>> {
53 let this = self.get_mut();
54 match &mut this.body {
55 BodyType::Empty => Poll::Ready(Ok(0)),
56
57 BodyType::Static { content, cursor } => {
58 let remaining = content.len() - *cursor;
59 if remaining == 0 {
60 return Poll::Ready(Ok(0));
61 }
62
63 let mut written = 0;
64 if !this.header_written {
65 let frame = Frame::Data(remaining as u64);
66 written += frame.encode(buf).ok_or_else(|| {
67 io::Error::new(
68 io::ErrorKind::WriteZero,
69 "buffer too small for frame header",
70 )
71 })?;
72 this.header_written = true;
73 }
74
75 let bytes = remaining.min(buf.len() - written);
76 buf[written..written + bytes].copy_from_slice(&content[*cursor..*cursor + bytes]);
77 *cursor += bytes;
78 Poll::Ready(Ok(written + bytes))
79 }
80
81 BodyType::Streaming {
82 async_read,
83 len: Some(len),
84 done,
85 progress,
86 ..
87 } => {
88 if *done {
89 return Poll::Ready(Ok(0));
90 }
91
92 let header_len = if this.header_written {
93 0
94 } else {
95 Frame::Data(*len).encoded_len()
96 };
97
98 let max_bytes = (*len - *progress)
99 .try_into()
100 .unwrap_or(buf.len() - header_len)
101 .min(buf.len() - header_len);
102
103 let bytes = ready!(
104 async_read
105 .get_mut()
106 .as_mut()
107 .poll_read(cx, &mut buf[header_len..header_len + max_bytes])
108 )?;
109
110 if !this.header_written {
111 Frame::Data(*len).encode(buf);
112 this.header_written = true;
113 }
114
115 if bytes == 0 {
116 *done = true;
117 } else {
118 *progress += bytes as u64;
119 }
120
121 Poll::Ready(Ok(header_len + bytes))
122 }
123
124 BodyType::Streaming {
125 async_read,
126 len: None,
127 done,
128 progress,
129 ..
130 } => {
131 if *done {
132 return Poll::Ready(Ok(0));
133 }
134
135 let reserved = Frame::Data(buf.len() as u64).encoded_len();
136 if buf.len() <= reserved {
137 return Poll::Ready(Err(io::Error::new(
138 io::ErrorKind::WriteZero,
139 "buffer too small for DATA frame",
140 )));
141 }
142
143 let bytes = ready!(
144 async_read
145 .get_mut()
146 .as_mut()
147 .poll_read(cx, &mut buf[reserved..])
148 )?;
149
150 if bytes == 0 {
151 *done = true;
152 return Poll::Ready(Ok(0));
153 }
154
155 *progress += bytes as u64;
156
157 let frame = Frame::Data(bytes as u64);
158 let header_len = frame.encode(buf).unwrap();
159 if header_len < reserved {
160 buf.copy_within(reserved..reserved + bytes, header_len);
161 }
162
163 Poll::Ready(Ok(header_len + bytes))
164 }
165 }
166 }
167}