1use std::{
2 any::Any,
3 collections::HashMap,
4 marker::PhantomData,
5 sync::{Arc, Weak},
6};
7
8use async_channel::{Receiver, Sender};
9use chrono::{DateTime, Utc};
10use futures_util::stream::{FuturesUnordered, StreamExt};
11use log::trace;
12use parking_lot::Mutex;
13use rand::{rngs::OsRng, Rng};
14
15use crate::error::{Error, Result};
16
17const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1000;
19
20pub type EventListenerID = u64;
22
23type Listeners<T> = HashMap<T, HashMap<String, HashMap<EventListenerID, Sender<Event>>>>;
25
26fn random_id() -> EventListenerID {
27 OsRng.gen()
28}
29
30pub struct EventEmitter<T> {
83 listeners: Mutex<Listeners<T>>,
84 listener_buffer_size: usize,
85}
86
87impl<T> EventEmitter<T>
88where
89 T: std::hash::Hash + Eq + std::fmt::Debug + Clone,
90{
91 pub fn new() -> Arc<EventEmitter<T>> {
93 Arc::new(Self {
94 listeners: Mutex::new(HashMap::new()),
95 listener_buffer_size: DEFAULT_CHANNEL_BUFFER_SIZE,
96 })
97 }
98
99 pub fn with_buffer_size(size: usize) -> Arc<EventEmitter<T>> {
110 assert_ne!(size, 0);
111 Arc::new(Self {
112 listeners: Mutex::new(HashMap::new()),
113 listener_buffer_size: size,
114 })
115 }
116
117 pub async fn emit<E: AsEventTopic<Topic = T> + Clone>(&self, value: &E) -> Result<()> {
122 let topic = E::topic();
123 self.emit_by_topic(&topic, value).await
124 }
125
126 pub async fn emit_by_topic<E: AsEventValue + Clone>(&self, topic: &T, value: &E) -> Result<()> {
128 let mut results = self.send(topic, value).await?;
129
130 let mut failed_listeners = vec![];
131 while let Some((id, fut_err)) = results.next().await {
132 if let Err(err) = fut_err {
133 trace!("Failed to emit event for topic {topic:?}: {err}");
134 failed_listeners.push(id);
135 }
136 }
137 drop(results);
138
139 if !failed_listeners.is_empty() {
140 self.remove(topic, E::event_id(), &failed_listeners);
141 }
142
143 Ok(())
144 }
145
146 pub fn register<E: AsEventValue + Clone>(self: &Arc<Self>, topic: &T) -> EventListener<T, E> {
148 let topics = &mut self.listeners.lock();
149
150 let chan = async_channel::bounded(self.listener_buffer_size);
151
152 let event_ids = topics.entry(topic.clone()).or_default();
153 let event_id = E::event_id().to_string();
154
155 let listeners = event_ids.entry(event_id.clone()).or_default();
156
157 let mut listener_id = random_id();
158 while listeners.contains_key(&listener_id) {
160 listener_id = random_id();
161 }
162
163 let listener =
164 EventListener::new(listener_id, Arc::downgrade(self), chan.1, &event_id, topic);
165
166 listeners.insert(listener_id, chan.0);
167
168 listener
169 }
170
171 pub fn clear(self: &Arc<Self>) {
175 self.listeners.lock().clear();
176 }
177
178 pub fn unregister_topic(self: &Arc<Self>, topic: &T) {
180 self.listeners.lock().remove(topic);
181 }
182
183 async fn send<E: AsEventValue + Clone>(
185 &self,
186 topic: &T,
187 value: &E,
188 ) -> Result<
189 FuturesUnordered<
190 impl std::future::Future<
191 Output = (
192 EventListenerID,
193 std::result::Result<(), async_channel::SendError<Event>>,
194 ),
195 > + use<'_, T, E>,
196 >,
197 > {
198 let value: Arc<dyn AsEventValue> = Arc::new(value.clone());
199 let event = Event::new(value);
200
201 let mut topics = self.listeners.lock();
202
203 let results = FuturesUnordered::new();
204 let event_ids = match topics.get_mut(topic) {
205 Some(ids) => ids,
206 None => {
207 trace!("Failed to emit an event to a non-existent topic {topic:?}",);
208 return Err(Error::EventEmitter(format!(
209 "Emit an event to a non-existent topic {topic:?}",
210 )));
211 }
212 };
213
214 let event_id = E::event_id().to_string();
215
216 let Some(listeners) = event_ids.get_mut(&event_id) else {
217 trace!("No listeners found for event '{event_id}' on topic {topic:?}",);
218 return Ok(results);
219 };
220
221 for (listener_id, listener) in listeners {
222 let event = event.clone();
223 let listener = listener.clone();
224 let id = *listener_id;
225 let result = async move { (id, listener.send(event.clone()).await) };
226 results.push(result);
227 }
228 drop(topics);
229
230 Ok(results)
231 }
232
233 fn remove(&self, topic: &T, event_id: &str, listener_ids: &[EventListenerID]) {
235 let topics = &mut self.listeners.lock();
236
237 let Some(event_ids) = topics.get_mut(topic) else {
238 trace!("Failed to remove a non-existent topic");
239 return;
240 };
241
242 let Some(listeners) = event_ids.get_mut(event_id) else {
243 trace!("Failed to remove a non-existent event id");
244 return;
245 };
246
247 for listener_id in listener_ids {
248 if listeners.remove(listener_id).is_none() {
249 trace!("Failed to remove a non-existent event listener");
250 }
251 }
252 }
253}
254
255pub struct EventListener<T, E> {
257 id: EventListenerID,
258 recv_chan: Receiver<Event>,
259 event_emitter: Weak<EventEmitter<T>>,
260 event_id: String,
261 topic: T,
262 phantom: PhantomData<E>,
263}
264
265impl<T, E> EventListener<T, E>
266where
267 T: std::hash::Hash + Eq + Clone + std::fmt::Debug,
268 E: AsEventValue + Clone,
269{
270 fn new(
272 id: EventListenerID,
273 event_emitter: Weak<EventEmitter<T>>,
274 recv_chan: Receiver<Event>,
275 event_id: &str,
276 topic: &T,
277 ) -> EventListener<T, E> {
278 Self {
279 id,
280 recv_chan,
281 event_emitter,
282 event_id: event_id.to_string(),
283 topic: topic.clone(),
284 phantom: PhantomData,
285 }
286 }
287
288 pub async fn recv(&self) -> Result<E> {
293 match self.recv_chan.recv().await {
294 Ok(event) => match (event.value as Arc<dyn Any>).downcast_ref::<E>() {
295 Some(v) => Ok(v.clone()),
296 None => unreachable!("Failed to downcast the event value."),
297 },
298 Err(err) => {
299 trace!("Failed to receive new event: {err}");
300 self.cancel();
301 Err(err.into())
302 }
303 }
304 }
305
306 pub fn cancel(&self) {
308 self.event_emitter()
309 .remove(&self.topic, &self.event_id, &[self.id]);
310 }
311
312 pub fn topic(&self) -> &T {
314 &self.topic
315 }
316
317 pub fn event_id(&self) -> &String {
319 &self.event_id
320 }
321
322 pub fn event_emitter(&self) -> Arc<EventEmitter<T>> {
324 self.event_emitter.upgrade().unwrap()
325 }
326}
327
328#[derive(Clone, Debug)]
330pub struct Event {
331 created_at: DateTime<Utc>,
333 value: Arc<dyn AsEventValue>,
335}
336
337impl Event {
338 pub fn new(value: Arc<dyn AsEventValue>) -> Self {
340 Self {
341 created_at: Utc::now(),
342 value,
343 }
344 }
345}
346
347impl std::fmt::Display for Event {
348 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
349 write!(f, "{}: {:?}", self.created_at, self.value)
350 }
351}
352
353pub trait AsEventValue: Any + Send + Sync + std::fmt::Debug {
358 fn event_id() -> &'static str
359 where
360 Self: Sized;
361}
362
363pub trait AsEventTopic: AsEventValue {
369 type Topic;
370 fn topic() -> Self::Topic
371 where
372 Self: Sized;
373}