Skip to main content

karyon_eventemitter/
eventemitter.rs

1use std::{
2    any::Any,
3    collections::HashMap,
4    marker::PhantomData,
5    sync::{Arc, Weak},
6};
7
8use async_channel::{Receiver, Sender};
9use chrono::{DateTime, Utc};
10use futures_util::stream::{FuturesUnordered, StreamExt};
11use log::trace;
12use parking_lot::Mutex;
13use rand::Rng;
14
15use crate::error::{Error, Result};
16
17/// Default buffer size for an event listener channel
18const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1000;
19
20/// Unique identifier for event listeners
21pub type EventListenerID = u64;
22
23/// Internal type alias for the nested HashMap structure that stores listeners
24type Listeners<T> = HashMap<T, HashMap<String, HashMap<EventListenerID, Sender<Event>>>>;
25
26fn random_id() -> EventListenerID {
27    rand::rng().random()
28}
29
30/// EventEmitter asynchronous event emitter.
31///
32/// Allows components to communicate through events organized by topics.
33///
34/// # Example
35///
36/// ```
37/// use karyon_eventemitter::{EventEmitter, EventTopic, EventValue};
38///
39///  async {
40///     let event_emitter = EventEmitter::new();
41///
42///     #[derive(Hash, PartialEq, Eq, Debug, Clone)]
43///     enum Topic {
44///         TopicA,
45///         TopicB,
46///     }
47///
48///     #[derive(Clone, Debug, PartialEq, EventValue)]
49///     struct A(usize);
50///
51///     #[derive(Clone, Debug, PartialEq, EventValue)]
52///     struct B(usize);
53///
54///     impl EventTopic for B {
55///         type Topic = Topic;
56///         fn topic() -> Self::Topic{
57///             Topic::TopicB
58///         }
59///     }
60///
61///     #[derive(Clone, Debug, PartialEq, EventValue)]
62///     struct C(usize);
63///
64///     let a_listener = event_emitter.register::<A>(&Topic::TopicA);
65///     let b_listener = event_emitter.register::<B>(&Topic::TopicB);
66///     // This also listens to Topic B
67///     let c_listener = event_emitter.register::<C>(&Topic::TopicB);
68///
69///     event_emitter.emit_by_topic(&Topic::TopicA, &A(3)) .await;
70///     event_emitter.emit(&B(3)) .await;
71///     event_emitter.emit_by_topic(&Topic::TopicB, &C(3)) .await;
72///
73///     let msg: A = a_listener.recv().await.unwrap();
74///     let msg: B = b_listener.recv().await.unwrap();
75///     let msg: C = c_listener.recv().await.unwrap();
76///
77///     // ....
78///  };
79///
80/// ```
81///
82pub struct EventEmitter<T> {
83    listeners: Mutex<Listeners<T>>,
84    listener_buffer_size: usize,
85}
86
87impl<T> EventEmitter<T>
88where
89    T: std::hash::Hash + Eq + std::fmt::Debug + Clone,
90{
91    /// Creates a new [`EventEmitter`]
92    pub fn new() -> Arc<EventEmitter<T>> {
93        Arc::new(Self {
94            listeners: Mutex::new(HashMap::new()),
95            listener_buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE,
96        })
97    }
98
99    /// Creates a new [`EventEmitter`] with the provided buffer size for the
100    /// [`EventListener`] channel.
101    ///
102    /// This is important to control the memory used by the listener channel.
103    /// If the consumer for the event listener can't keep up with the new events
104    /// coming, then the channel buffer will fill with new events, and if the
105    /// buffer is full, the emit function will block until the listener
106    /// starts to consume the buffered events.
107    ///
108    /// If `size` is zero, this function will panic.
109    pub fn with_buffer_size(size: usize) -> Arc<EventEmitter<T>> {
110        assert_ne!(size, 0);
111        Arc::new(Self {
112            listeners: Mutex::new(HashMap::new()),
113            listener_buffer_size: size,
114        })
115    }
116
117    /// Emits an event to the listeners.
118    ///
119    /// The event must implement the [`EventTopic`] trait to indicate the
120    /// topic of the event. Otherwise, you can use `emit_by_topic()`.
121    pub async fn emit<E: EventTopic<Topic = T> + Clone>(&self, value: &E) -> Result<()> {
122        let topic = E::topic();
123        self.emit_by_topic(&topic, value).await
124    }
125
126    /// Emits an event to the listeners.
127    pub async fn emit_by_topic<E: AsEventValue + Clone>(&self, topic: &T, value: &E) -> Result<()> {
128        let mut results = self.send(topic, value).await?;
129
130        let mut failed_listeners = vec![];
131        while let Some((id, fut_err)) = results.next().await {
132            if let Err(err) = fut_err {
133                trace!("Failed to emit event for topic {topic:?}: {err}");
134                failed_listeners.push(id);
135            }
136        }
137        drop(results);
138
139        if !failed_listeners.is_empty() {
140            self.remove(topic, E::event_id(), &failed_listeners);
141        }
142
143        Ok(())
144    }
145
146    /// Registers a new event listener for the given topic.
147    pub fn register<E: AsEventValue + Clone>(self: &Arc<Self>, topic: &T) -> EventListener<T, E> {
148        let topics = &mut self.listeners.lock();
149
150        let chan = async_channel::bounded(self.listener_buffer_size);
151
152        let event_ids = topics.entry(topic.clone()).or_default();
153        let event_id = E::event_id().to_string();
154
155        let listeners = event_ids.entry(event_id.clone()).or_default();
156
157        let mut listener_id = random_id();
158        // Generate a new one if listener_id already exists
159        while listeners.contains_key(&listener_id) {
160            listener_id = random_id();
161        }
162
163        let listener =
164            EventListener::new(listener_id, Arc::downgrade(self), chan.1, &event_id, topic);
165
166        listeners.insert(listener_id, chan.0);
167
168        listener
169    }
170
171    /// Returns true if the given topic has any registered listeners.
172    pub fn has_listeners(&self, topic: &T) -> bool {
173        self.listeners.lock().contains_key(topic)
174    }
175
176    /// Removes all topics and event listeners.
177    ///
178    /// This effectively resets the EventEmitter to its initial state.
179    pub fn clear(self: &Arc<Self>) {
180        self.listeners.lock().clear();
181    }
182
183    /// Unregisters all event listeners for the given topic.
184    pub fn unregister_topic(self: &Arc<Self>, topic: &T) {
185        self.listeners.lock().remove(topic);
186    }
187
188    /// Internal method that handles the actual sending of events to listeners.
189    async fn send<E: AsEventValue + Clone>(
190        &self,
191        topic: &T,
192        value: &E,
193    ) -> Result<
194        FuturesUnordered<
195            impl std::future::Future<
196                    Output = (
197                        EventListenerID,
198                        std::result::Result<(), async_channel::SendError<Event>>,
199                    ),
200                > + use<'_, T, E>,
201        >,
202    > {
203        let value: Arc<dyn AsEventValue> = Arc::new(value.clone());
204        let event = Event::new(value);
205
206        let mut topics = self.listeners.lock();
207
208        let results = FuturesUnordered::new();
209        let event_ids = match topics.get_mut(topic) {
210            Some(ids) => ids,
211            None => {
212                trace!("Failed to emit an event to a non-existent topic {topic:?}",);
213                return Err(Error::EventEmitter(format!(
214                    "Emit an event to a non-existent topic {topic:?}",
215                )));
216            }
217        };
218
219        let event_id = E::event_id().to_string();
220
221        let Some(listeners) = event_ids.get_mut(&event_id) else {
222            trace!("No listeners found for event '{event_id}' on topic {topic:?}",);
223            return Ok(results);
224        };
225
226        for (listener_id, listener) in listeners {
227            let event = event.clone();
228            let listener = listener.clone();
229            let id = *listener_id;
230            let result = async move { (id, listener.send(event.clone()).await) };
231            results.push(result);
232        }
233        drop(topics);
234
235        Ok(results)
236    }
237
238    /// Internal method to remove the event listener attached to the given topic.
239    fn remove(&self, topic: &T, event_id: &str, listener_ids: &[EventListenerID]) {
240        let topics = &mut self.listeners.lock();
241
242        let Some(event_ids) = topics.get_mut(topic) else {
243            trace!("Failed to remove a non-existent topic");
244            return;
245        };
246
247        let Some(listeners) = event_ids.get_mut(event_id) else {
248            trace!("Failed to remove a non-existent event id");
249            return;
250        };
251
252        for listener_id in listener_ids {
253            if listeners.remove(listener_id).is_none() {
254                trace!("Failed to remove a non-existent event listener");
255            }
256        }
257    }
258}
259
260/// EventListener listens for and receives events from the [`EventEmitter`].
261pub struct EventListener<T, E> {
262    id: EventListenerID,
263    recv_chan: Receiver<Event>,
264    event_emitter: Weak<EventEmitter<T>>,
265    event_id: String,
266    topic: T,
267    phantom: PhantomData<E>,
268}
269
270impl<T, E> EventListener<T, E>
271where
272    T: std::hash::Hash + Eq + Clone + std::fmt::Debug,
273    E: AsEventValue + Clone,
274{
275    /// Creates a new [`EventListener`].
276    fn new(
277        id: EventListenerID,
278        event_emitter: Weak<EventEmitter<T>>,
279        recv_chan: Receiver<Event>,
280        event_id: &str,
281        topic: &T,
282    ) -> EventListener<T, E> {
283        Self {
284            id,
285            recv_chan,
286            event_emitter,
287            event_id: event_id.to_string(),
288            topic: topic.clone(),
289            phantom: PhantomData,
290        }
291    }
292
293    /// Receives the next event from the emitter.
294    ///
295    /// This method blocks until an event is available or the channel is closed.
296    /// Events are automatically type-cast to the expected type E.
297    pub async fn recv(&self) -> Result<E> {
298        match self.recv_chan.recv().await {
299            Ok(event) => match (event.value as Arc<dyn Any>).downcast_ref::<E>() {
300                Some(v) => Ok(v.clone()),
301                None => unreachable!("Failed to downcast the event value."),
302            },
303            Err(err) => {
304                trace!("Failed to receive new event: {err}");
305                self.cancel();
306                Err(err.into())
307            }
308        }
309    }
310
311    /// Cancels the event listener and removes it from the [`EventEmitter`].
312    pub fn cancel(&self) {
313        self.event_emitter()
314            .remove(&self.topic, &self.event_id, &[self.id]);
315    }
316
317    /// Returns the topic this listener is registered for.
318    pub fn topic(&self) -> &T {
319        &self.topic
320    }
321
322    /// Returns the event id for this event listener.
323    pub fn event_id(&self) -> &String {
324        &self.event_id
325    }
326
327    /// Returns a reference to the parent EventEmitter.
328    pub fn event_emitter(&self) -> Arc<EventEmitter<T>> {
329        self.event_emitter.upgrade().unwrap()
330    }
331}
332
333/// A timestamped event container.
334#[derive(Clone, Debug)]
335pub struct Event {
336    /// The time at which the event was created.
337    created_at: DateTime<Utc>,
338    /// The value of the Event.
339    value: Arc<dyn AsEventValue>,
340}
341
342impl Event {
343    /// Creates a new Event.
344    pub fn new(value: Arc<dyn AsEventValue>) -> Self {
345        Self {
346            created_at: Utc::now(),
347            value,
348        }
349    }
350}
351
352impl std::fmt::Display for Event {
353    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
354        write!(f, "{}: {:?}", self.created_at, self.value)
355    }
356}
357
358/// Trait for event types that can be emitted.
359///
360/// This trait provides a string identifier for each event type,
361/// used internally for routing and type checking.
362pub trait AsEventValue: Any + Send + Sync + std::fmt::Debug {
363    fn event_id() -> &'static str
364    where
365        Self: Sized;
366}
367
368/// Trait for events that define their own topic.
369///
370/// This trait allows events to specify which topic they belong to,
371/// enabling the use of the convenient `emit()` method instead of
372/// requiring explicit topic specification with `emit_by_topic()`.
373pub trait EventTopic: AsEventValue {
374    type Topic;
375    fn topic() -> Self::Topic
376    where
377        Self: Sized;
378}