1use crate::{Headers, h2::H2Body, h3::H3Body};
2use BodyType::{Empty, Static, Streaming};
3use futures_lite::{AsyncRead, AsyncReadExt, io::Cursor, ready};
4use pin_project_lite::pin_project;
5use std::{
6 borrow::Cow,
7 fmt::{self, Debug, Formatter},
8 io::{Error, Result},
9 pin::Pin,
10 task::{Context, Poll},
11};
12use sync_wrapper::SyncWrapper;
13
14pub trait BodySource: AsyncRead + Send + 'static {
21 fn trailers(self: Pin<&mut Self>) -> Option<Headers>;
26}
27
28pin_project! {
29 struct PlainBody<T> {
30 #[pin]
31 async_read: T,
32 }
33}
34
35impl<T: AsyncRead> AsyncRead for PlainBody<T> {
36 fn poll_read(
37 self: Pin<&mut Self>,
38 cx: &mut Context<'_>,
39 buf: &mut [u8],
40 ) -> Poll<Result<usize>> {
41 self.project().async_read.poll_read(cx, buf)
42 }
43}
44
45impl<T: AsyncRead + Send + 'static> BodySource for PlainBody<T> {
46 fn trailers(self: Pin<&mut Self>) -> Option<Headers> {
47 None
48 }
49}
50
51#[derive(Debug, Default)]
55pub struct Body(pub(crate) BodyType);
56
57impl Body {
58 pub fn new_streaming(async_read: impl AsyncRead + Send + 'static, len: Option<u64>) -> Self {
62 Self::new_with_trailers(PlainBody { async_read }, len)
63 }
64
65 pub fn new_with_trailers(body: impl BodySource, len: Option<u64>) -> Self {
71 Self(Streaming {
72 async_read: SyncWrapper::new(Box::pin(body)),
73 len,
74 done: false,
75 progress: 0,
76 chunked_framing: true,
77 })
78 }
79
80 #[doc(hidden)]
87 #[cfg(feature = "unstable")]
88 #[must_use]
89 pub fn without_chunked_framing(mut self) -> Self {
90 if let Streaming {
91 ref mut chunked_framing,
92 ..
93 } = self.0
94 {
95 *chunked_framing = false;
96 }
97 self
98 }
99
100 pub(crate) fn ensure_chunked_framing(&mut self) -> &mut Self {
101 if let Streaming {
102 ref mut chunked_framing,
103 ..
104 } = self.0
105 {
106 *chunked_framing = true;
107 }
108
109 self
110 }
111
112 #[doc(hidden)]
118 pub fn trailers(&mut self) -> Option<Headers> {
119 match &mut self.0 {
120 Streaming {
121 async_read, done, ..
122 } if *done => async_read.get_mut().as_mut().trailers(),
123 _ => None,
124 }
125 }
126
127 pub fn new_static(content: impl Into<Cow<'static, [u8]>>) -> Self {
130 Self(Static {
131 content: content.into(),
132 cursor: 0,
133 })
134 }
135
136 pub fn static_bytes(&self) -> Option<&[u8]> {
140 match &self.0 {
141 Static { content, .. } => Some(content.as_ref()),
142 _ => None,
143 }
144 }
145
146 pub fn into_reader(self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> {
150 match self.0 {
151 Streaming { async_read, .. } => Box::pin(SyncAsyncReader(async_read)),
152 Static { content, .. } => Box::pin(Cursor::new(content)),
153 Empty => Box::pin(Cursor::new("")),
154 }
155 }
156
157 pub async fn into_bytes(self) -> Result<Cow<'static, [u8]>> {
166 match self.0 {
167 Static { content, .. } => Ok(content),
168
169 Streaming {
170 async_read,
171 len,
172 progress: 0,
173 done: false,
174 ..
175 } => {
176 let mut async_read = async_read.into_inner();
177 let mut buf = len
178 .and_then(|c| c.try_into().ok())
179 .map(Vec::with_capacity)
180 .unwrap_or_default();
181
182 async_read.read_to_end(&mut buf).await?;
183
184 Ok(Cow::Owned(buf))
185 }
186
187 Empty => Ok(Cow::Borrowed(b"")),
188
189 Streaming { .. } => Err(Error::other("body already read to completion")),
190 }
191 }
192
193 pub fn bytes_read(&self) -> u64 {
196 self.0.bytes_read()
197 }
198
199 pub fn len(&self) -> Option<u64> {
202 self.0.len()
203 }
204
205 pub fn is_empty(&self) -> bool {
207 self.0.is_empty()
208 }
209
210 pub fn is_static(&self) -> bool {
212 matches!(self.0, Static { .. })
213 }
214
215 pub fn is_streaming(&self) -> bool {
217 matches!(self.0, Streaming { .. })
218 }
219
220 #[doc(hidden)]
227 #[cfg(feature = "unstable")]
228 pub fn try_clone(&self) -> Option<Self> {
229 match &self.0 {
230 Empty => Some(Self::default()),
231 Static { content, .. } => Some(Self(Static {
232 content: content.clone(),
233 cursor: 0,
234 })),
235 Streaming { .. } => None,
236 }
237 }
238
239 #[cfg(feature = "unstable")]
241 pub fn into_h3(self) -> H3Body {
242 H3Body::new(self)
243 }
244
245 #[cfg(not(feature = "unstable"))]
247 pub(crate) fn into_h3(self) -> H3Body {
248 H3Body::new(self)
249 }
250
251 pub(crate) fn into_h2(self) -> H2Body {
258 H2Body::new(self)
259 }
260}
261
262#[allow(
263 clippy::cast_sign_loss,
264 clippy::cast_possible_truncation,
265 clippy::cast_precision_loss,
266 reason = "buffers are well below petabyte scale; log2/4 of a usize stays in f64 range, and \
267 the subtraction always yields a non-negative usize-representable value"
268)]
269fn max_bytes_to_read(buf_len: usize) -> usize {
270 assert!(
271 buf_len >= 6,
272 "buffers of length {buf_len} are too small for this implementation.
273 if this is a problem for you, please open an issue"
274 );
275
276 let bytes_remaining_after_two_cr_lns = (buf_len - 4) as f64;
277 let max_bytes_of_hex_framing = (bytes_remaining_after_two_cr_lns).log2() / 4f64;
279 (bytes_remaining_after_two_cr_lns - max_bytes_of_hex_framing.ceil()) as usize
280}
281
282impl AsyncRead for Body {
283 fn poll_read(
284 mut self: Pin<&mut Self>,
285 cx: &mut Context<'_>,
286 buf: &mut [u8],
287 ) -> Poll<Result<usize>> {
288 match &mut self.0 {
289 Empty => Poll::Ready(Ok(0)),
290 Static { content, cursor } => {
291 let length = content.len();
292 if length == *cursor {
293 return Poll::Ready(Ok(0));
294 }
295 let bytes = (length - *cursor).min(buf.len());
296 buf[0..bytes].copy_from_slice(&content[*cursor..*cursor + bytes]);
297 *cursor += bytes;
298 Poll::Ready(Ok(bytes))
299 }
300
301 Streaming {
302 async_read,
303 len: Some(len),
304 done,
305 progress,
306 ..
307 } => {
308 if *done {
309 return Poll::Ready(Ok(0));
310 }
311
312 let max_bytes_to_read = (*len - *progress)
313 .try_into()
314 .unwrap_or(buf.len())
315 .min(buf.len());
316
317 let bytes = ready!(
318 async_read
319 .get_mut()
320 .as_mut()
321 .poll_read(cx, &mut buf[..max_bytes_to_read])
322 )?;
323
324 if bytes == 0 {
325 *done = true;
326 } else {
327 *progress += bytes as u64;
328 }
329
330 Poll::Ready(Ok(bytes))
331 }
332
333 Streaming {
334 async_read,
335 len: None,
336 done,
337 progress,
338 chunked_framing,
339 } => {
340 if *done {
341 return Poll::Ready(Ok(0));
342 }
343
344 if !*chunked_framing {
345 let bytes = ready!(async_read.get_mut().as_mut().poll_read(cx, buf))?;
346 if bytes == 0 {
347 *done = true;
348 } else {
349 *progress += bytes as u64;
350 }
351 return Poll::Ready(Ok(bytes));
352 }
353
354 let max_bytes_to_read = max_bytes_to_read(buf.len());
355
356 let bytes = ready!(
357 async_read
358 .get_mut()
359 .as_mut()
360 .poll_read(cx, &mut buf[..max_bytes_to_read])
361 )?;
362
363 if bytes == 0 {
364 *done = true;
365 buf[..3].copy_from_slice(b"0\r\n");
371 return Poll::Ready(Ok(3));
372 }
373
374 *progress += bytes as u64;
375
376 let start = format!("{bytes:X}\r\n");
377 let start_length = start.len();
378 let total = bytes + start_length + 2;
379 buf.copy_within(..bytes, start_length);
380 buf[..start_length].copy_from_slice(start.as_bytes());
381 buf[total - 2..total].copy_from_slice(b"\r\n");
382 Poll::Ready(Ok(total))
383 }
384 }
385 }
386}
387
388struct SyncAsyncReader(SyncWrapper<Pin<Box<dyn BodySource>>>);
389impl Debug for SyncAsyncReader {
390 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
391 f.debug_struct("SyncAsyncReader").finish()
392 }
393}
394impl AsyncRead for SyncAsyncReader {
395 fn poll_read(
396 self: Pin<&mut Self>,
397 cx: &mut Context<'_>,
398 buf: &mut [u8],
399 ) -> Poll<Result<usize>> {
400 self.get_mut().0.get_mut().as_mut().poll_read(cx, buf)
401 }
402}
403
404#[derive(Default)]
405pub(crate) enum BodyType {
406 #[default]
407 Empty,
408
409 Static {
410 content: Cow<'static, [u8]>,
411 cursor: usize,
412 },
413
414 Streaming {
415 async_read: SyncWrapper<Pin<Box<dyn BodySource>>>,
416 progress: u64,
417 len: Option<u64>,
418 done: bool,
419 chunked_framing: bool,
423 },
424}
425
426impl Debug for BodyType {
427 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
428 match self {
429 Empty => f.debug_tuple("BodyType::Empty").finish(),
430 Static { content, cursor } => f
431 .debug_struct("BodyType::Static")
432 .field("content", &String::from_utf8_lossy(content))
433 .field("cursor", cursor)
434 .finish(),
435 Streaming {
436 len,
437 done,
438 progress,
439 ..
440 } => f
441 .debug_struct("BodyType::Streaming")
442 .field("async_read", &format_args!(".."))
443 .field("len", &len)
444 .field("done", &done)
445 .field("progress", &progress)
446 .finish(),
447 }
448 }
449}
450
451impl BodyType {
452 fn is_empty(&self) -> bool {
453 match *self {
454 Empty => true,
455 Static { ref content, .. } => content.is_empty(),
456 Streaming { len, .. } => len == Some(0),
457 }
458 }
459
460 fn len(&self) -> Option<u64> {
461 match *self {
462 Empty => Some(0),
463 Static { ref content, .. } => Some(content.len() as u64),
464 Streaming { len, .. } => len,
465 }
466 }
467
468 fn bytes_read(&self) -> u64 {
469 match *self {
470 Empty => 0,
471 Static { cursor, .. } => cursor as u64,
472 Streaming { progress, .. } => progress,
473 }
474 }
475}
476
477impl From<String> for Body {
478 fn from(s: String) -> Self {
479 s.into_bytes().into()
480 }
481}
482
483impl From<&'static str> for Body {
484 fn from(s: &'static str) -> Self {
485 s.as_bytes().into()
486 }
487}
488
489impl From<&'static [u8]> for Body {
490 fn from(content: &'static [u8]) -> Self {
491 Self::new_static(content)
492 }
493}
494
495impl From<Vec<u8>> for Body {
496 fn from(content: Vec<u8>) -> Self {
497 Self::new_static(content)
498 }
499}
500
501impl From<Cow<'static, [u8]>> for Body {
502 fn from(value: Cow<'static, [u8]>) -> Self {
503 Self::new_static(value)
504 }
505}
506
507impl From<Cow<'static, str>> for Body {
508 fn from(value: Cow<'static, str>) -> Self {
509 match value {
510 Cow::Borrowed(b) => b.into(),
511 Cow::Owned(o) => o.into(),
512 }
513 }
514}
515
516#[cfg(test)]
517mod test_bytes_to_read {
518 #[test]
519 fn simple_check_of_known_values() {
520 let values = vec![
529 (6, 1), (7, 2), (20, 15), (21, 15), (22, 16), (23, 17), (260, 254), (261, 254), (262, 255), (263, 256), (4100, 4093), (4101, 4093), (4102, 4094), (4103, 4095), (4104, 4096), ];
545
546 for (input, expected) in values {
547 let actual = super::max_bytes_to_read(input);
548 assert_eq!(
549 actual, expected,
550 "\n\nexpected max_bytes_to_read({input}) to be {expected}, but it was {actual}"
551 );
552
553 let used_bytes = expected + 4 + format!("{expected:X}").len();
555 assert!(
556 used_bytes == input || used_bytes == input - 1,
557 "\n\nfor an input of {}, expected used bytes to be {} or {}, but was {}",
558 input,
559 input,
560 input - 1,
561 used_bytes
562 );
563 }
564 }
565}