Skip to main content

karyon_p2p/monitor/
mod.rs

1mod event;
2
3use std::sync::Arc;
4
5use log::error;
6
7use karyon_eventemitter::{AsEventValue, EventEmitter, EventListener, EventTopic, EventValue};
8
9use karyon_net::Endpoint;
10
11pub(crate) use event::{ConnectionKind, DiscoveryKind, PoolEvent};
12
13#[cfg(feature = "serde")]
14use serde::{Deserialize, Serialize};
15
16use crate::{Config, PeerID};
17
18/// Responsible for network and system monitoring.
19///
20/// It use event emitter to notify the registerd listeners about new events.
21///
22/// # Example
23///
24/// ```no_run
25/// use karyon_core::async_runtime::global_executor;
26/// use karyon_p2p::{
27///     Config, Node, keypair::{KeyPair, KeyPairType}, monitor::ConnectionEvent,
28/// };
29///
30/// async {
31///     let key_pair = KeyPair::generate(&KeyPairType::Ed25519);
32///     let node = Node::new(&key_pair, Config::default(), global_executor());
33///
34///     // Subscribe to a monitor event.
35///     let monitor = node.monitor();
36///     let listener = monitor.register::<ConnectionEvent>();
37///     let new_event = listener.recv().await;
38/// };
39/// ```
40pub struct Monitor {
41    event_emitter: Arc<EventEmitter<MonitorTopic>>,
42    config: Arc<Config>,
43}
44
45impl Monitor {
46    /// Creates a new Monitor
47    pub(crate) fn new(config: Arc<Config>) -> Self {
48        Self {
49            event_emitter: EventEmitter::new(),
50            config,
51        }
52    }
53
54    /// Sends a new monitor event to subscribers.
55    pub(crate) async fn notify<E: ToEvent>(&self, event: E) {
56        if !self.config.enable_monitor {
57            return;
58        }
59
60        let topic = E::Event::topic();
61        if !self.event_emitter.has_listeners(&topic) {
62            return;
63        }
64
65        let event = event.to_event();
66        if let Err(err) = self.event_emitter.emit(&event).await {
67            error!("Failed to notify monitor event {event:?}: {err}");
68        }
69    }
70
71    /// Registers a new event listener for the provided topic.
72    pub fn register<E>(&self) -> EventListener<MonitorTopic, E>
73    where
74        E: Clone + AsEventValue + EventTopic<Topic = MonitorTopic>,
75    {
76        self.event_emitter.register(&E::topic())
77    }
78}
79
80#[derive(Clone, Debug, Eq, PartialEq, Hash)]
81#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
82pub enum MonitorTopic {
83    Connection,
84    PeerPool,
85    Discovery,
86}
87
88pub(super) trait ToEvent: Sized {
89    type Event: From<Self>
90        + Clone
91        + EventTopic<Topic = MonitorTopic>
92        + AsEventValue
93        + std::fmt::Debug;
94    fn to_event(self) -> Self::Event {
95        self.into()
96    }
97}
98
99impl ToEvent for ConnectionKind {
100    type Event = ConnectionEvent;
101}
102
103impl ToEvent for PoolEvent {
104    type Event = PeerPoolEvent;
105}
106
107impl ToEvent for DiscoveryKind {
108    type Event = DiscoveryEvent;
109}
110
111#[derive(Clone, Debug, EventValue)]
112#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
113pub struct ConnectionEvent {
114    pub event: String,
115    pub date: i64,
116    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
117    pub endpoint: Option<Endpoint>,
118}
119
120#[derive(Clone, Debug, EventValue)]
121#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
122pub struct PeerPoolEvent {
123    pub event: String,
124    pub date: i64,
125    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
126    pub peer_id: Option<PeerID>,
127    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
128    pub endpoint: Option<Endpoint>,
129}
130
131#[derive(Clone, Debug, EventValue)]
132#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
133pub struct DiscoveryEvent {
134    pub event: String,
135    pub date: i64,
136    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
137    pub endpoint: Option<Endpoint>,
138    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
139    pub size: Option<usize>,
140    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
141    pub peer_id: Option<PeerID>,
142}
143
144impl From<ConnectionKind> for ConnectionEvent {
145    fn from(event: ConnectionKind) -> Self {
146        let endpoint = event.get_endpoint().cloned();
147        Self {
148            endpoint,
149            event: event.variant_name().to_string(),
150            date: get_current_timestamp(),
151        }
152    }
153}
154
155impl From<PoolEvent> for PeerPoolEvent {
156    fn from(event: PoolEvent) -> Self {
157        let peer_id = event.get_peer_id().cloned();
158        let endpoint = event.get_endpoint().cloned();
159        Self {
160            peer_id,
161            endpoint,
162            event: event.variant_name().to_string(),
163            date: get_current_timestamp(),
164        }
165    }
166}
167
168impl From<DiscoveryKind> for DiscoveryEvent {
169    fn from(event: DiscoveryKind) -> Self {
170        Self {
171            endpoint: event.get_endpoint().cloned(),
172            size: event.get_size(),
173            peer_id: event.get_peer_id().cloned(),
174            event: event.variant_name().to_string(),
175            date: get_current_timestamp(),
176        }
177    }
178}
179
180impl EventTopic for ConnectionEvent {
181    type Topic = MonitorTopic;
182    fn topic() -> Self::Topic {
183        MonitorTopic::Connection
184    }
185}
186
187impl EventTopic for PeerPoolEvent {
188    type Topic = MonitorTopic;
189    fn topic() -> Self::Topic {
190        MonitorTopic::PeerPool
191    }
192}
193
194impl EventTopic for DiscoveryEvent {
195    type Topic = MonitorTopic;
196    fn topic() -> Self::Topic {
197        MonitorTopic::Discovery
198    }
199}
200
201fn get_current_timestamp() -> i64 {
202    chrono::Utc::now().timestamp()
203}