karyon_p2p/monitor/
mod.rs1mod event;
2
3use std::sync::Arc;
4
5use log::error;
6
7use karyon_eventemitter::{AsEventValue, EventEmitter, EventListener, EventTopic, EventValue};
8
9use karyon_net::Endpoint;
10
11pub(crate) use event::{ConnectionKind, DiscoveryKind, PoolEvent};
12
13#[cfg(feature = "serde")]
14use serde::{Deserialize, Serialize};
15
16use crate::{Config, PeerID};
17
18pub struct Monitor {
41 event_emitter: Arc<EventEmitter<MonitorTopic>>,
42 config: Arc<Config>,
43}
44
45impl Monitor {
46 pub(crate) fn new(config: Arc<Config>) -> Self {
48 Self {
49 event_emitter: EventEmitter::new(),
50 config,
51 }
52 }
53
54 pub(crate) async fn notify<E: ToEvent>(&self, event: E) {
56 if !self.config.enable_monitor {
57 return;
58 }
59
60 let topic = E::Event::topic();
61 if !self.event_emitter.has_listeners(&topic) {
62 return;
63 }
64
65 let event = event.to_event();
66 if let Err(err) = self.event_emitter.emit(&event).await {
67 error!("Failed to notify monitor event {event:?}: {err}");
68 }
69 }
70
71 pub fn register<E>(&self) -> EventListener<MonitorTopic, E>
73 where
74 E: Clone + AsEventValue + EventTopic<Topic = MonitorTopic>,
75 {
76 self.event_emitter.register(&E::topic())
77 }
78}
79
80#[derive(Clone, Debug, Eq, PartialEq, Hash)]
81#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
82pub enum MonitorTopic {
83 Connection,
84 PeerPool,
85 Discovery,
86}
87
88pub(super) trait ToEvent: Sized {
89 type Event: From<Self>
90 + Clone
91 + EventTopic<Topic = MonitorTopic>
92 + AsEventValue
93 + std::fmt::Debug;
94 fn to_event(self) -> Self::Event {
95 self.into()
96 }
97}
98
99impl ToEvent for ConnectionKind {
100 type Event = ConnectionEvent;
101}
102
103impl ToEvent for PoolEvent {
104 type Event = PeerPoolEvent;
105}
106
107impl ToEvent for DiscoveryKind {
108 type Event = DiscoveryEvent;
109}
110
111#[derive(Clone, Debug, EventValue)]
112#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
113pub struct ConnectionEvent {
114 pub event: String,
115 pub date: i64,
116 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
117 pub endpoint: Option<Endpoint>,
118}
119
120#[derive(Clone, Debug, EventValue)]
121#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
122pub struct PeerPoolEvent {
123 pub event: String,
124 pub date: i64,
125 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
126 pub peer_id: Option<PeerID>,
127 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
128 pub endpoint: Option<Endpoint>,
129}
130
131#[derive(Clone, Debug, EventValue)]
132#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
133pub struct DiscoveryEvent {
134 pub event: String,
135 pub date: i64,
136 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
137 pub endpoint: Option<Endpoint>,
138 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
139 pub size: Option<usize>,
140 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
141 pub peer_id: Option<PeerID>,
142}
143
144impl From<ConnectionKind> for ConnectionEvent {
145 fn from(event: ConnectionKind) -> Self {
146 let endpoint = event.get_endpoint().cloned();
147 Self {
148 endpoint,
149 event: event.variant_name().to_string(),
150 date: get_current_timestamp(),
151 }
152 }
153}
154
155impl From<PoolEvent> for PeerPoolEvent {
156 fn from(event: PoolEvent) -> Self {
157 let peer_id = event.get_peer_id().cloned();
158 let endpoint = event.get_endpoint().cloned();
159 Self {
160 peer_id,
161 endpoint,
162 event: event.variant_name().to_string(),
163 date: get_current_timestamp(),
164 }
165 }
166}
167
168impl From<DiscoveryKind> for DiscoveryEvent {
169 fn from(event: DiscoveryKind) -> Self {
170 Self {
171 endpoint: event.get_endpoint().cloned(),
172 size: event.get_size(),
173 peer_id: event.get_peer_id().cloned(),
174 event: event.variant_name().to_string(),
175 date: get_current_timestamp(),
176 }
177 }
178}
179
180impl EventTopic for ConnectionEvent {
181 type Topic = MonitorTopic;
182 fn topic() -> Self::Topic {
183 MonitorTopic::Connection
184 }
185}
186
187impl EventTopic for PeerPoolEvent {
188 type Topic = MonitorTopic;
189 fn topic() -> Self::Topic {
190 MonitorTopic::PeerPool
191 }
192}
193
194impl EventTopic for DiscoveryEvent {
195 type Topic = MonitorTopic;
196 fn topic() -> Self::Topic {
197 MonitorTopic::Discovery
198 }
199}
200
201fn get_current_timestamp() -> i64 {
202 chrono::Utc::now().timestamp()
203}