karyon_p2p/monitor/
mod.rs

1mod event;
2
3use std::sync::Arc;
4
5use log::error;
6
7use karyon_core::event::{EventEmitter, EventListener, EventValue, EventValueTopic};
8
9use karyon_net::Endpoint;
10
11pub(crate) use event::{ConnEvent, DiscvEvent, PPEvent};
12
13#[cfg(feature = "serde")]
14use serde::{Deserialize, Serialize};
15
16use crate::{Config, PeerID};
17
18/// Responsible for network and system monitoring.
19///
20/// It use event emitter to notify the registerd listeners about new events.
21///
22/// # Example
23///
24/// ```
25/// use std::sync::Arc;
26///
27/// use smol::Executor;
28///
29/// use karyon_p2p::{
30///     Config, Backend, PeerID, keypair::{KeyPair, KeyPairType}, monitor::ConnectionEvent,
31/// };
32///
33/// async {
34///     
35///     // Create a new Executor
36///     let ex = Arc::new(Executor::new());
37///
38///     let key_pair = KeyPair::generate(&KeyPairType::Ed25519);
39///     let backend = Backend::new(&key_pair, Config::default(), ex.into());
40///
41///     // Create a new Subscription
42///     let monitor =  backend.monitor();
43///     
44///     let listener = monitor.register::<ConnectionEvent>().await;
45///     
46///     let new_event = listener.recv().await;
47/// };
48/// ```
49pub struct Monitor {
50    event_emitter: Arc<EventEmitter<MonitorTopic>>,
51    config: Arc<Config>,
52}
53
54impl Monitor {
55    /// Creates a new Monitor
56    pub(crate) fn new(config: Arc<Config>) -> Self {
57        Self {
58            event_emitter: EventEmitter::new(),
59            config,
60        }
61    }
62
63    /// Sends a new monitor event to subscribers.
64    pub(crate) async fn notify<E: ToEventStruct>(&self, event: E) {
65        if self.config.enable_monitor {
66            let event = event.to_struct();
67            if let Err(err) = self.event_emitter.emit(&event).await {
68                error!("Failed to notify monitor event {:?}: {err}", event);
69            }
70        }
71    }
72
73    /// Registers a new event listener for the provided topic.
74    pub async fn register<E>(&self) -> EventListener<MonitorTopic, E>
75    where
76        E: Clone + EventValue + EventValueTopic<Topic = MonitorTopic>,
77    {
78        self.event_emitter.register(&E::topic()).await
79    }
80}
81
82#[derive(Clone, Debug, Eq, PartialEq, Hash)]
83#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
84pub enum MonitorTopic {
85    Connection,
86    PeerPool,
87    Discovery,
88}
89
90pub(super) trait ToEventStruct: Sized {
91    type EventStruct: From<Self> + Clone + EventValueTopic<Topic = MonitorTopic> + EventValue;
92    fn to_struct(self) -> Self::EventStruct {
93        self.into()
94    }
95}
96
97impl ToEventStruct for ConnEvent {
98    type EventStruct = ConnectionEvent;
99}
100
101impl ToEventStruct for PPEvent {
102    type EventStruct = PeerPoolEvent;
103}
104
105impl ToEventStruct for DiscvEvent {
106    type EventStruct = DiscoveryEvent;
107}
108
109#[derive(Clone, Debug)]
110#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
111pub struct ConnectionEvent {
112    pub event: String,
113    pub date: i64,
114    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
115    pub endpoint: Option<Endpoint>,
116}
117
118#[derive(Clone, Debug)]
119#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
120pub struct PeerPoolEvent {
121    pub event: String,
122    pub date: i64,
123    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
124    pub peer_id: Option<PeerID>,
125}
126
127#[derive(Clone, Debug)]
128#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
129pub struct DiscoveryEvent {
130    pub event: String,
131    pub date: i64,
132    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
133    pub endpoint: Option<Endpoint>,
134    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
135    pub size: Option<usize>,
136}
137
138impl From<ConnEvent> for ConnectionEvent {
139    fn from(event: ConnEvent) -> Self {
140        let endpoint = event.get_endpoint().cloned();
141        Self {
142            endpoint,
143            event: event.variant_name().to_string(),
144            date: get_current_timestamp(),
145        }
146    }
147}
148
149impl From<PPEvent> for PeerPoolEvent {
150    fn from(event: PPEvent) -> Self {
151        let peer_id = event.get_peer_id().cloned();
152        Self {
153            peer_id,
154            event: event.variant_name().to_string(),
155            date: get_current_timestamp(),
156        }
157    }
158}
159
160impl From<DiscvEvent> for DiscoveryEvent {
161    fn from(event: DiscvEvent) -> Self {
162        let (endpoint, size) = event.get_endpoint_and_size();
163        Self {
164            endpoint: endpoint.cloned(),
165            size,
166            event: event.variant_name().to_string(),
167            date: get_current_timestamp(),
168        }
169    }
170}
171
172impl EventValue for ConnectionEvent {
173    fn id() -> &'static str {
174        "ConnectionEvent"
175    }
176}
177
178impl EventValue for PeerPoolEvent {
179    fn id() -> &'static str {
180        "PeerPoolEvent"
181    }
182}
183
184impl EventValue for DiscoveryEvent {
185    fn id() -> &'static str {
186        "DiscoveryEvent"
187    }
188}
189
190impl EventValueTopic for ConnectionEvent {
191    type Topic = MonitorTopic;
192    fn topic() -> Self::Topic {
193        MonitorTopic::Connection
194    }
195}
196
197impl EventValueTopic for PeerPoolEvent {
198    type Topic = MonitorTopic;
199    fn topic() -> Self::Topic {
200        MonitorTopic::PeerPool
201    }
202}
203
204impl EventValueTopic for DiscoveryEvent {
205    type Topic = MonitorTopic;
206    fn topic() -> Self::Topic {
207        MonitorTopic::Discovery
208    }
209}
210
211fn get_current_timestamp() -> i64 {
212    chrono::Utc::now().timestamp()
213}