trillium_tokio/
runtime.rs1use std::{future::Future, sync::Arc, time::Duration};
2use tokio::{runtime::Handle, time};
3use tokio_stream::{Stream, StreamExt, wrappers::IntervalStream};
4use trillium_server_common::{DroppableFuture, Runtime, RuntimeTrait};
5
6#[derive(Debug, Clone)]
7enum Inner {
8 AlreadyRunning(Handle),
9 Owned(Arc<tokio::runtime::Runtime>),
10}
11
12#[derive(Clone, Debug)]
14pub struct TokioRuntime(Inner);
15
16impl Default for TokioRuntime {
17 fn default() -> Self {
18 match Handle::try_current() {
19 Ok(handle) => Self(Inner::AlreadyRunning(handle)),
20 _ => Self(Inner::Owned(Arc::new(
21 tokio::runtime::Runtime::new().unwrap(),
22 ))),
23 }
24 }
25}
26
27impl RuntimeTrait for TokioRuntime {
28 fn spawn<Fut>(
29 &self,
30 fut: Fut,
31 ) -> DroppableFuture<impl Future<Output = Option<Fut::Output>> + Send + 'static>
32 where
33 Fut: Future + Send + 'static,
34 Fut::Output: Send + 'static,
35 {
36 let join_handle = match &self.0 {
37 Inner::AlreadyRunning(handle) => handle.spawn(fut),
38 Inner::Owned(runtime) => runtime.spawn(fut),
39 };
40 DroppableFuture::new(async move { join_handle.await.ok() })
41 }
42
43 async fn delay(&self, duration: Duration) {
44 time::sleep(duration).await;
45 }
46
47 fn interval(&self, period: Duration) -> impl Stream<Item = ()> + Send + 'static {
48 IntervalStream::new(time::interval(period)).map(|_| ())
49 }
50
51 fn block_on<Fut: Future>(&self, fut: Fut) -> Fut::Output {
52 match &self.0 {
53 Inner::AlreadyRunning(handle) => handle.block_on(fut),
54 Inner::Owned(runtime) => runtime.block_on(fut),
55 }
56 }
57
58 #[cfg(unix)]
59 fn hook_signals(
60 &self,
61 signals: impl IntoIterator<Item = i32>,
62 ) -> impl Stream<Item = i32> + Send + 'static {
63 signal_hook_tokio::Signals::new(signals).unwrap()
64 }
65}
66
67impl TokioRuntime {
68 pub fn spawn<Fut>(
78 &self,
79 fut: Fut,
80 ) -> DroppableFuture<impl Future<Output = Option<Fut::Output>> + Send + 'static + use<Fut>>
81 where
82 Fut: Future + Send + 'static,
83 Fut::Output: Send + 'static,
84 {
85 let join_handle = match &self.0 {
86 Inner::AlreadyRunning(handle) => handle.spawn(fut),
87 Inner::Owned(runtime) => runtime.spawn(fut),
88 };
89 DroppableFuture::new(async move { join_handle.await.ok() })
90 }
91
92 pub async fn delay(&self, duration: Duration) {
94 time::sleep(duration).await;
95 }
96
97 pub fn interval(&self, period: Duration) -> impl Stream<Item = ()> + Send + 'static + use<> {
99 IntervalStream::new(time::interval(period)).map(|_| ())
100 }
101
102 pub fn block_on<Fut: Future>(&self, fut: Fut) -> Fut::Output {
104 match &self.0 {
105 Inner::AlreadyRunning(handle) => handle.block_on(fut),
106 Inner::Owned(runtime) => runtime.block_on(fut),
107 }
108 }
109
110 pub async fn timeout<Fut>(&self, duration: Duration, fut: Fut) -> Option<Fut::Output>
112 where
113 Fut: Future + Send,
114 Fut::Output: Send + 'static,
115 {
116 RuntimeTrait::timeout(self, duration, fut).await
117 }
118}
119
120impl From<TokioRuntime> for Runtime {
121 fn from(value: TokioRuntime) -> Self {
122 Arc::new(value).into()
123 }
124}