trillium_server_common/
runtime.rs1use futures_lite::Stream;
2use std::{
3 fmt::{self, Debug, Formatter},
4 future::Future,
5 pin::Pin,
6 sync::Arc,
7 time::Duration,
8};
9
10mod droppable_future;
11pub use droppable_future::DroppableFuture;
12
13mod runtime_trait;
14pub use runtime_trait::RuntimeTrait;
15
16mod fan_out;
17pub use fan_out::FanOut;
18
19mod object_safe_runtime;
20use object_safe_runtime::ObjectSafeRuntime;
21
22#[derive(Clone)]
24pub struct Runtime(Arc<dyn ObjectSafeRuntime>);
25
26impl Debug for Runtime {
27 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
28 f.debug_tuple("Runtime").field(&format_args!("..")).finish()
29 }
30}
31
32impl<R: RuntimeTrait> From<Arc<R>> for Runtime {
33 fn from(value: Arc<R>) -> Self {
34 Self(value)
35 }
36}
37
38impl Runtime {
39 pub fn new(runtime: impl RuntimeTrait) -> Self {
41 runtime.into() }
43
44 pub fn spawn<Output: Send + 'static>(
54 &self,
55 fut: impl Future<Output = Output> + Send + 'static,
56 ) -> DroppableFuture<Pin<Box<dyn Future<Output = Option<Output>> + Send + 'static>>> {
57 let fut = RuntimeTrait::spawn(self, fut).into_inner();
58 DroppableFuture::new(Box::pin(fut))
59 }
60
61 pub async fn delay(&self, duration: Duration) {
63 RuntimeTrait::delay(self, duration).await
64 }
65
66 pub fn interval(&self, period: Duration) -> impl Stream<Item = ()> + Send + '_ {
68 RuntimeTrait::interval(self, period)
69 }
70
71 pub fn block_on<Fut>(&self, fut: Fut) -> Fut::Output
73 where
74 Fut: Future,
75 {
76 RuntimeTrait::block_on(self, fut)
77 }
78
79 pub async fn timeout<Fut>(&self, duration: Duration, fut: Fut) -> Option<Fut::Output>
81 where
82 Fut: Future + Send,
83 Fut::Output: Send + 'static,
84 {
85 RuntimeTrait::timeout(self, duration, fut).await
86 }
87}
88
89impl RuntimeTrait for Runtime {
90 async fn delay(&self, duration: Duration) {
91 self.0.delay(duration).await
92 }
93
94 fn interval(&self, period: Duration) -> impl Stream<Item = ()> + Send + 'static {
95 self.0.interval(period)
96 }
97
98 fn spawn<Fut>(
99 &self,
100 fut: Fut,
101 ) -> DroppableFuture<impl Future<Output = Option<Fut::Output>> + Send + 'static>
102 where
103 Fut: Future + Send + 'static,
104 Fut::Output: Send + 'static,
105 {
106 let (send, receive) = async_channel::bounded(1);
107 let spawn_fut = self.0.spawn(Box::pin(async move {
108 let _ = send.try_send(fut.await);
109 }));
110 DroppableFuture::new(Box::pin(async move {
111 spawn_fut.await;
112 receive.try_recv().ok()
113 }))
114 }
115
116 fn block_on<Fut>(&self, fut: Fut) -> Fut::Output
117 where
118 Fut: Future,
119 {
120 let (send, receive) = std::sync::mpsc::channel();
121 self.0.block_on(Box::pin(async move {
122 let _ = send.send(fut.await);
123 }));
124 receive.recv().unwrap()
125 }
126
127 fn hook_signals(
128 &self,
129 signals: impl IntoIterator<Item = i32>,
130 ) -> impl Stream<Item = i32> + Send + 'static {
131 self.0.hook_signals(signals.into_iter().collect())
132 }
133}