1use crate::{Body, Buffer, Error, Headers, HttpConfig, MutCow, ProtocolSession, copy};
2use Poll::{Pending, Ready};
3use ReceivedBodyState::{Chunked, End, FixedLength, PartialChunkSize, Start};
4use encoding_rs::Encoding;
5use futures_lite::{AsyncRead, AsyncReadExt, AsyncWrite, ready};
6use std::{
7 fmt::{self, Debug, Formatter},
8 io::{self, ErrorKind},
9 pin::Pin,
10 task::{Context, Poll},
11};
12
13mod chunked;
14mod fixed_length;
15mod h2_data;
16mod h3_data;
17
18#[derive(fieldwork::Fieldwork)]
52pub struct ReceivedBody<'conn, Transport> {
53 #[field(get)]
72 content_length: Option<u64>,
73
74 buffer: MutCow<'conn, Buffer>,
75
76 transport: Option<MutCow<'conn, Transport>>,
77
78 state: MutCow<'conn, ReceivedBodyState>,
79
80 on_completion: Option<Box<dyn FnOnce(Transport) + Send + Sync + 'static>>,
81
82 #[field(get)]
85 encoding: &'static Encoding,
86
87 #[field(with, get, set)]
91 max_len: u64,
92
93 #[field(with, get, set)]
97 initial_len: usize,
98
99 #[field(with, get, set)]
104 copy_loops_per_yield: usize,
105
106 #[field(with, get, set)]
110 max_preallocate: usize,
111
112 max_header_list_size: u64,
113
114 trailers: MutCow<'conn, Option<Headers>>,
115
116 send_100_continue_offset: Option<usize>,
119
120 protocol_session: ProtocolSession,
123
124 h3_trailer_future:
126 Option<Pin<Box<dyn Future<Output = io::Result<Headers>> + Send + Sync + 'static>>>,
127}
128
129fn slice_from(min: u64, buf: &[u8]) -> Option<&[u8]> {
130 buf.get(usize::try_from(min).unwrap_or(usize::MAX)..)
131 .filter(|buf| !buf.is_empty())
132}
133
134impl<'conn, Transport> ReceivedBody<'conn, Transport>
135where
136 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
137{
138 #[allow(missing_docs)]
139 #[doc(hidden)]
140 pub fn new(
141 content_length: Option<u64>,
142 buffer: impl Into<MutCow<'conn, Buffer>>,
143 transport: impl Into<MutCow<'conn, Transport>>,
144 state: impl Into<MutCow<'conn, ReceivedBodyState>>,
145 on_completion: Option<Box<dyn FnOnce(Transport) + Send + Sync + 'static>>,
146 encoding: &'static Encoding,
147 ) -> Self {
148 Self::new_with_config(
149 content_length,
150 buffer,
151 transport,
152 state,
153 on_completion,
154 encoding,
155 &HttpConfig::DEFAULT,
156 )
157 }
158
159 #[allow(missing_docs)]
160 #[doc(hidden)]
161 pub(crate) fn new_with_config(
162 content_length: Option<u64>,
163 buffer: impl Into<MutCow<'conn, Buffer>>,
164 transport: impl Into<MutCow<'conn, Transport>>,
165 state: impl Into<MutCow<'conn, ReceivedBodyState>>,
166 on_completion: Option<Box<dyn FnOnce(Transport) + Send + Sync + 'static>>,
167 encoding: &'static Encoding,
168 config: &HttpConfig,
169 ) -> Self {
170 Self {
171 content_length,
172 buffer: buffer.into(),
173 transport: Some(transport.into()),
174 state: state.into(),
175 on_completion,
176 encoding,
177 max_len: config.received_body_max_len,
178 initial_len: config.received_body_initial_len,
179 copy_loops_per_yield: config.copy_loops_per_yield,
180 max_preallocate: config.received_body_max_preallocate,
181 max_header_list_size: config.max_header_list_size,
182 trailers: None.into(),
183 send_100_continue_offset: None,
184 protocol_session: ProtocolSession::Http1,
185 h3_trailer_future: None,
186 }
187 }
188
189 #[doc(hidden)]
193 #[must_use]
194 pub fn with_trailers(mut self, trailers: impl Into<MutCow<'conn, Option<Headers>>>) -> Self {
195 self.trailers = trailers.into();
196 self
197 }
198
199 #[doc(hidden)]
205 #[must_use]
206 #[cfg(feature = "unstable")]
207 pub fn with_protocol_session(mut self, protocol_session: ProtocolSession) -> Self {
208 self.protocol_session = protocol_session;
209 self
210 }
211
212 #[doc(hidden)]
213 #[must_use]
214 #[cfg(not(feature = "unstable"))]
215 pub(crate) fn with_protocol_session(mut self, protocol_session: ProtocolSession) -> Self {
216 self.protocol_session = protocol_session;
217 self
218 }
219
220 #[must_use]
223 pub(crate) fn with_send_100_continue(mut self) -> Self {
224 self.send_100_continue_offset = Some(0);
225 self
226 }
227
228 pub async fn read_string(self) -> crate::Result<String> {
245 let encoding = self.encoding();
246 let bytes = self.read_bytes().await?;
247 let (s, _, _) = encoding.decode(&bytes);
248 Ok(s.to_string())
249 }
250
251 fn owns_transport(&self) -> bool {
252 self.transport.as_ref().is_some_and(MutCow::is_owned)
253 }
254
255 pub async fn read_bytes(mut self) -> crate::Result<Vec<u8>> {
270 let mut vec = if let Some(len) = self.content_length {
271 if len > self.max_len {
272 return Err(Error::ReceivedBodyTooLong(self.max_len));
273 }
274
275 let len = usize::try_from(len).map_err(|_| Error::ReceivedBodyTooLong(self.max_len))?;
276
277 Vec::with_capacity(len.min(self.max_preallocate))
278 } else {
279 Vec::with_capacity(self.initial_len)
280 };
281
282 self.read_to_end(&mut vec).await?;
283 Ok(vec)
284 }
285
286 fn read_raw(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
287 if let Some(transport) = self.transport.as_deref_mut() {
288 read_buffered(&mut self.buffer, transport, cx, buf)
289 } else {
290 Ready(Err(ErrorKind::NotConnected.into()))
291 }
292 }
293
294 #[allow(
303 clippy::missing_errors_doc,
304 reason = "errors are documented above; clippy doesn't detect the section"
305 )]
306 pub async fn drain(self) -> io::Result<u64> {
307 let copy_loops_per_yield = self.copy_loops_per_yield;
308 copy(self, futures_lite::io::sink(), copy_loops_per_yield).await
309 }
310}
311
312impl<T> ReceivedBody<'static, T> {
313 pub fn take_transport(&mut self) -> Option<T> {
315 self.transport.take().map(MutCow::unwrap_owned)
316 }
317
318 #[doc(hidden)]
319 #[cfg(feature = "unstable")]
320 pub fn state(&self) -> ReceivedBodyState {
321 *self.state
322 }
323}
324
325impl<T> ReceivedBody<'_, T> {
326 #[doc(hidden)]
333 #[cfg(feature = "unstable")]
334 pub fn try_into_owned(self) -> Option<ReceivedBody<'static, T>> {
335 let Self {
336 content_length,
337 buffer,
338 transport,
339 state,
340 on_completion,
341 encoding,
342 max_len,
343 initial_len,
344 copy_loops_per_yield,
345 max_preallocate,
346 max_header_list_size,
347 trailers,
348 send_100_continue_offset,
349 protocol_session,
350 h3_trailer_future,
351 } = self;
352
353 let transport = match transport {
354 None => None,
355 Some(t) => Some(t.try_into_owned()?),
356 };
357
358 Some(ReceivedBody {
359 content_length,
360 buffer: buffer.try_into_owned()?,
361 transport,
362 state: state.try_into_owned()?,
363 on_completion,
364 encoding,
365 max_len,
366 initial_len,
367 copy_loops_per_yield,
368 max_preallocate,
369 max_header_list_size,
370 trailers: trailers.try_into_owned()?,
371 send_100_continue_offset,
372 protocol_session,
373 h3_trailer_future,
374 })
375 }
376}
377
378pub(crate) fn read_buffered<Transport>(
379 buffer: &mut Buffer,
380 transport: &mut Transport,
381 cx: &mut Context<'_>,
382 buf: &mut [u8],
383) -> Poll<io::Result<usize>>
384where
385 Transport: AsyncRead + Unpin,
386{
387 if buffer.is_empty() {
388 Pin::new(transport).poll_read(cx, buf)
389 } else if buffer.len() >= buf.len() {
390 let len = buf.len();
391 buf.copy_from_slice(&buffer[..len]);
392 buffer.ignore_front(len);
393 Ready(Ok(len))
394 } else {
395 let self_buffer_len = buffer.len();
396 buf[..self_buffer_len].copy_from_slice(buffer);
397 buffer.truncate(0);
398 match Pin::new(transport).poll_read(cx, &mut buf[self_buffer_len..]) {
399 Ready(Ok(additional)) => Ready(Ok(additional + self_buffer_len)),
400 Pending => Ready(Ok(self_buffer_len)),
401 other @ Ready(_) => other,
402 }
403 }
404}
405
406type StateOutput = Poll<io::Result<(ReceivedBodyState, usize)>>;
407
408impl<Transport> ReceivedBody<'_, Transport>
409where
410 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
411{
412 #[inline]
413 fn handle_start(&mut self) -> StateOutput {
414 Ready(Ok((
415 match self.content_length {
416 Some(0) => End,
417
418 Some(total_length) if total_length <= self.max_len => FixedLength {
419 current_index: 0,
420 total: total_length,
421 },
422
423 Some(_) => {
424 return Ready(Err(io::Error::new(
425 ErrorKind::Unsupported,
426 "content too long",
427 )));
428 }
429
430 None => Chunked {
431 remaining: 0,
432 total: 0,
433 },
434 },
435 0,
436 )))
437 }
438}
439
440impl<Transport> AsyncRead for ReceivedBody<'_, Transport>
441where
442 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
443{
444 fn poll_read(
445 mut self: Pin<&mut Self>,
446 cx: &mut Context<'_>,
447 buf: &mut [u8],
448 ) -> Poll<io::Result<usize>> {
449 const CONTINUE: &[u8] = b"HTTP/1.1 100 Continue\r\n\r\n";
450 while let Some(offset) = self.send_100_continue_offset {
451 let n = {
452 let Some(transport) = self.transport.as_deref_mut() else {
453 return Ready(Err(ErrorKind::NotConnected.into()));
454 };
455 if offset == 0 {
456 log::trace!("sending 100-continue");
457 }
458 ready!(Pin::new(transport).poll_write(cx, &CONTINUE[offset..]))?
459 };
460 if n == 0 {
461 return Ready(Err(ErrorKind::WriteZero.into()));
462 }
463 let new_offset = offset + n;
464 self.send_100_continue_offset = if new_offset >= CONTINUE.len() {
465 None
466 } else {
467 Some(new_offset)
468 };
469 }
470
471 for _ in 0..self.copy_loops_per_yield {
472 let (new_body_state, bytes) = ready!(match *self.state {
473 Start => self.handle_start(),
474 Chunked { remaining, total } => self.handle_chunked(cx, buf, remaining, total),
475 PartialChunkSize { total } => self.handle_partial(cx, buf, total),
476 FixedLength {
477 current_index,
478 total,
479 } => self.handle_fixed_length(cx, buf, current_index, total),
480 ReceivedBodyState::H2Data { total } => self.handle_h2_data(cx, buf, total),
481 ReceivedBodyState::H3Data {
482 remaining_in_frame,
483 total,
484 frame_type,
485 partial_frame_header,
486 } => self.handle_h3_data(
487 cx,
488 buf,
489 remaining_in_frame,
490 total,
491 frame_type,
492 partial_frame_header,
493 ),
494 ReceivedBodyState::ReadingH1Trailers { total } => {
495 self.handle_reading_h1_trailers(cx, buf, total)
496 }
497 End => Ready(Ok((End, 0))),
498 })?;
499
500 *self.state = new_body_state;
501
502 if *self.state == End {
503 if bytes == 0
504 && let Some(h3_trailer_future) = &mut self.h3_trailer_future
505 {
506 let trailers = ready!(h3_trailer_future.as_mut().poll(cx))?;
507 *self.trailers = Some(trailers);
508 self.h3_trailer_future = None;
509 }
510
511 if bytes == 0
515 && let Some((h2_connection, stream_id)) =
516 std::mem::replace(&mut self.protocol_session, ProtocolSession::Http1)
517 .as_h2()
518 && let Some(trailers) = h2_connection.take_trailers(stream_id)
519 {
520 *self.trailers = Some(trailers);
521 }
522
523 if self.on_completion.is_some() && self.owns_transport() {
524 let transport = self.transport.take().unwrap().unwrap_owned();
525 let on_completion = self.on_completion.take().unwrap();
526 on_completion(transport);
527 }
528 return Ready(Ok(bytes));
529 } else if bytes != 0 {
530 return Ready(Ok(bytes));
531 }
532 }
533
534 cx.waker().wake_by_ref();
535 Pending
536 }
537}
538
539impl<Transport> crate::BodySource for ReceivedBody<'static, Transport>
540where
541 Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
542{
543 fn trailers(self: Pin<&mut Self>) -> Option<Headers> {
544 self.get_mut().trailers.take()
545 }
546}
547
548impl<Transport> Debug for ReceivedBody<'_, Transport> {
549 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
550 f.debug_struct("ReceivedBody")
551 .field("state", &*self.state)
552 .field("content_length", &self.content_length)
553 .field("buffer", &format_args!(".."))
554 .field("on_completion", &self.on_completion.is_some())
555 .finish()
556 }
557}
558
559#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
561#[allow(missing_docs)]
562#[doc(hidden)]
563pub enum H3BodyFrameType {
564 #[default]
566 Start,
567 Data,
569 Unknown,
571 Trailers,
573}
574
575#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)]
576#[allow(missing_docs)]
577#[doc(hidden)]
578pub enum ReceivedBodyState {
579 #[default]
581 Start,
582
583 Chunked {
586 remaining: u64,
589
590 total: u64,
592 },
593
594 PartialChunkSize { total: u64 },
597
598 FixedLength {
600 current_index: u64,
603
604 total: u64,
607 },
608
609 H2Data {
614 total: u64,
616 },
617
618 H3Data {
620 remaining_in_frame: u64,
623
624 total: u64,
626
627 frame_type: H3BodyFrameType,
629
630 partial_frame_header: bool,
633 },
634
635 ReadingH1Trailers {
640 total: u64,
642 },
643
644 End,
646}
647
648impl ReceivedBodyState {
649 pub fn new_h2() -> Self {
650 Self::H2Data { total: 0 }
651 }
652
653 pub fn new_h3() -> Self {
654 Self::H3Data {
655 remaining_in_frame: 0,
656 total: 0,
657 frame_type: H3BodyFrameType::Start,
658 partial_frame_header: false,
659 }
660 }
661}
662
663impl<Transport> From<ReceivedBody<'static, Transport>> for Body
664where
665 Transport: AsyncRead + AsyncWrite + Send + Sync + Unpin + 'static,
666{
667 fn from(rb: ReceivedBody<'static, Transport>) -> Self {
668 let len = rb.content_length;
669 Body::new_with_trailers(rb, len)
670 }
671}