karyon_p2p/discovery/
mod.rs

1mod lookup;
2mod refresh;
3
4use std::sync::Arc;
5
6use log::{error, info};
7use rand::{rngs::OsRng, seq::SliceRandom};
8
9use karyon_core::{
10    async_runtime::Executor,
11    async_util::{Backoff, TaskGroup, TaskResult},
12    crypto::KeyPair,
13};
14
15use karyon_net::Endpoint;
16
17use crate::{
18    config::Config,
19    conn_queue::ConnQueue,
20    connection::ConnDirection,
21    connector::Connector,
22    listener::Listener,
23    monitor::Monitor,
24    routing_table::{
25        RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, INCOMPATIBLE_ENTRY, PENDING_ENTRY,
26        UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
27    },
28    slots::ConnectionSlots,
29    ConnRef, Error, PeerID, Result,
30};
31
32use lookup::LookupService;
33use refresh::RefreshService;
34
35pub struct Discovery {
36    /// Routing table
37    table: Arc<RoutingTable>,
38
39    /// Lookup Service
40    lookup_service: Arc<LookupService>,
41
42    /// Refresh Service
43    refresh_service: Arc<RefreshService>,
44
45    /// Connector
46    connector: Arc<Connector>,
47
48    /// Listener
49    listener: Arc<Listener>,
50
51    /// Connection queue
52    conn_queue: Arc<ConnQueue>,
53
54    /// Managing spawned tasks.
55    task_group: TaskGroup,
56
57    /// Holds the configuration for the P2P network.
58    config: Arc<Config>,
59}
60
61impl Discovery {
62    /// Creates a new Discovery
63    pub fn new(
64        key_pair: &KeyPair,
65        peer_id: &PeerID,
66        conn_queue: Arc<ConnQueue>,
67        config: Arc<Config>,
68        monitor: Arc<Monitor>,
69        ex: Executor,
70    ) -> Arc<Discovery> {
71        let table = Arc::new(RoutingTable::new(peer_id.0));
72
73        let refresh_service = Arc::new(RefreshService::new(
74            config.clone(),
75            table.clone(),
76            monitor.clone(),
77            ex.clone(),
78        ));
79
80        let lookup_service = Arc::new(LookupService::new(
81            key_pair,
82            table.clone(),
83            config.clone(),
84            monitor.clone(),
85            ex.clone(),
86        ));
87
88        let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots));
89        let connector = Connector::new(
90            key_pair,
91            config.max_connect_retries,
92            outbound_slots.clone(),
93            config.enable_tls,
94            monitor.clone(),
95            ex.clone(),
96        );
97
98        let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots));
99        let listener = Listener::new(
100            key_pair,
101            inbound_slots.clone(),
102            config.enable_tls,
103            monitor.clone(),
104            ex.clone(),
105        );
106
107        let task_group = TaskGroup::with_executor(ex);
108
109        Arc::new(Self {
110            refresh_service,
111            lookup_service,
112            conn_queue,
113            table,
114            connector,
115            listener,
116            task_group,
117            config,
118        })
119    }
120
121    /// Start the Discovery
122    pub async fn start(self: &Arc<Self>) -> Result<()> {
123        // Check if the listen_endpoint is provided, and if so, start a listener.
124        if let Some(endpoint) = &self.config.listen_endpoint {
125            // Return an error if the discovery port is set to 0.
126            if self.config.discovery_port == 0 {
127                return Err(Error::Config(
128                    "Please add a valid discovery port".to_string(),
129                ));
130            }
131
132            let resolved_endpoint = self.start_listener(endpoint).await?;
133
134            info!("Resolved listen endpoint: {resolved_endpoint}");
135            self.lookup_service
136                .set_listen_endpoint(&resolved_endpoint)?;
137            self.refresh_service
138                .set_listen_endpoint(&resolved_endpoint)?;
139        }
140
141        // Start the lookup service
142        self.lookup_service.start().await?;
143        // Start the refresh service
144        self.refresh_service.start().await?;
145
146        // Attempt to manually connect to peer endpoints provided in the Config.
147        for endpoint in self.config.peer_endpoints.iter() {
148            let _ = self.connect(endpoint, None).await;
149        }
150
151        // Start connect loop
152        self.task_group.spawn(
153            {
154                let this = self.clone();
155                async move { this.connect_loop().await }
156            },
157            |res| async move {
158                if let TaskResult::Completed(Err(err)) = res {
159                    error!("Connect loop stopped: {err}");
160                }
161            },
162        );
163
164        Ok(())
165    }
166
167    /// Shuts down the discovery
168    pub async fn shutdown(&self) {
169        self.task_group.cancel().await;
170        self.connector.shutdown().await;
171        self.listener.shutdown().await;
172
173        self.refresh_service.shutdown().await;
174        self.lookup_service.shutdown().await;
175    }
176
177    /// Start a listener and on success, return the resolved endpoint.
178    async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> {
179        let callback = {
180            let this = self.clone();
181            |c: ConnRef| async move {
182                this.conn_queue.handle(c, ConnDirection::Inbound).await?;
183                Ok(())
184            }
185        };
186
187        let resolved_endpoint = self.listener.start(endpoint.clone(), callback).await?;
188        Ok(resolved_endpoint)
189    }
190
191    /// This method will attempt to connect to a peer in the routing table.
192    /// If the routing table is empty, it will start the seeding process for
193    /// finding new peers.
194    ///
195    /// This will perform a backoff to prevent getting stuck in the loop
196    /// if the seeding process couldn't find any peers.
197    async fn connect_loop(self: Arc<Self>) -> Result<()> {
198        let backoff = Backoff::new(500, self.config.seeding_interval * 1000);
199        loop {
200            match self.table.random_entry(PENDING_ENTRY) {
201                Some(entry) => {
202                    backoff.reset();
203                    let endpoint = Endpoint::Tcp(entry.addr, entry.port);
204                    self.connect(&endpoint, Some(entry.key.into())).await;
205                }
206                None => {
207                    backoff.sleep().await;
208                    self.start_seeding().await;
209                }
210            }
211        }
212    }
213
214    /// Connect to the given endpoint using the connector
215    async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>) {
216        let cback = {
217            let this = self.clone();
218            let endpoint = endpoint.clone();
219            let pid = pid.clone();
220            |conn: ConnRef| async move {
221                let result = this.conn_queue.handle(conn, ConnDirection::Outbound).await;
222
223                // If the entry is not in the routing table, ignore the result
224                let pid = match pid {
225                    Some(p) => p,
226                    None => return Ok(()),
227                };
228
229                match result {
230                    Err(Error::IncompatiblePeer) => {
231                        error!("Failed to do handshake: {endpoint} incompatible peer");
232                        this.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY);
233                    }
234                    Err(Error::PeerAlreadyConnected) => {
235                        this.table.update_entry(&pid.0, CONNECTED_ENTRY)
236                    }
237                    Err(_) => this.table.update_entry(&pid.0, UNSTABLE_ENTRY),
238                    Ok(_) => this.table.update_entry(&pid.0, DISCONNECTED_ENTRY),
239                }
240
241                Ok(())
242            }
243        };
244
245        let result = self
246            .connector
247            .connect_with_cback(endpoint, &pid, cback)
248            .await;
249
250        if let Some(pid) = &pid {
251            match result {
252                Ok(_) => self.table.update_entry(&pid.0, CONNECTED_ENTRY),
253                Err(_) => self.table.update_entry(&pid.0, UNREACHABLE_ENTRY),
254            }
255        }
256    }
257
258    /// Starts seeding process.
259    ///
260    /// This method randomly selects a peer from the routing table and
261    /// attempts to connect to that peer for the initial lookup. If the routing
262    /// table doesn't have an available entry, it will connect to one of the
263    /// provided bootstrap endpoints in the `Config` and initiate the lookup.
264    async fn start_seeding(&self) {
265        match self.table.random_entry(PENDING_ENTRY | CONNECTED_ENTRY) {
266            Some(entry) => {
267                let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port);
268                let peer_id = Some(entry.key.into());
269                if let Err(err) = self.lookup_service.start_lookup(&endpoint, peer_id).await {
270                    self.table.update_entry(&entry.key, UNSTABLE_ENTRY);
271                    error!("Failed to do lookup: {endpoint}: {err}");
272                }
273            }
274            None => {
275                let peers = &self.config.bootstrap_peers;
276                for endpoint in peers.choose_multiple(&mut OsRng, peers.len()) {
277                    if let Err(err) = self.lookup_service.start_lookup(endpoint, None).await {
278                        error!("Failed to do lookup: {endpoint}: {err}");
279                    }
280                }
281            }
282        }
283    }
284}