trillium_server_common/listener_config.rs
1use crate::{
2 Acceptor, BoxedAcceptor, BoxedQuicConfig, QuicConfig, RuntimeTrait, Server, ServerHandle,
3 config::{init_shared, spawn_signals_loop},
4 server::PreboundListener,
5};
6use async_cell::sync::AsyncCell;
7use std::{
8 cell::OnceCell,
9 fmt::{self, Debug, Formatter},
10 future::Future,
11 io,
12 net::{SocketAddr, TcpListener as StdTcpListener, UdpSocket as StdUdpSocket},
13 pin::Pin,
14 sync::Arc,
15 thread::JoinHandle,
16};
17#[cfg(unix)]
18use std::{os::unix::net::UnixListener, path::Path};
19use trillium::Handler;
20use trillium_http::HttpContext;
21
22mod helpers;
23mod into_listen_addr;
24#[cfg(reuseport)]
25mod reuseport;
26mod run;
27
28use helpers::{resolve_env_listener, take_inherited_fd};
29pub use into_listen_addr::IntoListenAddr;
30use run::{Resolved, Shared};
31
32/// A worker body, type-erased over the handler, that builds and drives a reuseport listener group's
33/// accept loops for one worker index. Built inside [`ListenerConfig::run_async`] (where the handler
34/// type is known) and invoked once per worker thread.
35type BoxedWorker = Arc<dyn Fn(usize) -> Pin<Box<dyn Future<Output = ()>>> + Send + Sync>;
36
37/// A monomorphized [`FanOut::thread_per_core`](crate::FanOut::thread_per_core) call for a concrete
38/// runtime, erased to a function pointer. Stored on the builder only when a `bind_reuseport_*`
39/// method runs, so the `FanOut` bound is discharged at the bind site while `run_async` stays free
40/// of it (the handler type isn't known until `spawn`).
41type ThreadPerCoreInvoker<R> = fn(&R, usize, BoxedWorker) -> Vec<JoinHandle<()>>;
42
43/// A registered standard listener, before its accept loop is driven. Either a [`PreboundListener`]
44/// the builder claimed itself (from `bind_tcp`/`bind_tls`/`bind_fd`/`bind_uds`/`bind_env`) and will
45/// adopt into the runtime at spawn, or a [`Server`] the caller already bound and handed over via
46/// [`bind_server`](ListenerConfig::bind_server) (the bridge for [`Config::with_prebound_server`]).
47pub(super) enum ListenerSource<ServerType> {
48 Prebound(PreboundListener),
49 Adopted(ServerType),
50}
51
52/// A reuseport listener registered on the builder: its resolved address (the group's shared port),
53/// the first group member claimed eagerly at bind time, and its erased acceptor.
54type ReuseportListener<ServerType> = (
55 SocketAddr,
56 StdTcpListener,
57 BoxedAcceptor<<ServerType as Server>::Transport>,
58);
59
60/// An advanced listener builder.
61///
62/// Holds the inputs shared across every listener — handler (supplied at [`spawn`](Self::spawn)),
63/// swansong, shared state, HTTP config — and a set of registered listeners. Each `bind_*` method
64/// claims its address eagerly (failing fast) and the listeners are adopted into the runtime when
65/// the server is spawned, after [`Handler::init`] has run exactly once across all of them.
66///
67/// Construct one from a [`Config`](crate::Config) with
68/// [`Config::listeners`](crate::Config::listeners): the global server configuration (swansong,
69/// shared state, HTTP config, nodelay, max-connections, signals) is set on the `Config`, then
70/// carried over; the builder adds listener topology.
71pub struct ListenerConfig<ServerType: Server> {
72 context: HttpContext,
73 context_cell: Arc<AsyncCell<Arc<HttpContext>>>,
74 runtime: ServerType::Runtime,
75 max_connections: Option<usize>,
76 nodelay: bool,
77 register_signals: bool,
78 listeners: Vec<(
79 ListenerSource<ServerType>,
80 BoxedAcceptor<ServerType::Transport>,
81 )>,
82 quic_listeners: Vec<(StdUdpSocket, BoxedQuicConfig<ServerType>)>,
83 alt_svc_pairs: Vec<(u16, u16)>,
84 reuseport_listeners: Vec<ReuseportListener<ServerType>>,
85 reuseport_workers: Option<usize>,
86 thread_per_core: Option<ThreadPerCoreInvoker<ServerType::Runtime>>,
87}
88
89impl<ServerType: Server> Debug for ListenerConfig<ServerType> {
90 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
91 f.debug_struct("ListenerConfig")
92 .field("max_connections", &self.max_connections)
93 .field("nodelay", &self.nodelay)
94 .field("register_signals", &self.register_signals)
95 .field("listeners", &self.listeners.len())
96 .field("quic_listeners", &self.quic_listeners)
97 .field("alt_svc_pairs", &self.alt_svc_pairs)
98 .field("reuseport_listeners", &self.reuseport_listeners)
99 .field("reuseport_workers", &self.reuseport_workers)
100 .finish_non_exhaustive()
101 }
102}
103
104impl<ServerType: Server> ListenerConfig<ServerType> {
105 /// Construct a builder carrying the given global server configuration and no listeners.
106 pub(crate) fn from_global(
107 context: HttpContext,
108 context_cell: Arc<AsyncCell<Arc<HttpContext>>>,
109 runtime: ServerType::Runtime,
110 max_connections: Option<usize>,
111 nodelay: bool,
112 register_signals: bool,
113 ) -> Self {
114 Self {
115 context,
116 context_cell,
117 runtime,
118 max_connections,
119 nodelay,
120 register_signals,
121 listeners: Vec::new(),
122 quic_listeners: Vec::new(),
123 alt_svc_pairs: Vec::new(),
124 reuseport_listeners: Vec::new(),
125 reuseport_workers: None,
126 thread_per_core: None,
127 }
128 }
129
130 /// Register a plaintext TCP listener. The address is resolved and bound immediately; an error
131 /// means the address could not be resolved or the bind failed (e.g. the port is in use).
132 /// Binding to port `0` selects an ephemeral port, which will be reflected in the bound
133 /// address reported after [`spawn`](Self::spawn).
134 pub fn bind_tcp(self, addr: impl IntoListenAddr) -> io::Result<Self> {
135 let listener = StdTcpListener::bind(addr.into_listen_addr()?)?;
136 listener.set_nonblocking(true)?;
137 Ok(self.push_listener(PreboundListener::Tcp(listener), BoxedAcceptor::new(())))
138 }
139
140 /// Register a TLS listener, terminating TLS with the provided acceptor (e.g. from
141 /// `trillium-rustls` or `trillium-native-tls`). The protocols offered (h1, h2) follow the
142 /// acceptor's ALPN configuration. Like [`bind_tcp`](Self::bind_tcp), the address is resolved
143 /// and bound immediately.
144 pub fn bind_tls<A>(self, addr: impl IntoListenAddr, acceptor: A) -> io::Result<Self>
145 where
146 A: Acceptor<ServerType::Transport>,
147 {
148 let listener = StdTcpListener::bind(addr.into_listen_addr()?)?;
149 listener.set_nonblocking(true)?;
150 Ok(self.push_listener(
151 PreboundListener::Tcp(listener),
152 BoxedAcceptor::new(acceptor),
153 ))
154 }
155
156 /// Register a plaintext TCP listener inherited from the environment as the file descriptor at
157 /// `index` (as passed by a socket-activation supervisor such as systemfd or systemd, via the
158 /// `LISTEN_FDS` protocol). The descriptor is claimed immediately; an error here means no
159 /// inherited TCP listener was present at that index.
160 ///
161 /// Unlike the single-listener [`Config`](crate::Config) path, which auto-detects `LISTEN_FD`,
162 /// this is explicit: each inherited descriptor is bound by index, so several can be adopted.
163 pub fn bind_fd(self, index: usize) -> io::Result<Self> {
164 let listener = take_inherited_fd(index)?;
165 listener.set_nonblocking(true)?;
166 Ok(self.push_listener(PreboundListener::Tcp(listener), BoxedAcceptor::new(())))
167 }
168
169 /// Register a single listener resolved from the environment, following trillium's 12-factor
170 /// conventions: `HOST` (default `localhost`) and `PORT` (default `8080`), a listener inherited
171 /// via the `LISTEN_FDS` socket-activation protocol if present, and — on unix — a `HOST`
172 /// beginning with `/`, `.`, or `~` treated as a Unix-domain-socket path (the port is ignored).
173 ///
174 /// This is the explicit, fallible equivalent of the implicit binding that
175 /// [`Config`](crate::Config) performs when no listener is configured. The listener is
176 /// plaintext; for a TLS listener resolved the same way use
177 /// [`bind_env_tls`](Self::bind_env_tls).
178 pub fn bind_env(self) -> io::Result<Self> {
179 Ok(self.push_listener(resolve_env_listener()?, BoxedAcceptor::new(())))
180 }
181
182 /// Register a single TLS listener resolved from the environment, terminating TLS with the
183 /// provided acceptor. The TLS equivalent of [`bind_env`](Self::bind_env); see it for the
184 /// resolution rules.
185 pub fn bind_env_tls<A>(self, acceptor: A) -> io::Result<Self>
186 where
187 A: Acceptor<ServerType::Transport>,
188 {
189 Ok(self.push_listener(resolve_env_listener()?, BoxedAcceptor::new(acceptor)))
190 }
191
192 /// Register a TLS listener over a TCP descriptor inherited from the environment at `index`; see
193 /// [`bind_fd`](Self::bind_fd) for how the descriptor is claimed.
194 pub fn bind_fd_tls<A>(self, index: usize, acceptor: A) -> io::Result<Self>
195 where
196 A: Acceptor<ServerType::Transport>,
197 {
198 let listener = take_inherited_fd(index)?;
199 listener.set_nonblocking(true)?;
200 Ok(self.push_listener(
201 PreboundListener::Tcp(listener),
202 BoxedAcceptor::new(acceptor),
203 ))
204 }
205
206 /// Register a plaintext listener on a Unix-domain socket at `path`. The socket is bound
207 /// immediately; an error here means the bind failed (e.g. the path already exists).
208 #[cfg(unix)]
209 pub fn bind_uds(self, path: impl AsRef<Path>) -> io::Result<Self> {
210 let listener = UnixListener::bind(path)?;
211 listener.set_nonblocking(true)?;
212 Ok(self.push_listener(PreboundListener::Unix(listener), BoxedAcceptor::new(())))
213 }
214
215 /// Register a TLS listener on a Unix-domain socket at `path`.
216 #[cfg(unix)]
217 pub fn bind_uds_tls<A>(self, path: impl AsRef<Path>, acceptor: A) -> io::Result<Self>
218 where
219 A: Acceptor<ServerType::Transport>,
220 {
221 let listener = UnixListener::bind(path)?;
222 listener.set_nonblocking(true)?;
223 Ok(self.push_listener(
224 PreboundListener::Unix(listener),
225 BoxedAcceptor::new(acceptor),
226 ))
227 }
228
229 pub(crate) fn push_listener(
230 mut self,
231 listener: PreboundListener,
232 acceptor: BoxedAcceptor<ServerType::Transport>,
233 ) -> Self {
234 self.listeners
235 .push((ListenerSource::Prebound(listener), acceptor));
236 self
237 }
238
239 /// Register a pre-claimed UDP socket and its QUIC configuration; the public entry point that
240 /// claims the socket itself is [`bind_quic`](Self::bind_quic).
241 pub(crate) fn push_quic_listener(
242 mut self,
243 socket: StdUdpSocket,
244 quic: BoxedQuicConfig<ServerType>,
245 ) -> Self {
246 self.quic_listeners.push((socket, quic));
247 self
248 }
249
250 /// Adopt a server the caller has already bound into the runtime (the multi-listener equivalent
251 /// of [`Config::with_prebound_server`](crate::Config::with_prebound_server)). Plaintext; for a
252 /// TLS-terminating prebound server use [`bind_server_tls`](Self::bind_server_tls).
253 ///
254 /// Infallible — there is no bind to fail. The adopted server's bound address is discovered by
255 /// running its [`Server::init`] hook at spawn.
256 pub fn bind_server(self, server: impl Into<ServerType>) -> Self {
257 self.bind_server_boxed(server.into(), BoxedAcceptor::new(()))
258 }
259
260 /// Adopt an already-bound server terminating TLS with the provided acceptor. The
261 /// TLS equivalent of [`bind_server`](Self::bind_server).
262 pub fn bind_server_tls<A>(self, server: impl Into<ServerType>, acceptor: A) -> Self
263 where
264 A: Acceptor<ServerType::Transport>,
265 {
266 self.bind_server_boxed(server.into(), BoxedAcceptor::new(acceptor))
267 }
268
269 /// Adopt an already-bound server with a pre-erased acceptor.
270 pub(crate) fn bind_server_boxed(
271 mut self,
272 server: ServerType,
273 acceptor: BoxedAcceptor<ServerType::Transport>,
274 ) -> Self {
275 self.listeners
276 .push((ListenerSource::Adopted(server), acceptor));
277 self
278 }
279
280 /// Register a QUIC listener for HTTP/3, using the provided QUIC configuration (e.g. from
281 /// `trillium-quinn`). The address's UDP socket is claimed immediately for fail-fast binding;
282 /// the QUIC endpoint itself is constructed inside the runtime when the server is spawned.
283 ///
284 /// `bind_quic` does not by itself cause an `alt-svc` header to be advertised on any TCP
285 /// listener. If a TCP or TLS listener is bound on the same port as this QUIC listener, the
286 /// builder will auto-pair the two and advertise `alt-svc: h3=":<port>"` on that TCP
287 /// listener's responses. Use [`with_alt_svc`](Self::with_alt_svc) to express any non-matching
288 /// pairing (e.g. h3 on a different port from the TLS port that advertises it).
289 pub fn bind_quic<Q>(mut self, addr: impl IntoListenAddr, quic: Q) -> io::Result<Self>
290 where
291 Q: QuicConfig<ServerType>,
292 {
293 let socket = StdUdpSocket::bind(addr.into_listen_addr()?)?;
294 socket.set_nonblocking(true)?;
295 self.quic_listeners
296 .push((socket, BoxedQuicConfig::new(quic)));
297 Ok(self)
298 }
299
300 /// Advertise an `alt-svc: h3=":<to>"` header on responses from the TCP/TLS listener bound to
301 /// `from`, pointing at the QUIC listener bound to `to`. May be chained for multiple
302 /// alternatives — values sharing a `from` port are merged into one header value.
303 ///
304 /// A matching same-port pair (a `bind_tcp(p)` or `bind_tls(p, _)` together with a
305 /// `bind_quic(p, _)`) is auto-advertised without an explicit call; use this method only for
306 /// pairings the builder cannot infer.
307 ///
308 /// `from` need not be a TLS port — emitting `alt-svc` from a plaintext listener is valid in
309 /// topologies where something upstream (a TLS-terminating proxy or load balancer) provides
310 /// the TLS view a client sees.
311 pub fn with_alt_svc(mut self, from: u16, to: u16) -> Self {
312 self.alt_svc_pairs.push((from, to));
313 self
314 }
315
316 /// Return a [`ServerHandle`] for this builder, usable to await startup, retrieve the bound
317 /// [`BoundInfo`](crate::BoundInfo), or initiate shutdown.
318 pub fn handle(&self) -> ServerHandle {
319 ServerHandle {
320 swansong: self.context.swansong().clone(),
321 context: self.context_cell.clone(),
322 received_context: OnceCell::new(),
323 runtime: self.runtime.clone().into(),
324 }
325 }
326
327 /// Initialize the handler once and drive every registered listener's accept loop until
328 /// shutdown. This is the appropriate entrypoint when embedding in an already-running
329 /// runtime; see [`run`](Self::run) and [`spawn`](Self::spawn) for the terminal forms.
330 pub async fn run_async(self, handler: impl Handler) {
331 let Self {
332 context,
333 context_cell,
334 runtime,
335 max_connections,
336 nodelay,
337 register_signals,
338 listeners,
339 quic_listeners,
340 alt_svc_pairs,
341 reuseport_listeners,
342 reuseport_workers,
343 thread_per_core,
344 } = self;
345
346 // Adopt every registered listener into the runtime and fully populate `Info` — primary
347 // address, public listener set, and resolved alt-svc — before any handler initialization.
348 let Resolved {
349 info,
350 standard,
351 quic,
352 alt_svc,
353 primary_is_secure,
354 } = Resolved::resolve(
355 context,
356 &runtime,
357 listeners,
358 quic_listeners,
359 &reuseport_listeners,
360 &alt_svc_pairs,
361 );
362
363 // Run `Handler::init` exactly once and produce the `Arc`-shared context + handler that
364 // every accept loop — shared-runtime, h3, and reuseport worker — drives.
365 let (context, handler, _quic, max_connections) = init_shared::<ServerType, (), _>(
366 info,
367 runtime.clone(),
368 (),
369 max_connections,
370 primary_is_secure,
371 handler,
372 )
373 .await;
374
375 context_cell.set(context.clone());
376 spawn_signals_loop(context.clone(), register_signals, runtime.clone());
377
378 let shared = Shared {
379 context,
380 handler,
381 runtime,
382 max_connections,
383 nodelay,
384 alt_svc,
385 };
386
387 // The reuseport fleet runs on its own per-core worker threads; the returned flag lets the
388 // no-TCP-listener fallback in `drive` distinguish a reuseport-only server from one with
389 // nothing bound.
390 let has_reuseport_workers = shared.spawn_reuseport_fleet_if_configured(
391 thread_per_core,
392 reuseport_listeners,
393 reuseport_workers,
394 );
395
396 shared.drive(standard, quic, has_reuseport_workers).await;
397 }
398
399 /// Spawn the server onto its runtime, returning a [`ServerHandle`] immediately.
400 pub fn spawn(self, handler: impl Handler) -> ServerHandle {
401 let handle = self.handle();
402 let runtime = self.runtime.clone();
403 runtime.spawn(self.run_async(handler));
404 handle
405 }
406
407 /// Start the runtime and block on the server until shutdown.
408 pub fn run(self, handler: impl Handler) {
409 self.runtime.clone().block_on(self.run_async(handler));
410 }
411}