karyon_p2p/monitor/
mod.rs

1mod event;
2
3use std::sync::Arc;
4
5use log::error;
6
7use karyon_eventemitter::{AsEventTopic, AsEventValue, EventEmitter, EventListener, EventValue};
8
9use karyon_net::Endpoint;
10
11pub(crate) use event::{ConnEvent, DiscvEvent, PPEvent};
12
13#[cfg(feature = "serde")]
14use serde::{Deserialize, Serialize};
15
16use crate::{Config, PeerID};
17
18/// Responsible for network and system monitoring.
19///
20/// It use event emitter to notify the registerd listeners about new events.
21///
22/// # Example
23///
24/// ```
25/// use std::sync::Arc;
26///
27/// use smol::Executor;
28///
29/// use karyon_p2p::{
30///     Config, Backend, PeerID, keypair::{KeyPair, KeyPairType}, monitor::ConnectionEvent,
31/// };
32///
33/// async {
34///     
35///     // Create a new Executor
36///     let ex = Arc::new(Executor::new());
37///
38///     let key_pair = KeyPair::generate(&KeyPairType::Ed25519);
39///     let backend = Backend::new(&key_pair, Config::default(), ex.into());
40///
41///     // Create a new Subscription
42///     let monitor =  backend.monitor();
43///     
44///     let listener = monitor.register::<ConnectionEvent>();
45///     
46///     let new_event = listener.recv().await;
47/// };
48/// ```
49pub struct Monitor {
50    event_emitter: Arc<EventEmitter<MonitorTopic>>,
51    config: Arc<Config>,
52}
53
54impl Monitor {
55    /// Creates a new Monitor
56    pub(crate) fn new(config: Arc<Config>) -> Self {
57        Self {
58            event_emitter: EventEmitter::new(),
59            config,
60        }
61    }
62
63    /// Sends a new monitor event to subscribers.
64    pub(crate) async fn notify<E: ToEventStruct>(&self, event: E) {
65        if self.config.enable_monitor {
66            let event = event.to_struct();
67            if let Err(err) = self.event_emitter.emit(&event).await {
68                error!("Failed to notify monitor event {event:?}: {err}");
69            }
70        }
71    }
72
73    /// Registers a new event listener for the provided topic.
74    pub fn register<E>(&self) -> EventListener<MonitorTopic, E>
75    where
76        E: Clone + AsEventValue + AsEventTopic<Topic = MonitorTopic>,
77    {
78        self.event_emitter.register(&E::topic())
79    }
80}
81
82#[derive(Clone, Debug, Eq, PartialEq, Hash)]
83#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
84pub enum MonitorTopic {
85    Connection,
86    PeerPool,
87    Discovery,
88}
89
90pub(super) trait ToEventStruct: Sized {
91    type EventStruct: From<Self>
92        + Clone
93        + AsEventTopic<Topic = MonitorTopic>
94        + AsEventValue
95        + std::fmt::Debug;
96    fn to_struct(self) -> Self::EventStruct {
97        self.into()
98    }
99}
100
101impl ToEventStruct for ConnEvent {
102    type EventStruct = ConnectionEvent;
103}
104
105impl ToEventStruct for PPEvent {
106    type EventStruct = PeerPoolEvent;
107}
108
109impl ToEventStruct for DiscvEvent {
110    type EventStruct = DiscoveryEvent;
111}
112
113#[derive(Clone, Debug, EventValue)]
114#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
115pub struct ConnectionEvent {
116    pub event: String,
117    pub date: i64,
118    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
119    pub endpoint: Option<Endpoint>,
120}
121
122#[derive(Clone, Debug, EventValue)]
123#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
124pub struct PeerPoolEvent {
125    pub event: String,
126    pub date: i64,
127    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
128    pub peer_id: Option<PeerID>,
129}
130
131#[derive(Clone, Debug, EventValue)]
132#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
133pub struct DiscoveryEvent {
134    pub event: String,
135    pub date: i64,
136    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
137    pub endpoint: Option<Endpoint>,
138    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
139    pub size: Option<usize>,
140}
141
142impl From<ConnEvent> for ConnectionEvent {
143    fn from(event: ConnEvent) -> Self {
144        let endpoint = event.get_endpoint().cloned();
145        Self {
146            endpoint,
147            event: event.variant_name().to_string(),
148            date: get_current_timestamp(),
149        }
150    }
151}
152
153impl From<PPEvent> for PeerPoolEvent {
154    fn from(event: PPEvent) -> Self {
155        let peer_id = event.get_peer_id().cloned();
156        Self {
157            peer_id,
158            event: event.variant_name().to_string(),
159            date: get_current_timestamp(),
160        }
161    }
162}
163
164impl From<DiscvEvent> for DiscoveryEvent {
165    fn from(event: DiscvEvent) -> Self {
166        let (endpoint, size) = event.get_endpoint_and_size();
167        Self {
168            endpoint: endpoint.cloned(),
169            size,
170            event: event.variant_name().to_string(),
171            date: get_current_timestamp(),
172        }
173    }
174}
175
176impl AsEventTopic for ConnectionEvent {
177    type Topic = MonitorTopic;
178    fn topic() -> Self::Topic {
179        MonitorTopic::Connection
180    }
181}
182
183impl AsEventTopic for PeerPoolEvent {
184    type Topic = MonitorTopic;
185    fn topic() -> Self::Topic {
186        MonitorTopic::PeerPool
187    }
188}
189
190impl AsEventTopic for DiscoveryEvent {
191    type Topic = MonitorTopic;
192    fn topic() -> Self::Topic {
193        MonitorTopic::Discovery
194    }
195}
196
197fn get_current_timestamp() -> i64 {
198    chrono::Utc::now().timestamp()
199}