karyon_p2p/discovery/kademlia/
lookup.rs1use std::{sync::Arc, time::Duration};
2
3use futures_util::stream::{FuturesUnordered, StreamExt};
4use log::{error, trace};
5use rand::{rngs::OsRng, seq::IndexedRandom, TryRngCore};
6
7use karyon_core::{async_runtime::Executor, async_util::timeout, crypto::KeyPair};
8
9use karyon_net::Endpoint;
10
11use crate::{
12 bloom::BloomRef,
13 connector::Connector,
14 discovery::kademlia::{
15 messages::{
16 FindPeerMsg, KadNetCmd, KadNetMsg, KadNetMsgCodec, PeerMsg, PeersMsg, PingMsg, PongMsg,
17 },
18 routing_table::RoutingTable,
19 SUPPORTED_LOOKUP_PROTOCOLS,
20 },
21 listener::Listener,
22 message::{pick_endpoint, PeerAddr, Protocol, ShutdownMsg},
23 monitor::{ConnectionKind, DiscoveryKind, Monitor},
24 slots::ConnectionSlots,
25 util::decode,
26 version::version_match,
27 Config, Error, PeerID, Result,
28};
29
30type KadConnRef = karyon_net::FramedConn<KadNetMsgCodec>;
32
33pub const MAX_PEERS_IN_PEERSMSG: usize = 10;
35
36pub const MAX_ADDRS_PER_PEER: usize = 4;
39
40pub const MAX_DISCOVERY_ADDRS_PER_PEER: usize = 3;
43
44pub struct LookupEndpoints {
46 pub listen: Vec<Endpoint>,
48 pub lookup: Option<Endpoint>,
50 pub refresh: Option<Endpoint>,
52}
53
54pub struct LookupService {
55 id: PeerID,
57
58 table: Arc<RoutingTable>,
60
61 listener: Arc<Listener<KadNetMsgCodec>>,
63 connector: Arc<Connector<KadNetMsgCodec>>,
65
66 outbound_slots: Arc<ConnectionSlots>,
68
69 endpoints: LookupEndpoints,
71
72 config: Arc<Config>,
74
75 monitor: Arc<Monitor>,
77
78 bloom: BloomRef,
80}
81
82impl LookupService {
83 pub fn new(
85 key_pair: &KeyPair,
86 table: Arc<RoutingTable>,
87 config: Arc<Config>,
88 monitor: Arc<Monitor>,
89 bloom: BloomRef,
90 endpoints: LookupEndpoints,
91 ex: Executor,
92 ) -> Self {
93 let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots));
94 let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots));
95
96 let listener = Listener::new(key_pair, inbound_slots.clone(), monitor.clone(), ex.clone());
97
98 let connector = Connector::new(
99 key_pair,
100 config.lookup_connect_retries,
101 outbound_slots.clone(),
102 monitor.clone(),
103 ex,
104 );
105
106 let id = key_pair
107 .public()
108 .try_into()
109 .expect("Get PeerID from KeyPair");
110 Self {
111 id,
112 table,
113 listener,
114 connector,
115 outbound_slots,
116 endpoints,
117 config,
118 monitor,
119 bloom,
120 }
121 }
122
123 pub async fn start(self: &Arc<Self>) -> Result<()> {
125 self.start_listener().await?;
126 Ok(())
127 }
128
129 pub fn lookup_endpoint(&self) -> Option<&Endpoint> {
130 self.endpoints.lookup.as_ref()
131 }
132
133 pub async fn shutdown(&self) {
135 self.connector.shutdown().await;
136 self.listener.shutdown().await;
137 }
138
139 pub async fn start_lookup(&self, endpoint: &Endpoint, peer_id: Option<PeerID>) -> Result<()> {
147 trace!("Lookup started {endpoint}");
148 self.monitor
149 .notify(DiscoveryKind::LookupStarted(endpoint.clone()))
150 .await;
151
152 let mut random_peers = vec![];
153 if let Err(err) = self
154 .random_lookup(endpoint, peer_id, &mut random_peers)
155 .await
156 {
157 self.monitor
158 .notify(DiscoveryKind::LookupFailed(endpoint.clone()))
159 .await;
160 return Err(err);
161 };
162
163 let mut peer_buffer = vec![];
164 if let Err(err) = self.self_lookup(&random_peers, &mut peer_buffer).await {
165 self.monitor
166 .notify(DiscoveryKind::LookupFailed(endpoint.clone()))
167 .await;
168 return Err(err);
169 }
170
171 while peer_buffer.len() < MAX_PEERS_IN_PEERSMSG {
172 match random_peers.pop() {
173 Some(p) => peer_buffer.push(p),
174 None => break,
175 }
176 }
177
178 for peer in peer_buffer.iter() {
179 let result = self.table.add_entry(peer.clone().into());
180 trace!("Add entry {result:?}");
181 }
182
183 self.monitor
184 .notify(DiscoveryKind::LookupSucceeded(
185 endpoint.clone(),
186 peer_buffer.len(),
187 ))
188 .await;
189
190 Ok(())
191 }
192
193 async fn random_lookup(
197 &self,
198 endpoint: &Endpoint,
199 peer_id: Option<PeerID>,
200 random_peers: &mut Vec<PeerMsg>,
201 ) -> Result<()> {
202 for _ in 0..2 {
203 let random_peer_id = PeerID::random()?;
204 let peers = self
205 .connect(endpoint.clone(), peer_id.clone(), &random_peer_id)
206 .await?;
207
208 for peer in peers {
209 if random_peers.contains(&peer)
210 || peer.peer_id == self.id
211 || self.table.contains_key(&peer.peer_id.0)
212 {
213 continue;
214 }
215
216 random_peers.push(peer);
217 }
218 }
219
220 Ok(())
221 }
222
223 async fn self_lookup(
225 &self,
226 random_peers: &[PeerMsg],
227 peer_buffer: &mut Vec<PeerMsg>,
228 ) -> Result<()> {
229 let mut results = FuturesUnordered::new();
230 for peer in random_peers.choose_multiple(&mut rand::rng(), random_peers.len()) {
231 let endpoint = match pick_endpoint(&peer.discovery_addrs, SUPPORTED_LOOKUP_PROTOCOLS) {
232 Some(ep) => ep,
233 None => continue,
234 };
235 results.push(self.connect(endpoint, Some(peer.peer_id.clone()), &self.id))
236 }
237
238 while let Some(result) = results.next().await {
239 match result {
240 Ok(peers) => peer_buffer.extend(peers),
241 Err(err) => {
242 error!("Failed to do self lookup: {err}");
243 }
244 }
245 }
246
247 Ok(())
248 }
249
250 async fn connect(
253 &self,
254 endpoint: Endpoint,
255 peer_id: Option<PeerID>,
256 target_peer_id: &PeerID,
257 ) -> Result<Vec<PeerMsg>> {
258 let conn = self.connector.connect(&endpoint, &peer_id).await?;
259 let result = self.handle_outbound(conn, target_peer_id).await;
260
261 self.monitor
262 .notify(ConnectionKind::Disconnected(endpoint))
263 .await;
264 self.outbound_slots.remove().await;
265
266 result
267 }
268
269 async fn handle_outbound(
271 &self,
272 mut conn: KadConnRef,
273 target_peer_id: &PeerID,
274 ) -> Result<Vec<PeerMsg>> {
275 trace!("Send Ping msg");
276 let mut peers;
277
278 let ping_msg = self.send_ping_msg(&mut conn).await?;
279
280 loop {
281 let t = Duration::from_secs(self.config.lookup_response_timeout);
282 let msg: KadNetMsg = timeout(t, conn.recv_msg()).await??;
283 match msg.header.command {
284 KadNetCmd::Pong => {
285 let (pong_msg, _) = decode::<PongMsg>(&msg.payload)?;
286 if ping_msg.nonce != pong_msg.0 {
287 return Err(Error::InvalidPongMsg);
288 }
289 trace!("Send FindPeer msg");
290 self.send_findpeer_msg(&mut conn, target_peer_id).await?;
291 }
292 KadNetCmd::Peers => {
293 peers = decode::<PeersMsg>(&msg.payload)?.0.peers;
294 if peers.len() > MAX_PEERS_IN_PEERSMSG {
295 return Err(Error::Lookup(
296 "Received too many peers in PeersMsg".to_string(),
297 ));
298 }
299 for p in &mut peers {
300 validate_peer_msg(p)?;
301 }
302 break;
303 }
304 c => return Err(Error::InvalidMsg(format!("Unexpected msg: {c:?}"))),
305 };
306 }
307
308 trace!("Send Peer msg");
309 self.send_peer_msg(&mut conn).await?;
310
311 trace!("Send Shutdown msg");
312 self.send_shutdown_msg(&mut conn).await?;
313
314 Ok(peers)
315 }
316
317 async fn start_listener(self: &Arc<Self>) -> Result<()> {
319 let endpoint = match self.lookup_endpoint() {
320 Some(e) => e.clone(),
321 None => return Ok(()),
322 };
323
324 if !endpoint.is_tcp() {
325 return Err(Error::Config(format!(
326 "lookup endpoint must be tcp://..., got {endpoint}"
327 )));
328 }
329
330 let callback = {
331 let this = self.clone();
332 |conn: KadConnRef| async move {
333 let t = Duration::from_secs(this.config.lookup_connection_lifespan);
334 timeout(t, this.handle_inbound(conn)).await??;
335 Ok(())
336 }
337 };
338
339 self.listener
340 .start_with_callback(endpoint, callback)
341 .await?;
342 Ok(())
343 }
344
345 async fn handle_inbound(self: &Arc<Self>, mut conn: KadConnRef) -> Result<()> {
347 loop {
348 let msg: KadNetMsg = conn.recv_msg().await?;
349 trace!("Receive msg {:?}", msg.header.command);
350
351 if let KadNetCmd::Shutdown = msg.header.command {
352 return Ok(());
353 }
354
355 match &msg.header.command {
356 KadNetCmd::Ping => {
357 let (ping_msg, _) = decode::<PingMsg>(&msg.payload)?;
358 if !version_match(&self.config.version.req, &ping_msg.version) {
359 return Err(Error::IncompatibleVersion("system: {}".into()));
360 }
361 self.send_pong_msg(ping_msg.nonce, &mut conn).await?;
362 }
363 KadNetCmd::FindPeer => {
364 let (findpeer_msg, _) = decode::<FindPeerMsg>(&msg.payload)?;
365 let peer_id = findpeer_msg.0;
366 self.send_peers_msg(&peer_id, &mut conn).await?;
367 }
368 KadNetCmd::Peer => {
369 let (mut peer, _) = decode::<PeerMsg>(&msg.payload)?;
370 validate_peer_msg(&mut peer)?;
371 let result = self.table.add_entry(peer.into());
372 trace!("Add entry result: {result:?}");
373 }
374 c => return Err(Error::InvalidMsg(format!("Unexpected msg: {c:?}"))),
375 }
376 }
377 }
378
379 async fn send_ping_msg(&self, conn: &mut KadConnRef) -> Result<PingMsg> {
381 trace!("Send Pong msg");
382 let mut nonce: [u8; 32] = [0; 32];
383 OsRng.try_fill_bytes(&mut nonce)?;
384
385 let ping_msg = PingMsg {
386 version: self.config.version.v.clone(),
387 nonce,
388 };
389 conn.send_msg(KadNetMsg::new(KadNetCmd::Ping, &ping_msg)?)
390 .await?;
391 Ok(ping_msg)
392 }
393
394 async fn send_pong_msg(&self, nonce: [u8; 32], conn: &mut KadConnRef) -> Result<()> {
396 trace!("Send Pong msg");
397 conn.send_msg(KadNetMsg::new(KadNetCmd::Pong, PongMsg(nonce))?)
398 .await?;
399 Ok(())
400 }
401
402 async fn send_findpeer_msg(&self, conn: &mut KadConnRef, peer_id: &PeerID) -> Result<()> {
404 trace!("Send FindPeer msg");
405 conn.send_msg(KadNetMsg::new(
406 KadNetCmd::FindPeer,
407 FindPeerMsg(peer_id.clone()),
408 )?)
409 .await?;
410 Ok(())
411 }
412
413 async fn send_peers_msg(&self, peer_id: &PeerID, conn: &mut KadConnRef) -> Result<()> {
415 trace!("Send Peers msg");
416 let entries = self
417 .table
418 .closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG);
419
420 let peers: Vec<PeerMsg> = entries.into_iter().map(|e| e.into()).collect();
421 conn.send_msg(KadNetMsg::new(KadNetCmd::Peers, PeersMsg { peers })?)
422 .await?;
423 Ok(())
424 }
425
426 async fn send_peer_msg(&self, conn: &mut KadConnRef) -> Result<()> {
430 trace!("Send Peer msg");
431
432 let mut addrs = Vec::new();
433 for ep in &self.endpoints.listen {
434 if let Some(pa) = PeerAddr::from_endpoint(ep, 0) {
435 addrs.push(pa);
436 }
437 }
438
439 let mut discovery_addrs = Vec::new();
440 if let Some(ep) = self.endpoints.lookup.as_ref() {
441 if let Some(pa) = PeerAddr::from_endpoint(ep, 0) {
442 discovery_addrs.push(pa);
443 }
444 }
445 if let Some(ep) = self.endpoints.refresh.as_ref() {
446 if let Some(pa) = PeerAddr::from_endpoint(ep, 0) {
447 discovery_addrs.push(pa);
448 }
449 }
450
451 let peer_msg = PeerMsg {
452 peer_id: self.id.clone(),
453 addrs,
454 discovery_addrs,
455 protocols: *self.bloom.read(),
456 };
457 conn.send_msg(KadNetMsg::new(KadNetCmd::Peer, &peer_msg)?)
458 .await?;
459 Ok(())
460 }
461
462 async fn send_shutdown_msg(&self, conn: &mut KadConnRef) -> Result<()> {
464 trace!("Send Shutdown msg");
465 conn.send_msg(KadNetMsg::new(KadNetCmd::Shutdown, ShutdownMsg(0))?)
466 .await?;
467 Ok(())
468 }
469}
470
471fn validate_peer_msg(p: &mut PeerMsg) -> Result<()> {
474 if p.addrs.len() > MAX_ADDRS_PER_PEER {
475 return Err(Error::InvalidMsg(format!(
476 "PeerMsg.addrs has {} entries, max {MAX_ADDRS_PER_PEER}",
477 p.addrs.len()
478 )));
479 }
480 p.discovery_addrs
481 .retain(|a| !matches!(a.protocol, Protocol::Tls));
482 if p.discovery_addrs.len() > MAX_DISCOVERY_ADDRS_PER_PEER {
483 return Err(Error::InvalidMsg(format!(
484 "PeerMsg.discovery_addrs has {} entries, max {MAX_DISCOVERY_ADDRS_PER_PEER}",
485 p.discovery_addrs.len()
486 )));
487 }
488 Ok(())
489}