karyon_p2p/discovery/kademlia/
mod.rs1mod lookup;
2mod messages;
3mod refresh;
4pub mod routing_table;
5
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use log::error;
10use rand::seq::IndexedRandom;
11
12use karyon_core::{
13 async_runtime::Executor,
14 async_util::{AsyncQueue, Backoff, TaskGroup, TaskResult},
15 crypto::KeyPair,
16};
17
18use karyon_net::Endpoint;
19
20use crate::{
21 bloom::BloomRef,
22 config::Config,
23 discovery::{DiscoveredPeer, Discovery, PeerConnectionEvent},
24 message::{pick_endpoint, PeerAddr, Protocol},
25 monitor::Monitor,
26 PeerID, Result,
27};
28
29use lookup::{LookupEndpoints, LookupService};
30use refresh::RefreshService;
31use routing_table::{
32 RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY, UNREACHABLE_ENTRY,
33 UNSTABLE_ENTRY,
34};
35
36const DISCOVERED_PEER_QUEUE_SIZE: usize = 128;
38
39#[cfg(feature = "quic")]
42pub(crate) const SUPPORTED_LOOKUP_PROTOCOLS: &[Protocol] = &[Protocol::Tcp, Protocol::Quic];
43#[cfg(not(feature = "quic"))]
44pub(crate) const SUPPORTED_LOOKUP_PROTOCOLS: &[Protocol] = &[Protocol::Tcp];
45
46pub struct KademliaDiscovery {
47 table: Arc<RoutingTable>,
49
50 lookup_service: Arc<LookupService>,
52
53 refresh_service: Arc<RefreshService>,
55
56 peer_queue: Arc<AsyncQueue<DiscoveredPeer>>,
60
61 task_group: TaskGroup,
63
64 config: Arc<Config>,
66
67 bloom: BloomRef,
71}
72
73impl KademliaDiscovery {
74 pub fn new(
76 key_pair: &KeyPair,
77 peer_id: &PeerID,
78 config: Arc<Config>,
79 monitor: Arc<Monitor>,
80 bloom: BloomRef,
81 ex: Executor,
82 ) -> Arc<Self> {
83 let table = Arc::new(RoutingTable::new(peer_id.0));
84
85 let lookup_ep = config
88 .discovery_endpoints
89 .iter()
90 .find(|e| is_lookup_proto(e))
91 .cloned();
92 let refresh_ep = config
93 .discovery_endpoints
94 .iter()
95 .find(|e| e.is_udp())
96 .cloned();
97
98 let refresh_service = Arc::new(RefreshService::new(
99 config.clone(),
100 table.clone(),
101 monitor.clone(),
102 refresh_ep.clone(),
103 ex.clone(),
104 ));
105
106 let lookup_endpoints = LookupEndpoints {
107 listen: config.listen_endpoints.clone(),
108 lookup: lookup_ep,
109 refresh: refresh_ep,
110 };
111 let lookup_service = Arc::new(LookupService::new(
112 key_pair,
113 table.clone(),
114 config.clone(),
115 monitor.clone(),
116 bloom.clone(),
117 lookup_endpoints,
118 ex.clone(),
119 ));
120
121 let task_group = TaskGroup::with_executor(ex);
122
123 let peer_queue = AsyncQueue::new(DISCOVERED_PEER_QUEUE_SIZE);
124
125 Arc::new(Self {
126 refresh_service,
127 lookup_service,
128 table,
129 peer_queue,
130 task_group,
131 config,
132 bloom,
133 })
134 }
135
136 async fn connect_loop(self: Arc<Self>) -> Result<()> {
140 let backoff = Backoff::new(500, self.config.seeding_interval * 1000);
141 loop {
142 let required = *self.bloom.read();
143 match self.table.random_entry_filtered(PENDING_ENTRY, &required) {
144 Some(entry) => {
145 backoff.reset();
146 let key = entry.key;
147 let peer = DiscoveredPeer {
148 peer_id: Some(key.into()),
149 addrs: entry.addrs.clone(),
150 discovery_addrs: entry.discovery_addrs.clone(),
151 };
152 self.table.update_entry(&key, CONNECTED_ENTRY);
155 self.peer_queue.push(peer).await;
156 }
157 None => {
158 backoff.sleep().await;
159 self.start_seeding().await;
160 }
161 }
162 }
163 }
164
165 async fn start_seeding(&self) {
172 match self.table.random_entry(PENDING_ENTRY | CONNECTED_ENTRY) {
173 Some(entry) => {
174 let endpoint =
175 match pick_endpoint(&entry.discovery_addrs, SUPPORTED_LOOKUP_PROTOCOLS) {
176 Some(ep) => ep,
177 None => return,
178 };
179 let peer_id = Some(entry.key.into());
180 if let Err(err) = self.lookup_service.start_lookup(&endpoint, peer_id).await {
181 self.table.update_entry(&entry.key, UNSTABLE_ENTRY);
182 error!("Failed to do lookup: {endpoint}: {err}");
183 }
184 }
185 None => {
186 let peers = &self.config.bootstrap_peers;
187 let shuffled: Vec<_> = peers
188 .choose_multiple(&mut rand::rng(), peers.len())
189 .collect();
190 for endpoint in shuffled {
191 if let Err(err) = self.lookup_service.start_lookup(endpoint, None).await {
192 error!("Failed to do lookup: {endpoint}: {err}");
193 }
194 }
195 }
196 }
197 }
198}
199
200#[async_trait]
201impl Discovery for KademliaDiscovery {
202 async fn start(self: Arc<Self>) -> Result<()> {
203 self.lookup_service.start().await?;
205 self.refresh_service.start().await?;
207
208 for endpoint in self.config.peer_endpoints.iter() {
210 if let Some(addr) = PeerAddr::from_endpoint(endpoint, 0) {
211 let peer = DiscoveredPeer {
212 peer_id: None,
213 addrs: vec![addr],
214 discovery_addrs: vec![],
215 };
216 self.peer_queue.push(peer).await;
217 }
218 }
219
220 self.task_group.spawn(
222 {
223 let this = self.clone();
224 async move { this.connect_loop().await }
225 },
226 |res| async move {
227 if let TaskResult::Completed(Err(err)) = res {
228 error!("Connect loop stopped: {err}");
229 }
230 },
231 );
232
233 Ok(())
234 }
235
236 async fn shutdown(&self) {
237 self.task_group.cancel().await;
238 self.refresh_service.shutdown().await;
239 self.lookup_service.shutdown().await;
240 }
241
242 async fn recv(&self) -> DiscoveredPeer {
243 self.peer_queue.recv().await
244 }
245
246 fn on_event(&self, event: PeerConnectionEvent) {
247 match event {
248 PeerConnectionEvent::Connected(pid) => {
249 self.table.update_entry(&pid.0, CONNECTED_ENTRY);
250 }
251 PeerConnectionEvent::Disconnected(pid) => {
252 self.table.update_entry(&pid.0, DISCONNECTED_ENTRY);
253 }
254 PeerConnectionEvent::ConnectFailed(Some(pid)) => {
255 self.table.update_entry(&pid.0, UNREACHABLE_ENTRY);
256 }
257 PeerConnectionEvent::ConnectFailed(None) => {}
258 }
259 }
260
261 fn find_peers_with(&self, item: &[u8]) -> Vec<DiscoveredPeer> {
262 self.table
263 .entries_with_item(item)
264 .into_iter()
265 .map(|e| DiscoveredPeer {
266 peer_id: Some(e.key.into()),
267 addrs: e.addrs,
268 discovery_addrs: e.discovery_addrs,
269 })
270 .collect()
271 }
272}
273
274fn is_lookup_proto(e: &Endpoint) -> bool {
276 if e.is_tcp() {
277 return true;
278 }
279 #[cfg(feature = "quic")]
280 if e.is_quic() {
281 return true;
282 }
283 false
284}