Skip to main content

trillium_channels/
channel_broadcaster.rs

1use crate::ChannelEvent;
2use async_broadcast::{InactiveReceiver, Receiver as ActiveReceiver, Sender};
3use futures_lite::Stream;
4use std::{
5    mem,
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10/// Channel-wide event broadcaster and subscriber
11///
12/// This can be cloned and stored elsewhere in an application in order to
13/// send events to connected channel clients. Retrieve a [`ChannelBroadcaster`] from a
14/// [`Channel`](crate::Channel) by calling
15/// [`Channel::broadcaster`](crate::Channel::broadcaster)
16///
17/// ChannelBroadcaster also implements [`Stream`] so that your application
18/// can listen in on ChannelEvents happening elsewhere. This might be used
19/// for spawning a task to log events, or synchronizing events between
20/// servers.
21#[derive(Clone, Debug)]
22pub struct ChannelBroadcaster {
23    sender: Sender<ChannelEvent>,
24    receiver: Receiver<ChannelEvent>,
25}
26
27#[derive(Debug)]
28enum Receiver<C> {
29    Active(ActiveReceiver<C>),
30    Inactive(InactiveReceiver<C>),
31    Activating,
32}
33
34impl<C> Clone for Receiver<C> {
35    fn clone(&self) -> Self {
36        match self {
37            Self::Active(active) => Self::Inactive(active.clone().deactivate()),
38            Self::Inactive(inactive) => Self::Inactive(inactive.clone()),
39            Self::Activating => Self::Activating, // should not be reachable
40        }
41    }
42}
43
44impl<C> Stream for Receiver<C>
45where
46    C: Clone + std::fmt::Debug,
47{
48    type Item = C;
49
50    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51        self.activate();
52
53        match &mut *self {
54            Receiver::Active(a) => Pin::new(a).poll_next(cx),
55            _ => Poll::Ready(None), // unreachable, but why panic when we can just end the stream?
56        }
57    }
58}
59
60impl<C> Receiver<C>
61where
62    C: Clone,
63{
64    fn activate(&mut self) {
65        if let Receiver::Inactive(_) = self
66            && let Receiver::Inactive(inactive) = mem::replace(self, Self::Activating)
67        {
68            *self = Receiver::Active(inactive.activate());
69        };
70    }
71}
72
73impl ChannelBroadcaster {
74    pub(crate) fn new(
75        sender: Sender<ChannelEvent>,
76        receiver: InactiveReceiver<ChannelEvent>,
77    ) -> Self {
78        Self {
79            sender,
80            receiver: Receiver::Inactive(receiver),
81        }
82    }
83
84    /// Send this ChannelEvent to all subscribed channel clients
85    pub fn broadcast(&self, event: impl Into<ChannelEvent>) {
86        // we don't care about whether there are any connected clients
87        // here, so we ignore error results.
88        self.sender.try_broadcast(event.into()).ok();
89    }
90
91    /// Returns the number of connected clients. Note that the number of
92    /// clients listening on any given channel will likely be smaller than
93    /// this, and currently that number is not available.
94    pub fn connected_clients(&self) -> usize {
95        self.sender.receiver_count()
96    }
97}
98
99impl Stream for ChannelBroadcaster {
100    type Item = ChannelEvent;
101
102    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
103        Pin::new(&mut self.receiver).poll_next(cx)
104    }
105}