1use std::{sync::Arc, time::Duration};
2
3use futures_util::stream::{FuturesUnordered, StreamExt};
4use log::{error, trace};
5use parking_lot::RwLock;
6use rand::{rngs::OsRng, seq::SliceRandom, RngCore};
7
8use karyon_core::{async_runtime::Executor, async_util::timeout, crypto::KeyPair, util::decode};
9
10use karyon_net::Endpoint;
11
12use crate::{
13 connector::Connector,
14 listener::Listener,
15 message::{FindPeerMsg, NetMsg, NetMsgCmd, PeerMsg, PeersMsg, PingMsg, PongMsg, ShutdownMsg},
16 monitor::{ConnEvent, DiscvEvent, Monitor},
17 routing_table::RoutingTable,
18 slots::ConnectionSlots,
19 version::version_match,
20 Config, ConnRef, Error, PeerID, Result,
21};
22
23pub const MAX_PEERS_IN_PEERSMSG: usize = 10;
25
26pub struct LookupService {
27 id: PeerID,
29
30 table: Arc<RoutingTable>,
32
33 listener: Arc<Listener>,
35 connector: Arc<Connector>,
37
38 outbound_slots: Arc<ConnectionSlots>,
40
41 listen_endpoint: RwLock<Option<Endpoint>>,
43
44 discovery_endpoint: RwLock<Option<Endpoint>>,
46
47 config: Arc<Config>,
49
50 monitor: Arc<Monitor>,
52}
53
54impl LookupService {
55 pub fn new(
57 key_pair: &KeyPair,
58 table: Arc<RoutingTable>,
59 config: Arc<Config>,
60 monitor: Arc<Monitor>,
61 ex: Executor,
62 ) -> Self {
63 let inbound_slots = Arc::new(ConnectionSlots::new(config.lookup_inbound_slots));
64 let outbound_slots = Arc::new(ConnectionSlots::new(config.lookup_outbound_slots));
65
66 let listener = Listener::new(
67 key_pair,
68 inbound_slots.clone(),
69 config.enable_tls,
70 monitor.clone(),
71 ex.clone(),
72 );
73
74 let connector = Connector::new(
75 key_pair,
76 config.lookup_connect_retries,
77 outbound_slots.clone(),
78 config.enable_tls,
79 monitor.clone(),
80 ex,
81 );
82
83 let id = key_pair
84 .public()
85 .try_into()
86 .expect("Get PeerID from KeyPair");
87 Self {
88 id,
89 table,
90 listener,
91 connector,
92 outbound_slots,
93 listen_endpoint: RwLock::new(None),
94 discovery_endpoint: RwLock::new(None),
95 config,
96 monitor,
97 }
98 }
99
100 pub async fn start(self: &Arc<Self>) -> Result<()> {
102 self.start_listener().await?;
103 Ok(())
104 }
105
106 pub fn set_listen_endpoint(&self, resolved_endpoint: &Endpoint) -> Result<()> {
108 let discovery_endpoint = Endpoint::Tcp(
109 resolved_endpoint.addr()?.clone(),
110 self.config.discovery_port,
111 );
112 *self.listen_endpoint.write() = Some(resolved_endpoint.clone());
113 *self.discovery_endpoint.write() = Some(discovery_endpoint.clone());
114 Ok(())
115 }
116
117 pub fn listen_endpoint(&self) -> Option<Endpoint> {
118 self.listen_endpoint.read().clone()
119 }
120
121 pub fn discovery_endpoint(&self) -> Option<Endpoint> {
122 self.discovery_endpoint.read().clone()
123 }
124
125 pub async fn shutdown(&self) {
127 self.connector.shutdown().await;
128 self.listener.shutdown().await;
129 }
130
131 pub async fn start_lookup(&self, endpoint: &Endpoint, peer_id: Option<PeerID>) -> Result<()> {
139 trace!("Lookup started {endpoint}");
140 self.monitor
141 .notify(DiscvEvent::LookupStarted(endpoint.clone()))
142 .await;
143
144 let mut random_peers = vec![];
145 if let Err(err) = self
146 .random_lookup(endpoint, peer_id, &mut random_peers)
147 .await
148 {
149 self.monitor
150 .notify(DiscvEvent::LookupFailed(endpoint.clone()))
151 .await;
152 return Err(err);
153 };
154
155 let mut peer_buffer = vec![];
156 if let Err(err) = self.self_lookup(&random_peers, &mut peer_buffer).await {
157 self.monitor
158 .notify(DiscvEvent::LookupFailed(endpoint.clone()))
159 .await;
160 return Err(err);
161 }
162
163 while peer_buffer.len() < MAX_PEERS_IN_PEERSMSG {
164 match random_peers.pop() {
165 Some(p) => peer_buffer.push(p),
166 None => break,
167 }
168 }
169
170 for peer in peer_buffer.iter() {
171 let result = self.table.add_entry(peer.clone().into());
172 trace!("Add entry {:?}", result);
173 }
174
175 self.monitor
176 .notify(DiscvEvent::LookupSucceeded(
177 endpoint.clone(),
178 peer_buffer.len(),
179 ))
180 .await;
181
182 Ok(())
183 }
184
185 async fn random_lookup(
189 &self,
190 endpoint: &Endpoint,
191 peer_id: Option<PeerID>,
192 random_peers: &mut Vec<PeerMsg>,
193 ) -> Result<()> {
194 for _ in 0..2 {
195 let random_peer_id = PeerID::random();
196 let peers = self
197 .connect(endpoint.clone(), peer_id.clone(), &random_peer_id)
198 .await?;
199
200 for peer in peers {
201 if random_peers.contains(&peer)
202 || peer.peer_id == self.id
203 || self.table.contains_key(&peer.peer_id.0)
204 {
205 continue;
206 }
207
208 random_peers.push(peer);
209 }
210 }
211
212 Ok(())
213 }
214
215 async fn self_lookup(
217 &self,
218 random_peers: &[PeerMsg],
219 peer_buffer: &mut Vec<PeerMsg>,
220 ) -> Result<()> {
221 let mut results = FuturesUnordered::new();
222 for peer in random_peers.choose_multiple(&mut OsRng, random_peers.len()) {
223 let endpoint = Endpoint::Tcp(peer.addr.clone(), peer.discovery_port);
224 results.push(self.connect(endpoint, Some(peer.peer_id.clone()), &self.id))
225 }
226
227 while let Some(result) = results.next().await {
228 match result {
229 Ok(peers) => peer_buffer.extend(peers),
230 Err(err) => {
231 error!("Failed to do self lookup: {err}");
232 }
233 }
234 }
235
236 Ok(())
237 }
238
239 async fn connect(
242 &self,
243 endpoint: Endpoint,
244 peer_id: Option<PeerID>,
245 target_peer_id: &PeerID,
246 ) -> Result<Vec<PeerMsg>> {
247 let conn = self.connector.connect(&endpoint, &peer_id).await?;
248 let result = self.handle_outbound(conn, target_peer_id).await;
249
250 self.monitor.notify(ConnEvent::Disconnected(endpoint)).await;
251 self.outbound_slots.remove().await;
252
253 result
254 }
255
256 async fn handle_outbound(
258 &self,
259 conn: ConnRef,
260 target_peer_id: &PeerID,
261 ) -> Result<Vec<PeerMsg>> {
262 trace!("Send Ping msg");
263 let peers;
264
265 let ping_msg = self.send_ping_msg(&conn).await?;
266
267 loop {
268 let t = Duration::from_secs(self.config.lookup_response_timeout);
269 let msg: NetMsg = timeout(t, conn.recv()).await??;
270 match msg.header.command {
271 NetMsgCmd::Pong => {
272 let (pong_msg, _) = decode::<PongMsg>(&msg.payload)?;
273 if ping_msg.nonce != pong_msg.0 {
274 return Err(Error::InvalidPongMsg);
275 }
276 trace!("Send FindPeer msg");
277 self.send_findpeer_msg(&conn, target_peer_id).await?;
278 }
279 NetMsgCmd::Peers => {
280 peers = decode::<PeersMsg>(&msg.payload)?.0.peers;
281 if peers.len() >= MAX_PEERS_IN_PEERSMSG {
282 return Err(Error::Lookup(
283 "Received too many peers in PeersMsg".to_string(),
284 ));
285 }
286 break;
287 }
288 c => return Err(Error::InvalidMsg(format!("Unexpected msg: {:?}", c))),
289 };
290 }
291
292 trace!("Send Peer msg");
293 if let Some(endpoint) = self.listen_endpoint() {
294 self.send_peer_msg(&conn, endpoint).await?;
295 }
296
297 trace!("Send Shutdown msg");
298 self.send_shutdown_msg(&conn).await?;
299
300 Ok(peers)
301 }
302
303 async fn start_listener(self: &Arc<Self>) -> Result<()> {
305 let endpoint: Endpoint = match self.discovery_endpoint() {
306 Some(e) => e,
307 None => return Ok(()),
308 };
309
310 let callback = {
311 let this = self.clone();
312 |conn: ConnRef| async move {
313 let t = Duration::from_secs(this.config.lookup_connection_lifespan);
314 timeout(t, this.handle_inbound(conn)).await??;
315 Ok(())
316 }
317 };
318
319 self.listener.start(endpoint, callback).await?;
320 Ok(())
321 }
322
323 async fn handle_inbound(self: &Arc<Self>, conn: ConnRef) -> Result<()> {
325 loop {
326 let msg: NetMsg = conn.recv().await?;
327 trace!("Receive msg {:?}", msg.header.command);
328
329 if let NetMsgCmd::Shutdown = msg.header.command {
330 return Ok(());
331 }
332
333 match &msg.header.command {
334 NetMsgCmd::Ping => {
335 let (ping_msg, _) = decode::<PingMsg>(&msg.payload)?;
336 if !version_match(&self.config.version.req, &ping_msg.version) {
337 return Err(Error::IncompatibleVersion("system: {}".into()));
338 }
339 self.send_pong_msg(ping_msg.nonce, &conn).await?;
340 }
341 NetMsgCmd::FindPeer => {
342 let (findpeer_msg, _) = decode::<FindPeerMsg>(&msg.payload)?;
343 let peer_id = findpeer_msg.0;
344 self.send_peers_msg(&peer_id, &conn).await?;
345 }
346 NetMsgCmd::Peer => {
347 let (peer, _) = decode::<PeerMsg>(&msg.payload)?;
348 let result = self.table.add_entry(peer.clone().into());
349 trace!("Add entry result: {:?}", result);
350 }
351 c => return Err(Error::InvalidMsg(format!("Unexpected msg: {:?}", c))),
352 }
353 }
354 }
355
356 async fn send_ping_msg(&self, conn: &ConnRef) -> Result<PingMsg> {
358 trace!("Send Pong msg");
359 let mut nonce: [u8; 32] = [0; 32];
360 RngCore::fill_bytes(&mut OsRng, &mut nonce);
361
362 let ping_msg = PingMsg {
363 version: self.config.version.v.clone(),
364 nonce,
365 };
366 conn.send(NetMsg::new(NetMsgCmd::Ping, &ping_msg)?).await?;
367 Ok(ping_msg)
368 }
369
370 async fn send_pong_msg(&self, nonce: [u8; 32], conn: &ConnRef) -> Result<()> {
372 trace!("Send Pong msg");
373 conn.send(NetMsg::new(NetMsgCmd::Pong, PongMsg(nonce))?)
374 .await?;
375 Ok(())
376 }
377
378 async fn send_findpeer_msg(&self, conn: &ConnRef, peer_id: &PeerID) -> Result<()> {
380 trace!("Send FindPeer msg");
381 conn.send(NetMsg::new(
382 NetMsgCmd::FindPeer,
383 FindPeerMsg(peer_id.clone()),
384 )?)
385 .await?;
386 Ok(())
387 }
388
389 async fn send_peers_msg(&self, peer_id: &PeerID, conn: &ConnRef) -> Result<()> {
391 trace!("Send Peers msg");
392 let entries = self
393 .table
394 .closest_entries(&peer_id.0, MAX_PEERS_IN_PEERSMSG);
395
396 let peers: Vec<PeerMsg> = entries.into_iter().map(|e| e.into()).collect();
397 conn.send(NetMsg::new(NetMsgCmd::Peers, PeersMsg { peers })?)
398 .await?;
399 Ok(())
400 }
401
402 async fn send_peer_msg(&self, conn: &ConnRef, endpoint: Endpoint) -> Result<()> {
404 trace!("Send Peer msg");
405 let peer_msg = PeerMsg {
406 addr: endpoint.addr()?.clone(),
407 port: *endpoint.port()?,
408 discovery_port: self.config.discovery_port,
409 peer_id: self.id.clone(),
410 };
411 conn.send(NetMsg::new(NetMsgCmd::Peer, &peer_msg)?).await?;
412 Ok(())
413 }
414
415 async fn send_shutdown_msg(&self, conn: &ConnRef) -> Result<()> {
417 trace!("Send Shutdown msg");
418 conn.send(NetMsg::new(NetMsgCmd::Shutdown, ShutdownMsg(0))?)
419 .await?;
420 Ok(())
421 }
422}