Skip to main content

trillium_channels/
channel_client.rs

1use crate::{ChannelEvent, Version, client_receiver::ClientReceiver, subscriptions::Subscriptions};
2use async_broadcast::{Receiver, Sender as BroadcastSender};
3use async_channel::Sender;
4use serde::Serialize;
5use trillium::log_error;
6use trillium_websockets::Message;
7
8/// # Communicate with the connected client.
9///
10/// Note that although each client is unique and represents a specific
11/// websocket connection, the ChannelClient can be cloned and moved
12/// elsewhere if needed and any updates to the topic subscriptions
13/// will be kept synchronized across clones.
14#[derive(Debug, Clone)]
15pub struct ChannelClient {
16    subscriptions: Subscriptions,
17    sender: Sender<ChannelEvent>,
18    broadcast_sender: BroadcastSender<ChannelEvent>,
19    version: Version,
20}
21
22impl ChannelClient {
23    pub(crate) fn new(
24        broadcast_sender: BroadcastSender<ChannelEvent>,
25        broadcast_receiver: Receiver<ChannelEvent>,
26        version: Version,
27    ) -> (Self, ClientReceiver) {
28        let (sender, individual) = async_channel::unbounded();
29        let subscriptions = Subscriptions::default();
30        (
31            Self {
32                subscriptions: subscriptions.clone(),
33                sender,
34                broadcast_sender,
35                version,
36            },
37            ClientReceiver::new(individual, broadcast_receiver, subscriptions, version),
38        )
39    }
40
41    /// Send a [`ChannelEvent`] to all connected clients. Note that
42    /// these messages will only reach clients that subscribe to the
43    /// event's topic.
44    pub fn broadcast(&self, event: impl Into<ChannelEvent>) {
45        let mut event = event.into();
46        event.reference = None;
47        log_error!(self.broadcast_sender.try_broadcast(event));
48    }
49
50    /// Send a [`ChannelEvent`] to this specific client. Note that
51    /// this message will only be received if the client subscribes to
52    /// the event's topic.
53    pub async fn send_event(&self, event: impl Into<ChannelEvent>) {
54        log_error!(self.sender.send(event.into()).await);
55    }
56
57    /// Send an ok reply in reference to the provided ChannelEvent
58    /// with the provided response payload.
59    ///
60    /// Note that this sets the event as `"phx_reply"` and the payload as
61    /// `{"status": "ok", "response": response }`, as well as setting the
62    /// reference field.
63    pub async fn reply_ok(&self, event: &ChannelEvent, payload: &impl Serialize) {
64        #[derive(serde::Serialize)]
65        struct Reply<'a, S> {
66            status: &'static str,
67            response: &'a S,
68        }
69
70        self.send_event(event.build_reply(
71            "phx_reply",
72            &Reply {
73                status: "ok",
74                response: payload,
75            },
76        ))
77        .await
78    }
79
80    /// Send an error reply in reference to the provided ChannelEvent
81    /// with the provided response payload.
82    ///
83    /// Note that this sets the event as `"phx_error"` as well as setting
84    /// the reference field.
85    pub async fn reply_error(&self, event: &ChannelEvent, error: &impl Serialize) {
86        self.send_event(event.build_reply("phx_error", &error))
87            .await
88    }
89
90    /// Join a topic, sending an ok reply with the provided optional
91    /// value. This sends an ok reply to the client as well as adding the
92    /// topic to the client's subscriptions.
93    ///
94    /// Use `&()` as the payload if no payload is needed.
95    pub async fn allow_join(&self, event: &ChannelEvent, payload: &impl Serialize) {
96        if event.event() != "phx_join" {
97            log::error!(
98                "allow_join called with an event other than phx_join: {:?}",
99                event
100            );
101            return;
102        }
103        self.subscriptions.join(event.topic.to_string());
104        self.reply_ok(event, payload).await;
105    }
106
107    /// Leave a topic as requested by the provided channel event,
108    /// including the optional payload. This sends an ok reply to the
109    /// client as well as removing the channel from the client's
110    /// subscriptions.
111    ///
112    /// Use `&()` as the payload if no payload is needed.
113    pub async fn allow_leave(&self, event: &ChannelEvent, payload: &impl Serialize) {
114        if event.event() != "phx_leave" {
115            log::error!(
116                "allow_leave called with an event other than phx_leave: {:?}",
117                event
118            );
119            return;
120        }
121        self.subscriptions.leave(&event.topic);
122        self.reply_ok(event, payload).await;
123    }
124
125    /// Borrow this client's subscriptions
126    pub fn subscriptions(&self) -> &Subscriptions {
127        &self.subscriptions
128    }
129
130    pub(crate) fn deserialize(&self, message: Message) -> Option<ChannelEvent> {
131        let string = message.to_text().ok()?;
132        ChannelEvent::deserialize(string, self.version).ok()
133    }
134}