Skip to main content

trillium_smol/
runtime.rs

1use async_executor::StaticLocalExecutor;
2use async_io::Timer;
3use async_task::Task;
4use futures_lite::{FutureExt, Stream, StreamExt};
5use std::{
6    cell::Cell,
7    future::Future,
8    pin::Pin,
9    sync::Arc,
10    task::{Context, Poll},
11    time::Duration,
12};
13use trillium_server_common::{DroppableFuture, Runtime, RuntimeTrait};
14
15thread_local! {
16    /// When a reuseport worker thread installs its own single-threaded executor here, spawns from
17    /// that thread go to it instead of the global executor, keeping each connection's work on the
18    /// core the kernel delivered it to. Empty on every other thread, where spawns fall through to
19    /// the multi-threaded global executor.
20    static WORKER_EXECUTOR: Cell<Option<&'static StaticLocalExecutor>> = const { Cell::new(None) };
21}
22
23fn spawn_on_current_executor<Fut>(fut: Fut) -> Task<Fut::Output>
24where
25    Fut: Future + Send + 'static,
26    Fut::Output: Send + 'static,
27{
28    match WORKER_EXECUTOR.get() {
29        Some(executor) => executor.spawn(fut),
30        None => async_global_executor::spawn(fut),
31    }
32}
33
34/// Drive `fut` to completion on a fresh single-threaded executor installed as this thread's
35/// [`WORKER_EXECUTOR`], so spawns issued from `fut` and its descendants stay on this thread.
36///
37/// The executor is leaked: one per reuseport worker thread, living until process shutdown, which is
38/// what the worker fleet wants and what [`StaticLocalExecutor`] is optimized for.
39#[cfg(all(
40    feature = "reuseport",
41    unix,
42    not(target_os = "solaris"),
43    not(target_os = "illumos"),
44    not(target_os = "cygwin"),
45    not(target_vendor = "apple")
46))]
47pub(crate) fn block_on_worker(fut: impl Future<Output = ()>) {
48    let executor: &'static StaticLocalExecutor = async_executor::LocalExecutor::new().leak();
49    WORKER_EXECUTOR.set(Some(executor));
50    async_io::block_on(executor.run(fut));
51}
52
53/// Runtime for Smol
54#[derive(Debug, Clone, Copy, Default)]
55pub struct SmolRuntime(());
56
57struct DetachOnDrop<Output>(Option<Task<Output>>);
58impl<Output> Future for DetachOnDrop<Output> {
59    type Output = Output;
60
61    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
62        Pin::new(self.0.as_mut().unwrap()).poll(cx)
63    }
64}
65
66impl<Output> Drop for DetachOnDrop<Output> {
67    fn drop(&mut self) {
68        if let Some(task) = self.0.take() {
69            task.detach();
70        }
71    }
72}
73
74impl RuntimeTrait for SmolRuntime {
75    fn spawn<Fut>(
76        &self,
77        fut: Fut,
78    ) -> DroppableFuture<impl Future<Output = Option<Fut::Output>> + Send + 'static>
79    where
80        Fut: Future + Send + 'static,
81        Fut::Output: Send + 'static,
82    {
83        let join_handle = DetachOnDrop(Some(spawn_on_current_executor(fut)));
84        DroppableFuture::new(async move { join_handle.catch_unwind().await.ok() })
85    }
86
87    async fn delay(&self, duration: Duration) {
88        Timer::after(duration).await;
89    }
90
91    fn interval(&self, period: Duration) -> impl Stream<Item = ()> + Send + 'static {
92        Timer::interval(period).map(|_| ())
93    }
94
95    fn block_on<Fut: Future>(&self, fut: Fut) -> Fut::Output {
96        async_global_executor::block_on(fut)
97    }
98
99    #[cfg(unix)]
100    fn hook_signals(
101        &self,
102        signals: impl IntoIterator<Item = i32>,
103    ) -> impl Stream<Item = i32> + Send + 'static {
104        use async_signal::{Signal, Signals};
105        use signal_hook::consts::signal::*;
106        let signals: Vec<Signal> = signals
107            .into_iter()
108            .filter_map(|n| match n {
109                SIGHUP => Some(Signal::Hup),
110                SIGINT => Some(Signal::Int),
111                SIGQUIT => Some(Signal::Quit),
112                SIGILL => Some(Signal::Ill),
113                SIGTRAP => Some(Signal::Trap),
114                SIGABRT => Some(Signal::Abort),
115                SIGBUS => Some(Signal::Bus),
116                SIGFPE => Some(Signal::Fpe),
117                SIGKILL => Some(Signal::Kill),
118                SIGUSR1 => Some(Signal::Usr1),
119                SIGSEGV => Some(Signal::Segv),
120                SIGUSR2 => Some(Signal::Usr2),
121                SIGPIPE => Some(Signal::Pipe),
122                SIGALRM => Some(Signal::Alarm),
123                SIGTERM => Some(Signal::Term),
124                SIGCHLD => Some(Signal::Child),
125                SIGCONT => Some(Signal::Cont),
126                SIGSTOP => Some(Signal::Stop),
127                SIGTSTP => Some(Signal::Tstp),
128                SIGTTIN => Some(Signal::Ttin),
129                SIGTTOU => Some(Signal::Ttou),
130                SIGURG => Some(Signal::Urg),
131                SIGXCPU => Some(Signal::Xcpu),
132                SIGXFSZ => Some(Signal::Xfsz),
133                SIGVTALRM => Some(Signal::Vtalarm),
134                SIGPROF => Some(Signal::Prof),
135                SIGWINCH => Some(Signal::Winch),
136                SIGIO => Some(Signal::Io),
137                SIGSYS => Some(Signal::Sys),
138                _ => None,
139            })
140            .collect();
141        Signals::new(signals)
142            .unwrap()
143            .filter_map(|r| r.ok().map(|s| s as i32))
144    }
145}
146
147impl SmolRuntime {
148    /// Spawn a future on the runtime, returning a future that has detach-on-drop semantics
149    ///
150    /// Spawned tasks conform to the following behavior:
151    ///
152    /// * detach on drop: If the returned [`DroppableFuture`] is dropped immediately, the task will
153    ///   continue to execute until completion.
154    ///
155    /// * unwinding: If the spawned future panics, this must not propagate to the join handle.
156    ///   Instead, the awaiting the join handle returns None in case of panic.
157    pub fn spawn<Fut>(
158        &self,
159        fut: Fut,
160    ) -> DroppableFuture<impl Future<Output = Option<Fut::Output>> + Send + 'static + use<Fut>>
161    where
162        Fut: Future + Send + 'static,
163        Fut::Output: Send + 'static,
164    {
165        let join_handle = DetachOnDrop(Some(spawn_on_current_executor(fut)));
166        DroppableFuture::new(async move { join_handle.catch_unwind().await.ok() })
167    }
168
169    /// Wake in this amount of wall time
170    pub async fn delay(&self, duration: Duration) {
171        Timer::after(duration).await;
172    }
173
174    /// Returns a [`Stream`] that yields a `()` on the provided period
175    pub fn interval(&self, period: Duration) -> impl Stream<Item = ()> + Send + 'static + use<> {
176        Timer::interval(period).map(|_| ())
177    }
178
179    /// Runtime implementation hook for blocking on a top level future.
180    pub fn block_on<Fut: Future>(&self, fut: Fut) -> Fut::Output {
181        RuntimeTrait::block_on(self, fut)
182    }
183
184    /// Race a future against the provided duration, returning None in case of timeout.
185    pub async fn timeout<Fut>(&self, duration: Duration, fut: Fut) -> Option<Fut::Output>
186    where
187        Fut: Future + Send,
188        Fut::Output: Send + 'static,
189    {
190        RuntimeTrait::timeout(self, duration, fut).await
191    }
192}
193
194impl From<SmolRuntime> for Runtime {
195    fn from(value: SmolRuntime) -> Self {
196        Arc::new(value).into()
197    }
198}