karyon_p2p/
backend.rs

1use std::{collections::HashMap, sync::Arc};
2
3use log::info;
4
5use karyon_core::{async_runtime::Executor, crypto::KeyPair};
6use karyon_net::Endpoint;
7
8use crate::{
9    config::Config, conn_queue::ConnQueue, discovery::Discovery, monitor::Monitor, peer::Peer,
10    peer_pool::PeerPool, protocol::Protocol, PeerID, Result,
11};
12
13/// Backend serves as the central entry point for initiating and managing
14/// the P2P network.
15pub struct Backend {
16    /// The Configuration for the P2P network.
17    config: Arc<Config>,
18
19    /// Identity Key pair
20    key_pair: KeyPair,
21
22    /// Peer ID
23    peer_id: PeerID,
24
25    /// Responsible for network and system monitoring.
26    monitor: Arc<Monitor>,
27
28    /// Discovery instance.
29    discovery: Arc<Discovery>,
30
31    /// PeerPool instance.
32    peer_pool: Arc<PeerPool>,
33}
34
35impl Backend {
36    /// Creates a new Backend.
37    pub fn new(key_pair: &KeyPair, config: Config, ex: Executor) -> Arc<Backend> {
38        let config = Arc::new(config);
39        let monitor = Arc::new(Monitor::new(config.clone()));
40        let conn_queue = ConnQueue::new();
41
42        let peer_id = PeerID::try_from(key_pair.public())
43            .expect("Derive a peer id from the provided key pair.");
44        info!("PeerID: {}", peer_id);
45
46        let peer_pool = PeerPool::new(
47            &peer_id,
48            conn_queue.clone(),
49            config.clone(),
50            monitor.clone(),
51            ex.clone(),
52        );
53
54        let discovery = Discovery::new(
55            key_pair,
56            &peer_id,
57            conn_queue,
58            config.clone(),
59            monitor.clone(),
60            ex,
61        );
62
63        Arc::new(Self {
64            key_pair: key_pair.clone(),
65            peer_id,
66            monitor,
67            discovery,
68            config,
69            peer_pool,
70        })
71    }
72
73    /// Run the Backend, starting the PeerPool and Discovery instances.
74    pub async fn run(self: &Arc<Self>) -> Result<()> {
75        self.peer_pool.start().await?;
76        self.discovery.start().await?;
77        Ok(())
78    }
79
80    /// Attach a custom protocol to the network
81    pub async fn attach_protocol<P: Protocol>(
82        &self,
83        c: impl Fn(Arc<Peer>) -> Arc<dyn Protocol> + Send + Sync + 'static,
84    ) -> Result<()> {
85        self.peer_pool.attach_protocol::<P>(Box::new(c)).await
86    }
87
88    /// Returns the number of currently connected peers.
89    pub async fn peers(&self) -> usize {
90        self.peer_pool.peers_len().await
91    }
92
93    /// Returns the `Config`.
94    pub fn config(&self) -> Arc<Config> {
95        self.config.clone()
96    }
97
98    /// Returns the `PeerID`.
99    pub fn peer_id(&self) -> &PeerID {
100        &self.peer_id
101    }
102
103    /// Returns the `KeyPair`.
104    pub fn key_pair(&self) -> &KeyPair {
105        &self.key_pair
106    }
107
108    /// Returns a map of inbound connected peers with their endpoints.
109    pub async fn inbound_peers(&self) -> HashMap<PeerID, Endpoint> {
110        self.peer_pool.inbound_peers().await
111    }
112
113    /// Returns a map of outbound connected peers with their endpoints.
114    pub async fn outbound_peers(&self) -> HashMap<PeerID, Endpoint> {
115        self.peer_pool.outbound_peers().await
116    }
117
118    /// Returns the monitor to receive system events.
119    pub fn monitor(&self) -> Arc<Monitor> {
120        self.monitor.clone()
121    }
122
123    /// Shuts down the Backend.
124    pub async fn shutdown(&self) {
125        self.discovery.shutdown().await;
126        self.peer_pool.shutdown().await;
127    }
128}