karyon_p2p/discovery/
lookup.rs

1use std::{sync::Arc, time::Duration};
2
3use futures_util::stream::{FuturesUnordered, StreamExt};
4use log::{error, trace};
5use parking_lot::RwLock;
6use rand::{rngs::OsRng, seq::SliceRandom, RngCore};
7
8use karyon_core::{async_runtime::Executor, async_util::timeout, crypto::KeyPair, util::decode};
9
10use karyon_net::Endpoint;
11
12use crate::{
13    connector::Connector,
14    listener::Listener,
15    message::{FindPeerMsg, NetMsg, NetMsgCmd, PeerMsg, PeersMsg, PingMsg, PongMsg, ShutdownMsg},
16    monitor::{ConnEvent, DiscvEvent, Monitor},
17    routing_table::RoutingTable,
18    slots::ConnectionSlots,
19    version::version_match,
20    Config, ConnRef, Error, PeerID, Result,
21};
22
23/// Maximum number of peers that can be returned in a PeersMsg.
24pub const MAX_PEERS_IN_PEERSMSG: usize = 10;
25
26pub struct LookupService {
27    /// Peer's ID
28    id: PeerID,
29
30    /// Routing Table
31    table: Arc<RoutingTable>,
32
33    /// Listener
34    listener: Arc<Listener>,
35    /// Connector
36    connector: Arc<Connector>,
37
38    /// Outbound slots.
39    outbound_slots: Arc<ConnectionSlots>,
40
41    /// Resolved listen endpoint
42    listen_endpoint: RwLock<Option<Endpoint>>,
43
44    /// Resolved discovery endpoint
45    discovery_endpoint: RwLock<Option<Endpoint>>,
46
47    /// Holds the configuration for the P2P network.
48    config: Arc<Config>,
49
50    /// Responsible for network and system monitoring.
51    monitor: Arc<Monitor>,
52}
53
54impl LookupService {
55    /// Creates a new lookup service
56    pub fn new(
57        key_pair: &KeyPair,
58        table: Arc<RoutingTable>,
59        config: Arc<Config>,
60        monitor: Arc<Monitor>,
61        ex: Executor,
62    ) -> Self {
63        let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots));
64        let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots));
65
66        let listener = Listener::new(
67            key_pair,
68            inbound_slots.clone(),
69            config.enable_tls,
70            monitor.clone(),
71            ex.clone(),
72        );
73
74        let connector = Connector::new(
75            key_pair,
76            config.lookup_connect_retries,
77            outbound_slots.clone(),
78            config.enable_tls,
79            monitor.clone(),
80            ex,
81        );
82
83        let id = key_pair
84            .public()
85            .try_into()
86            .expect("Get PeerID from KeyPair");
87        Self {
88            id,
89            table,
90            listener,
91            connector,
92            outbound_slots,
93            listen_endpoint: RwLock::new(None),
94            discovery_endpoint: RwLock::new(None),
95            config,
96            monitor,
97        }
98    }
99
100    /// Start the lookup service.
101    pub async fn start(self: &Arc<Self>) -> Result<()> {
102        self.start_listener().await?;
103        Ok(())
104    }
105
106    /// Set the resolved listen endpoint.
107    pub fn set_listen_endpoint(&self, resolved_endpoint: &Endpoint) -> Result<()> {
108        let discovery_endpoint = Endpoint::Tcp(
109            resolved_endpoint.addr()?.clone(),
110            self.config.discovery_port,
111        );
112        *self.listen_endpoint.write() = Some(resolved_endpoint.clone());
113        *self.discovery_endpoint.write() = Some(discovery_endpoint.clone());
114        Ok(())
115    }
116
117    pub fn listen_endpoint(&self) -> Option<Endpoint> {
118        self.listen_endpoint.read().clone()
119    }
120
121    pub fn discovery_endpoint(&self) -> Option<Endpoint> {
122        self.discovery_endpoint.read().clone()
123    }
124
125    /// Shuts down the lookup service.
126    pub async fn shutdown(&self) {
127        self.connector.shutdown().await;
128        self.listener.shutdown().await;
129    }
130
131    /// Starts iterative lookup and populate the routing table.
132    ///
133    /// This method begins by generating a random peer ID and connecting to the
134    /// provided endpoint. It then sends a FindPeer message containing the
135    /// randomly generated peer ID. Upon receiving peers from the initial lookup,
136    /// it starts connecting to these received peers and sends them a FindPeer
137    /// message that contains our own peer ID.
138    pub async fn start_lookup(&self, endpoint: &Endpoint, peer_id: Option<PeerID>) -> Result<()> {
139        trace!("Lookup started {endpoint}");
140        self.monitor
141            .notify(DiscvEvent::LookupStarted(endpoint.clone()))
142            .await;
143
144        let mut random_peers = vec![];
145        if let Err(err) = self
146            .random_lookup(endpoint, peer_id, &mut random_peers)
147            .await
148        {
149            self.monitor
150                .notify(DiscvEvent::LookupFailed(endpoint.clone()))
151                .await;
152            return Err(err);
153        };
154
155        let mut peer_buffer = vec![];
156        if let Err(err) = self.self_lookup(&random_peers, &mut peer_buffer).await {
157            self.monitor
158                .notify(DiscvEvent::LookupFailed(endpoint.clone()))
159                .await;
160            return Err(err);
161        }
162
163        while peer_buffer.len() < MAX_PEERS_IN_PEERSMSG {
164            match random_peers.pop() {
165                Some(p) => peer_buffer.push(p),
166                None => break,
167            }
168        }
169
170        for peer in peer_buffer.iter() {
171            let result = self.table.add_entry(peer.clone().into());
172            trace!("Add entry {:?}", result);
173        }
174
175        self.monitor
176            .notify(DiscvEvent::LookupSucceeded(
177                endpoint.clone(),
178                peer_buffer.len(),
179            ))
180            .await;
181
182        Ok(())
183    }
184
185    /// Starts a random lookup
186    ///
187    /// This will perfom lookup on a random generated PeerID
188    async fn random_lookup(
189        &self,
190        endpoint: &Endpoint,
191        peer_id: Option<PeerID>,
192        random_peers: &mut Vec<PeerMsg>,
193    ) -> Result<()> {
194        for _ in 0..2 {
195            let random_peer_id = PeerID::random();
196            let peers = self
197                .connect(endpoint.clone(), peer_id.clone(), &random_peer_id)
198                .await?;
199
200            for peer in peers {
201                if random_peers.contains(&peer)
202                    || peer.peer_id == self.id
203                    || self.table.contains_key(&peer.peer_id.0)
204                {
205                    continue;
206                }
207
208                random_peers.push(peer);
209            }
210        }
211
212        Ok(())
213    }
214
215    /// Starts a self lookup
216    async fn self_lookup(
217        &self,
218        random_peers: &[PeerMsg],
219        peer_buffer: &mut Vec<PeerMsg>,
220    ) -> Result<()> {
221        let mut results = FuturesUnordered::new();
222        for peer in random_peers.choose_multiple(&mut OsRng, random_peers.len()) {
223            let endpoint = Endpoint::Tcp(peer.addr.clone(), peer.discovery_port);
224            results.push(self.connect(endpoint, Some(peer.peer_id.clone()), &self.id))
225        }
226
227        while let Some(result) = results.next().await {
228            match result {
229                Ok(peers) => peer_buffer.extend(peers),
230                Err(err) => {
231                    error!("Failed to do self lookup: {err}");
232                }
233            }
234        }
235
236        Ok(())
237    }
238
239    /// Connects to the given endpoint and initiates a lookup process for the
240    /// provided peer ID.
241    async fn connect(
242        &self,
243        endpoint: Endpoint,
244        peer_id: Option<PeerID>,
245        target_peer_id: &PeerID,
246    ) -> Result<Vec<PeerMsg>> {
247        let conn = self.connector.connect(&endpoint, &peer_id).await?;
248        let result = self.handle_outbound(conn, target_peer_id).await;
249
250        self.monitor.notify(ConnEvent::Disconnected(endpoint)).await;
251        self.outbound_slots.remove().await;
252
253        result
254    }
255
256    /// Handles outbound connection
257    async fn handle_outbound(
258        &self,
259        conn: ConnRef,
260        target_peer_id: &PeerID,
261    ) -> Result<Vec<PeerMsg>> {
262        trace!("Send Ping msg");
263        let peers;
264
265        let ping_msg = self.send_ping_msg(&conn).await?;
266
267        loop {
268            let t = Duration::from_secs(self.config.lookup_response_timeout);
269            let msg: NetMsg = timeout(t, conn.recv()).await??;
270            match msg.header.command {
271                NetMsgCmd::Pong => {
272                    let (pong_msg, _) = decode::<PongMsg>(&msg.payload)?;
273                    if ping_msg.nonce != pong_msg.0 {
274                        return Err(Error::InvalidPongMsg);
275                    }
276                    trace!("Send FindPeer msg");
277                    self.send_findpeer_msg(&conn, target_peer_id).await?;
278                }
279                NetMsgCmd::Peers => {
280                    peers = decode::<PeersMsg>(&msg.payload)?.0.peers;
281                    if peers.len() >= MAX_PEERS_IN_PEERSMSG {
282                        return Err(Error::Lookup(
283                            "Received too many peers in PeersMsg".to_string(),
284                        ));
285                    }
286                    break;
287                }
288                c => return Err(Error::InvalidMsg(format!("Unexpected msg: {:?}", c))),
289            };
290        }
291
292        trace!("Send Peer msg");
293        if let Some(endpoint) = self.listen_endpoint() {
294            self.send_peer_msg(&conn, endpoint).await?;
295        }
296
297        trace!("Send Shutdown msg");
298        self.send_shutdown_msg(&conn).await?;
299
300        Ok(peers)
301    }
302
303    /// Start a listener.
304    async fn start_listener(self: &Arc<Self>) -> Result<()> {
305        let endpoint: Endpoint = match self.discovery_endpoint() {
306            Some(e) => e,
307            None => return Ok(()),
308        };
309
310        let callback = {
311            let this = self.clone();
312            |conn: ConnRef| async move {
313                let t = Duration::from_secs(this.config.lookup_connection_lifespan);
314                timeout(t, this.handle_inbound(conn)).await??;
315                Ok(())
316            }
317        };
318
319        self.listener.start(endpoint, callback).await?;
320        Ok(())
321    }
322
323    /// Handles inbound connection
324    async fn handle_inbound(self: &Arc<Self>, conn: ConnRef) -> Result<()> {
325        loop {
326            let msg: NetMsg = conn.recv().await?;
327            trace!("Receive msg {:?}", msg.header.command);
328
329            if let NetMsgCmd::Shutdown = msg.header.command {
330                return Ok(());
331            }
332
333            match &msg.header.command {
334                NetMsgCmd::Ping => {
335                    let (ping_msg, _) = decode::<PingMsg>(&msg.payload)?;
336                    if !version_match(&self.config.version.req, &ping_msg.version) {
337                        return Err(Error::IncompatibleVersion("system: {}".into()));
338                    }
339                    self.send_pong_msg(ping_msg.nonce, &conn).await?;
340                }
341                NetMsgCmd::FindPeer => {
342                    let (findpeer_msg, _) = decode::<FindPeerMsg>(&msg.payload)?;
343                    let peer_id = findpeer_msg.0;
344                    self.send_peers_msg(&peer_id, &conn).await?;
345                }
346                NetMsgCmd::Peer => {
347                    let (peer, _) = decode::<PeerMsg>(&msg.payload)?;
348                    let result = self.table.add_entry(peer.clone().into());
349                    trace!("Add entry result: {:?}", result);
350                }
351                c => return Err(Error::InvalidMsg(format!("Unexpected msg: {:?}", c))),
352            }
353        }
354    }
355
356    /// Sends a Ping msg.
357    async fn send_ping_msg(&self, conn: &ConnRef) -> Result<PingMsg> {
358        trace!("Send Pong msg");
359        let mut nonce: [u8; 32] = [0; 32];
360        RngCore::fill_bytes(&mut OsRng, &mut nonce);
361
362        let ping_msg = PingMsg {
363            version: self.config.version.v.clone(),
364            nonce,
365        };
366        conn.send(NetMsg::new(NetMsgCmd::Ping, &ping_msg)?).await?;
367        Ok(ping_msg)
368    }
369
370    /// Sends a Pong msg
371    async fn send_pong_msg(&self, nonce: [u8; 32], conn: &ConnRef) -> Result<()> {
372        trace!("Send Pong msg");
373        conn.send(NetMsg::new(NetMsgCmd::Pong, PongMsg(nonce))?)
374            .await?;
375        Ok(())
376    }
377
378    /// Sends a FindPeer msg
379    async fn send_findpeer_msg(&self, conn: &ConnRef, peer_id: &PeerID) -> Result<()> {
380        trace!("Send FindPeer msg");
381        conn.send(NetMsg::new(
382            NetMsgCmd::FindPeer,
383            FindPeerMsg(peer_id.clone()),
384        )?)
385        .await?;
386        Ok(())
387    }
388
389    /// Sends a Peers msg.
390    async fn send_peers_msg(&self, peer_id: &PeerID, conn: &ConnRef) -> Result<()> {
391        trace!("Send Peers msg");
392        let entries = self
393            .table
394            .closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG);
395
396        let peers: Vec<PeerMsg> = entries.into_iter().map(|e| e.into()).collect();
397        conn.send(NetMsg::new(NetMsgCmd::Peers, PeersMsg { peers })?)
398            .await?;
399        Ok(())
400    }
401
402    /// Sends a Peer msg.
403    async fn send_peer_msg(&self, conn: &ConnRef, endpoint: Endpoint) -> Result<()> {
404        trace!("Send Peer msg");
405        let peer_msg = PeerMsg {
406            addr: endpoint.addr()?.clone(),
407            port: *endpoint.port()?,
408            discovery_port: self.config.discovery_port,
409            peer_id: self.id.clone(),
410        };
411        conn.send(NetMsg::new(NetMsgCmd::Peer, &peer_msg)?).await?;
412        Ok(())
413    }
414
415    /// Sends a Shutdown msg.
416    async fn send_shutdown_msg(&self, conn: &ConnRef) -> Result<()> {
417        trace!("Send Shutdown msg");
418        conn.send(NetMsg::new(NetMsgCmd::Shutdown, ShutdownMsg(0))?)
419            .await?;
420        Ok(())
421    }
422}