Skip to main content

trillium_tokio/
runtime.rs

1use std::{future::Future, sync::Arc, time::Duration};
2use tokio::{runtime::Handle, time};
3use tokio_stream::{Stream, StreamExt, wrappers::IntervalStream};
4use trillium_server_common::{DroppableFuture, Runtime, RuntimeTrait};
5
6#[derive(Debug, Clone)]
7enum Inner {
8    AlreadyRunning(Handle),
9    Owned(Arc<tokio::runtime::Runtime>),
10}
11
12/// tokio runtime
13#[derive(Clone, Debug)]
14pub struct TokioRuntime(Inner);
15
16impl Default for TokioRuntime {
17    fn default() -> Self {
18        match Handle::try_current() {
19            Ok(handle) => Self(Inner::AlreadyRunning(handle)),
20            _ => Self(Inner::Owned(Arc::new(
21                tokio::runtime::Runtime::new().unwrap(),
22            ))),
23        }
24    }
25}
26
27impl RuntimeTrait for TokioRuntime {
28    fn spawn<Fut>(
29        &self,
30        fut: Fut,
31    ) -> DroppableFuture<impl Future<Output = Option<Fut::Output>> + Send + 'static>
32    where
33        Fut: Future + Send + 'static,
34        Fut::Output: Send + 'static,
35    {
36        let join_handle = match &self.0 {
37            Inner::AlreadyRunning(handle) => handle.spawn(fut),
38            Inner::Owned(runtime) => runtime.spawn(fut),
39        };
40        DroppableFuture::new(async move { join_handle.await.ok() })
41    }
42
43    async fn delay(&self, duration: Duration) {
44        time::sleep(duration).await;
45    }
46
47    fn interval(&self, period: Duration) -> impl Stream<Item = ()> + Send + 'static {
48        IntervalStream::new(time::interval(period)).map(|_| ())
49    }
50
51    fn block_on<Fut: Future>(&self, fut: Fut) -> Fut::Output {
52        match &self.0 {
53            Inner::AlreadyRunning(handle) => handle.block_on(fut),
54            Inner::Owned(runtime) => runtime.block_on(fut),
55        }
56    }
57
58    #[cfg(unix)]
59    fn hook_signals(
60        &self,
61        signals: impl IntoIterator<Item = i32>,
62    ) -> impl Stream<Item = i32> + Send + 'static {
63        signal_hook_tokio::Signals::new(signals).unwrap()
64    }
65}
66
67impl TokioRuntime {
68    /// Spawn a future on the runtime, returning a future that has detach-on-drop semantics
69    ///
70    /// Spawned tasks conform to the following behavior:
71    ///
72    /// * detach on drop: If the returned [`DroppableFuture`] is dropped immediately, the task will
73    ///   continue to execute until completion.
74    ///
75    /// * unwinding: If the spawned future panics, this must not propagate to the join handle.
76    ///   Instead, the awaiting the join handle returns None in case of panic.
77    pub fn spawn<Fut>(
78        &self,
79        fut: Fut,
80    ) -> DroppableFuture<impl Future<Output = Option<Fut::Output>> + Send + 'static + use<Fut>>
81    where
82        Fut: Future + Send + 'static,
83        Fut::Output: Send + 'static,
84    {
85        let join_handle = match &self.0 {
86            Inner::AlreadyRunning(handle) => handle.spawn(fut),
87            Inner::Owned(runtime) => runtime.spawn(fut),
88        };
89        DroppableFuture::new(async move { join_handle.await.ok() })
90    }
91
92    /// wake in this amount of wall time
93    pub async fn delay(&self, duration: Duration) {
94        time::sleep(duration).await;
95    }
96
97    /// Returns a [`Stream`] that yields a `()` on the provided period
98    pub fn interval(&self, period: Duration) -> impl Stream<Item = ()> + Send + 'static + use<> {
99        IntervalStream::new(time::interval(period)).map(|_| ())
100    }
101
102    /// Runtime implementation hook for blocking on a top level future.
103    pub fn block_on<Fut: Future>(&self, fut: Fut) -> Fut::Output {
104        match &self.0 {
105            Inner::AlreadyRunning(handle) => handle.block_on(fut),
106            Inner::Owned(runtime) => runtime.block_on(fut),
107        }
108    }
109
110    /// Race a future against the provided duration, returning None in case of timeout.
111    pub async fn timeout<Fut>(&self, duration: Duration, fut: Fut) -> Option<Fut::Output>
112    where
113        Fut: Future + Send,
114        Fut::Output: Send + 'static,
115    {
116        RuntimeTrait::timeout(self, duration, fut).await
117    }
118}
119
120impl From<TokioRuntime> for Runtime {
121    fn from(value: TokioRuntime) -> Self {
122        Arc::new(value).into()
123    }
124}