karyon_p2p/monitor/
mod.rs1mod event;
2
3use std::sync::Arc;
4
5use log::error;
6
7use karyon_core::event::{EventEmitter, EventListener, EventValue, EventValueTopic};
8
9use karyon_net::Endpoint;
10
11pub(crate) use event::{ConnEvent, DiscvEvent, PPEvent};
12
13#[cfg(feature = "serde")]
14use serde::{Deserialize, Serialize};
15
16use crate::{Config, PeerID};
17
18pub struct Monitor {
50 event_emitter: Arc<EventEmitter<MonitorTopic>>,
51 config: Arc<Config>,
52}
53
54impl Monitor {
55 pub(crate) fn new(config: Arc<Config>) -> Self {
57 Self {
58 event_emitter: EventEmitter::new(),
59 config,
60 }
61 }
62
63 pub(crate) async fn notify<E: ToEventStruct>(&self, event: E) {
65 if self.config.enable_monitor {
66 let event = event.to_struct();
67 if let Err(err) = self.event_emitter.emit(&event).await {
68 error!("Failed to notify monitor event {:?}: {err}", event);
69 }
70 }
71 }
72
73 pub async fn register<E>(&self) -> EventListener<MonitorTopic, E>
75 where
76 E: Clone + EventValue + EventValueTopic<Topic = MonitorTopic>,
77 {
78 self.event_emitter.register(&E::topic()).await
79 }
80}
81
82#[derive(Clone, Debug, Eq, PartialEq, Hash)]
83#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
84pub enum MonitorTopic {
85 Connection,
86 PeerPool,
87 Discovery,
88}
89
90pub(super) trait ToEventStruct: Sized {
91 type EventStruct: From<Self> + Clone + EventValueTopic<Topic = MonitorTopic> + EventValue;
92 fn to_struct(self) -> Self::EventStruct {
93 self.into()
94 }
95}
96
97impl ToEventStruct for ConnEvent {
98 type EventStruct = ConnectionEvent;
99}
100
101impl ToEventStruct for PPEvent {
102 type EventStruct = PeerPoolEvent;
103}
104
105impl ToEventStruct for DiscvEvent {
106 type EventStruct = DiscoveryEvent;
107}
108
109#[derive(Clone, Debug)]
110#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
111pub struct ConnectionEvent {
112 pub event: String,
113 pub date: i64,
114 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
115 pub endpoint: Option<Endpoint>,
116}
117
118#[derive(Clone, Debug)]
119#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
120pub struct PeerPoolEvent {
121 pub event: String,
122 pub date: i64,
123 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
124 pub peer_id: Option<PeerID>,
125}
126
127#[derive(Clone, Debug)]
128#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
129pub struct DiscoveryEvent {
130 pub event: String,
131 pub date: i64,
132 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
133 pub endpoint: Option<Endpoint>,
134 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
135 pub size: Option<usize>,
136}
137
138impl From<ConnEvent> for ConnectionEvent {
139 fn from(event: ConnEvent) -> Self {
140 let endpoint = event.get_endpoint().cloned();
141 Self {
142 endpoint,
143 event: event.variant_name().to_string(),
144 date: get_current_timestamp(),
145 }
146 }
147}
148
149impl From<PPEvent> for PeerPoolEvent {
150 fn from(event: PPEvent) -> Self {
151 let peer_id = event.get_peer_id().cloned();
152 Self {
153 peer_id,
154 event: event.variant_name().to_string(),
155 date: get_current_timestamp(),
156 }
157 }
158}
159
160impl From<DiscvEvent> for DiscoveryEvent {
161 fn from(event: DiscvEvent) -> Self {
162 let (endpoint, size) = event.get_endpoint_and_size();
163 Self {
164 endpoint: endpoint.cloned(),
165 size,
166 event: event.variant_name().to_string(),
167 date: get_current_timestamp(),
168 }
169 }
170}
171
172impl EventValue for ConnectionEvent {
173 fn id() -> &'static str {
174 "ConnectionEvent"
175 }
176}
177
178impl EventValue for PeerPoolEvent {
179 fn id() -> &'static str {
180 "PeerPoolEvent"
181 }
182}
183
184impl EventValue for DiscoveryEvent {
185 fn id() -> &'static str {
186 "DiscoveryEvent"
187 }
188}
189
190impl EventValueTopic for ConnectionEvent {
191 type Topic = MonitorTopic;
192 fn topic() -> Self::Topic {
193 MonitorTopic::Connection
194 }
195}
196
197impl EventValueTopic for PeerPoolEvent {
198 type Topic = MonitorTopic;
199 fn topic() -> Self::Topic {
200 MonitorTopic::PeerPool
201 }
202}
203
204impl EventValueTopic for DiscoveryEvent {
205 type Topic = MonitorTopic;
206 fn topic() -> Self::Topic {
207 MonitorTopic::Discovery
208 }
209}
210
211fn get_current_timestamp() -> i64 {
212 chrono::Utc::now().timestamp()
213}