1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
use crate::{client_receiver::ClientReceiver, subscriptions::Subscriptions, ChannelEvent, Version};
use async_broadcast::{Receiver, Sender as BroadcastSender};
use async_channel::Sender;
use serde::Serialize;
use trillium::log_error;
use trillium_websockets::Message;

/**
# Communicate with the connected client.

Note that although each client is unique and represents a specific
websocket connection, the ChannelClient can be cloned and moved
elsewhere if needed and any updates to the topic subscriptions
will be kept synchronized across clones.
*/
#[derive(Debug, Clone)]
pub struct ChannelClient {
    subscriptions: Subscriptions,
    sender: Sender<ChannelEvent>,
    broadcast_sender: BroadcastSender<ChannelEvent>,
    version: Version,
}

impl ChannelClient {
    pub(crate) fn new(
        broadcast_sender: BroadcastSender<ChannelEvent>,
        broadcast_receiver: Receiver<ChannelEvent>,
        version: Version,
    ) -> (Self, ClientReceiver) {
        let (sender, individual) = async_channel::unbounded();
        let subscriptions = Subscriptions::default();
        (
            Self {
                subscriptions: subscriptions.clone(),
                sender,
                broadcast_sender,
                version,
            },
            ClientReceiver::new(individual, broadcast_receiver, subscriptions, version),
        )
    }

    /**
    Send a [`ChannelEvent`] to all connected clients. Note that
    these messages will only reach clients that subscribe to the
    event's topic.
    */
    pub fn broadcast(&self, event: impl Into<ChannelEvent>) {
        let mut event = event.into();
        event.reference = None;
        log_error!(self.broadcast_sender.try_broadcast(event));
    }

    /**
    Send a [`ChannelEvent`] to this specific client. Note that
    this message will only be received if the client subscribes to
    the event's topic.
    */
    pub async fn send_event(&self, event: impl Into<ChannelEvent>) {
        log_error!(self.sender.send(event.into()).await);
    }

    /**
    Send an ok reply in reference to the provided ChannelEvent
    with the provided response payload.

    Note that this sets the event as `"phx_reply"` and the payload as
    `{"status": "ok", "response": response }`, as well as setting the
    reference field.
    */
    pub async fn reply_ok(&self, event: &ChannelEvent, payload: &impl Serialize) {
        #[derive(serde::Serialize)]
        struct Reply<'a, S> {
            status: &'static str,
            response: &'a S,
        }

        self.send_event(event.build_reply(
            "phx_reply",
            &Reply {
                status: "ok",
                response: payload,
            },
        ))
        .await
    }

    /**
    Send an error reply in reference to the provided ChannelEvent
    with the provided response payload.

    Note that this sets the event as `"phx_error"` as well as setting
    the reference field.
    */
    pub async fn reply_error(&self, event: &ChannelEvent, error: &impl Serialize) {
        self.send_event(event.build_reply("phx_error", &error))
            .await
    }

    /**
    Join a topic, sending an ok reply with the provided optional
    value. This sends an ok reply to the client as well as adding the
    topic to the client's subscriptions.

    Use `&()` as the payload if no payload is needed.

    */
    pub async fn allow_join(&self, event: &ChannelEvent, payload: &impl Serialize) {
        if event.event() != "phx_join" {
            log::error!(
                "allow_join called with an event other than phx_join: {:?}",
                event
            );
            return;
        }
        self.subscriptions.join(event.topic.to_string());
        self.reply_ok(event, payload).await;
    }

    /**
    Leave a topic as requested by the provided channel event,
    including the optional payload. This sends an ok reply to the
    client as well as removing the channel from the client's
    subscriptions.

    Use `&()` as the payload if no payload is needed.
    */
    pub async fn allow_leave(&self, event: &ChannelEvent, payload: &impl Serialize) {
        if event.event() != "phx_leave" {
            log::error!(
                "allow_leave called with an event other than phx_leave: {:?}",
                event
            );
            return;
        }
        self.subscriptions.leave(&event.topic);
        self.reply_ok(event, payload).await;
    }

    /**
    Borrow this client's subscriptions
     */
    pub fn subscriptions(&self) -> &Subscriptions {
        &self.subscriptions
    }

    pub(crate) fn deserialize(&self, message: Message) -> Option<ChannelEvent> {
        let string = message.to_text().ok()?;
        ChannelEvent::deserialize(string, self.version).ok()
    }
}