trillium_sse/lib.rs
1//! # Trillium tools for server sent events
2//!
3//! This primarily provides [`SseConnExt`], an
4//! extension trait for [`trillium::Conn`] that has a
5//! [`with_sse_stream`](crate::SseConnExt::with_sse_stream) chainable
6//! method that takes a [`Stream`] where the `Item`
7//! implements [`Eventable`].
8//!
9//! Often, you will want this stream to be something like a channel, but
10//! the specifics of that are dependent on the event fanout
11//! characteristics of your application.
12//!
13//! This crate implements [`Eventable`] for an [`Event`] type that you can
14//! use in your application, for `String`, and for `&'static str`. You can
15//! also implement [`Eventable`] for any type in your application.
16//!
17//! ## Example usage
18//!
19//! ```
20//! use broadcaster::BroadcastChannel;
21//! use trillium::{Conn, Method, State, conn_try, conn_unwrap, log_error};
22//! use trillium_sse::SseConnExt;
23//! use trillium_static_compiled::static_compiled;
24//!
25//! type Channel = BroadcastChannel<String>;
26//!
27//! fn get_sse(mut conn: Conn) -> Conn {
28//! let broadcaster = conn_unwrap!(conn.take_state::<Channel>(), conn);
29//! conn.with_sse_stream(broadcaster)
30//! }
31//!
32//! async fn post_broadcast(mut conn: Conn) -> Conn {
33//! let broadcaster = conn_unwrap!(conn.take_state::<Channel>(), conn);
34//! let body = conn_try!(conn.request_body_string().await, conn);
35//! log_error!(broadcaster.send(&body).await);
36//! conn.ok("sent")
37//! }
38//!
39//! fn main() {
40//! let handler = (
41//! static_compiled!("examples/static").with_index_file("index.html"),
42//! State::new(Channel::new()),
43//! |conn: Conn| async move {
44//! match (conn.method(), conn.path()) {
45//! (Method::Get, "/sse") => get_sse(conn),
46//! (Method::Post, "/broadcast") => post_broadcast(conn).await,
47//! _ => conn,
48//! }
49//! },
50//! );
51//!
52//! // trillium_smol::run(handler);
53//! }
54//! ```
55#![forbid(unsafe_code)]
56#![deny(
57 missing_copy_implementations,
58 rustdoc::missing_crate_level_docs,
59 missing_debug_implementations,
60 nonstandard_style,
61 unused_qualifications
62)]
63#![warn(missing_docs)]
64
65#[cfg(test)]
66#[doc = include_str!("../README.md")]
67mod readme {}
68
69use futures_lite::{AsyncRead, stream::Stream};
70use std::{
71 borrow::Cow,
72 fmt::Write,
73 io,
74 marker::PhantomData,
75 pin::Pin,
76 task::{Context, Poll},
77};
78use trillium::{Body, Conn, KnownHeaderName, Status};
79
80struct SseBody<S, E> {
81 stream: S,
82 buffer: Vec<u8>,
83 event: PhantomData<E>,
84}
85
86impl<S, E> SseBody<S, E>
87where
88 S: Stream<Item = E> + Unpin + Send + Sync + 'static,
89 E: Eventable,
90{
91 pub fn new(stream: S) -> Self {
92 Self {
93 stream,
94 buffer: Vec::new(),
95 event: PhantomData,
96 }
97 }
98}
99
100fn encode(event: impl Eventable) -> String {
101 let mut output = String::new();
102 if let Some(event_type) = event.event_type() {
103 writeln!(&mut output, "event: {event_type}").unwrap();
104 }
105
106 if let Some(id) = event.id() {
107 writeln!(&mut output, "id: {id}").unwrap();
108 }
109
110 for part in event.data().lines() {
111 writeln!(&mut output, "data: {part}").unwrap();
112 }
113
114 writeln!(output).unwrap();
115
116 output
117}
118
119impl<S, E> AsyncRead for SseBody<S, E>
120where
121 S: Stream<Item = E> + Unpin + Send + Sync + 'static,
122 E: Eventable,
123{
124 fn poll_read(
125 self: Pin<&mut Self>,
126 cx: &mut Context<'_>,
127 buf: &mut [u8],
128 ) -> Poll<io::Result<usize>> {
129 let Self { buffer, stream, .. } = self.get_mut();
130
131 let buffer_read = buffer.len().min(buf.len());
132 if buffer_read > 0 {
133 buf[0..buffer_read].copy_from_slice(&buffer[0..buffer_read]);
134 buffer.drain(0..buffer_read);
135 return Poll::Ready(Ok(buffer_read));
136 }
137
138 match Pin::new(stream).poll_next(cx) {
139 Poll::Pending => Poll::Pending,
140 Poll::Ready(Some(item)) => {
141 let data = encode(item).into_bytes();
142 let writable_len = data.len().min(buf.len());
143 buf[0..writable_len].copy_from_slice(&data[0..writable_len]);
144 if writable_len < data.len() {
145 buffer.extend_from_slice(&data[writable_len..]);
146 }
147 Poll::Ready(Ok(writable_len))
148 }
149
150 Poll::Ready(None) => Poll::Ready(Ok(0)),
151 }
152 }
153}
154
155impl<S, E> From<SseBody<S, E>> for Body
156where
157 S: Stream<Item = E> + Unpin + Send + Sync + 'static,
158 E: Eventable,
159{
160 fn from(sse_body: SseBody<S, E>) -> Self {
161 Body::new_streaming(sse_body, None)
162 }
163}
164
165/// Extension trait for server sent events
166pub trait SseConnExt {
167 /// builds and sets a streaming response body that conforms to the
168 /// [server-sent-events
169 /// spec](https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events)
170 /// from a Stream of any [`Eventable`] type (such as
171 /// [`Event`], as well as setting appropiate headers for
172 /// this response.
173 fn with_sse_stream<S, E>(self, sse_stream: S) -> Self
174 where
175 S: Stream<Item = E> + Unpin + Send + Sync + 'static,
176 E: Eventable;
177}
178
179impl SseConnExt for Conn {
180 fn with_sse_stream<S, E>(self, sse_stream: S) -> Self
181 where
182 S: Stream<Item = E> + Unpin + Send + Sync + 'static,
183 E: Eventable,
184 {
185 let body = SseBody::new(self.swansong().interrupt(sse_stream));
186 self.with_response_header(KnownHeaderName::ContentType, "text/event-stream")
187 .with_response_header(KnownHeaderName::CacheControl, "no-cache")
188 // Close-delimited framing: the event stream carries neither `Content-Length`
189 // nor `Transfer-Encoding`, running until the connection closes. Chunked
190 // transfer-encoding can disrupt event delivery timing for this protocol.
191 .with_response_header(KnownHeaderName::Connection, "close")
192 .with_body(body)
193 .with_status(Status::Ok)
194 .halt()
195 }
196}
197
198/// A trait that allows any Unpin + Send + Sync type to act as an event.
199///
200/// For a concrete implementation of this trait, you can use [`Event`],
201/// but it is also implemented for [`String`] and [`&'static str`].
202pub trait Eventable: Unpin + Send + Sync + 'static {
203 /// return the data for this event. non-optional.
204 fn data(&self) -> &str;
205
206 /// return the event type, optionally
207 fn event_type(&self) -> Option<&str> {
208 None
209 }
210
211 /// return a unique event id, optionally
212 fn id(&self) -> Option<&str> {
213 None
214 }
215}
216
217impl Eventable for Event {
218 fn data(&self) -> &str {
219 Event::data(self)
220 }
221
222 fn event_type(&self) -> Option<&str> {
223 Event::event_type(self)
224 }
225}
226
227impl Eventable for &'static str {
228 fn data(&self) -> &str {
229 self
230 }
231}
232
233impl Eventable for String {
234 fn data(&self) -> &str {
235 self
236 }
237}
238
239/// Events are a concrete implementation of the [`Eventable`] trait.
240#[derive(Debug, Clone, Eq, PartialEq)]
241pub struct Event {
242 data: Cow<'static, str>,
243 event_type: Option<Cow<'static, str>>,
244}
245
246impl From<&'static str> for Event {
247 fn from(s: &'static str) -> Self {
248 Self::from(Cow::Borrowed(s))
249 }
250}
251
252impl From<String> for Event {
253 fn from(s: String) -> Self {
254 Self::from(Cow::Owned(s))
255 }
256}
257
258impl From<Cow<'static, str>> for Event {
259 fn from(data: Cow<'static, str>) -> Self {
260 Event {
261 data,
262 event_type: None,
263 }
264 }
265}
266
267impl Event {
268 /// builds a new [`Event`]
269 ///
270 /// by default, this event has no event type. to set an event type,
271 /// use [`Event::with_type`] or [`Event::set_type`]
272 pub fn new(data: impl Into<Cow<'static, str>>) -> Self {
273 Self::from(data.into())
274 }
275
276 /// chainable constructor to set the type on an event
277 ///
278 /// ```
279 /// let event = trillium_sse::Event::new("event data").with_type("userdata");
280 /// assert_eq!(event.event_type(), Some("userdata"));
281 /// assert_eq!(event.data(), "event data");
282 /// ```
283 pub fn with_type(mut self, event_type: impl Into<Cow<'static, str>>) -> Self {
284 self.set_type(event_type);
285 self
286 }
287
288 /// set the event type for this Event. The default is None.
289 ///
290 /// ```
291 /// let mut event = trillium_sse::Event::new("event data");
292 /// assert_eq!(event.event_type(), None);
293 /// event.set_type("userdata");
294 /// assert_eq!(event.event_type(), Some("userdata"));
295 /// ```
296 pub fn set_type(&mut self, event_type: impl Into<Cow<'static, str>>) {
297 self.event_type = Some(event_type.into());
298 }
299
300 /// returns this Event's data as a &str
301 pub fn data(&self) -> &str {
302 &self.data
303 }
304
305 /// returns this Event's type as a str, if set
306 pub fn event_type(&self) -> Option<&str> {
307 self.event_type.as_deref()
308 }
309}