1use std::{
2 collections::{HashMap, HashSet},
3 sync::Arc,
4};
5
6use log::{debug, info};
7use parking_lot::RwLock as SyncRwLock;
8
9use karyon_core::{
10 async_runtime::Executor,
11 async_util::{TaskGroup, TaskResult},
12 crypto::KeyPair,
13};
14use karyon_eventemitter::EventListener;
15use karyon_net::Endpoint;
16
17use crate::{
18 bloom::{Bloom, BloomRef},
19 codec::PeerNetMsgCodec,
20 config::Config,
21 conn_queue::ConnQueue,
22 connector::Connector,
23 discovery::{kademlia::KademliaDiscovery, DiscoveredPeer, Discovery, PeerConnectionEvent},
24 listener::Listener,
25 message::{pick_endpoint, Protocol},
26 monitor::{Monitor, PoolEvent},
27 peer_pool::{PeerEvent, PeerEventTopic, PeerPool},
28 protocol::{PeerConn, Protocol as ProtocolTrait, ProtocolID, ProtocolKind},
29 protocols::PingProtocol,
30 slots::ConnectionSlots,
31 PeerID, Result,
32};
33
34pub struct Node {
59 config: Arc<Config>,
61
62 key_pair: KeyPair,
64
65 peer_id: PeerID,
67
68 monitor: Arc<Monitor>,
70
71 discovery: Arc<dyn Discovery>,
73
74 peer_pool: Arc<PeerPool>,
76
77 connector: Arc<Connector<PeerNetMsgCodec>>,
79
80 listener: Arc<Listener<PeerNetMsgCodec>>,
82
83 task_group: TaskGroup,
85
86 bloom: BloomRef,
91}
92
93impl Node {
94 pub fn new(key_pair: &KeyPair, config: Config, ex: Executor) -> Arc<Node> {
96 let config = Arc::new(config);
97 let monitor = Arc::new(Monitor::new(config.clone()));
98 let peer_id = PeerID::try_from(key_pair.public())
99 .expect("Derive a peer id from the provided key pair.");
100 info!("PeerID: {peer_id}");
101
102 let conn_queue = ConnQueue::new();
103 let peer_pool = PeerPool::new(
104 &peer_id,
105 conn_queue.clone(),
106 config.clone(),
107 monitor.clone(),
108 ex.clone(),
109 );
110
111 let bloom: BloomRef = Arc::new(SyncRwLock::new(Bloom::empty()));
112
113 let discovery: Arc<dyn Discovery> = KademliaDiscovery::new(
114 key_pair,
115 &peer_id,
116 config.clone(),
117 monitor.clone(),
118 bloom.clone(),
119 ex.clone(),
120 );
121
122 let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots));
123 let connector = Connector::new_with_queue(
124 key_pair,
125 config.max_connect_retries,
126 outbound_slots,
127 conn_queue.clone(),
128 monitor.clone(),
129 ex.clone(),
130 );
131
132 let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots));
133 let listener = Listener::new_with_queue(
134 key_pair,
135 inbound_slots,
136 conn_queue,
137 monitor.clone(),
138 ex.clone(),
139 );
140
141 let task_group = TaskGroup::with_executor(ex);
142
143 Arc::new(Self {
144 key_pair: key_pair.clone(),
145 peer_id,
146 monitor,
147 discovery,
148 config,
149 peer_pool,
150 connector,
151 listener,
152 task_group,
153 bloom,
154 })
155 }
156
157 pub fn with_discovery(
162 key_pair: &KeyPair,
163 config: Config,
164 discovery: Arc<dyn Discovery>,
165 ex: Executor,
166 ) -> Arc<Node> {
167 let config = Arc::new(config);
168 let monitor = Arc::new(Monitor::new(config.clone()));
169 let peer_id = PeerID::try_from(key_pair.public())
170 .expect("Derive a peer id from the provided key pair.");
171 info!("PeerID: {peer_id}");
172
173 let conn_queue = ConnQueue::new();
174 let peer_pool = PeerPool::new(
175 &peer_id,
176 conn_queue.clone(),
177 config.clone(),
178 monitor.clone(),
179 ex.clone(),
180 );
181
182 let bloom: BloomRef = Arc::new(SyncRwLock::new(Bloom::empty()));
183
184 let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots));
185 let connector = Connector::new_with_queue(
186 key_pair,
187 config.max_connect_retries,
188 outbound_slots,
189 conn_queue.clone(),
190 monitor.clone(),
191 ex.clone(),
192 );
193
194 let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots));
195 let listener = Listener::new_with_queue(
196 key_pair,
197 inbound_slots,
198 conn_queue,
199 monitor.clone(),
200 ex.clone(),
201 );
202
203 let task_group = TaskGroup::with_executor(ex);
204
205 Arc::new(Self {
206 key_pair: key_pair.clone(),
207 peer_id,
208 monitor,
209 discovery,
210 config,
211 peer_pool,
212 connector,
213 listener,
214 task_group,
215 bloom,
216 })
217 }
218
219 pub async fn run(self: &Arc<Self>) -> Result<()> {
221 self.attach_core_protocols().await?;
224
225 self.peer_pool.start().await?;
226
227 for endpoint in &self.config.listen_endpoints {
229 let resolved = self.listener.start(endpoint.clone()).await?;
230 info!("Listening on {resolved}");
231 }
232
233 self.discovery.clone().start().await?;
235
236 self.task_group.spawn(
238 {
239 let this = self.clone();
240 async move { this.forward_peer_events().await }
241 },
242 |res: TaskResult<()>| async move {
243 debug!("forward_peer_events task ended: {res}");
244 },
245 );
246
247 self.task_group.spawn(
249 {
250 let this = self.clone();
251 async move { this.connect_discovered_peers().await }
252 },
253 |res: TaskResult<()>| async move {
254 debug!("connect_discovered_peers task ended: {res}");
255 },
256 );
257
258 Ok(())
259 }
260
261 async fn forward_peer_events(self: Arc<Self>) {
266 let listener = self.peer_pool.register_peer_events();
267 while let Ok(event) = listener.recv().await {
268 let mapped = match event {
269 PeerEvent::Added(pid) => PeerConnectionEvent::Connected(pid),
270 PeerEvent::Removed(pid) => PeerConnectionEvent::Disconnected(pid),
271 PeerEvent::HandshakeFailed(pid) => PeerConnectionEvent::ConnectFailed(pid),
272 };
273 self.discovery.on_event(mapped);
274 }
275 }
276
277 async fn connect_discovered_peers(self: Arc<Self>) {
280 let supported = [Protocol::Tcp, Protocol::Tls, Protocol::Quic];
281
282 loop {
283 let discovered = self.discovery.recv().await;
284
285 let endpoint = match pick_endpoint(&discovered.addrs, &supported) {
286 Some(ep) => ep,
287 None => continue,
288 };
289
290 let peer_id = discovered.peer_id.clone();
291
292 if self
293 .connector
294 .connect_and_queue(&endpoint, &peer_id)
295 .await
296 .is_err()
297 {
298 self.monitor
299 .notify(PoolEvent::ConnectFailed(peer_id.clone(), endpoint))
300 .await;
301 self.discovery
302 .on_event(PeerConnectionEvent::ConnectFailed(peer_id));
303 }
304 }
305 }
306
307 pub async fn attach_protocol<P: ProtocolTrait>(
312 &self,
313 c: impl Fn(PeerConn) -> Result<Arc<dyn ProtocolTrait>> + Send + Sync + 'static,
314 ) -> Result<()> {
315 self.peer_pool.attach_protocol::<P>(Box::new(c)).await?;
316 let id = P::id();
317 match P::kind() {
318 ProtocolKind::Mandatory => self.bloom_add_mandatory(&id),
319 ProtocolKind::Optional => self.bloom_add_optional(&id),
320 }
321 Ok(())
322 }
323
324 async fn attach_core_protocols(self: &Arc<Self>) -> Result<()> {
326 self.attach_protocol::<PingProtocol>(|conn| {
327 Ok(PingProtocol::new(conn) as Arc<dyn ProtocolTrait>)
328 })
329 .await
330 }
331
332 pub fn bloom_add_mandatory(&self, item: impl AsRef<[u8]>) {
335 self.bloom.write().add_mandatory(item);
336 }
337
338 pub fn bloom_add_optional(&self, item: impl AsRef<[u8]>) {
342 self.bloom.write().add_optional(item);
343 }
344
345 pub fn bloom_snapshot(&self) -> Bloom {
347 *self.bloom.read()
348 }
349
350 pub fn find_peers_with(&self, item: impl AsRef<[u8]>) -> Vec<DiscoveredPeer> {
354 self.discovery.find_peers_with(item.as_ref())
355 }
356
357 pub async fn peers(&self) -> usize {
359 self.peer_pool.peers_len().await
360 }
361
362 pub fn config(&self) -> Arc<Config> {
364 self.config.clone()
365 }
366
367 pub fn peer_id(&self) -> &PeerID {
369 &self.peer_id
370 }
371
372 pub fn key_pair(&self) -> &KeyPair {
374 &self.key_pair
375 }
376
377 pub async fn inbound_peers(&self) -> HashMap<PeerID, Endpoint> {
379 self.peer_pool.inbound_peers().await
380 }
381
382 pub async fn outbound_peers(&self) -> HashMap<PeerID, Endpoint> {
384 self.peer_pool.outbound_peers().await
385 }
386
387 pub fn monitor(&self) -> Arc<Monitor> {
389 self.monitor.clone()
390 }
391
392 pub fn register_peer_events(&self) -> EventListener<PeerEventTopic, PeerEvent> {
395 self.peer_pool.register_peer_events()
396 }
397
398 pub async fn broadcast_to(
401 &self,
402 proto_id: &ProtocolID,
403 msg: Vec<u8>,
404 targets: &HashSet<PeerID>,
405 ) {
406 self.peer_pool.broadcast_to(proto_id, msg, targets).await;
407 }
408
409 pub async fn send_to(
412 &self,
413 peer_id: &PeerID,
414 proto_id: &ProtocolID,
415 msg: Vec<u8>,
416 ) -> Result<()> {
417 self.peer_pool.send_to(peer_id, proto_id, msg).await
418 }
419
420 pub async fn peer_protocol_set(&self, pid: &PeerID) -> Option<HashSet<ProtocolID>> {
423 self.peer_pool.peer_protocol_set(pid).await
424 }
425
426 pub async fn shutdown(&self) {
428 self.discovery.shutdown().await;
429 self.peer_pool.shutdown().await;
430 self.connector.shutdown().await;
431 self.listener.shutdown().await;
432 self.task_group.cancel().await;
433 }
434}