Skip to main content

trillium_sse/
lib.rs

1//! # Trillium tools for server sent events
2//!
3//! This primarily provides [`SseConnExt`], an
4//! extension trait for [`trillium::Conn`] that has a
5//! [`with_sse_stream`](crate::SseConnExt::with_sse_stream) chainable
6//! method that takes a [`Stream`] where the `Item`
7//! implements [`Eventable`].
8//!
9//! Often, you will want this stream to be something like a channel, but
10//! the specifics of that are dependent on the event fanout
11//! characteristics of your application.
12//!
13//! This crate implements [`Eventable`] for an [`Event`] type that you can
14//! use in your application, for `String`, and for `&'static str`. You can
15//! also implement [`Eventable`] for any type in your application.
16//!
17//! ## Example usage
18//!
19//! ```
20//! use broadcaster::BroadcastChannel;
21//! use trillium::{Conn, Method, State, conn_try, conn_unwrap, log_error};
22//! use trillium_sse::SseConnExt;
23//! use trillium_static_compiled::static_compiled;
24//!
25//! type Channel = BroadcastChannel<String>;
26//!
27//! fn get_sse(mut conn: Conn) -> Conn {
28//!     let broadcaster = conn_unwrap!(conn.take_state::<Channel>(), conn);
29//!     conn.with_sse_stream(broadcaster)
30//! }
31//!
32//! async fn post_broadcast(mut conn: Conn) -> Conn {
33//!     let broadcaster = conn_unwrap!(conn.take_state::<Channel>(), conn);
34//!     let body = conn_try!(conn.request_body_string().await, conn);
35//!     log_error!(broadcaster.send(&body).await);
36//!     conn.ok("sent")
37//! }
38//!
39//! fn main() {
40//!     let handler = (
41//!         static_compiled!("examples/static").with_index_file("index.html"),
42//!         State::new(Channel::new()),
43//!         |conn: Conn| async move {
44//!             match (conn.method(), conn.path()) {
45//!                 (Method::Get, "/sse") => get_sse(conn),
46//!                 (Method::Post, "/broadcast") => post_broadcast(conn).await,
47//!                 _ => conn,
48//!             }
49//!         },
50//!     );
51//!
52//!     // trillium_smol::run(handler);
53//! }
54//! ```
55#![forbid(unsafe_code)]
56#![deny(
57    missing_copy_implementations,
58    rustdoc::missing_crate_level_docs,
59    missing_debug_implementations,
60    nonstandard_style,
61    unused_qualifications
62)]
63#![warn(missing_docs)]
64
65#[cfg(test)]
66#[doc = include_str!("../README.md")]
67mod readme {}
68
69use futures_lite::{AsyncRead, stream::Stream};
70use std::{
71    borrow::Cow,
72    fmt::Write,
73    io,
74    marker::PhantomData,
75    pin::Pin,
76    task::{Context, Poll},
77};
78use trillium::{Body, Conn, KnownHeaderName, Status};
79
80struct SseBody<S, E> {
81    stream: S,
82    buffer: Vec<u8>,
83    event: PhantomData<E>,
84}
85
86impl<S, E> SseBody<S, E>
87where
88    S: Stream<Item = E> + Unpin + Send + Sync + 'static,
89    E: Eventable,
90{
91    pub fn new(stream: S) -> Self {
92        Self {
93            stream,
94            buffer: Vec::new(),
95            event: PhantomData,
96        }
97    }
98}
99
100fn encode(event: impl Eventable) -> String {
101    let mut output = String::new();
102    if let Some(event_type) = event.event_type() {
103        writeln!(&mut output, "event: {event_type}").unwrap();
104    }
105
106    if let Some(id) = event.id() {
107        writeln!(&mut output, "id: {id}").unwrap();
108    }
109
110    for part in event.data().lines() {
111        writeln!(&mut output, "data: {part}").unwrap();
112    }
113
114    writeln!(output).unwrap();
115
116    output
117}
118
119impl<S, E> AsyncRead for SseBody<S, E>
120where
121    S: Stream<Item = E> + Unpin + Send + Sync + 'static,
122    E: Eventable,
123{
124    fn poll_read(
125        self: Pin<&mut Self>,
126        cx: &mut Context<'_>,
127        buf: &mut [u8],
128    ) -> Poll<io::Result<usize>> {
129        let Self { buffer, stream, .. } = self.get_mut();
130
131        let buffer_read = buffer.len().min(buf.len());
132        if buffer_read > 0 {
133            buf[0..buffer_read].copy_from_slice(&buffer[0..buffer_read]);
134            buffer.drain(0..buffer_read);
135            return Poll::Ready(Ok(buffer_read));
136        }
137
138        match Pin::new(stream).poll_next(cx) {
139            Poll::Pending => Poll::Pending,
140            Poll::Ready(Some(item)) => {
141                let data = encode(item).into_bytes();
142                let writable_len = data.len().min(buf.len());
143                buf[0..writable_len].copy_from_slice(&data[0..writable_len]);
144                if writable_len < data.len() {
145                    buffer.extend_from_slice(&data[writable_len..]);
146                }
147                Poll::Ready(Ok(writable_len))
148            }
149
150            Poll::Ready(None) => Poll::Ready(Ok(0)),
151        }
152    }
153}
154
155impl<S, E> From<SseBody<S, E>> for Body
156where
157    S: Stream<Item = E> + Unpin + Send + Sync + 'static,
158    E: Eventable,
159{
160    fn from(sse_body: SseBody<S, E>) -> Self {
161        Body::new_streaming(sse_body, None)
162    }
163}
164
165/// Extension trait for server sent events
166pub trait SseConnExt {
167    /// builds and sets a streaming response body that conforms to the
168    /// [server-sent-events
169    /// spec](https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events)
170    /// from a Stream of any [`Eventable`] type (such as
171    /// [`Event`], as well as setting appropiate headers for
172    /// this response.
173    fn with_sse_stream<S, E>(self, sse_stream: S) -> Self
174    where
175        S: Stream<Item = E> + Unpin + Send + Sync + 'static,
176        E: Eventable;
177}
178
179impl SseConnExt for Conn {
180    fn with_sse_stream<S, E>(self, sse_stream: S) -> Self
181    where
182        S: Stream<Item = E> + Unpin + Send + Sync + 'static,
183        E: Eventable,
184    {
185        let body = SseBody::new(self.swansong().interrupt(sse_stream));
186        self.with_response_header(KnownHeaderName::ContentType, "text/event-stream")
187            .with_response_header(KnownHeaderName::CacheControl, "no-cache")
188            // Close-delimited framing: the event stream carries neither `Content-Length`
189            // nor `Transfer-Encoding`, running until the connection closes. Chunked
190            // transfer-encoding can disrupt event delivery timing for this protocol.
191            .with_response_header(KnownHeaderName::Connection, "close")
192            .with_body(body)
193            .with_status(Status::Ok)
194            .halt()
195    }
196}
197
198/// A trait that allows any Unpin + Send + Sync type to act as an event.
199///
200/// For a concrete implementation of this trait, you can use [`Event`],
201/// but it is also implemented for [`String`] and [`&'static str`].
202pub trait Eventable: Unpin + Send + Sync + 'static {
203    /// return the data for this event. non-optional.
204    fn data(&self) -> &str;
205
206    /// return the event type, optionally
207    fn event_type(&self) -> Option<&str> {
208        None
209    }
210
211    /// return a unique event id, optionally
212    fn id(&self) -> Option<&str> {
213        None
214    }
215}
216
217impl Eventable for Event {
218    fn data(&self) -> &str {
219        Event::data(self)
220    }
221
222    fn event_type(&self) -> Option<&str> {
223        Event::event_type(self)
224    }
225}
226
227impl Eventable for &'static str {
228    fn data(&self) -> &str {
229        self
230    }
231}
232
233impl Eventable for String {
234    fn data(&self) -> &str {
235        self
236    }
237}
238
239/// Events are a concrete implementation of the [`Eventable`] trait.
240#[derive(Debug, Clone, Eq, PartialEq)]
241pub struct Event {
242    data: Cow<'static, str>,
243    event_type: Option<Cow<'static, str>>,
244}
245
246impl From<&'static str> for Event {
247    fn from(s: &'static str) -> Self {
248        Self::from(Cow::Borrowed(s))
249    }
250}
251
252impl From<String> for Event {
253    fn from(s: String) -> Self {
254        Self::from(Cow::Owned(s))
255    }
256}
257
258impl From<Cow<'static, str>> for Event {
259    fn from(data: Cow<'static, str>) -> Self {
260        Event {
261            data,
262            event_type: None,
263        }
264    }
265}
266
267impl Event {
268    /// builds a new [`Event`]
269    ///
270    /// by default, this event has no event type. to set an event type,
271    /// use [`Event::with_type`] or [`Event::set_type`]
272    pub fn new(data: impl Into<Cow<'static, str>>) -> Self {
273        Self::from(data.into())
274    }
275
276    /// chainable constructor to set the type on an event
277    ///
278    /// ```
279    /// let event = trillium_sse::Event::new("event data").with_type("userdata");
280    /// assert_eq!(event.event_type(), Some("userdata"));
281    /// assert_eq!(event.data(), "event data");
282    /// ```
283    pub fn with_type(mut self, event_type: impl Into<Cow<'static, str>>) -> Self {
284        self.set_type(event_type);
285        self
286    }
287
288    /// set the event type for this Event. The default is None.
289    ///
290    /// ```
291    /// let mut event = trillium_sse::Event::new("event data");
292    /// assert_eq!(event.event_type(), None);
293    /// event.set_type("userdata");
294    /// assert_eq!(event.event_type(), Some("userdata"));
295    /// ```
296    pub fn set_type(&mut self, event_type: impl Into<Cow<'static, str>>) {
297        self.event_type = Some(event_type.into());
298    }
299
300    /// returns this Event's data as a &str
301    pub fn data(&self) -> &str {
302        &self.data
303    }
304
305    /// returns this Event's type as a str, if set
306    pub fn event_type(&self) -> Option<&str> {
307        self.event_type.as_deref()
308    }
309}