trillium_channels/
channel_client.rs1use crate::{ChannelEvent, Version, client_receiver::ClientReceiver, subscriptions::Subscriptions};
2use async_broadcast::{Receiver, Sender as BroadcastSender};
3use async_channel::Sender;
4use serde::Serialize;
5use trillium::log_error;
6use trillium_websockets::Message;
7
8#[derive(Debug, Clone)]
15pub struct ChannelClient {
16 subscriptions: Subscriptions,
17 sender: Sender<ChannelEvent>,
18 broadcast_sender: BroadcastSender<ChannelEvent>,
19 version: Version,
20}
21
22impl ChannelClient {
23 pub(crate) fn new(
24 broadcast_sender: BroadcastSender<ChannelEvent>,
25 broadcast_receiver: Receiver<ChannelEvent>,
26 version: Version,
27 ) -> (Self, ClientReceiver) {
28 let (sender, individual) = async_channel::unbounded();
29 let subscriptions = Subscriptions::default();
30 (
31 Self {
32 subscriptions: subscriptions.clone(),
33 sender,
34 broadcast_sender,
35 version,
36 },
37 ClientReceiver::new(individual, broadcast_receiver, subscriptions, version),
38 )
39 }
40
41 pub fn broadcast(&self, event: impl Into<ChannelEvent>) {
45 let mut event = event.into();
46 event.reference = None;
47 log_error!(self.broadcast_sender.try_broadcast(event));
48 }
49
50 pub async fn send_event(&self, event: impl Into<ChannelEvent>) {
54 log_error!(self.sender.send(event.into()).await);
55 }
56
57 pub async fn reply_ok(&self, event: &ChannelEvent, payload: &impl Serialize) {
64 #[derive(serde::Serialize)]
65 struct Reply<'a, S> {
66 status: &'static str,
67 response: &'a S,
68 }
69
70 self.send_event(event.build_reply(
71 "phx_reply",
72 &Reply {
73 status: "ok",
74 response: payload,
75 },
76 ))
77 .await
78 }
79
80 pub async fn reply_error(&self, event: &ChannelEvent, error: &impl Serialize) {
86 self.send_event(event.build_reply("phx_error", &error))
87 .await
88 }
89
90 pub async fn allow_join(&self, event: &ChannelEvent, payload: &impl Serialize) {
96 if event.event() != "phx_join" {
97 log::error!(
98 "allow_join called with an event other than phx_join: {:?}",
99 event
100 );
101 return;
102 }
103 self.subscriptions.join(event.topic.to_string());
104 self.reply_ok(event, payload).await;
105 }
106
107 pub async fn allow_leave(&self, event: &ChannelEvent, payload: &impl Serialize) {
114 if event.event() != "phx_leave" {
115 log::error!(
116 "allow_leave called with an event other than phx_leave: {:?}",
117 event
118 );
119 return;
120 }
121 self.subscriptions.leave(&event.topic);
122 self.reply_ok(event, payload).await;
123 }
124
125 pub fn subscriptions(&self) -> &Subscriptions {
127 &self.subscriptions
128 }
129
130 pub(crate) fn deserialize(&self, message: Message) -> Option<ChannelEvent> {
131 let string = message.to_text().ok()?;
132 ChannelEvent::deserialize(string, self.version).ok()
133 }
134}