trillium_client/conn/
shared.rs1use super::{Body, Conn, ReceivedBody, ReceivedBodyState, Transport, TypeSet, encoding};
2use crate::{Error, Result, Version, pool::PoolEntry};
3use futures_lite::{AsyncWriteExt, io};
4use std::{
5 fmt::{self, Debug, Formatter},
6 future::{Future, IntoFuture},
7 mem,
8 pin::Pin,
9};
10use trillium_http::Upgrade;
11
12#[cfg(any(feature = "serde_json", feature = "sonic-rs"))]
16#[derive(thiserror::Error, Debug)]
17pub enum ClientSerdeError {
18 #[error(transparent)]
20 HttpError(#[from] Error),
21
22 #[cfg(feature = "sonic-rs")]
23 #[error(transparent)]
25 JsonError(#[from] sonic_rs::Error),
26
27 #[cfg(feature = "serde_json")]
28 #[error(transparent)]
30 JsonError(#[from] serde_json::Error),
31}
32
33impl Conn {
34 pub(crate) async fn exec(&mut self) -> Result<()> {
35 if let Some(h3) = self.h3.clone()
36 && self.try_exec_h3(&h3).await?
37 {
38 self.update_alt_svc_from_response(&h3);
39 return Ok(());
40 }
41
42 self.exec_h1().await
43 }
44
45 pub(crate) fn body_len(&self) -> Option<u64> {
46 if let Some(ref body) = self.request_body {
47 body.len()
48 } else {
49 Some(0)
50 }
51 }
52
53 pub(crate) fn finalize_headers(&mut self) -> Result<()> {
54 match self.http_version {
55 Version::Http0_9 | Version::Http1_0 | Version::Http1_1 => self.finalize_headers_h1(),
56 Version::Http3 if self.h3.is_some() => self.finalize_headers_h3(),
57 other => Err(Error::UnsupportedVersion(other)),
58 }
59 }
60}
61
62impl Drop for Conn {
63 fn drop(&mut self) {
64 log::trace!("dropping client conn");
65 let Some(mut transport) = self.transport.take() else {
66 log::trace!("no transport, nothing to do");
67
68 return;
69 };
70
71 if !self.is_keep_alive() {
72 log::trace!("not keep alive, closing");
73
74 self.config
75 .runtime()
76 .clone()
77 .spawn(async move { transport.close().await });
78
79 return;
80 }
81
82 let Ok(Some(peer_addr)) = transport.peer_addr() else {
83 return;
84 };
85 let Some(pool) = self.pool.take() else { return };
86
87 let origin = self.url.origin();
88
89 if self.response_body_state == ReceivedBodyState::End {
90 log::trace!(
91 "response body has been read to completion, checking transport back into pool for \
92 {}",
93 &peer_addr
94 );
95 pool.insert(origin, PoolEntry::new(transport, None));
96 } else {
97 let content_length = self.response_content_length();
98 let buffer = mem::take(&mut self.buffer);
99 let response_body_state = self.response_body_state;
100 let encoding = encoding(&self.response_headers);
101 self.config.runtime().spawn(async move {
102 let mut response_body = ReceivedBody::new(
103 content_length,
104 buffer,
105 transport,
106 response_body_state,
107 None,
108 encoding,
109 );
110
111 match io::copy(&mut response_body, io::sink()).await {
112 Ok(bytes) => {
113 let transport = response_body.take_transport().unwrap();
114 log::trace!(
115 "read {} bytes in order to recycle conn for {}",
116 bytes,
117 &peer_addr
118 );
119 pool.insert(origin, PoolEntry::new(transport, None));
120 }
121
122 Err(ioerror) => log::error!("unable to recycle conn due to {}", ioerror),
123 };
124 });
125 }
126 }
127}
128
129impl From<Conn> for Body {
130 fn from(conn: Conn) -> Body {
131 let received_body: ReceivedBody<'static, _> = conn.into();
132 received_body.into()
133 }
134}
135
136impl From<Conn> for ReceivedBody<'static, Box<dyn Transport>> {
137 fn from(mut conn: Conn) -> Self {
138 let _ = conn.finalize_headers();
139 let runtime = conn.config.runtime();
140 let origin = conn.url.origin();
141
142 let on_completion = if conn.is_keep_alive()
143 && let Some(pool) = conn.pool.take()
144 {
145 Box::new(move |transport: Box<dyn Transport>| {
146 log::trace!("body transferred, returning to pool");
147 pool.insert(origin.clone(), PoolEntry::new(transport, None));
148 }) as Box<dyn FnOnce(Box<dyn Transport>) + Send + Sync + 'static>
149 } else {
150 Box::new(move |mut transport: Box<dyn Transport>| {
151 runtime.spawn(async move { transport.close().await });
152 }) as Box<dyn FnOnce(Box<dyn Transport>) + Send + Sync + 'static>
153 };
154
155 ReceivedBody::new(
156 conn.response_content_length(),
157 mem::take(&mut conn.buffer),
158 conn.transport.take().unwrap(),
159 conn.response_body_state,
160 Some(on_completion),
161 conn.response_encoding(),
162 )
163 }
164}
165
166impl From<Conn> for Upgrade<Box<dyn Transport>> {
167 fn from(mut conn: Conn) -> Self {
168 Upgrade::new(
169 mem::take(&mut conn.request_headers),
170 conn.url.path().to_string(),
171 conn.method,
172 conn.transport.take().unwrap(),
173 mem::take(&mut conn.buffer),
174 conn.http_version(),
175 )
176 }
177}
178
179impl IntoFuture for Conn {
180 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'static>>;
181 type Output = Result<Conn>;
182
183 fn into_future(mut self) -> Self::IntoFuture {
184 Box::pin(async move { (&mut self).await.map(|()| self) })
185 }
186}
187
188impl<'conn> IntoFuture for &'conn mut Conn {
189 type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'conn>>;
190 type Output = Result<()>;
191
192 fn into_future(self) -> Self::IntoFuture {
193 Box::pin(async move {
194 if let Some(duration) = self.timeout {
195 self.config
196 .runtime()
197 .timeout(duration, self.exec())
198 .await
199 .unwrap_or(Err(Error::TimedOut("Conn", duration)))?;
200 } else {
201 self.exec().await?;
202 }
203 Ok(())
204 })
205 }
206}
207
208impl Debug for Conn {
209 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
210 f.debug_struct("Conn")
211 .field("url", &self.url)
212 .field("method", &self.method)
213 .field("request_headers", &self.request_headers)
214 .field("response_headers", &self.response_headers)
215 .field("status", &self.status)
216 .field("request_body", &self.request_body)
217 .field("pool", &self.pool)
218 .field("h3", &self.h3.is_some())
219 .field("buffer", &String::from_utf8_lossy(&self.buffer))
220 .field("response_body_state", &self.response_body_state)
221 .field("config", &self.config)
222 .field("state", &self.state)
223 .field("authority", &self.authority)
224 .field("scheme", &self.scheme)
225 .field("path", &self.path)
226 .field("request_target", &self.request_target)
227 .field("request_trailers", &self.request_trailers)
228 .field("response_trailers", &self.response_trailers)
229 .finish()
230 }
231}
232
233impl AsRef<TypeSet> for Conn {
234 fn as_ref(&self) -> &TypeSet {
235 &self.state
236 }
237}
238
239impl AsMut<TypeSet> for Conn {
240 fn as_mut(&mut self) -> &mut TypeSet {
241 &mut self.state
242 }
243}