Skip to main content

karyon_p2p/discovery/kademlia/
lookup.rs

1use std::{sync::Arc, time::Duration};
2
3use futures_util::stream::{FuturesUnordered, StreamExt};
4use log::{error, trace};
5use rand::{rngs::OsRng, seq::IndexedRandom, TryRngCore};
6
7use karyon_core::{async_runtime::Executor, async_util::timeout, crypto::KeyPair};
8
9use karyon_net::Endpoint;
10
11use crate::{
12    bloom::BloomRef,
13    connector::Connector,
14    discovery::kademlia::{
15        messages::{
16            FindPeerMsg, KadNetCmd, KadNetMsg, KadNetMsgCodec, PeerMsg, PeersMsg, PingMsg, PongMsg,
17        },
18        routing_table::RoutingTable,
19        SUPPORTED_LOOKUP_PROTOCOLS,
20    },
21    listener::Listener,
22    message::{pick_endpoint, PeerAddr, Protocol, ShutdownMsg},
23    monitor::{ConnectionKind, DiscoveryKind, Monitor},
24    slots::ConnectionSlots,
25    util::decode,
26    version::version_match,
27    Config, Error, PeerID, Result,
28};
29
30/// Framed lookup-plane connection.
31type KadConnRef = karyon_net::FramedConn<KadNetMsgCodec>;
32
33/// Maximum number of peers that can be returned in a PeersMsg.
34pub const MAX_PEERS_IN_PEERSMSG: usize = 10;
35
36/// Maximum data-plane addresses a peer may advertise about itself.
37/// Legitimate peers only need one per transport (tcp/tls/quic).
38pub const MAX_ADDRS_PER_PEER: usize = 4;
39
40/// Maximum discovery addresses a peer may advertise about itself.
41/// Legitimate peers only need lookup + refresh.
42pub const MAX_DISCOVERY_ADDRS_PER_PEER: usize = 3;
43
44/// Endpoints the lookup service advertises and binds to.
45pub struct LookupEndpoints {
46    /// Data-plane listen addrs (advertised in PeerMsg.addrs).
47    pub listen: Vec<Endpoint>,
48    /// Local bind for the lookup listener (also advertised).
49    pub lookup: Option<Endpoint>,
50    /// UDP refresh addr to advertise (if any).
51    pub refresh: Option<Endpoint>,
52}
53
54pub struct LookupService {
55    /// Peer's ID
56    id: PeerID,
57
58    /// Routing Table
59    table: Arc<RoutingTable>,
60
61    /// Listener
62    listener: Arc<Listener<KadNetMsgCodec>>,
63    /// Connector
64    connector: Arc<Connector<KadNetMsgCodec>>,
65
66    /// Outbound slots.
67    outbound_slots: Arc<ConnectionSlots>,
68
69    /// Endpoints this service advertises and binds to.
70    endpoints: LookupEndpoints,
71
72    /// Holds the configuration for the P2P network.
73    config: Arc<Config>,
74
75    /// Responsible for network and system monitoring.
76    monitor: Arc<Monitor>,
77
78    /// Shared local bloom. Snapshotted on every outgoing PeerMsg.
79    bloom: BloomRef,
80}
81
82impl LookupService {
83    /// Creates a new lookup service.
84    pub fn new(
85        key_pair: &KeyPair,
86        table: Arc<RoutingTable>,
87        config: Arc<Config>,
88        monitor: Arc<Monitor>,
89        bloom: BloomRef,
90        endpoints: LookupEndpoints,
91        ex: Executor,
92    ) -> Self {
93        let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots));
94        let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots));
95
96        let listener = Listener::new(key_pair, inbound_slots.clone(), monitor.clone(), ex.clone());
97
98        let connector = Connector::new(
99            key_pair,
100            config.lookup_connect_retries,
101            outbound_slots.clone(),
102            monitor.clone(),
103            ex,
104        );
105
106        let id = key_pair
107            .public()
108            .try_into()
109            .expect("Get PeerID from KeyPair");
110        Self {
111            id,
112            table,
113            listener,
114            connector,
115            outbound_slots,
116            endpoints,
117            config,
118            monitor,
119            bloom,
120        }
121    }
122
123    /// Start the lookup service.
124    pub async fn start(self: &Arc<Self>) -> Result<()> {
125        self.start_listener().await?;
126        Ok(())
127    }
128
129    pub fn lookup_endpoint(&self) -> Option<&Endpoint> {
130        self.endpoints.lookup.as_ref()
131    }
132
133    /// Shuts down the lookup service.
134    pub async fn shutdown(&self) {
135        self.connector.shutdown().await;
136        self.listener.shutdown().await;
137    }
138
139    /// Starts iterative lookup and populate the routing table.
140    ///
141    /// This method begins by generating a random peer ID and connecting to the
142    /// provided endpoint. It then sends a FindPeer message containing the
143    /// randomly generated peer ID. Upon receiving peers from the initial lookup,
144    /// it starts connecting to these received peers and sends them a FindPeer
145    /// message that contains our own peer ID.
146    pub async fn start_lookup(&self, endpoint: &Endpoint, peer_id: Option<PeerID>) -> Result<()> {
147        trace!("Lookup started {endpoint}");
148        self.monitor
149            .notify(DiscoveryKind::LookupStarted(endpoint.clone()))
150            .await;
151
152        let mut random_peers = vec![];
153        if let Err(err) = self
154            .random_lookup(endpoint, peer_id, &mut random_peers)
155            .await
156        {
157            self.monitor
158                .notify(DiscoveryKind::LookupFailed(endpoint.clone()))
159                .await;
160            return Err(err);
161        };
162
163        let mut peer_buffer = vec![];
164        if let Err(err) = self.self_lookup(&random_peers, &mut peer_buffer).await {
165            self.monitor
166                .notify(DiscoveryKind::LookupFailed(endpoint.clone()))
167                .await;
168            return Err(err);
169        }
170
171        while peer_buffer.len() < MAX_PEERS_IN_PEERSMSG {
172            match random_peers.pop() {
173                Some(p) => peer_buffer.push(p),
174                None => break,
175            }
176        }
177
178        for peer in peer_buffer.iter() {
179            let result = self.table.add_entry(peer.clone().into());
180            trace!("Add entry {result:?}");
181        }
182
183        self.monitor
184            .notify(DiscoveryKind::LookupSucceeded(
185                endpoint.clone(),
186                peer_buffer.len(),
187            ))
188            .await;
189
190        Ok(())
191    }
192
193    /// Starts a random lookup
194    ///
195    /// This will perfom lookup on a random generated PeerID
196    async fn random_lookup(
197        &self,
198        endpoint: &Endpoint,
199        peer_id: Option<PeerID>,
200        random_peers: &mut Vec<PeerMsg>,
201    ) -> Result<()> {
202        for _ in 0..2 {
203            let random_peer_id = PeerID::random()?;
204            let peers = self
205                .connect(endpoint.clone(), peer_id.clone(), &random_peer_id)
206                .await?;
207
208            for peer in peers {
209                if random_peers.contains(&peer)
210                    || peer.peer_id == self.id
211                    || self.table.contains_key(&peer.peer_id.0)
212                {
213                    continue;
214                }
215
216                random_peers.push(peer);
217            }
218        }
219
220        Ok(())
221    }
222
223    /// Starts a self lookup
224    async fn self_lookup(
225        &self,
226        random_peers: &[PeerMsg],
227        peer_buffer: &mut Vec<PeerMsg>,
228    ) -> Result<()> {
229        let mut results = FuturesUnordered::new();
230        for peer in random_peers.choose_multiple(&mut rand::rng(), random_peers.len()) {
231            let endpoint = match pick_endpoint(&peer.discovery_addrs, SUPPORTED_LOOKUP_PROTOCOLS) {
232                Some(ep) => ep,
233                None => continue,
234            };
235            results.push(self.connect(endpoint, Some(peer.peer_id.clone()), &self.id))
236        }
237
238        while let Some(result) = results.next().await {
239            match result {
240                Ok(peers) => peer_buffer.extend(peers),
241                Err(err) => {
242                    error!("Failed to do self lookup: {err}");
243                }
244            }
245        }
246
247        Ok(())
248    }
249
250    /// Connects to the given endpoint and initiates a lookup process for the
251    /// provided peer ID.
252    async fn connect(
253        &self,
254        endpoint: Endpoint,
255        peer_id: Option<PeerID>,
256        target_peer_id: &PeerID,
257    ) -> Result<Vec<PeerMsg>> {
258        let conn = self.connector.connect(&endpoint, &peer_id).await?;
259        let result = self.handle_outbound(conn, target_peer_id).await;
260
261        self.monitor
262            .notify(ConnectionKind::Disconnected(endpoint))
263            .await;
264        self.outbound_slots.remove().await;
265
266        result
267    }
268
269    /// Handles outbound connection
270    async fn handle_outbound(
271        &self,
272        mut conn: KadConnRef,
273        target_peer_id: &PeerID,
274    ) -> Result<Vec<PeerMsg>> {
275        trace!("Send Ping msg");
276        let mut peers;
277
278        let ping_msg = self.send_ping_msg(&mut conn).await?;
279
280        loop {
281            let t = Duration::from_secs(self.config.lookup_response_timeout);
282            let msg: KadNetMsg = timeout(t, conn.recv_msg()).await??;
283            match msg.header.command {
284                KadNetCmd::Pong => {
285                    let (pong_msg, _) = decode::<PongMsg>(&msg.payload)?;
286                    if ping_msg.nonce != pong_msg.0 {
287                        return Err(Error::InvalidPongMsg);
288                    }
289                    trace!("Send FindPeer msg");
290                    self.send_findpeer_msg(&mut conn, target_peer_id).await?;
291                }
292                KadNetCmd::Peers => {
293                    peers = decode::<PeersMsg>(&msg.payload)?.0.peers;
294                    if peers.len() > MAX_PEERS_IN_PEERSMSG {
295                        return Err(Error::Lookup(
296                            "Received too many peers in PeersMsg".to_string(),
297                        ));
298                    }
299                    for p in &mut peers {
300                        validate_peer_msg(p)?;
301                    }
302                    break;
303                }
304                c => return Err(Error::InvalidMsg(format!("Unexpected msg: {c:?}"))),
305            };
306        }
307
308        trace!("Send Peer msg");
309        self.send_peer_msg(&mut conn).await?;
310
311        trace!("Send Shutdown msg");
312        self.send_shutdown_msg(&mut conn).await?;
313
314        Ok(peers)
315    }
316
317    /// Start a listener.
318    async fn start_listener(self: &Arc<Self>) -> Result<()> {
319        let endpoint = match self.lookup_endpoint() {
320            Some(e) => e.clone(),
321            None => return Ok(()),
322        };
323
324        if !endpoint.is_tcp() {
325            return Err(Error::Config(format!(
326                "lookup endpoint must be tcp://..., got {endpoint}"
327            )));
328        }
329
330        let callback = {
331            let this = self.clone();
332            |conn: KadConnRef| async move {
333                let t = Duration::from_secs(this.config.lookup_connection_lifespan);
334                timeout(t, this.handle_inbound(conn)).await??;
335                Ok(())
336            }
337        };
338
339        self.listener
340            .start_with_callback(endpoint, callback)
341            .await?;
342        Ok(())
343    }
344
345    /// Handles inbound connection
346    async fn handle_inbound(self: &Arc<Self>, mut conn: KadConnRef) -> Result<()> {
347        loop {
348            let msg: KadNetMsg = conn.recv_msg().await?;
349            trace!("Receive msg {:?}", msg.header.command);
350
351            if let KadNetCmd::Shutdown = msg.header.command {
352                return Ok(());
353            }
354
355            match &msg.header.command {
356                KadNetCmd::Ping => {
357                    let (ping_msg, _) = decode::<PingMsg>(&msg.payload)?;
358                    if !version_match(&self.config.version.req, &ping_msg.version) {
359                        return Err(Error::IncompatibleVersion("system: {}".into()));
360                    }
361                    self.send_pong_msg(ping_msg.nonce, &mut conn).await?;
362                }
363                KadNetCmd::FindPeer => {
364                    let (findpeer_msg, _) = decode::<FindPeerMsg>(&msg.payload)?;
365                    let peer_id = findpeer_msg.0;
366                    self.send_peers_msg(&peer_id, &mut conn).await?;
367                }
368                KadNetCmd::Peer => {
369                    let (mut peer, _) = decode::<PeerMsg>(&msg.payload)?;
370                    validate_peer_msg(&mut peer)?;
371                    let result = self.table.add_entry(peer.into());
372                    trace!("Add entry result: {result:?}");
373                }
374                c => return Err(Error::InvalidMsg(format!("Unexpected msg: {c:?}"))),
375            }
376        }
377    }
378
379    /// Sends a Ping msg.
380    async fn send_ping_msg(&self, conn: &mut KadConnRef) -> Result<PingMsg> {
381        trace!("Send Pong msg");
382        let mut nonce: [u8; 32] = [0; 32];
383        OsRng.try_fill_bytes(&mut nonce)?;
384
385        let ping_msg = PingMsg {
386            version: self.config.version.v.clone(),
387            nonce,
388        };
389        conn.send_msg(KadNetMsg::new(KadNetCmd::Ping, &ping_msg)?)
390            .await?;
391        Ok(ping_msg)
392    }
393
394    /// Sends a Pong msg
395    async fn send_pong_msg(&self, nonce: [u8; 32], conn: &mut KadConnRef) -> Result<()> {
396        trace!("Send Pong msg");
397        conn.send_msg(KadNetMsg::new(KadNetCmd::Pong, PongMsg(nonce))?)
398            .await?;
399        Ok(())
400    }
401
402    /// Sends a FindPeer msg
403    async fn send_findpeer_msg(&self, conn: &mut KadConnRef, peer_id: &PeerID) -> Result<()> {
404        trace!("Send FindPeer msg");
405        conn.send_msg(KadNetMsg::new(
406            KadNetCmd::FindPeer,
407            FindPeerMsg(peer_id.clone()),
408        )?)
409        .await?;
410        Ok(())
411    }
412
413    /// Sends a Peers msg.
414    async fn send_peers_msg(&self, peer_id: &PeerID, conn: &mut KadConnRef) -> Result<()> {
415        trace!("Send Peers msg");
416        let entries = self
417            .table
418            .closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG);
419
420        let peers: Vec<PeerMsg> = entries.into_iter().map(|e| e.into()).collect();
421        conn.send_msg(KadNetMsg::new(KadNetCmd::Peers, PeersMsg { peers })?)
422            .await?;
423        Ok(())
424    }
425
426    /// Sends a Peer msg advertising our listen and discovery addresses.
427    /// `addrs` carries every data-plane listen endpoint; `discovery_addrs`
428    /// carries the lookup endpoint and (when set) the udp refresh endpoint.
429    async fn send_peer_msg(&self, conn: &mut KadConnRef) -> Result<()> {
430        trace!("Send Peer msg");
431
432        let mut addrs = Vec::new();
433        for ep in &self.endpoints.listen {
434            if let Some(pa) = PeerAddr::from_endpoint(ep, 0) {
435                addrs.push(pa);
436            }
437        }
438
439        let mut discovery_addrs = Vec::new();
440        if let Some(ep) = self.endpoints.lookup.as_ref() {
441            if let Some(pa) = PeerAddr::from_endpoint(ep, 0) {
442                discovery_addrs.push(pa);
443            }
444        }
445        if let Some(ep) = self.endpoints.refresh.as_ref() {
446            if let Some(pa) = PeerAddr::from_endpoint(ep, 0) {
447                discovery_addrs.push(pa);
448            }
449        }
450
451        let peer_msg = PeerMsg {
452            peer_id: self.id.clone(),
453            addrs,
454            discovery_addrs,
455            protocols: *self.bloom.read(),
456        };
457        conn.send_msg(KadNetMsg::new(KadNetCmd::Peer, &peer_msg)?)
458            .await?;
459        Ok(())
460    }
461
462    /// Sends a Shutdown msg.
463    async fn send_shutdown_msg(&self, conn: &mut KadConnRef) -> Result<()> {
464        trace!("Send Shutdown msg");
465        conn.send_msg(KadNetMsg::new(KadNetCmd::Shutdown, ShutdownMsg(0))?)
466            .await?;
467        Ok(())
468    }
469}
470
471/// Reject PeerMsgs that advertise more addresses than a legitimate
472/// peer would. Caps memory blow-up from malicious or buggy peers.
473fn validate_peer_msg(p: &mut PeerMsg) -> Result<()> {
474    if p.addrs.len() > MAX_ADDRS_PER_PEER {
475        return Err(Error::InvalidMsg(format!(
476            "PeerMsg.addrs has {} entries, max {MAX_ADDRS_PER_PEER}",
477            p.addrs.len()
478        )));
479    }
480    p.discovery_addrs
481        .retain(|a| !matches!(a.protocol, Protocol::Tls));
482    if p.discovery_addrs.len() > MAX_DISCOVERY_ADDRS_PER_PEER {
483        return Err(Error::InvalidMsg(format!(
484            "PeerMsg.discovery_addrs has {} entries, max {MAX_DISCOVERY_ADDRS_PER_PEER}",
485            p.discovery_addrs.len()
486        )));
487    }
488    Ok(())
489}