karyon_eventemitter/
eventemitter.rs

1use std::{
2    any::Any,
3    collections::HashMap,
4    marker::PhantomData,
5    sync::{Arc, Weak},
6};
7
8use async_channel::{Receiver, Sender};
9use chrono::{DateTime, Utc};
10use futures_util::stream::{FuturesUnordered, StreamExt};
11use log::trace;
12use parking_lot::Mutex;
13use rand::{rngs::OsRng, Rng};
14
15use crate::error::{Error, Result};
16
17/// Default buffer size for an event listener channel
18const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1000;
19
20/// Unique identifier for event listeners
21pub type EventListenerID = u64;
22
23/// Internal type alias for the nested HashMap structure that stores listeners
24type Listeners<T> = HashMap<T, HashMap<String, HashMap<EventListenerID, Sender<Event>>>>;
25
26fn random_id() -> EventListenerID {
27    OsRng.gen()
28}
29
30/// EventEmitter asynchronous event emitter.
31///
32/// Allows components to communicate through events organized by topics.
33///
34/// # Example
35///
36/// ```
37/// use karyon_eventemitter::{EventEmitter, AsEventTopic, EventValue};
38///
39///  async {
40///     let event_emitter = EventEmitter::new();
41///
42///     #[derive(Hash, PartialEq, Eq, Debug, Clone)]
43///     enum Topic {
44///         TopicA,
45///         TopicB,
46///     }
47///
48///     #[derive(Clone, Debug, PartialEq, EventValue)]
49///     struct A(usize);
50///
51///     #[derive(Clone, Debug, PartialEq, EventValue)]
52///     struct B(usize);
53///
54///     impl AsEventTopic for B {
55///         type Topic = Topic;
56///         fn topic() -> Self::Topic{
57///             Topic::TopicB
58///         }
59///     }
60///
61///     #[derive(Clone, Debug, PartialEq, EventValue)]
62///     struct C(usize);
63///
64///     let a_listener = event_emitter.register::<A>(&Topic::TopicA);
65///     let b_listener = event_emitter.register::<B>(&Topic::TopicB);
66///     // This also listens to Topic B
67///     let c_listener = event_emitter.register::<C>(&Topic::TopicB);
68///
69///     event_emitter.emit_by_topic(&Topic::TopicA, &A(3)) .await;
70///     event_emitter.emit(&B(3)) .await;
71///     event_emitter.emit_by_topic(&Topic::TopicB, &C(3)) .await;
72///
73///     let msg: A = a_listener.recv().await.unwrap();
74///     let msg: B = b_listener.recv().await.unwrap();
75///     let msg: C = c_listener.recv().await.unwrap();
76///
77///     // ....
78///  };
79///
80/// ```
81///
82pub struct EventEmitter<T> {
83    listeners: Mutex<Listeners<T>>,
84    listener_buffer_size: usize,
85}
86
87impl<T> EventEmitter<T>
88where
89    T: std::hash::Hash + Eq + std::fmt::Debug + Clone,
90{
91    /// Creates a new [`EventEmitter`]
92    pub fn new() -> Arc<EventEmitter<T>> {
93        Arc::new(Self {
94            listeners: Mutex::new(HashMap::new()),
95            listener_buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE,
96        })
97    }
98
99    /// Creates a new [`EventEmitter`] with the provided buffer size for the
100    /// [`EventListener`] channel.
101    ///
102    /// This is important to control the memory used by the listener channel.
103    /// If the consumer for the event listener can't keep up with the new events
104    /// coming, then the channel buffer will fill with new events, and if the
105    /// buffer is full, the emit function will block until the listener
106    /// starts to consume the buffered events.
107    ///
108    /// If `size` is zero, this function will panic.
109    pub fn with_buffer_size(size: usize) -> Arc<EventEmitter<T>> {
110        assert_ne!(size, 0);
111        Arc::new(Self {
112            listeners: Mutex::new(HashMap::new()),
113            listener_buffer_size: size,
114        })
115    }
116
117    /// Emits an event to the listeners.
118    ///
119    /// The event must implement the [`AsEventTopic`] trait to indicate the
120    /// topic of the event. Otherwise, you can use `emit_by_topic()`.
121    pub async fn emit<E: AsEventTopic<Topic = T> + Clone>(&self, value: &E) -> Result<()> {
122        let topic = E::topic();
123        self.emit_by_topic(&topic, value).await
124    }
125
126    /// Emits an event to the listeners.
127    pub async fn emit_by_topic<E: AsEventValue + Clone>(&self, topic: &T, value: &E) -> Result<()> {
128        let mut results = self.send(topic, value).await?;
129
130        let mut failed_listeners = vec![];
131        while let Some((id, fut_err)) = results.next().await {
132            if let Err(err) = fut_err {
133                trace!("Failed to emit event for topic {topic:?}: {err}");
134                failed_listeners.push(id);
135            }
136        }
137        drop(results);
138
139        if !failed_listeners.is_empty() {
140            self.remove(topic, E::event_id(), &failed_listeners);
141        }
142
143        Ok(())
144    }
145
146    /// Registers a new event listener for the given topic.
147    pub fn register<E: AsEventValue + Clone>(self: &Arc<Self>, topic: &T) -> EventListener<T, E> {
148        let topics = &mut self.listeners.lock();
149
150        let chan = async_channel::bounded(self.listener_buffer_size);
151
152        let event_ids = topics.entry(topic.clone()).or_default();
153        let event_id = E::event_id().to_string();
154
155        let listeners = event_ids.entry(event_id.clone()).or_default();
156
157        let mut listener_id = random_id();
158        // Generate a new one if listener_id already exists
159        while listeners.contains_key(&listener_id) {
160            listener_id = random_id();
161        }
162
163        let listener =
164            EventListener::new(listener_id, Arc::downgrade(self), chan.1, &event_id, topic);
165
166        listeners.insert(listener_id, chan.0);
167
168        listener
169    }
170
171    /// Removes all topics and event listeners.
172    ///
173    /// This effectively resets the EventEmitter to its initial state.
174    pub fn clear(self: &Arc<Self>) {
175        self.listeners.lock().clear();
176    }
177
178    /// Unregisters all event listeners for the given topic.
179    pub fn unregister_topic(self: &Arc<Self>, topic: &T) {
180        self.listeners.lock().remove(topic);
181    }
182
183    /// Internal method that handles the actual sending of events to listeners.
184    async fn send<E: AsEventValue + Clone>(
185        &self,
186        topic: &T,
187        value: &E,
188    ) -> Result<
189        FuturesUnordered<
190            impl std::future::Future<
191                    Output = (
192                        EventListenerID,
193                        std::result::Result<(), async_channel::SendError<Event>>,
194                    ),
195                > + use<'_, T, E>,
196        >,
197    > {
198        let value: Arc<dyn AsEventValue> = Arc::new(value.clone());
199        let event = Event::new(value);
200
201        let mut topics = self.listeners.lock();
202
203        let results = FuturesUnordered::new();
204        let event_ids = match topics.get_mut(topic) {
205            Some(ids) => ids,
206            None => {
207                trace!("Failed to emit an event to a non-existent topic {topic:?}",);
208                return Err(Error::EventEmitter(format!(
209                    "Emit an event to a non-existent topic {topic:?}",
210                )));
211            }
212        };
213
214        let event_id = E::event_id().to_string();
215
216        let Some(listeners) = event_ids.get_mut(&event_id) else {
217            trace!("No listeners found for event '{event_id}' on topic {topic:?}",);
218            return Ok(results);
219        };
220
221        for (listener_id, listener) in listeners {
222            let event = event.clone();
223            let listener = listener.clone();
224            let id = *listener_id;
225            let result = async move { (id, listener.send(event.clone()).await) };
226            results.push(result);
227        }
228        drop(topics);
229
230        Ok(results)
231    }
232
233    /// Internal method to remove the event listener attached to the given topic.
234    fn remove(&self, topic: &T, event_id: &str, listener_ids: &[EventListenerID]) {
235        let topics = &mut self.listeners.lock();
236
237        let Some(event_ids) = topics.get_mut(topic) else {
238            trace!("Failed to remove a non-existent topic");
239            return;
240        };
241
242        let Some(listeners) = event_ids.get_mut(event_id) else {
243            trace!("Failed to remove a non-existent event id");
244            return;
245        };
246
247        for listener_id in listener_ids {
248            if listeners.remove(listener_id).is_none() {
249                trace!("Failed to remove a non-existent event listener");
250            }
251        }
252    }
253}
254
255/// EventListener listens for and receives events from the [`EventEmitter`].
256pub struct EventListener<T, E> {
257    id: EventListenerID,
258    recv_chan: Receiver<Event>,
259    event_emitter: Weak<EventEmitter<T>>,
260    event_id: String,
261    topic: T,
262    phantom: PhantomData<E>,
263}
264
265impl<T, E> EventListener<T, E>
266where
267    T: std::hash::Hash + Eq + Clone + std::fmt::Debug,
268    E: AsEventValue + Clone,
269{
270    /// Creates a new [`EventListener`].
271    fn new(
272        id: EventListenerID,
273        event_emitter: Weak<EventEmitter<T>>,
274        recv_chan: Receiver<Event>,
275        event_id: &str,
276        topic: &T,
277    ) -> EventListener<T, E> {
278        Self {
279            id,
280            recv_chan,
281            event_emitter,
282            event_id: event_id.to_string(),
283            topic: topic.clone(),
284            phantom: PhantomData,
285        }
286    }
287
288    /// Receives the next event from the emitter.
289    ///
290    /// This method blocks until an event is available or the channel is closed.
291    /// Events are automatically type-cast to the expected type E.
292    pub async fn recv(&self) -> Result<E> {
293        match self.recv_chan.recv().await {
294            Ok(event) => match (event.value as Arc<dyn Any>).downcast_ref::<E>() {
295                Some(v) => Ok(v.clone()),
296                None => unreachable!("Failed to downcast the event value."),
297            },
298            Err(err) => {
299                trace!("Failed to receive new event: {err}");
300                self.cancel();
301                Err(err.into())
302            }
303        }
304    }
305
306    /// Cancels the event listener and removes it from the [`EventEmitter`].
307    pub fn cancel(&self) {
308        self.event_emitter()
309            .remove(&self.topic, &self.event_id, &[self.id]);
310    }
311
312    /// Returns the topic this listener is registered for.
313    pub fn topic(&self) -> &T {
314        &self.topic
315    }
316
317    /// Returns the event id for this event listener.
318    pub fn event_id(&self) -> &String {
319        &self.event_id
320    }
321
322    /// Returns a reference to the parent EventEmitter.
323    pub fn event_emitter(&self) -> Arc<EventEmitter<T>> {
324        self.event_emitter.upgrade().unwrap()
325    }
326}
327
328/// A timestamped event container.
329#[derive(Clone, Debug)]
330pub struct Event {
331    /// The time at which the event was created.
332    created_at: DateTime<Utc>,
333    /// The value of the Event.
334    value: Arc<dyn AsEventValue>,
335}
336
337impl Event {
338    /// Creates a new Event.
339    pub fn new(value: Arc<dyn AsEventValue>) -> Self {
340        Self {
341            created_at: Utc::now(),
342            value,
343        }
344    }
345}
346
347impl std::fmt::Display for Event {
348    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
349        write!(f, "{}: {:?}", self.created_at, self.value)
350    }
351}
352
353/// Trait for event types that can be emitted.
354///
355/// This trait provides a string identifier for each event type,
356/// used internally for routing and type checking.
357pub trait AsEventValue: Any + Send + Sync + std::fmt::Debug {
358    fn event_id() -> &'static str
359    where
360        Self: Sized;
361}
362
363/// Trait for events that define their own topic.
364///
365/// This trait allows events to specify which topic they belong to,
366/// enabling the use of the convenient `emit()` method instead of
367/// requiring explicit topic specification with `emit_by_topic()`.
368pub trait AsEventTopic: AsEventValue {
369    type Topic;
370    fn topic() -> Self::Topic
371    where
372        Self: Sized;
373}