1use std::{
2 any::Any,
3 collections::HashMap,
4 marker::PhantomData,
5 sync::{Arc, Weak},
6};
7
8use async_channel::{Receiver, Sender};
9use chrono::{DateTime, Utc};
10use futures_util::stream::{FuturesUnordered, StreamExt};
11use log::{debug, error};
12
13use crate::{async_runtime::lock::Mutex, util::random_32, Error, Result};
14
15const CHANNEL_BUFFER_SIZE: usize = 1000;
16
17pub type EventListenerID = u32;
18
19type Listeners<T> = HashMap<T, HashMap<String, HashMap<EventListenerID, Sender<Event>>>>;
20
21pub struct EventEmitter<T> {
77 listeners: Mutex<Listeners<T>>,
78 listener_buffer_size: usize,
79}
80
81impl<T> EventEmitter<T>
82where
83 T: std::hash::Hash + Eq + std::fmt::Debug + Clone,
84{
85 pub fn new() -> Arc<EventEmitter<T>> {
87 Arc::new(Self {
88 listeners: Mutex::new(HashMap::new()),
89 listener_buffer_size: CHANNEL_BUFFER_SIZE,
90 })
91 }
92
93 pub fn with_buffer_size(size: usize) -> Arc<EventEmitter<T>> {
104 Arc::new(Self {
105 listeners: Mutex::new(HashMap::new()),
106 listener_buffer_size: size,
107 })
108 }
109
110 pub async fn emit<E: EventValueTopic<Topic = T> + Clone>(&self, value: &E) -> Result<()> {
115 let topic = E::topic();
116 self.emit_by_topic(&topic, value).await
117 }
118
119 pub async fn emit_by_topic<E: EventValueAny + EventValue + Clone>(
121 &self,
122 topic: &T,
123 value: &E,
124 ) -> Result<()> {
125 let value: Arc<dyn EventValueAny> = Arc::new(value.clone());
126 let event = Event::new(value);
127
128 let mut topics = self.listeners.lock().await;
129
130 if !topics.contains_key(topic) {
131 debug!(
132 "Failed to emit an event to a non-existent topic {:?}",
133 topic
134 );
135 return Err(Error::EventEmitError(format!(
136 "Emit an event to a non-existent topic {:?}",
137 topic,
138 )));
139 }
140
141 let event_ids = topics.get_mut(topic).unwrap();
142 let event_id = E::id().to_string();
143
144 if !event_ids.contains_key(&event_id) {
145 debug!("Failed to emit an event: unknown event id {:?}", event_id);
146 return Err(Error::EventEmitError(format!(
147 "Emit an event: unknown event id {}",
148 event_id,
149 )));
150 }
151
152 let mut results = FuturesUnordered::new();
153
154 let listeners = event_ids.get_mut(&event_id).unwrap();
155 for (listener_id, listener) in listeners.iter() {
156 let result = async { (*listener_id, listener.send(event.clone()).await) };
157 results.push(result);
158 }
159
160 let mut failed_listeners = vec![];
161 while let Some((id, fut_err)) = results.next().await {
162 if let Err(err) = fut_err {
163 debug!("Failed to emit event for topic {:?}: {}", topic, err);
164 failed_listeners.push(id);
165 }
166 }
167 drop(results);
168
169 for listener_id in failed_listeners.iter() {
170 listeners.remove(listener_id);
171 }
172
173 Ok(())
174 }
175
176 pub async fn register<E: EventValueAny + EventValue + Clone>(
178 self: &Arc<Self>,
179 topic: &T,
180 ) -> EventListener<T, E> {
181 let topics = &mut self.listeners.lock().await;
182
183 let chan = async_channel::bounded(self.listener_buffer_size);
184
185 if !topics.contains_key(topic) {
186 topics.insert(topic.clone(), HashMap::new());
187 }
188
189 let event_ids = topics.get_mut(topic).unwrap();
190 let event_id = E::id().to_string();
191
192 if !event_ids.contains_key(&event_id) {
193 event_ids.insert(event_id.clone(), HashMap::new());
194 }
195
196 let listeners = event_ids.get_mut(&event_id).unwrap();
197
198 let mut listener_id = random_32();
199 while listeners.contains_key(&listener_id) {
201 listener_id = random_32();
202 }
203
204 let listener =
205 EventListener::new(listener_id, Arc::downgrade(self), chan.1, &event_id, topic);
206
207 listeners.insert(listener_id, chan.0);
208
209 listener
210 }
211
212 pub async fn clear(self: &Arc<Self>) {
214 self.listeners.lock().await.clear();
215 }
216
217 pub async fn unregister_topic(self: &Arc<Self>, topic: &T) {
219 self.listeners.lock().await.remove(topic);
220 }
221
222 async fn remove(&self, topic: &T, event_id: &str, listener_id: &EventListenerID) {
224 let topics = &mut self.listeners.lock().await;
225 if !topics.contains_key(topic) {
226 error!("Failed to remove a non-existent topic");
227 return;
228 }
229
230 let event_ids = topics.get_mut(topic).unwrap();
231 if !event_ids.contains_key(event_id) {
232 error!("Failed to remove a non-existent event id");
233 return;
234 }
235
236 let listeners = event_ids.get_mut(event_id).unwrap();
237 if listeners.remove(listener_id).is_none() {
238 error!("Failed to remove a non-existent event listener");
239 }
240 }
241}
242
243pub struct EventListener<T, E> {
245 id: EventListenerID,
246 recv_chan: Receiver<Event>,
247 event_emitter: Weak<EventEmitter<T>>,
248 event_id: String,
249 topic: T,
250 phantom: PhantomData<E>,
251}
252
253impl<T, E> EventListener<T, E>
254where
255 T: std::hash::Hash + Eq + Clone + std::fmt::Debug,
256 E: EventValueAny + Clone + EventValue,
257{
258 fn new(
260 id: EventListenerID,
261 event_emitter: Weak<EventEmitter<T>>,
262 recv_chan: Receiver<Event>,
263 event_id: &str,
264 topic: &T,
265 ) -> EventListener<T, E> {
266 Self {
267 id,
268 recv_chan,
269 event_emitter,
270 event_id: event_id.to_string(),
271 topic: topic.clone(),
272 phantom: PhantomData,
273 }
274 }
275
276 pub async fn recv(&self) -> Result<E> {
278 match self.recv_chan.recv().await {
279 Ok(event) => match ((*event.value).value_as_any()).downcast_ref::<E>() {
280 Some(v) => Ok(v.clone()),
281 None => unreachable!("Failed to downcast the event value."),
282 },
283 Err(err) => {
284 error!("Failed to receive new event: {err}");
285 self.cancel().await;
286 Err(err.into())
287 }
288 }
289 }
290
291 pub async fn cancel(&self) {
293 if let Some(es) = self.event_emitter.upgrade() {
294 es.remove(&self.topic, &self.event_id, &self.id).await;
295 }
296 }
297
298 pub fn topic(&self) -> &T {
300 &self.topic
301 }
302
303 pub fn event_id(&self) -> &String {
305 &self.event_id
306 }
307}
308
309#[derive(Clone, Debug)]
311pub struct Event {
312 created_at: DateTime<Utc>,
314 value: Arc<dyn EventValueAny>,
316}
317
318impl Event {
319 pub fn new(value: Arc<dyn EventValueAny>) -> Self {
321 Self {
322 created_at: Utc::now(),
323 value,
324 }
325 }
326}
327
328impl std::fmt::Display for Event {
329 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
330 write!(f, "{}: {:?}", self.created_at, self.value)
331 }
332}
333
334pub trait EventValueAny: Any + Send + Sync + std::fmt::Debug {
335 fn value_as_any(&self) -> &dyn Any;
336}
337
338impl<T: Send + Sync + std::fmt::Debug + Any> EventValueAny for T {
339 fn value_as_any(&self) -> &dyn Any {
340 self
341 }
342}
343
344pub trait EventValue: EventValueAny {
345 fn id() -> &'static str
346 where
347 Self: Sized;
348}
349
350pub trait EventValueTopic: EventValueAny + EventValue {
351 type Topic;
352 fn topic() -> Self::Topic
353 where
354 Self: Sized;
355}
356
357#[cfg(test)]
358mod tests {
359 use crate::async_runtime::block_on;
360
361 use super::*;
362
363 #[derive(Hash, PartialEq, Eq, Debug, Clone)]
364 enum Topic {
365 TopicA,
366 TopicB,
367 TopicC,
368 TopicD,
369 TopicE,
370 }
371
372 #[derive(Clone, Debug, PartialEq)]
373 struct A {
374 a_value: usize,
375 }
376
377 #[derive(Clone, Debug, PartialEq)]
378 struct B {
379 b_value: usize,
380 }
381
382 #[derive(Clone, Debug, PartialEq)]
383 struct C {
384 c_value: usize,
385 }
386
387 #[derive(Clone, Debug, PartialEq)]
388 struct E {
389 e_value: usize,
390 }
391
392 #[derive(Clone, Debug, PartialEq)]
393 struct F {
394 f_value: usize,
395 }
396
397 impl EventValue for A {
398 fn id() -> &'static str {
399 "A"
400 }
401 }
402
403 impl EventValue for B {
404 fn id() -> &'static str {
405 "B"
406 }
407 }
408
409 impl EventValue for C {
410 fn id() -> &'static str {
411 "C"
412 }
413 }
414
415 impl EventValue for E {
416 fn id() -> &'static str {
417 "E"
418 }
419 }
420
421 impl EventValue for F {
422 fn id() -> &'static str {
423 "F"
424 }
425 }
426
427 impl EventValueTopic for C {
428 type Topic = Topic;
429 fn topic() -> Self::Topic {
430 Topic::TopicC
431 }
432 }
433
434 #[test]
435 fn test_event_emitter() {
436 block_on(async move {
437 let event_emitter = EventEmitter::<Topic>::new();
438
439 let a_listener = event_emitter.register::<A>(&Topic::TopicA).await;
440 let b_listener = event_emitter.register::<B>(&Topic::TopicB).await;
441
442 event_emitter
443 .emit_by_topic(&Topic::TopicA, &A { a_value: 3 })
444 .await
445 .expect("Emit event");
446 event_emitter
447 .emit_by_topic(&Topic::TopicB, &B { b_value: 5 })
448 .await
449 .expect("Emit event");
450
451 let msg = a_listener.recv().await.unwrap();
452 assert_eq!(msg, A { a_value: 3 });
453
454 let msg = b_listener.recv().await.unwrap();
455 assert_eq!(msg, B { b_value: 5 });
456
457 let c_listener = event_emitter.register::<C>(&Topic::TopicC).await;
459 let d_listener = event_emitter.register::<C>(&Topic::TopicD).await;
460
461 event_emitter
462 .emit(&C { c_value: 10 })
463 .await
464 .expect("Emit event");
465 let msg = c_listener.recv().await.unwrap();
466 assert_eq!(msg, C { c_value: 10 });
467
468 event_emitter
469 .emit_by_topic(&Topic::TopicD, &C { c_value: 10 })
470 .await
471 .expect("Emit event");
472 let msg = d_listener.recv().await.unwrap();
473 assert_eq!(msg, C { c_value: 10 });
474
475 let e_listener = event_emitter.register::<E>(&Topic::TopicE).await;
477 let f_listener = event_emitter.register::<F>(&Topic::TopicE).await;
478
479 event_emitter
480 .emit_by_topic(&Topic::TopicE, &E { e_value: 5 })
481 .await
482 .expect("Emit event");
483
484 let msg = e_listener.recv().await.unwrap();
485 assert_eq!(msg, E { e_value: 5 });
486
487 event_emitter
488 .emit_by_topic(&Topic::TopicE, &F { f_value: 5 })
489 .await
490 .expect("Emit event");
491
492 let msg = f_listener.recv().await.unwrap();
493 assert_eq!(msg, F { f_value: 5 });
494 });
495 }
496}