Skip to main content

karyon_p2p/discovery/kademlia/
mod.rs

1mod lookup;
2mod messages;
3mod refresh;
4pub mod routing_table;
5
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use log::error;
10use rand::seq::IndexedRandom;
11
12use karyon_core::{
13    async_runtime::Executor,
14    async_util::{AsyncQueue, Backoff, TaskGroup, TaskResult},
15    crypto::KeyPair,
16};
17
18use karyon_net::Endpoint;
19
20use crate::{
21    bloom::BloomRef,
22    config::Config,
23    discovery::{DiscoveredPeer, Discovery, PeerConnectionEvent},
24    message::{pick_endpoint, PeerAddr, Protocol},
25    monitor::Monitor,
26    PeerID, Result,
27};
28
29use lookup::{LookupEndpoints, LookupService};
30use refresh::RefreshService;
31use routing_table::{
32    RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY, UNREACHABLE_ENTRY,
33    UNSTABLE_ENTRY,
34};
35
36/// Bound on the discovered-peer queue between Kademlia and the Node.
37const DISCOVERED_PEER_QUEUE_SIZE: usize = 128;
38
39/// Transports the lookup service can dial. Computed once here so the
40/// rest of the module never needs an inline cfg.
41#[cfg(feature = "quic")]
42pub(crate) const SUPPORTED_LOOKUP_PROTOCOLS: &[Protocol] = &[Protocol::Tcp, Protocol::Quic];
43#[cfg(not(feature = "quic"))]
44pub(crate) const SUPPORTED_LOOKUP_PROTOCOLS: &[Protocol] = &[Protocol::Tcp];
45
46pub struct KademliaDiscovery {
47    /// Routing table
48    table: Arc<RoutingTable>,
49
50    /// Lookup Service
51    lookup_service: Arc<LookupService>,
52
53    /// Refresh Service
54    refresh_service: Arc<RefreshService>,
55
56    /// Discovered peers queued for the Node to dial. Producers
57    /// (connect_loop, manual peer_endpoints) push; the Node's
58    /// connect_discovered_peers task drains via `recv()`.
59    peer_queue: Arc<AsyncQueue<DiscoveredPeer>>,
60
61    /// Managing spawned tasks.
62    task_group: TaskGroup,
63
64    /// Holds the configuration for the P2P network.
65    config: Arc<Config>,
66
67    /// Shared local bloom. Snapshotted on every connect-loop iteration
68    /// to filter routing-table entries (and stamped on outgoing PeerMsgs
69    /// via the lookup service).
70    bloom: BloomRef,
71}
72
73impl KademliaDiscovery {
74    /// Creates a new KademliaDiscovery
75    pub fn new(
76        key_pair: &KeyPair,
77        peer_id: &PeerID,
78        config: Arc<Config>,
79        monitor: Arc<Monitor>,
80        bloom: BloomRef,
81        ex: Executor,
82    ) -> Arc<Self> {
83        let table = Arc::new(RoutingTable::new(peer_id.0));
84
85        // Pick the lookup endpoint (tcp/tls/quic) and the refresh
86        // endpoint (udp) from `discovery_endpoints` (any order).
87        let lookup_ep = config
88            .discovery_endpoints
89            .iter()
90            .find(|e| is_lookup_proto(e))
91            .cloned();
92        let refresh_ep = config
93            .discovery_endpoints
94            .iter()
95            .find(|e| e.is_udp())
96            .cloned();
97
98        let refresh_service = Arc::new(RefreshService::new(
99            config.clone(),
100            table.clone(),
101            monitor.clone(),
102            refresh_ep.clone(),
103            ex.clone(),
104        ));
105
106        let lookup_endpoints = LookupEndpoints {
107            listen: config.listen_endpoints.clone(),
108            lookup: lookup_ep,
109            refresh: refresh_ep,
110        };
111        let lookup_service = Arc::new(LookupService::new(
112            key_pair,
113            table.clone(),
114            config.clone(),
115            monitor.clone(),
116            bloom.clone(),
117            lookup_endpoints,
118            ex.clone(),
119        ));
120
121        let task_group = TaskGroup::with_executor(ex);
122
123        let peer_queue = AsyncQueue::new(DISCOVERED_PEER_QUEUE_SIZE);
124
125        Arc::new(Self {
126            refresh_service,
127            lookup_service,
128            table,
129            peer_queue,
130            task_group,
131            config,
132            bloom,
133        })
134    }
135
136    /// This method will attempt to find a peer in the routing table and
137    /// send it as a DiscoveredPeer for the Node to connect to.
138    /// If the routing table is empty, it will start the seeding process.
139    async fn connect_loop(self: Arc<Self>) -> Result<()> {
140        let backoff = Backoff::new(500, self.config.seeding_interval * 1000);
141        loop {
142            let required = *self.bloom.read();
143            match self.table.random_entry_filtered(PENDING_ENTRY, &required) {
144                Some(entry) => {
145                    backoff.reset();
146                    let key = entry.key;
147                    let peer = DiscoveredPeer {
148                        peer_id: Some(key.into()),
149                        addrs: entry.addrs.clone(),
150                        discovery_addrs: entry.discovery_addrs.clone(),
151                    };
152                    // Mark as connected to prevent re-picking while
153                    // the Node is still connecting.
154                    self.table.update_entry(&key, CONNECTED_ENTRY);
155                    self.peer_queue.push(peer).await;
156                }
157                None => {
158                    backoff.sleep().await;
159                    self.start_seeding().await;
160                }
161            }
162        }
163    }
164
165    /// Starts seeding process.
166    ///
167    /// This method randomly selects a peer from the routing table and
168    /// attempts to connect to that peer for the initial lookup. If the routing
169    /// table doesn't have an available entry, it will connect to one of the
170    /// provided bootstrap endpoints in the `Config` and initiate the lookup.
171    async fn start_seeding(&self) {
172        match self.table.random_entry(PENDING_ENTRY | CONNECTED_ENTRY) {
173            Some(entry) => {
174                let endpoint =
175                    match pick_endpoint(&entry.discovery_addrs, SUPPORTED_LOOKUP_PROTOCOLS) {
176                        Some(ep) => ep,
177                        None => return,
178                    };
179                let peer_id = Some(entry.key.into());
180                if let Err(err) = self.lookup_service.start_lookup(&endpoint, peer_id).await {
181                    self.table.update_entry(&entry.key, UNSTABLE_ENTRY);
182                    error!("Failed to do lookup: {endpoint}: {err}");
183                }
184            }
185            None => {
186                let peers = &self.config.bootstrap_peers;
187                let shuffled: Vec<_> = peers
188                    .choose_multiple(&mut rand::rng(), peers.len())
189                    .collect();
190                for endpoint in shuffled {
191                    if let Err(err) = self.lookup_service.start_lookup(endpoint, None).await {
192                        error!("Failed to do lookup: {endpoint}: {err}");
193                    }
194                }
195            }
196        }
197    }
198}
199
200#[async_trait]
201impl Discovery for KademliaDiscovery {
202    async fn start(self: Arc<Self>) -> Result<()> {
203        // Start the lookup service
204        self.lookup_service.start().await?;
205        // Start the refresh service
206        self.refresh_service.start().await?;
207
208        // Send manual peer endpoints as DiscoveredPeer
209        for endpoint in self.config.peer_endpoints.iter() {
210            if let Some(addr) = PeerAddr::from_endpoint(endpoint, 0) {
211                let peer = DiscoveredPeer {
212                    peer_id: None,
213                    addrs: vec![addr],
214                    discovery_addrs: vec![],
215                };
216                self.peer_queue.push(peer).await;
217            }
218        }
219
220        // Start connect loop
221        self.task_group.spawn(
222            {
223                let this = self.clone();
224                async move { this.connect_loop().await }
225            },
226            |res| async move {
227                if let TaskResult::Completed(Err(err)) = res {
228                    error!("Connect loop stopped: {err}");
229                }
230            },
231        );
232
233        Ok(())
234    }
235
236    async fn shutdown(&self) {
237        self.task_group.cancel().await;
238        self.refresh_service.shutdown().await;
239        self.lookup_service.shutdown().await;
240    }
241
242    async fn recv(&self) -> DiscoveredPeer {
243        self.peer_queue.recv().await
244    }
245
246    fn on_event(&self, event: PeerConnectionEvent) {
247        match event {
248            PeerConnectionEvent::Connected(pid) => {
249                self.table.update_entry(&pid.0, CONNECTED_ENTRY);
250            }
251            PeerConnectionEvent::Disconnected(pid) => {
252                self.table.update_entry(&pid.0, DISCONNECTED_ENTRY);
253            }
254            PeerConnectionEvent::ConnectFailed(Some(pid)) => {
255                self.table.update_entry(&pid.0, UNREACHABLE_ENTRY);
256            }
257            PeerConnectionEvent::ConnectFailed(None) => {}
258        }
259    }
260
261    fn find_peers_with(&self, item: &[u8]) -> Vec<DiscoveredPeer> {
262        self.table
263            .entries_with_item(item)
264            .into_iter()
265            .map(|e| DiscoveredPeer {
266                peer_id: Some(e.key.into()),
267                addrs: e.addrs,
268                discovery_addrs: e.discovery_addrs,
269            })
270            .collect()
271    }
272}
273
274/// Returns true if the endpoint can serve the Kademlia lookup service.
275fn is_lookup_proto(e: &Endpoint) -> bool {
276    if e.is_tcp() {
277        return true;
278    }
279    #[cfg(feature = "quic")]
280    if e.is_quic() {
281        return true;
282    }
283    false
284}