Skip to main content

trillium_server_common/
listener_config.rs

1use crate::{
2    Acceptor, BoxedAcceptor, BoxedQuicConfig, QuicConfig, RuntimeTrait, Server, ServerHandle,
3    config::{init_shared, spawn_signals_loop},
4    server::PreboundListener,
5};
6use async_cell::sync::AsyncCell;
7use std::{
8    cell::OnceCell,
9    fmt::{self, Debug, Formatter},
10    future::Future,
11    io,
12    net::{SocketAddr, TcpListener as StdTcpListener, UdpSocket as StdUdpSocket},
13    pin::Pin,
14    sync::Arc,
15    thread::JoinHandle,
16};
17#[cfg(unix)]
18use std::{os::unix::net::UnixListener, path::Path};
19use trillium::Handler;
20use trillium_http::HttpContext;
21
22mod helpers;
23mod into_listen_addr;
24#[cfg(reuseport)]
25mod reuseport;
26mod run;
27
28use helpers::{resolve_env_listener, take_inherited_fd};
29pub use into_listen_addr::IntoListenAddr;
30use run::{Resolved, Shared};
31
32/// A worker body, type-erased over the handler, that builds and drives a reuseport listener group's
33/// accept loops for one worker index. Built inside [`ListenerConfig::run_async`] (where the handler
34/// type is known) and invoked once per worker thread.
35type BoxedWorker = Arc<dyn Fn(usize) -> Pin<Box<dyn Future<Output = ()>>> + Send + Sync>;
36
37/// A monomorphized [`FanOut::thread_per_core`](crate::FanOut::thread_per_core) call for a concrete
38/// runtime, erased to a function pointer. Stored on the builder only when a `bind_reuseport_*`
39/// method runs, so the `FanOut` bound is discharged at the bind site while `run_async` stays free
40/// of it (the handler type isn't known until `spawn`).
41type ThreadPerCoreInvoker<R> = fn(&R, usize, BoxedWorker) -> Vec<JoinHandle<()>>;
42
43/// A registered standard listener, before its accept loop is driven. Either a [`PreboundListener`]
44/// the builder claimed itself (from `bind_tcp`/`bind_tls`/`bind_fd`/`bind_uds`/`bind_env`) and will
45/// adopt into the runtime at spawn, or a [`Server`] the caller already bound and handed over via
46/// [`bind_server`](ListenerConfig::bind_server) (the bridge for [`Config::with_prebound_server`]).
47pub(super) enum ListenerSource<ServerType> {
48    Prebound(PreboundListener),
49    Adopted(ServerType),
50}
51
52/// A reuseport listener registered on the builder: its resolved address (the group's shared port),
53/// the first group member claimed eagerly at bind time, and its erased acceptor.
54type ReuseportListener<ServerType> = (
55    SocketAddr,
56    StdTcpListener,
57    BoxedAcceptor<<ServerType as Server>::Transport>,
58);
59
60/// An advanced listener builder.
61///
62/// Holds the inputs shared across every listener — handler (supplied at [`spawn`](Self::spawn)),
63/// swansong, shared state, HTTP config — and a set of registered listeners. Each `bind_*` method
64/// claims its address eagerly (failing fast) and the listeners are adopted into the runtime when
65/// the server is spawned, after [`Handler::init`] has run exactly once across all of them.
66///
67/// Construct one from a [`Config`](crate::Config) with
68/// [`Config::listeners`](crate::Config::listeners): the global server configuration (swansong,
69/// shared state, HTTP config, nodelay, max-connections, signals) is set on the `Config`, then
70/// carried over; the builder adds listener topology.
71pub struct ListenerConfig<ServerType: Server> {
72    context: HttpContext,
73    context_cell: Arc<AsyncCell<Arc<HttpContext>>>,
74    runtime: ServerType::Runtime,
75    max_connections: Option<usize>,
76    nodelay: bool,
77    register_signals: bool,
78    listeners: Vec<(
79        ListenerSource<ServerType>,
80        BoxedAcceptor<ServerType::Transport>,
81    )>,
82    quic_listeners: Vec<(StdUdpSocket, BoxedQuicConfig<ServerType>)>,
83    alt_svc_pairs: Vec<(u16, u16)>,
84    reuseport_listeners: Vec<ReuseportListener<ServerType>>,
85    reuseport_workers: Option<usize>,
86    thread_per_core: Option<ThreadPerCoreInvoker<ServerType::Runtime>>,
87}
88
89impl<ServerType: Server> Debug for ListenerConfig<ServerType> {
90    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
91        f.debug_struct("ListenerConfig")
92            .field("max_connections", &self.max_connections)
93            .field("nodelay", &self.nodelay)
94            .field("register_signals", &self.register_signals)
95            .field("listeners", &self.listeners.len())
96            .field("quic_listeners", &self.quic_listeners)
97            .field("alt_svc_pairs", &self.alt_svc_pairs)
98            .field("reuseport_listeners", &self.reuseport_listeners)
99            .field("reuseport_workers", &self.reuseport_workers)
100            .finish_non_exhaustive()
101    }
102}
103
104impl<ServerType: Server> ListenerConfig<ServerType> {
105    /// Construct a builder carrying the given global server configuration and no listeners.
106    pub(crate) fn from_global(
107        context: HttpContext,
108        context_cell: Arc<AsyncCell<Arc<HttpContext>>>,
109        runtime: ServerType::Runtime,
110        max_connections: Option<usize>,
111        nodelay: bool,
112        register_signals: bool,
113    ) -> Self {
114        Self {
115            context,
116            context_cell,
117            runtime,
118            max_connections,
119            nodelay,
120            register_signals,
121            listeners: Vec::new(),
122            quic_listeners: Vec::new(),
123            alt_svc_pairs: Vec::new(),
124            reuseport_listeners: Vec::new(),
125            reuseport_workers: None,
126            thread_per_core: None,
127        }
128    }
129
130    /// Register a plaintext TCP listener. The address is resolved and bound immediately; an error
131    /// means the address could not be resolved or the bind failed (e.g. the port is in use).
132    /// Binding to port `0` selects an ephemeral port, which will be reflected in the bound
133    /// address reported after [`spawn`](Self::spawn).
134    pub fn bind_tcp(self, addr: impl IntoListenAddr) -> io::Result<Self> {
135        let listener = StdTcpListener::bind(addr.into_listen_addr()?)?;
136        listener.set_nonblocking(true)?;
137        Ok(self.push_listener(PreboundListener::Tcp(listener), BoxedAcceptor::new(())))
138    }
139
140    /// Register a TLS listener, terminating TLS with the provided acceptor (e.g. from
141    /// `trillium-rustls` or `trillium-native-tls`). The protocols offered (h1, h2) follow the
142    /// acceptor's ALPN configuration. Like [`bind_tcp`](Self::bind_tcp), the address is resolved
143    /// and bound immediately.
144    pub fn bind_tls<A>(self, addr: impl IntoListenAddr, acceptor: A) -> io::Result<Self>
145    where
146        A: Acceptor<ServerType::Transport>,
147    {
148        let listener = StdTcpListener::bind(addr.into_listen_addr()?)?;
149        listener.set_nonblocking(true)?;
150        Ok(self.push_listener(
151            PreboundListener::Tcp(listener),
152            BoxedAcceptor::new(acceptor),
153        ))
154    }
155
156    /// Register a plaintext TCP listener inherited from the environment as the file descriptor at
157    /// `index` (as passed by a socket-activation supervisor such as systemfd or systemd, via the
158    /// `LISTEN_FDS` protocol). The descriptor is claimed immediately; an error here means no
159    /// inherited TCP listener was present at that index.
160    ///
161    /// Unlike the single-listener [`Config`](crate::Config) path, which auto-detects `LISTEN_FD`,
162    /// this is explicit: each inherited descriptor is bound by index, so several can be adopted.
163    pub fn bind_fd(self, index: usize) -> io::Result<Self> {
164        let listener = take_inherited_fd(index)?;
165        listener.set_nonblocking(true)?;
166        Ok(self.push_listener(PreboundListener::Tcp(listener), BoxedAcceptor::new(())))
167    }
168
169    /// Register a single listener resolved from the environment, following trillium's 12-factor
170    /// conventions: `HOST` (default `localhost`) and `PORT` (default `8080`), a listener inherited
171    /// via the `LISTEN_FDS` socket-activation protocol if present, and — on unix — a `HOST`
172    /// beginning with `/`, `.`, or `~` treated as a Unix-domain-socket path (the port is ignored).
173    ///
174    /// This is the explicit, fallible equivalent of the implicit binding that
175    /// [`Config`](crate::Config) performs when no listener is configured. The listener is
176    /// plaintext; for a TLS listener resolved the same way use
177    /// [`bind_env_tls`](Self::bind_env_tls).
178    pub fn bind_env(self) -> io::Result<Self> {
179        Ok(self.push_listener(resolve_env_listener()?, BoxedAcceptor::new(())))
180    }
181
182    /// Register a single TLS listener resolved from the environment, terminating TLS with the
183    /// provided acceptor. The TLS equivalent of [`bind_env`](Self::bind_env); see it for the
184    /// resolution rules.
185    pub fn bind_env_tls<A>(self, acceptor: A) -> io::Result<Self>
186    where
187        A: Acceptor<ServerType::Transport>,
188    {
189        Ok(self.push_listener(resolve_env_listener()?, BoxedAcceptor::new(acceptor)))
190    }
191
192    /// Register a TLS listener over a TCP descriptor inherited from the environment at `index`; see
193    /// [`bind_fd`](Self::bind_fd) for how the descriptor is claimed.
194    pub fn bind_fd_tls<A>(self, index: usize, acceptor: A) -> io::Result<Self>
195    where
196        A: Acceptor<ServerType::Transport>,
197    {
198        let listener = take_inherited_fd(index)?;
199        listener.set_nonblocking(true)?;
200        Ok(self.push_listener(
201            PreboundListener::Tcp(listener),
202            BoxedAcceptor::new(acceptor),
203        ))
204    }
205
206    /// Register a plaintext listener on a Unix-domain socket at `path`. The socket is bound
207    /// immediately; an error here means the bind failed (e.g. the path already exists).
208    #[cfg(unix)]
209    pub fn bind_uds(self, path: impl AsRef<Path>) -> io::Result<Self> {
210        let listener = UnixListener::bind(path)?;
211        listener.set_nonblocking(true)?;
212        Ok(self.push_listener(PreboundListener::Unix(listener), BoxedAcceptor::new(())))
213    }
214
215    /// Register a TLS listener on a Unix-domain socket at `path`.
216    #[cfg(unix)]
217    pub fn bind_uds_tls<A>(self, path: impl AsRef<Path>, acceptor: A) -> io::Result<Self>
218    where
219        A: Acceptor<ServerType::Transport>,
220    {
221        let listener = UnixListener::bind(path)?;
222        listener.set_nonblocking(true)?;
223        Ok(self.push_listener(
224            PreboundListener::Unix(listener),
225            BoxedAcceptor::new(acceptor),
226        ))
227    }
228
229    pub(crate) fn push_listener(
230        mut self,
231        listener: PreboundListener,
232        acceptor: BoxedAcceptor<ServerType::Transport>,
233    ) -> Self {
234        self.listeners
235            .push((ListenerSource::Prebound(listener), acceptor));
236        self
237    }
238
239    /// Register a pre-claimed UDP socket and its QUIC configuration; the public entry point that
240    /// claims the socket itself is [`bind_quic`](Self::bind_quic).
241    pub(crate) fn push_quic_listener(
242        mut self,
243        socket: StdUdpSocket,
244        quic: BoxedQuicConfig<ServerType>,
245    ) -> Self {
246        self.quic_listeners.push((socket, quic));
247        self
248    }
249
250    /// Adopt a server the caller has already bound into the runtime (the multi-listener equivalent
251    /// of [`Config::with_prebound_server`](crate::Config::with_prebound_server)). Plaintext; for a
252    /// TLS-terminating prebound server use [`bind_server_tls`](Self::bind_server_tls).
253    ///
254    /// Infallible — there is no bind to fail. The adopted server's bound address is discovered by
255    /// running its [`Server::init`] hook at spawn.
256    pub fn bind_server(self, server: impl Into<ServerType>) -> Self {
257        self.bind_server_boxed(server.into(), BoxedAcceptor::new(()))
258    }
259
260    /// Adopt an already-bound server terminating TLS with the provided acceptor. The
261    /// TLS equivalent of [`bind_server`](Self::bind_server).
262    pub fn bind_server_tls<A>(self, server: impl Into<ServerType>, acceptor: A) -> Self
263    where
264        A: Acceptor<ServerType::Transport>,
265    {
266        self.bind_server_boxed(server.into(), BoxedAcceptor::new(acceptor))
267    }
268
269    /// Adopt an already-bound server with a pre-erased acceptor.
270    pub(crate) fn bind_server_boxed(
271        mut self,
272        server: ServerType,
273        acceptor: BoxedAcceptor<ServerType::Transport>,
274    ) -> Self {
275        self.listeners
276            .push((ListenerSource::Adopted(server), acceptor));
277        self
278    }
279
280    /// Register a QUIC listener for HTTP/3, using the provided QUIC configuration (e.g. from
281    /// `trillium-quinn`). The address's UDP socket is claimed immediately for fail-fast binding;
282    /// the QUIC endpoint itself is constructed inside the runtime when the server is spawned.
283    ///
284    /// `bind_quic` does not by itself cause an `alt-svc` header to be advertised on any TCP
285    /// listener. If a TCP or TLS listener is bound on the same port as this QUIC listener, the
286    /// builder will auto-pair the two and advertise `alt-svc: h3=":<port>"` on that TCP
287    /// listener's responses. Use [`with_alt_svc`](Self::with_alt_svc) to express any non-matching
288    /// pairing (e.g. h3 on a different port from the TLS port that advertises it).
289    pub fn bind_quic<Q>(mut self, addr: impl IntoListenAddr, quic: Q) -> io::Result<Self>
290    where
291        Q: QuicConfig<ServerType>,
292    {
293        let socket = StdUdpSocket::bind(addr.into_listen_addr()?)?;
294        socket.set_nonblocking(true)?;
295        self.quic_listeners
296            .push((socket, BoxedQuicConfig::new(quic)));
297        Ok(self)
298    }
299
300    /// Advertise an `alt-svc: h3=":<to>"` header on responses from the TCP/TLS listener bound to
301    /// `from`, pointing at the QUIC listener bound to `to`. May be chained for multiple
302    /// alternatives — values sharing a `from` port are merged into one header value.
303    ///
304    /// A matching same-port pair (a `bind_tcp(p)` or `bind_tls(p, _)` together with a
305    /// `bind_quic(p, _)`) is auto-advertised without an explicit call; use this method only for
306    /// pairings the builder cannot infer.
307    ///
308    /// `from` need not be a TLS port — emitting `alt-svc` from a plaintext listener is valid in
309    /// topologies where something upstream (a TLS-terminating proxy or load balancer) provides
310    /// the TLS view a client sees.
311    pub fn with_alt_svc(mut self, from: u16, to: u16) -> Self {
312        self.alt_svc_pairs.push((from, to));
313        self
314    }
315
316    /// Return a [`ServerHandle`] for this builder, usable to await startup, retrieve the bound
317    /// [`BoundInfo`](crate::BoundInfo), or initiate shutdown.
318    pub fn handle(&self) -> ServerHandle {
319        ServerHandle {
320            swansong: self.context.swansong().clone(),
321            context: self.context_cell.clone(),
322            received_context: OnceCell::new(),
323            runtime: self.runtime.clone().into(),
324        }
325    }
326
327    /// Initialize the handler once and drive every registered listener's accept loop until
328    /// shutdown. This is the appropriate entrypoint when embedding in an already-running
329    /// runtime; see [`run`](Self::run) and [`spawn`](Self::spawn) for the terminal forms.
330    pub async fn run_async(self, handler: impl Handler) {
331        let Self {
332            context,
333            context_cell,
334            runtime,
335            max_connections,
336            nodelay,
337            register_signals,
338            listeners,
339            quic_listeners,
340            alt_svc_pairs,
341            reuseport_listeners,
342            reuseport_workers,
343            thread_per_core,
344        } = self;
345
346        // Adopt every registered listener into the runtime and fully populate `Info` — primary
347        // address, public listener set, and resolved alt-svc — before any handler initialization.
348        let Resolved {
349            info,
350            standard,
351            quic,
352            alt_svc,
353            primary_is_secure,
354        } = Resolved::resolve(
355            context,
356            &runtime,
357            listeners,
358            quic_listeners,
359            &reuseport_listeners,
360            &alt_svc_pairs,
361        );
362
363        // Run `Handler::init` exactly once and produce the `Arc`-shared context + handler that
364        // every accept loop — shared-runtime, h3, and reuseport worker — drives.
365        let (context, handler, _quic, max_connections) = init_shared::<ServerType, (), _>(
366            info,
367            runtime.clone(),
368            (),
369            max_connections,
370            primary_is_secure,
371            handler,
372        )
373        .await;
374
375        context_cell.set(context.clone());
376        spawn_signals_loop(context.clone(), register_signals, runtime.clone());
377
378        let shared = Shared {
379            context,
380            handler,
381            runtime,
382            max_connections,
383            nodelay,
384            alt_svc,
385        };
386
387        // The reuseport fleet runs on its own per-core worker threads; the returned flag lets the
388        // no-TCP-listener fallback in `drive` distinguish a reuseport-only server from one with
389        // nothing bound.
390        let has_reuseport_workers = shared.spawn_reuseport_fleet_if_configured(
391            thread_per_core,
392            reuseport_listeners,
393            reuseport_workers,
394        );
395
396        shared.drive(standard, quic, has_reuseport_workers).await;
397    }
398
399    /// Spawn the server onto its runtime, returning a [`ServerHandle`] immediately.
400    pub fn spawn(self, handler: impl Handler) -> ServerHandle {
401        let handle = self.handle();
402        let runtime = self.runtime.clone();
403        runtime.spawn(self.run_async(handler));
404        handle
405    }
406
407    /// Start the runtime and block on the server until shutdown.
408    pub fn run(self, handler: impl Handler) {
409        self.runtime.clone().block_on(self.run_async(handler));
410    }
411}