use std::{
any::Any,
collections::HashMap,
marker::PhantomData,
sync::{Arc, Weak},
};
use async_channel::{Receiver, Sender};
use chrono::{DateTime, Utc};
use futures_util::stream::{FuturesUnordered, StreamExt};
use log::{debug, error};
use crate::{async_runtime::lock::Mutex, util::random_32, Error, Result};
const CHANNEL_BUFFER_SIZE: usize = 1000;
pub type EventListenerID = u32;
type Listeners<T> = HashMap<T, HashMap<String, HashMap<EventListenerID, Sender<Event>>>>;
pub struct EventEmitter<T> {
listeners: Mutex<Listeners<T>>,
listener_buffer_size: usize,
}
impl<T> EventEmitter<T>
where
T: std::hash::Hash + Eq + std::fmt::Debug + Clone,
{
pub fn new() -> Arc<EventEmitter<T>> {
Arc::new(Self {
listeners: Mutex::new(HashMap::new()),
listener_buffer_size: CHANNEL_BUFFER_SIZE,
})
}
pub fn with_buffer_size(size: usize) -> Arc<EventEmitter<T>> {
Arc::new(Self {
listeners: Mutex::new(HashMap::new()),
listener_buffer_size: size,
})
}
pub async fn emit<E: EventValueTopic<Topic = T> + Clone>(&self, value: &E) -> Result<()> {
let topic = E::topic();
self.emit_by_topic(&topic, value).await
}
pub async fn emit_by_topic<E: EventValueAny + EventValue + Clone>(
&self,
topic: &T,
value: &E,
) -> Result<()> {
let value: Arc<dyn EventValueAny> = Arc::new(value.clone());
let event = Event::new(value);
let mut topics = self.listeners.lock().await;
if !topics.contains_key(topic) {
debug!(
"Failed to emit an event to a non-existent topic {:?}",
topic
);
return Err(Error::EventEmitError(format!(
"Emit an event to a non-existent topic {:?}",
topic,
)));
}
let event_ids = topics.get_mut(topic).unwrap();
let event_id = E::id().to_string();
if !event_ids.contains_key(&event_id) {
debug!("Failed to emit an event: unknown event id {:?}", event_id);
return Err(Error::EventEmitError(format!(
"Emit an event: unknown event id {}",
event_id,
)));
}
let mut results = FuturesUnordered::new();
let listeners = event_ids.get_mut(&event_id).unwrap();
for (listener_id, listener) in listeners.iter() {
let result = async { (*listener_id, listener.send(event.clone()).await) };
results.push(result);
}
let mut failed_listeners = vec![];
while let Some((id, fut_err)) = results.next().await {
if let Err(err) = fut_err {
debug!("Failed to emit event for topic {:?}: {}", topic, err);
failed_listeners.push(id);
}
}
drop(results);
for listener_id in failed_listeners.iter() {
listeners.remove(listener_id);
}
Ok(())
}
pub async fn register<E: EventValueAny + EventValue + Clone>(
self: &Arc<Self>,
topic: &T,
) -> EventListener<T, E> {
let topics = &mut self.listeners.lock().await;
let chan = async_channel::bounded(self.listener_buffer_size);
if !topics.contains_key(topic) {
topics.insert(topic.clone(), HashMap::new());
}
let event_ids = topics.get_mut(topic).unwrap();
let event_id = E::id().to_string();
if !event_ids.contains_key(&event_id) {
event_ids.insert(event_id.clone(), HashMap::new());
}
let listeners = event_ids.get_mut(&event_id).unwrap();
let mut listener_id = random_32();
while listeners.contains_key(&listener_id) {
listener_id = random_32();
}
let listener =
EventListener::new(listener_id, Arc::downgrade(self), chan.1, &event_id, topic);
listeners.insert(listener_id, chan.0);
listener
}
pub async fn clear(self: &Arc<Self>) {
self.listeners.lock().await.clear();
}
pub async fn unregister_topic(self: &Arc<Self>, topic: &T) {
self.listeners.lock().await.remove(topic);
}
async fn remove(&self, topic: &T, event_id: &str, listener_id: &EventListenerID) {
let topics = &mut self.listeners.lock().await;
if !topics.contains_key(topic) {
error!("Failed to remove a non-existent topic");
return;
}
let event_ids = topics.get_mut(topic).unwrap();
if !event_ids.contains_key(event_id) {
error!("Failed to remove a non-existent event id");
return;
}
let listeners = event_ids.get_mut(event_id).unwrap();
if listeners.remove(listener_id).is_none() {
error!("Failed to remove a non-existent event listener");
}
}
}
pub struct EventListener<T, E> {
id: EventListenerID,
recv_chan: Receiver<Event>,
event_emitter: Weak<EventEmitter<T>>,
event_id: String,
topic: T,
phantom: PhantomData<E>,
}
impl<T, E> EventListener<T, E>
where
T: std::hash::Hash + Eq + Clone + std::fmt::Debug,
E: EventValueAny + Clone + EventValue,
{
fn new(
id: EventListenerID,
event_emitter: Weak<EventEmitter<T>>,
recv_chan: Receiver<Event>,
event_id: &str,
topic: &T,
) -> EventListener<T, E> {
Self {
id,
recv_chan,
event_emitter,
event_id: event_id.to_string(),
topic: topic.clone(),
phantom: PhantomData,
}
}
pub async fn recv(&self) -> Result<E> {
match self.recv_chan.recv().await {
Ok(event) => match ((*event.value).value_as_any()).downcast_ref::<E>() {
Some(v) => Ok(v.clone()),
None => unreachable!("Failed to downcast the event value."),
},
Err(err) => {
error!("Failed to receive new event: {err}");
self.cancel().await;
Err(err.into())
}
}
}
pub async fn cancel(&self) {
if let Some(es) = self.event_emitter.upgrade() {
es.remove(&self.topic, &self.event_id, &self.id).await;
}
}
pub fn topic(&self) -> &T {
&self.topic
}
pub fn event_id(&self) -> &String {
&self.event_id
}
}
#[derive(Clone, Debug)]
pub struct Event {
created_at: DateTime<Utc>,
value: Arc<dyn EventValueAny>,
}
impl Event {
pub fn new(value: Arc<dyn EventValueAny>) -> Self {
Self {
created_at: Utc::now(),
value,
}
}
}
impl std::fmt::Display for Event {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}: {:?}", self.created_at, self.value)
}
}
pub trait EventValueAny: Any + Send + Sync + std::fmt::Debug {
fn value_as_any(&self) -> &dyn Any;
}
impl<T: Send + Sync + std::fmt::Debug + Any> EventValueAny for T {
fn value_as_any(&self) -> &dyn Any {
self
}
}
pub trait EventValue: EventValueAny {
fn id() -> &'static str
where
Self: Sized;
}
pub trait EventValueTopic: EventValueAny + EventValue {
type Topic;
fn topic() -> Self::Topic
where
Self: Sized;
}
#[cfg(test)]
mod tests {
use crate::async_runtime::block_on;
use super::*;
#[derive(Hash, PartialEq, Eq, Debug, Clone)]
enum Topic {
TopicA,
TopicB,
TopicC,
TopicD,
TopicE,
}
#[derive(Clone, Debug, PartialEq)]
struct A {
a_value: usize,
}
#[derive(Clone, Debug, PartialEq)]
struct B {
b_value: usize,
}
#[derive(Clone, Debug, PartialEq)]
struct C {
c_value: usize,
}
#[derive(Clone, Debug, PartialEq)]
struct E {
e_value: usize,
}
#[derive(Clone, Debug, PartialEq)]
struct F {
f_value: usize,
}
impl EventValue for A {
fn id() -> &'static str {
"A"
}
}
impl EventValue for B {
fn id() -> &'static str {
"B"
}
}
impl EventValue for C {
fn id() -> &'static str {
"C"
}
}
impl EventValue for E {
fn id() -> &'static str {
"E"
}
}
impl EventValue for F {
fn id() -> &'static str {
"F"
}
}
impl EventValueTopic for C {
type Topic = Topic;
fn topic() -> Self::Topic {
Topic::TopicC
}
}
#[test]
fn test_event_emitter() {
block_on(async move {
let event_emitter = EventEmitter::<Topic>::new();
let a_listener = event_emitter.register::<A>(&Topic::TopicA).await;
let b_listener = event_emitter.register::<B>(&Topic::TopicB).await;
event_emitter
.emit_by_topic(&Topic::TopicA, &A { a_value: 3 })
.await
.expect("Emit event");
event_emitter
.emit_by_topic(&Topic::TopicB, &B { b_value: 5 })
.await
.expect("Emit event");
let msg = a_listener.recv().await.unwrap();
assert_eq!(msg, A { a_value: 3 });
let msg = b_listener.recv().await.unwrap();
assert_eq!(msg, B { b_value: 5 });
let c_listener = event_emitter.register::<C>(&Topic::TopicC).await;
let d_listener = event_emitter.register::<C>(&Topic::TopicD).await;
event_emitter
.emit(&C { c_value: 10 })
.await
.expect("Emit event");
let msg = c_listener.recv().await.unwrap();
assert_eq!(msg, C { c_value: 10 });
event_emitter
.emit_by_topic(&Topic::TopicD, &C { c_value: 10 })
.await
.expect("Emit event");
let msg = d_listener.recv().await.unwrap();
assert_eq!(msg, C { c_value: 10 });
let e_listener = event_emitter.register::<E>(&Topic::TopicE).await;
let f_listener = event_emitter.register::<F>(&Topic::TopicE).await;
event_emitter
.emit_by_topic(&Topic::TopicE, &E { e_value: 5 })
.await
.expect("Emit event");
let msg = e_listener.recv().await.unwrap();
assert_eq!(msg, E { e_value: 5 });
event_emitter
.emit_by_topic(&Topic::TopicE, &F { f_value: 5 })
.await
.expect("Emit event");
let msg = f_listener.recv().await.unwrap();
assert_eq!(msg, F { f_value: 5 });
});
}
}