1use std::{
2 any::Any,
3 collections::HashMap,
4 marker::PhantomData,
5 sync::{Arc, Weak},
6};
7
8use async_channel::{Receiver, Sender};
9use chrono::{DateTime, Utc};
10use futures_util::stream::{FuturesUnordered, StreamExt};
11use log::trace;
12use parking_lot::Mutex;
13use rand::Rng;
14
15use crate::error::{Error, Result};
16
17const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1000;
19
20pub type EventListenerID = u64;
22
23type Listeners<T> = HashMap<T, HashMap<String, HashMap<EventListenerID, Sender<Event>>>>;
25
26fn random_id() -> EventListenerID {
27 rand::rng().random()
28}
29
30pub struct EventEmitter<T> {
83 listeners: Mutex<Listeners<T>>,
84 listener_buffer_size: usize,
85}
86
87impl<T> EventEmitter<T>
88where
89 T: std::hash::Hash + Eq + std::fmt::Debug + Clone,
90{
91 pub fn new() -> Arc<EventEmitter<T>> {
93 Arc::new(Self {
94 listeners: Mutex::new(HashMap::new()),
95 listener_buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE,
96 })
97 }
98
99 pub fn with_buffer_size(size: usize) -> Arc<EventEmitter<T>> {
110 assert_ne!(size, 0);
111 Arc::new(Self {
112 listeners: Mutex::new(HashMap::new()),
113 listener_buffer_size: size,
114 })
115 }
116
117 pub async fn emit<E: EventTopic<Topic = T> + Clone>(&self, value: &E) -> Result<()> {
122 let topic = E::topic();
123 self.emit_by_topic(&topic, value).await
124 }
125
126 pub async fn emit_by_topic<E: AsEventValue + Clone>(&self, topic: &T, value: &E) -> Result<()> {
128 let mut results = self.send(topic, value).await?;
129
130 let mut failed_listeners = vec![];
131 while let Some((id, fut_err)) = results.next().await {
132 if let Err(err) = fut_err {
133 trace!("Failed to emit event for topic {topic:?}: {err}");
134 failed_listeners.push(id);
135 }
136 }
137 drop(results);
138
139 if !failed_listeners.is_empty() {
140 self.remove(topic, E::event_id(), &failed_listeners);
141 }
142
143 Ok(())
144 }
145
146 pub fn register<E: AsEventValue + Clone>(self: &Arc<Self>, topic: &T) -> EventListener<T, E> {
148 let topics = &mut self.listeners.lock();
149
150 let chan = async_channel::bounded(self.listener_buffer_size);
151
152 let event_ids = topics.entry(topic.clone()).or_default();
153 let event_id = E::event_id().to_string();
154
155 let listeners = event_ids.entry(event_id.clone()).or_default();
156
157 let mut listener_id = random_id();
158 while listeners.contains_key(&listener_id) {
160 listener_id = random_id();
161 }
162
163 let listener =
164 EventListener::new(listener_id, Arc::downgrade(self), chan.1, &event_id, topic);
165
166 listeners.insert(listener_id, chan.0);
167
168 listener
169 }
170
171 pub fn has_listeners(&self, topic: &T) -> bool {
173 self.listeners.lock().contains_key(topic)
174 }
175
176 pub fn clear(self: &Arc<Self>) {
180 self.listeners.lock().clear();
181 }
182
183 pub fn unregister_topic(self: &Arc<Self>, topic: &T) {
185 self.listeners.lock().remove(topic);
186 }
187
188 async fn send<E: AsEventValue + Clone>(
190 &self,
191 topic: &T,
192 value: &E,
193 ) -> Result<
194 FuturesUnordered<
195 impl std::future::Future<
196 Output = (
197 EventListenerID,
198 std::result::Result<(), async_channel::SendError<Event>>,
199 ),
200 > + use<'_, T, E>,
201 >,
202 > {
203 let value: Arc<dyn AsEventValue> = Arc::new(value.clone());
204 let event = Event::new(value);
205
206 let mut topics = self.listeners.lock();
207
208 let results = FuturesUnordered::new();
209 let event_ids = match topics.get_mut(topic) {
210 Some(ids) => ids,
211 None => {
212 trace!("Failed to emit an event to a non-existent topic {topic:?}",);
213 return Err(Error::EventEmitter(format!(
214 "Emit an event to a non-existent topic {topic:?}",
215 )));
216 }
217 };
218
219 let event_id = E::event_id().to_string();
220
221 let Some(listeners) = event_ids.get_mut(&event_id) else {
222 trace!("No listeners found for event '{event_id}' on topic {topic:?}",);
223 return Ok(results);
224 };
225
226 for (listener_id, listener) in listeners {
227 let event = event.clone();
228 let listener = listener.clone();
229 let id = *listener_id;
230 let result = async move { (id, listener.send(event.clone()).await) };
231 results.push(result);
232 }
233 drop(topics);
234
235 Ok(results)
236 }
237
238 fn remove(&self, topic: &T, event_id: &str, listener_ids: &[EventListenerID]) {
240 let topics = &mut self.listeners.lock();
241
242 let Some(event_ids) = topics.get_mut(topic) else {
243 trace!("Failed to remove a non-existent topic");
244 return;
245 };
246
247 let Some(listeners) = event_ids.get_mut(event_id) else {
248 trace!("Failed to remove a non-existent event id");
249 return;
250 };
251
252 for listener_id in listener_ids {
253 if listeners.remove(listener_id).is_none() {
254 trace!("Failed to remove a non-existent event listener");
255 }
256 }
257 }
258}
259
260pub struct EventListener<T, E> {
262 id: EventListenerID,
263 recv_chan: Receiver<Event>,
264 event_emitter: Weak<EventEmitter<T>>,
265 event_id: String,
266 topic: T,
267 phantom: PhantomData<E>,
268}
269
270impl<T, E> EventListener<T, E>
271where
272 T: std::hash::Hash + Eq + Clone + std::fmt::Debug,
273 E: AsEventValue + Clone,
274{
275 fn new(
277 id: EventListenerID,
278 event_emitter: Weak<EventEmitter<T>>,
279 recv_chan: Receiver<Event>,
280 event_id: &str,
281 topic: &T,
282 ) -> EventListener<T, E> {
283 Self {
284 id,
285 recv_chan,
286 event_emitter,
287 event_id: event_id.to_string(),
288 topic: topic.clone(),
289 phantom: PhantomData,
290 }
291 }
292
293 pub async fn recv(&self) -> Result<E> {
298 match self.recv_chan.recv().await {
299 Ok(event) => match (event.value as Arc<dyn Any>).downcast_ref::<E>() {
300 Some(v) => Ok(v.clone()),
301 None => unreachable!("Failed to downcast the event value."),
302 },
303 Err(err) => {
304 trace!("Failed to receive new event: {err}");
305 self.cancel();
306 Err(err.into())
307 }
308 }
309 }
310
311 pub fn cancel(&self) {
313 self.event_emitter()
314 .remove(&self.topic, &self.event_id, &[self.id]);
315 }
316
317 pub fn topic(&self) -> &T {
319 &self.topic
320 }
321
322 pub fn event_id(&self) -> &String {
324 &self.event_id
325 }
326
327 pub fn event_emitter(&self) -> Arc<EventEmitter<T>> {
329 self.event_emitter.upgrade().unwrap()
330 }
331}
332
333#[derive(Clone, Debug)]
335pub struct Event {
336 created_at: DateTime<Utc>,
338 value: Arc<dyn AsEventValue>,
340}
341
342impl Event {
343 pub fn new(value: Arc<dyn AsEventValue>) -> Self {
345 Self {
346 created_at: Utc::now(),
347 value,
348 }
349 }
350}
351
352impl std::fmt::Display for Event {
353 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
354 write!(f, "{}: {:?}", self.created_at, self.value)
355 }
356}
357
358pub trait AsEventValue: Any + Send + Sync + std::fmt::Debug {
363 fn event_id() -> &'static str
364 where
365 Self: Sized;
366}
367
368pub trait EventTopic: AsEventValue {
374 type Topic;
375 fn topic() -> Self::Topic
376 where
377 Self: Sized;
378}