Skip to main content

karyon_p2p/
node.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::Arc,
4};
5
6use log::{debug, info};
7use parking_lot::RwLock as SyncRwLock;
8
9use karyon_core::{
10    async_runtime::Executor,
11    async_util::{TaskGroup, TaskResult},
12    crypto::KeyPair,
13};
14use karyon_eventemitter::EventListener;
15use karyon_net::Endpoint;
16
17use crate::{
18    bloom::{Bloom, BloomRef},
19    codec::PeerNetMsgCodec,
20    config::Config,
21    conn_queue::ConnQueue,
22    connector::Connector,
23    discovery::{kademlia::KademliaDiscovery, DiscoveredPeer, Discovery, PeerConnectionEvent},
24    listener::Listener,
25    message::{pick_endpoint, Protocol},
26    monitor::{Monitor, PoolEvent},
27    peer_pool::{PeerEvent, PeerEventTopic, PeerPool},
28    protocol::{PeerConn, Protocol as ProtocolTrait, ProtocolID, ProtocolKind},
29    protocols::PingProtocol,
30    slots::ConnectionSlots,
31    PeerID, Result,
32};
33
34/// Central entry point for the p2p network.
35///
36/// Manages peer connections, discovery, and protocol registration.
37///
38/// # Example
39///
40/// ```no_run
41/// use karyon_core::async_runtime::global_executor;
42/// use karyon_p2p::{Node, Config, keypair::{KeyPair, KeyPairType}};
43///
44/// let key_pair = KeyPair::generate(&KeyPairType::Ed25519);
45/// let config = Config {
46///     listen_endpoints: vec![
47///         "tcp://0.0.0.0:8000".parse().unwrap(),
48///     ],
49///     ..Config::default()
50/// };
51///
52/// let node = Node::new(&key_pair, config, global_executor());
53///
54/// // node.run().await.unwrap();
55/// // node.attach_protocol::<MyProto>(|peer| MyProto::new(peer)).await;
56/// // node.shutdown().await;
57/// ```
58pub struct Node {
59    /// The Configuration for the P2P network.
60    config: Arc<Config>,
61
62    /// Identity Key pair
63    key_pair: KeyPair,
64
65    /// Peer ID
66    peer_id: PeerID,
67
68    /// Responsible for network and system monitoring.
69    monitor: Arc<Monitor>,
70
71    /// Discovery instance.
72    discovery: Arc<dyn Discovery>,
73
74    /// PeerPool instance.
75    peer_pool: Arc<PeerPool>,
76
77    /// Connector for outbound connections.
78    connector: Arc<Connector<PeerNetMsgCodec>>,
79
80    /// Listener for inbound connections.
81    listener: Arc<Listener<PeerNetMsgCodec>>,
82
83    /// Managing spawned tasks.
84    task_group: TaskGroup,
85
86    /// Local bloom advertising what items (protocols, swarm keys, ...)
87    /// this node supports. The mandatory side is filtered with `covers`,
88    /// the optional side with `intersects`. Updated by `attach_protocol`
89    /// (via `Protocol::kind()`) and by application layers like Swarm.
90    bloom: BloomRef,
91}
92
93impl Node {
94    /// Creates a new Node with the default Kademlia discovery.
95    pub fn new(key_pair: &KeyPair, config: Config, ex: Executor) -> Arc<Node> {
96        let config = Arc::new(config);
97        let monitor = Arc::new(Monitor::new(config.clone()));
98        let peer_id = PeerID::try_from(key_pair.public())
99            .expect("Derive a peer id from the provided key pair.");
100        info!("PeerID: {peer_id}");
101
102        let conn_queue = ConnQueue::new();
103        let peer_pool = PeerPool::new(
104            &peer_id,
105            conn_queue.clone(),
106            config.clone(),
107            monitor.clone(),
108            ex.clone(),
109        );
110
111        let bloom: BloomRef = Arc::new(SyncRwLock::new(Bloom::empty()));
112
113        let discovery: Arc<dyn Discovery> = KademliaDiscovery::new(
114            key_pair,
115            &peer_id,
116            config.clone(),
117            monitor.clone(),
118            bloom.clone(),
119            ex.clone(),
120        );
121
122        let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots));
123        let connector = Connector::new_with_queue(
124            key_pair,
125            config.max_connect_retries,
126            outbound_slots,
127            conn_queue.clone(),
128            monitor.clone(),
129            ex.clone(),
130        );
131
132        let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots));
133        let listener = Listener::new_with_queue(
134            key_pair,
135            inbound_slots,
136            conn_queue,
137            monitor.clone(),
138            ex.clone(),
139        );
140
141        let task_group = TaskGroup::with_executor(ex);
142
143        Arc::new(Self {
144            key_pair: key_pair.clone(),
145            peer_id,
146            monitor,
147            discovery,
148            config,
149            peer_pool,
150            connector,
151            listener,
152            task_group,
153            bloom,
154        })
155    }
156
157    /// Creates a new Node with a custom discovery implementation.
158    /// The caller is responsible for wiring its own bloom_provider into
159    /// the discovery; Node's `bloom_add_*` methods will not affect
160    /// it unless the discovery reads from the same source.
161    pub fn with_discovery(
162        key_pair: &KeyPair,
163        config: Config,
164        discovery: Arc<dyn Discovery>,
165        ex: Executor,
166    ) -> Arc<Node> {
167        let config = Arc::new(config);
168        let monitor = Arc::new(Monitor::new(config.clone()));
169        let peer_id = PeerID::try_from(key_pair.public())
170            .expect("Derive a peer id from the provided key pair.");
171        info!("PeerID: {peer_id}");
172
173        let conn_queue = ConnQueue::new();
174        let peer_pool = PeerPool::new(
175            &peer_id,
176            conn_queue.clone(),
177            config.clone(),
178            monitor.clone(),
179            ex.clone(),
180        );
181
182        let bloom: BloomRef = Arc::new(SyncRwLock::new(Bloom::empty()));
183
184        let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots));
185        let connector = Connector::new_with_queue(
186            key_pair,
187            config.max_connect_retries,
188            outbound_slots,
189            conn_queue.clone(),
190            monitor.clone(),
191            ex.clone(),
192        );
193
194        let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots));
195        let listener = Listener::new_with_queue(
196            key_pair,
197            inbound_slots,
198            conn_queue,
199            monitor.clone(),
200            ex.clone(),
201        );
202
203        let task_group = TaskGroup::with_executor(ex);
204
205        Arc::new(Self {
206            key_pair: key_pair.clone(),
207            peer_id,
208            monitor,
209            discovery,
210            config,
211            peer_pool,
212            connector,
213            listener,
214            task_group,
215            bloom,
216        })
217    }
218
219    /// Run the Node, starting listeners, PeerPool, and Discovery.
220    pub async fn run(self: &Arc<Self>) -> Result<()> {
221        // Core protocols (PING) are attached before the pool starts so
222        // they're advertised on every handshake.
223        self.attach_core_protocols().await?;
224
225        self.peer_pool.start().await?;
226
227        // Start data listeners.
228        for endpoint in &self.config.listen_endpoints {
229            let resolved = self.listener.start(endpoint.clone()).await?;
230            info!("Listening on {resolved}");
231        }
232
233        // Start discovery.
234        self.discovery.clone().start().await?;
235
236        // Forward peer lifecycle events to discovery.
237        self.task_group.spawn(
238            {
239                let this = self.clone();
240                async move { this.forward_peer_events().await }
241            },
242            |res: TaskResult<()>| async move {
243                debug!("forward_peer_events task ended: {res}");
244            },
245        );
246
247        // Spawn task to connect discovered peers.
248        self.task_group.spawn(
249            {
250                let this = self.clone();
251                async move { this.connect_discovered_peers().await }
252            },
253            |res: TaskResult<()>| async move {
254                debug!("connect_discovered_peers task ended: {res}");
255            },
256        );
257
258        Ok(())
259    }
260
261    /// Forward peer-pool lifecycle events to discovery so it can
262    /// update its routing state. Each registered listener (Node's
263    /// here, plus any external Swarm subscriber) gets every event
264    /// independently - no MPMC stealing.
265    async fn forward_peer_events(self: Arc<Self>) {
266        let listener = self.peer_pool.register_peer_events();
267        while let Ok(event) = listener.recv().await {
268            let mapped = match event {
269                PeerEvent::Added(pid) => PeerConnectionEvent::Connected(pid),
270                PeerEvent::Removed(pid) => PeerConnectionEvent::Disconnected(pid),
271                PeerEvent::HandshakeFailed(pid) => PeerConnectionEvent::ConnectFailed(pid),
272            };
273            self.discovery.on_event(mapped);
274        }
275    }
276
277    /// Consume discovered peers from the discovery service and connect to them.
278    /// Runs forever; the task_group cancels it on Node::shutdown.
279    async fn connect_discovered_peers(self: Arc<Self>) {
280        let supported = [Protocol::Tcp, Protocol::Tls, Protocol::Quic];
281
282        loop {
283            let discovered = self.discovery.recv().await;
284
285            let endpoint = match pick_endpoint(&discovered.addrs, &supported) {
286                Some(ep) => ep,
287                None => continue,
288            };
289
290            let peer_id = discovered.peer_id.clone();
291
292            if self
293                .connector
294                .connect_and_queue(&endpoint, &peer_id)
295                .await
296                .is_err()
297            {
298                self.monitor
299                    .notify(PoolEvent::ConnectFailed(peer_id.clone(), endpoint))
300                    .await;
301                self.discovery
302                    .on_event(PeerConnectionEvent::ConnectFailed(peer_id));
303            }
304        }
305    }
306
307    /// Attach a custom protocol. karyon runs the constructor closure
308    /// once per connected peer with a typed `PeerConn` scoped to this
309    /// protocol. Bloom advertises the protocol id according to
310    /// `P::kind()`.
311    pub async fn attach_protocol<P: ProtocolTrait>(
312        &self,
313        c: impl Fn(PeerConn) -> Result<Arc<dyn ProtocolTrait>> + Send + Sync + 'static,
314    ) -> Result<()> {
315        self.peer_pool.attach_protocol::<P>(Box::new(c)).await?;
316        let id = P::id();
317        match P::kind() {
318            ProtocolKind::Mandatory => self.bloom_add_mandatory(&id),
319            ProtocolKind::Optional => self.bloom_add_optional(&id),
320        }
321        Ok(())
322    }
323
324    /// Attach the core protocols (PING). Called once during `run`.
325    async fn attach_core_protocols(self: &Arc<Self>) -> Result<()> {
326        self.attach_protocol::<PingProtocol>(|conn| {
327            Ok(PingProtocol::new(conn) as Arc<dyn ProtocolTrait>)
328        })
329        .await
330    }
331
332    /// Add an item the local node REQUIRES peers to also support.
333    /// Reflected in the next bloom snapshot advertised in PeerMsg.
334    pub fn bloom_add_mandatory(&self, item: impl AsRef<[u8]>) {
335        self.bloom.write().add_mandatory(item);
336    }
337
338    /// Add an item the local node would LIKE peers to also support but
339    /// doesn't require. Used by Swarm and other layers for fuzzy
340    /// protocol-aware discovery without rejecting non-matches.
341    pub fn bloom_add_optional(&self, item: impl AsRef<[u8]>) {
342        self.bloom.write().add_optional(item);
343    }
344
345    /// Snapshot of the local bloom (mandatory + optional sides).
346    pub fn bloom_snapshot(&self) -> Bloom {
347        *self.bloom.read()
348    }
349
350    /// Find peers in the routing table whose advertised bloom may
351    /// contain `item`. Useful for swarm-targeted lookups (e.g.
352    /// "peers in this room") without changing handshake semantics.
353    pub fn find_peers_with(&self, item: impl AsRef<[u8]>) -> Vec<DiscoveredPeer> {
354        self.discovery.find_peers_with(item.as_ref())
355    }
356
357    /// Returns the number of currently connected peers.
358    pub async fn peers(&self) -> usize {
359        self.peer_pool.peers_len().await
360    }
361
362    /// Returns the `Config`.
363    pub fn config(&self) -> Arc<Config> {
364        self.config.clone()
365    }
366
367    /// Returns the `PeerID`.
368    pub fn peer_id(&self) -> &PeerID {
369        &self.peer_id
370    }
371
372    /// Returns the `KeyPair`.
373    pub fn key_pair(&self) -> &KeyPair {
374        &self.key_pair
375    }
376
377    /// Returns a map of inbound connected peers with their endpoints.
378    pub async fn inbound_peers(&self) -> HashMap<PeerID, Endpoint> {
379        self.peer_pool.inbound_peers().await
380    }
381
382    /// Returns a map of outbound connected peers with their endpoints.
383    pub async fn outbound_peers(&self) -> HashMap<PeerID, Endpoint> {
384        self.peer_pool.outbound_peers().await
385    }
386
387    /// Returns the monitor to receive system events.
388    pub fn monitor(&self) -> Arc<Monitor> {
389        self.monitor.clone()
390    }
391
392    /// Register a listener for peer lifecycle events. Each call returns
393    /// a fresh listener that receives every event (true broadcast).
394    pub fn register_peer_events(&self) -> EventListener<PeerEventTopic, PeerEvent> {
395        self.peer_pool.register_peer_events()
396    }
397
398    /// Broadcast a message to a specific set of peers on a given protocol.
399    /// Used by Swarm and other layers to scope broadcasts.
400    pub async fn broadcast_to(
401        &self,
402        proto_id: &ProtocolID,
403        msg: Vec<u8>,
404        targets: &HashSet<PeerID>,
405    ) {
406        self.peer_pool.broadcast_to(proto_id, msg, targets).await;
407    }
408
409    /// Send a message to a specific peer on the given protocol.
410    /// Returns `PeerNotFound` if the peer is not currently connected.
411    pub async fn send_to(
412        &self,
413        peer_id: &PeerID,
414        proto_id: &ProtocolID,
415        msg: Vec<u8>,
416    ) -> Result<()> {
417        self.peer_pool.send_to(peer_id, proto_id, msg).await
418    }
419
420    /// Returns the negotiated protocol set for a connected peer, or
421    /// `None` if no peer with that id is currently in the pool.
422    pub async fn peer_protocol_set(&self, pid: &PeerID) -> Option<HashSet<ProtocolID>> {
423        self.peer_pool.peer_protocol_set(pid).await
424    }
425
426    /// Shuts down the Node.
427    pub async fn shutdown(&self) {
428        self.discovery.shutdown().await;
429        self.peer_pool.shutdown().await;
430        self.connector.shutdown().await;
431        self.listener.shutdown().await;
432        self.task_group.cancel().await;
433    }
434}