karyon_p2p/monitor/
mod.rs1mod event;
2
3use std::sync::Arc;
4
5use log::error;
6
7use karyon_eventemitter::{AsEventTopic, AsEventValue, EventEmitter, EventListener, EventValue};
8
9use karyon_net::Endpoint;
10
11pub(crate) use event::{ConnEvent, DiscvEvent, PPEvent};
12
13#[cfg(feature = "serde")]
14use serde::{Deserialize, Serialize};
15
16use crate::{Config, PeerID};
17
18pub struct Monitor {
50 event_emitter: Arc<EventEmitter<MonitorTopic>>,
51 config: Arc<Config>,
52}
53
54impl Monitor {
55 pub(crate) fn new(config: Arc<Config>) -> Self {
57 Self {
58 event_emitter: EventEmitter::new(),
59 config,
60 }
61 }
62
63 pub(crate) async fn notify<E: ToEventStruct>(&self, event: E) {
65 if self.config.enable_monitor {
66 let event = event.to_struct();
67 if let Err(err) = self.event_emitter.emit(&event).await {
68 error!("Failed to notify monitor event {event:?}: {err}");
69 }
70 }
71 }
72
73 pub fn register<E>(&self) -> EventListener<MonitorTopic, E>
75 where
76 E: Clone + AsEventValue + AsEventTopic<Topic = MonitorTopic>,
77 {
78 self.event_emitter.register(&E::topic())
79 }
80}
81
82#[derive(Clone, Debug, Eq, PartialEq, Hash)]
83#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
84pub enum MonitorTopic {
85 Connection,
86 PeerPool,
87 Discovery,
88}
89
90pub(super) trait ToEventStruct: Sized {
91 type EventStruct: From<Self>
92 + Clone
93 + AsEventTopic<Topic = MonitorTopic>
94 + AsEventValue
95 + std::fmt::Debug;
96 fn to_struct(self) -> Self::EventStruct {
97 self.into()
98 }
99}
100
101impl ToEventStruct for ConnEvent {
102 type EventStruct = ConnectionEvent;
103}
104
105impl ToEventStruct for PPEvent {
106 type EventStruct = PeerPoolEvent;
107}
108
109impl ToEventStruct for DiscvEvent {
110 type EventStruct = DiscoveryEvent;
111}
112
113#[derive(Clone, Debug, EventValue)]
114#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
115pub struct ConnectionEvent {
116 pub event: String,
117 pub date: i64,
118 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
119 pub endpoint: Option<Endpoint>,
120}
121
122#[derive(Clone, Debug, EventValue)]
123#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
124pub struct PeerPoolEvent {
125 pub event: String,
126 pub date: i64,
127 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
128 pub peer_id: Option<PeerID>,
129}
130
131#[derive(Clone, Debug, EventValue)]
132#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
133pub struct DiscoveryEvent {
134 pub event: String,
135 pub date: i64,
136 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
137 pub endpoint: Option<Endpoint>,
138 #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
139 pub size: Option<usize>,
140}
141
142impl From<ConnEvent> for ConnectionEvent {
143 fn from(event: ConnEvent) -> Self {
144 let endpoint = event.get_endpoint().cloned();
145 Self {
146 endpoint,
147 event: event.variant_name().to_string(),
148 date: get_current_timestamp(),
149 }
150 }
151}
152
153impl From<PPEvent> for PeerPoolEvent {
154 fn from(event: PPEvent) -> Self {
155 let peer_id = event.get_peer_id().cloned();
156 Self {
157 peer_id,
158 event: event.variant_name().to_string(),
159 date: get_current_timestamp(),
160 }
161 }
162}
163
164impl From<DiscvEvent> for DiscoveryEvent {
165 fn from(event: DiscvEvent) -> Self {
166 let (endpoint, size) = event.get_endpoint_and_size();
167 Self {
168 endpoint: endpoint.cloned(),
169 size,
170 event: event.variant_name().to_string(),
171 date: get_current_timestamp(),
172 }
173 }
174}
175
176impl AsEventTopic for ConnectionEvent {
177 type Topic = MonitorTopic;
178 fn topic() -> Self::Topic {
179 MonitorTopic::Connection
180 }
181}
182
183impl AsEventTopic for PeerPoolEvent {
184 type Topic = MonitorTopic;
185 fn topic() -> Self::Topic {
186 MonitorTopic::PeerPool
187 }
188}
189
190impl AsEventTopic for DiscoveryEvent {
191 type Topic = MonitorTopic;
192 fn topic() -> Self::Topic {
193 MonitorTopic::Discovery
194 }
195}
196
197fn get_current_timestamp() -> i64 {
198 chrono::Utc::now().timestamp()
199}