karyon_core/
event.rs

1use std::{
2    any::Any,
3    collections::HashMap,
4    marker::PhantomData,
5    sync::{Arc, Weak},
6};
7
8use async_channel::{Receiver, Sender};
9use chrono::{DateTime, Utc};
10use futures_util::stream::{FuturesUnordered, StreamExt};
11use log::{debug, error};
12
13use crate::{async_runtime::lock::Mutex, util::random_32, Error, Result};
14
15const CHANNEL_BUFFER_SIZE: usize = 1000;
16
17pub type EventListenerID = u32;
18
19type Listeners<T> = HashMap<T, HashMap<String, HashMap<EventListenerID, Sender<Event>>>>;
20
21/// EventEmitter emits events to registered listeners based on topics.
22/// # Example
23///
24/// ```
25/// use karyon_core::event::{EventEmitter, EventValueTopic, EventValue};
26///
27///  async {
28///     let event_emitter = EventEmitter::new();
29///
30///     #[derive(Hash, PartialEq, Eq, Debug, Clone)]
31///     enum Topic {
32///         TopicA,
33///         TopicB,
34///     }
35///
36///     #[derive(Clone, Debug, PartialEq)]
37///     struct A(usize);
38///
39///    impl EventValue for A {
40///         fn id() -> &'static str {
41///             "A"
42///         }
43///     }
44///
45///     let listener = event_emitter.register::<A>(&Topic::TopicA).await;
46///
47///     event_emitter.emit_by_topic(&Topic::TopicA, &A(3)) .await;
48///     let msg: A = listener.recv().await.unwrap();
49///
50///     #[derive(Clone, Debug, PartialEq)]
51///     struct B(usize);
52///
53///     impl EventValue for B {
54///         fn id() -> &'static str {
55///             "B"
56///         }
57///     }
58///
59///     impl EventValueTopic for B {
60///         type Topic = Topic;
61///         fn topic() -> Self::Topic{
62///             Topic::TopicB
63///         }
64///     }
65///
66///     let listener = event_emitter.register::<B>(&Topic::TopicB).await;
67///
68///     event_emitter.emit(&B(3)) .await;
69///     let msg: B = listener.recv().await.unwrap();
70///
71///     // ....
72///  };
73///
74/// ```
75///
76pub struct EventEmitter<T> {
77    listeners: Mutex<Listeners<T>>,
78    listener_buffer_size: usize,
79}
80
81impl<T> EventEmitter<T>
82where
83    T: std::hash::Hash + Eq + std::fmt::Debug + Clone,
84{
85    /// Creates a new [`EventEmitter`]
86    pub fn new() -> Arc<EventEmitter<T>> {
87        Arc::new(Self {
88            listeners: Mutex::new(HashMap::new()),
89            listener_buffer_size: CHANNEL_BUFFER_SIZE,
90        })
91    }
92
93    /// Creates a new [`EventEmitter`] with the provided buffer size for the
94    /// [`EventListener`] channel.
95    ///
96    /// This is important to control the memory used by the listener channel.
97    /// If the consumer for the event listener can't keep up with the new events
98    /// coming, then the channel buffer will fill with new events, and if the
99    /// buffer is full, the emit function will block until the listener
100    /// starts to consume the buffered events.
101    ///
102    /// If `size` is zero, this function will panic.
103    pub fn with_buffer_size(size: usize) -> Arc<EventEmitter<T>> {
104        Arc::new(Self {
105            listeners: Mutex::new(HashMap::new()),
106            listener_buffer_size: size,
107        })
108    }
109
110    /// Emits an event to the listeners.
111    ///
112    /// The event must implement the [`EventValueTopic`] trait to indicate the
113    /// topic of the event. Otherwise, you can use `emit_by_topic()`.
114    pub async fn emit<E: EventValueTopic<Topic = T> + Clone>(&self, value: &E) -> Result<()> {
115        let topic = E::topic();
116        self.emit_by_topic(&topic, value).await
117    }
118
119    /// Emits an event to the listeners.
120    pub async fn emit_by_topic<E: EventValueAny + EventValue + Clone>(
121        &self,
122        topic: &T,
123        value: &E,
124    ) -> Result<()> {
125        let value: Arc<dyn EventValueAny> = Arc::new(value.clone());
126        let event = Event::new(value);
127
128        let mut topics = self.listeners.lock().await;
129
130        if !topics.contains_key(topic) {
131            debug!(
132                "Failed to emit an event to a non-existent topic {:?}",
133                topic
134            );
135            return Err(Error::EventEmitError(format!(
136                "Emit an event to a non-existent topic {:?}",
137                topic,
138            )));
139        }
140
141        let event_ids = topics.get_mut(topic).unwrap();
142        let event_id = E::id().to_string();
143
144        if !event_ids.contains_key(&event_id) {
145            debug!("Failed to emit an event: unknown event id {:?}", event_id);
146            return Err(Error::EventEmitError(format!(
147                "Emit an event: unknown event id {}",
148                event_id,
149            )));
150        }
151
152        let mut results = FuturesUnordered::new();
153
154        let listeners = event_ids.get_mut(&event_id).unwrap();
155        for (listener_id, listener) in listeners.iter() {
156            let result = async { (*listener_id, listener.send(event.clone()).await) };
157            results.push(result);
158        }
159
160        let mut failed_listeners = vec![];
161        while let Some((id, fut_err)) = results.next().await {
162            if let Err(err) = fut_err {
163                debug!("Failed to emit event for topic {:?}: {}", topic, err);
164                failed_listeners.push(id);
165            }
166        }
167        drop(results);
168
169        for listener_id in failed_listeners.iter() {
170            listeners.remove(listener_id);
171        }
172
173        Ok(())
174    }
175
176    /// Registers a new event listener for the given topic.
177    pub async fn register<E: EventValueAny + EventValue + Clone>(
178        self: &Arc<Self>,
179        topic: &T,
180    ) -> EventListener<T, E> {
181        let topics = &mut self.listeners.lock().await;
182
183        let chan = async_channel::bounded(self.listener_buffer_size);
184
185        if !topics.contains_key(topic) {
186            topics.insert(topic.clone(), HashMap::new());
187        }
188
189        let event_ids = topics.get_mut(topic).unwrap();
190        let event_id = E::id().to_string();
191
192        if !event_ids.contains_key(&event_id) {
193            event_ids.insert(event_id.clone(), HashMap::new());
194        }
195
196        let listeners = event_ids.get_mut(&event_id).unwrap();
197
198        let mut listener_id = random_32();
199        // Generate a new one if listener_id already exists
200        while listeners.contains_key(&listener_id) {
201            listener_id = random_32();
202        }
203
204        let listener =
205            EventListener::new(listener_id, Arc::downgrade(self), chan.1, &event_id, topic);
206
207        listeners.insert(listener_id, chan.0);
208
209        listener
210    }
211
212    /// Remove all topics and event listeners  
213    pub async fn clear(self: &Arc<Self>) {
214        self.listeners.lock().await.clear();
215    }
216
217    /// Unregisters all event listeners for the given topic.
218    pub async fn unregister_topic(self: &Arc<Self>, topic: &T) {
219        self.listeners.lock().await.remove(topic);
220    }
221
222    /// Removes the event listener attached to the given topic.
223    async fn remove(&self, topic: &T, event_id: &str, listener_id: &EventListenerID) {
224        let topics = &mut self.listeners.lock().await;
225        if !topics.contains_key(topic) {
226            error!("Failed to remove a non-existent topic");
227            return;
228        }
229
230        let event_ids = topics.get_mut(topic).unwrap();
231        if !event_ids.contains_key(event_id) {
232            error!("Failed to remove a non-existent event id");
233            return;
234        }
235
236        let listeners = event_ids.get_mut(event_id).unwrap();
237        if listeners.remove(listener_id).is_none() {
238            error!("Failed to remove a non-existent event listener");
239        }
240    }
241}
242
243/// EventListener listens for and receives events from the [`EventEmitter`].
244pub struct EventListener<T, E> {
245    id: EventListenerID,
246    recv_chan: Receiver<Event>,
247    event_emitter: Weak<EventEmitter<T>>,
248    event_id: String,
249    topic: T,
250    phantom: PhantomData<E>,
251}
252
253impl<T, E> EventListener<T, E>
254where
255    T: std::hash::Hash + Eq + Clone + std::fmt::Debug,
256    E: EventValueAny + Clone + EventValue,
257{
258    /// Creates a new [`EventListener`].
259    fn new(
260        id: EventListenerID,
261        event_emitter: Weak<EventEmitter<T>>,
262        recv_chan: Receiver<Event>,
263        event_id: &str,
264        topic: &T,
265    ) -> EventListener<T, E> {
266        Self {
267            id,
268            recv_chan,
269            event_emitter,
270            event_id: event_id.to_string(),
271            topic: topic.clone(),
272            phantom: PhantomData,
273        }
274    }
275
276    /// Receives the next event.
277    pub async fn recv(&self) -> Result<E> {
278        match self.recv_chan.recv().await {
279            Ok(event) => match ((*event.value).value_as_any()).downcast_ref::<E>() {
280                Some(v) => Ok(v.clone()),
281                None => unreachable!("Failed to downcast the event value."),
282            },
283            Err(err) => {
284                error!("Failed to receive new event: {err}");
285                self.cancel().await;
286                Err(err.into())
287            }
288        }
289    }
290
291    /// Cancels the event listener and removes it from the [`EventEmitter`].
292    pub async fn cancel(&self) {
293        if let Some(es) = self.event_emitter.upgrade() {
294            es.remove(&self.topic, &self.event_id, &self.id).await;
295        }
296    }
297
298    /// Returns the topic for this event listener.
299    pub fn topic(&self) -> &T {
300        &self.topic
301    }
302
303    /// Returns the event id for this event listener.
304    pub fn event_id(&self) -> &String {
305        &self.event_id
306    }
307}
308
309/// An event within the [`EventEmitter`].
310#[derive(Clone, Debug)]
311pub struct Event {
312    /// The time at which the event was created.
313    created_at: DateTime<Utc>,
314    /// The value of the Event.
315    value: Arc<dyn EventValueAny>,
316}
317
318impl Event {
319    /// Creates a new Event.
320    pub fn new(value: Arc<dyn EventValueAny>) -> Self {
321        Self {
322            created_at: Utc::now(),
323            value,
324        }
325    }
326}
327
328impl std::fmt::Display for Event {
329    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
330        write!(f, "{}: {:?}", self.created_at, self.value)
331    }
332}
333
334pub trait EventValueAny: Any + Send + Sync + std::fmt::Debug {
335    fn value_as_any(&self) -> &dyn Any;
336}
337
338impl<T: Send + Sync + std::fmt::Debug + Any> EventValueAny for T {
339    fn value_as_any(&self) -> &dyn Any {
340        self
341    }
342}
343
344pub trait EventValue: EventValueAny {
345    fn id() -> &'static str
346    where
347        Self: Sized;
348}
349
350pub trait EventValueTopic: EventValueAny + EventValue {
351    type Topic;
352    fn topic() -> Self::Topic
353    where
354        Self: Sized;
355}
356
357#[cfg(test)]
358mod tests {
359    use crate::async_runtime::block_on;
360
361    use super::*;
362
363    #[derive(Hash, PartialEq, Eq, Debug, Clone)]
364    enum Topic {
365        TopicA,
366        TopicB,
367        TopicC,
368        TopicD,
369        TopicE,
370    }
371
372    #[derive(Clone, Debug, PartialEq)]
373    struct A {
374        a_value: usize,
375    }
376
377    #[derive(Clone, Debug, PartialEq)]
378    struct B {
379        b_value: usize,
380    }
381
382    #[derive(Clone, Debug, PartialEq)]
383    struct C {
384        c_value: usize,
385    }
386
387    #[derive(Clone, Debug, PartialEq)]
388    struct E {
389        e_value: usize,
390    }
391
392    #[derive(Clone, Debug, PartialEq)]
393    struct F {
394        f_value: usize,
395    }
396
397    impl EventValue for A {
398        fn id() -> &'static str {
399            "A"
400        }
401    }
402
403    impl EventValue for B {
404        fn id() -> &'static str {
405            "B"
406        }
407    }
408
409    impl EventValue for C {
410        fn id() -> &'static str {
411            "C"
412        }
413    }
414
415    impl EventValue for E {
416        fn id() -> &'static str {
417            "E"
418        }
419    }
420
421    impl EventValue for F {
422        fn id() -> &'static str {
423            "F"
424        }
425    }
426
427    impl EventValueTopic for C {
428        type Topic = Topic;
429        fn topic() -> Self::Topic {
430            Topic::TopicC
431        }
432    }
433
434    #[test]
435    fn test_event_emitter() {
436        block_on(async move {
437            let event_emitter = EventEmitter::<Topic>::new();
438
439            let a_listener = event_emitter.register::<A>(&Topic::TopicA).await;
440            let b_listener = event_emitter.register::<B>(&Topic::TopicB).await;
441
442            event_emitter
443                .emit_by_topic(&Topic::TopicA, &A { a_value: 3 })
444                .await
445                .expect("Emit event");
446            event_emitter
447                .emit_by_topic(&Topic::TopicB, &B { b_value: 5 })
448                .await
449                .expect("Emit event");
450
451            let msg = a_listener.recv().await.unwrap();
452            assert_eq!(msg, A { a_value: 3 });
453
454            let msg = b_listener.recv().await.unwrap();
455            assert_eq!(msg, B { b_value: 5 });
456
457            // register the same event type to different topics
458            let c_listener = event_emitter.register::<C>(&Topic::TopicC).await;
459            let d_listener = event_emitter.register::<C>(&Topic::TopicD).await;
460
461            event_emitter
462                .emit(&C { c_value: 10 })
463                .await
464                .expect("Emit event");
465            let msg = c_listener.recv().await.unwrap();
466            assert_eq!(msg, C { c_value: 10 });
467
468            event_emitter
469                .emit_by_topic(&Topic::TopicD, &C { c_value: 10 })
470                .await
471                .expect("Emit event");
472            let msg = d_listener.recv().await.unwrap();
473            assert_eq!(msg, C { c_value: 10 });
474
475            // register different event types to the same topic
476            let e_listener = event_emitter.register::<E>(&Topic::TopicE).await;
477            let f_listener = event_emitter.register::<F>(&Topic::TopicE).await;
478
479            event_emitter
480                .emit_by_topic(&Topic::TopicE, &E { e_value: 5 })
481                .await
482                .expect("Emit event");
483
484            let msg = e_listener.recv().await.unwrap();
485            assert_eq!(msg, E { e_value: 5 });
486
487            event_emitter
488                .emit_by_topic(&Topic::TopicE, &F { f_value: 5 })
489                .await
490                .expect("Emit event");
491
492            let msg = f_listener.recv().await.unwrap();
493            assert_eq!(msg, F { f_value: 5 });
494        });
495    }
496}