Skip to main content

trillium_http/
http_context.rs

1use crate::{
2    Buffer, Conn, ConnectionStatus, HttpConfig, Result, TypeSet, Upgrade,
3    headers::header_observer::HeaderObserver,
4};
5use fieldwork::Fieldwork;
6use futures_lite::{AsyncRead, AsyncWrite};
7use std::{future::Future, sync::Arc};
8use swansong::{ShutdownCompletion, Swansong};
9/// Shared configuration and context for an http server.
10///
11/// Contains tunable parameters in a [`HttpConfig`], the [`Swansong`] graceful shutdown control
12/// interface, and a shared [`TypeSet`] that contains application-specific information about the
13/// running server.
14#[derive(Default, Debug, Fieldwork)]
15#[fieldwork(get, set, get_mut, with)]
16pub struct HttpContext {
17    /// [`HttpConfig`] performance and security parameters
18    pub(crate) config: HttpConfig,
19
20    /// [`Swansong`] graceful shutdown interface
21    pub(crate) swansong: Swansong,
22
23    /// [`TypeSet`] shared state
24    pub(crate) shared_state: TypeSet,
25
26    /// Per-listener QPACK header-frequency observer. Shared by `Arc` across all connections
27    /// a given listener accepts; runtime adapters isolate it per hop-and-direction via
28    /// [`__isolate_qpack_observer`](Self::__isolate_qpack_observer).
29    #[cfg_attr(not(feature = "unstable"), field = false)]
30    pub(crate) observer: Arc<HeaderObserver>,
31}
32impl AsRef<TypeSet> for HttpContext {
33    fn as_ref(&self) -> &TypeSet {
34        &self.shared_state
35    }
36}
37
38impl AsMut<TypeSet> for HttpContext {
39    fn as_mut(&mut self) -> &mut TypeSet {
40        &mut self.shared_state
41    }
42}
43
44impl AsRef<Swansong> for HttpContext {
45    fn as_ref(&self) -> &Swansong {
46        &self.swansong
47    }
48}
49
50impl AsRef<HttpConfig> for HttpContext {
51    fn as_ref(&self) -> &HttpConfig {
52        &self.config
53    }
54}
55
56impl HttpContext {
57    /// Construct a new `HttpContext`
58    pub fn new() -> Self {
59        Self::default()
60    }
61
62    /// Perform HTTP on the provided transport, applying the provided `async Conn -> Conn` handler
63    /// function for every distinct http request-response.
64    ///
65    /// For any given invocation of `HttpContext::run`, the handler function may run any number of
66    /// times, depending on whether the connection is reused by the client.
67    ///
68    /// # Errors
69    ///
70    /// This function will return an [`Error`](crate::Error) if any of the http requests is
71    /// irrecoverably malformed or otherwise noncompliant.
72    pub async fn run<Transport, Handler, Fut>(
73        self: Arc<Self>,
74        transport: Transport,
75        handler: Handler,
76    ) -> Result<Option<Upgrade<Transport>>>
77    where
78        Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
79        Handler: FnMut(Conn<Transport>) -> Fut,
80        Fut: Future<Output = Conn<Transport>>,
81    {
82        let initial_bytes = Vec::with_capacity(self.config.request_buffer_initial_len);
83        run_with_initial_bytes(self, transport, initial_bytes, handler).await
84    }
85
86    /// Attempt graceful shutdown of this server.
87    ///
88    /// The returned [`ShutdownCompletion`] type can
89    /// either be awaited in an async context or blocked on with [`ShutdownCompletion::block`] in a
90    /// blocking context
91    pub fn shut_down(&self) -> ShutdownCompletion {
92        self.swansong.shut_down()
93    }
94
95    /// Replace this context's QPACK header observer with a fresh, empty one.
96    ///
97    /// Adapter crates call this during listener setup so each hop-and-direction pair in a
98    /// deployment gets its own observer. A reverse proxy's inbound server observer is distinct
99    /// from its outbound client observer by construction, so header values one hop forwards
100    /// (e.g. `authorization`, `cookie`) cannot reach the QPACK state of unrelated clients on
101    /// the other hop.
102    ///
103    /// Not part of the stable public API; exposed only for adapter crates.
104    #[doc(hidden)]
105    pub fn __isolate_qpack_observer(&mut self) -> &mut Self {
106        self.observer = Arc::new(HeaderObserver::default());
107        log::trace!(
108            target: "qpack_metrics",
109            "isolated fresh QPACK observer for this context (ptr={:p})",
110            Arc::as_ptr(&self.observer),
111        );
112        self
113    }
114}
115
116/// Like [`HttpContext::run`], but starts with the supplied bytes pre-filled into the request
117/// buffer.
118///
119/// For adapters that peek the first few bytes off a cleartext TCP stream to decide between
120/// HTTP/1.1 and HTTP/2 prior-knowledge dispatch, then need to hand those bytes into the HTTP/1
121/// parser without re-reading. Bytes already in the buffer are consumed by the parser before
122/// any transport read happens.
123///
124/// # Errors
125///
126/// Same as [`HttpContext::run`] — any irrecoverably malformed or noncompliant HTTP/1 request
127/// surfaces as an [`Error`](crate::Error).
128pub async fn run_with_initial_bytes<Transport, Handler, Fut>(
129    context: Arc<HttpContext>,
130    transport: Transport,
131    initial_bytes: Vec<u8>,
132    mut handler: Handler,
133) -> Result<Option<Upgrade<Transport>>>
134where
135    Transport: AsyncRead + AsyncWrite + Unpin + Send + Sync + 'static,
136    Handler: FnMut(Conn<Transport>) -> Fut,
137    Fut: Future<Output = Conn<Transport>>,
138{
139    let _guard = context.swansong.guard();
140    let buffer: Buffer = initial_bytes.into();
141    let mut conn = Conn::new_internal(context, transport, buffer).await?;
142
143    loop {
144        conn = match handler(conn).await.send().await? {
145            ConnectionStatus::Upgrade(upgrade) => return Ok(Some(upgrade)),
146            ConnectionStatus::Close => return Ok(None),
147            ConnectionStatus::Conn(next) => next,
148        }
149    }
150}