Skip to main content

trillium_server_common/
runtime.rs

1use futures_lite::Stream;
2use std::{
3    fmt::{self, Debug, Formatter},
4    future::Future,
5    pin::Pin,
6    sync::Arc,
7    time::Duration,
8};
9
10mod droppable_future;
11pub use droppable_future::DroppableFuture;
12
13mod runtime_trait;
14pub use runtime_trait::RuntimeTrait;
15
16mod fan_out;
17pub use fan_out::FanOut;
18
19mod object_safe_runtime;
20use object_safe_runtime::ObjectSafeRuntime;
21
22/// A type-erased [`RuntimeTrait`] implementation. Think of this as an `Arc<dyn RuntimeTrait>`
23#[derive(Clone)]
24pub struct Runtime(Arc<dyn ObjectSafeRuntime>);
25
26impl Debug for Runtime {
27    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
28        f.debug_tuple("Runtime").field(&format_args!("..")).finish()
29    }
30}
31
32impl<R: RuntimeTrait> From<Arc<R>> for Runtime {
33    fn from(value: Arc<R>) -> Self {
34        Self(value)
35    }
36}
37
38impl Runtime {
39    /// Construct a new type-erased runtime object from any [`RuntimeTrait`] implementation.
40    pub fn new(runtime: impl RuntimeTrait) -> Self {
41        runtime.into() // we avoid re-arcing a Runtime by using Into::into
42    }
43
44    /// Spawn a future on the runtime, returning a future that has detach-on-drop semantics
45    ///
46    /// Spawned tasks conform to the following behavior:
47    ///
48    /// * detach on drop: If the returned [`DroppableFuture`] is dropped immediately, the task will
49    ///   continue to execute until completion.
50    ///
51    /// * unwinding: If the spawned future panics, this must not propagate to the join handle.
52    ///   Instead, the awaiting the join handle returns None in case of panic.
53    pub fn spawn<Output: Send + 'static>(
54        &self,
55        fut: impl Future<Output = Output> + Send + 'static,
56    ) -> DroppableFuture<Pin<Box<dyn Future<Output = Option<Output>> + Send + 'static>>> {
57        let fut = RuntimeTrait::spawn(self, fut).into_inner();
58        DroppableFuture::new(Box::pin(fut))
59    }
60
61    /// Wake in this amount of wall time
62    pub async fn delay(&self, duration: Duration) {
63        RuntimeTrait::delay(self, duration).await
64    }
65
66    /// Returns a [`Stream`] that yields a `()` on the provided period
67    pub fn interval(&self, period: Duration) -> impl Stream<Item = ()> + Send + '_ {
68        RuntimeTrait::interval(self, period)
69    }
70
71    /// Runtime implementation hook for blocking on a top level future.
72    pub fn block_on<Fut>(&self, fut: Fut) -> Fut::Output
73    where
74        Fut: Future,
75    {
76        RuntimeTrait::block_on(self, fut)
77    }
78
79    /// Race a future against the provided duration, returning None in case of timeout.
80    pub async fn timeout<Fut>(&self, duration: Duration, fut: Fut) -> Option<Fut::Output>
81    where
82        Fut: Future + Send,
83        Fut::Output: Send + 'static,
84    {
85        RuntimeTrait::timeout(self, duration, fut).await
86    }
87}
88
89impl RuntimeTrait for Runtime {
90    async fn delay(&self, duration: Duration) {
91        self.0.delay(duration).await
92    }
93
94    fn interval(&self, period: Duration) -> impl Stream<Item = ()> + Send + 'static {
95        self.0.interval(period)
96    }
97
98    fn spawn<Fut>(
99        &self,
100        fut: Fut,
101    ) -> DroppableFuture<impl Future<Output = Option<Fut::Output>> + Send + 'static>
102    where
103        Fut: Future + Send + 'static,
104        Fut::Output: Send + 'static,
105    {
106        let (send, receive) = async_channel::bounded(1);
107        let spawn_fut = self.0.spawn(Box::pin(async move {
108            let _ = send.try_send(fut.await);
109        }));
110        DroppableFuture::new(Box::pin(async move {
111            spawn_fut.await;
112            receive.try_recv().ok()
113        }))
114    }
115
116    fn block_on<Fut>(&self, fut: Fut) -> Fut::Output
117    where
118        Fut: Future,
119    {
120        let (send, receive) = std::sync::mpsc::channel();
121        self.0.block_on(Box::pin(async move {
122            let _ = send.send(fut.await);
123        }));
124        receive.recv().unwrap()
125    }
126
127    fn hook_signals(
128        &self,
129        signals: impl IntoIterator<Item = i32>,
130    ) -> impl Stream<Item = i32> + Send + 'static {
131        self.0.hook_signals(signals.into_iter().collect())
132    }
133}