trillium_channels/
channel_broadcaster.rs1use crate::ChannelEvent;
2use async_broadcast::{InactiveReceiver, Receiver as ActiveReceiver, Sender};
3use futures_lite::Stream;
4use std::{
5 mem,
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10#[derive(Clone, Debug)]
22pub struct ChannelBroadcaster {
23 sender: Sender<ChannelEvent>,
24 receiver: Receiver<ChannelEvent>,
25}
26
27#[derive(Debug)]
28enum Receiver<C> {
29 Active(ActiveReceiver<C>),
30 Inactive(InactiveReceiver<C>),
31 Activating,
32}
33
34impl<C> Clone for Receiver<C> {
35 fn clone(&self) -> Self {
36 match self {
37 Self::Active(active) => Self::Inactive(active.clone().deactivate()),
38 Self::Inactive(inactive) => Self::Inactive(inactive.clone()),
39 Self::Activating => Self::Activating, }
41 }
42}
43
44impl<C> Stream for Receiver<C>
45where
46 C: Clone + std::fmt::Debug,
47{
48 type Item = C;
49
50 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51 self.activate();
52
53 match &mut *self {
54 Receiver::Active(a) => Pin::new(a).poll_next(cx),
55 _ => Poll::Ready(None), }
57 }
58}
59
60impl<C> Receiver<C>
61where
62 C: Clone,
63{
64 fn activate(&mut self) {
65 if let Receiver::Inactive(_) = self
66 && let Receiver::Inactive(inactive) = mem::replace(self, Self::Activating)
67 {
68 *self = Receiver::Active(inactive.activate());
69 };
70 }
71}
72
73impl ChannelBroadcaster {
74 pub(crate) fn new(
75 sender: Sender<ChannelEvent>,
76 receiver: InactiveReceiver<ChannelEvent>,
77 ) -> Self {
78 Self {
79 sender,
80 receiver: Receiver::Inactive(receiver),
81 }
82 }
83
84 pub fn broadcast(&self, event: impl Into<ChannelEvent>) {
86 self.sender.try_broadcast(event.into()).ok();
89 }
90
91 pub fn connected_clients(&self) -> usize {
95 self.sender.receiver_count()
96 }
97}
98
99impl Stream for ChannelBroadcaster {
100 type Item = ChannelEvent;
101
102 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
103 Pin::new(&mut self.receiver).poll_next(cx)
104 }
105}