1mod lookup;
2mod refresh;
3
4use std::sync::Arc;
5
6use log::{error, info};
7use rand::{rngs::OsRng, seq::SliceRandom};
8
9use karyon_core::{
10 async_runtime::Executor,
11 async_util::{Backoff, TaskGroup, TaskResult},
12 crypto::KeyPair,
13};
14
15use karyon_net::Endpoint;
16
17use crate::{
18 config::Config,
19 conn_queue::ConnQueue,
20 connection::ConnDirection,
21 connector::Connector,
22 listener::Listener,
23 monitor::Monitor,
24 routing_table::{
25 RoutingTable, CONNECTED_ENTRY, DISCONNECTED_ENTRY, INCOMPATIBLE_ENTRY, PENDING_ENTRY,
26 UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
27 },
28 slots::ConnectionSlots,
29 ConnRef, Error, PeerID, Result,
30};
31
32use lookup::LookupService;
33use refresh::RefreshService;
34
35pub struct Discovery {
36 table: Arc<RoutingTable>,
38
39 lookup_service: Arc<LookupService>,
41
42 refresh_service: Arc<RefreshService>,
44
45 connector: Arc<Connector>,
47
48 listener: Arc<Listener>,
50
51 conn_queue: Arc<ConnQueue>,
53
54 task_group: TaskGroup,
56
57 config: Arc<Config>,
59}
60
61impl Discovery {
62 pub fn new(
64 key_pair: &KeyPair,
65 peer_id: &PeerID,
66 conn_queue: Arc<ConnQueue>,
67 config: Arc<Config>,
68 monitor: Arc<Monitor>,
69 ex: Executor,
70 ) -> Arc<Discovery> {
71 let table = Arc::new(RoutingTable::new(peer_id.0));
72
73 let refresh_service = Arc::new(RefreshService::new(
74 config.clone(),
75 table.clone(),
76 monitor.clone(),
77 ex.clone(),
78 ));
79
80 let lookup_service = Arc::new(LookupService::new(
81 key_pair,
82 table.clone(),
83 config.clone(),
84 monitor.clone(),
85 ex.clone(),
86 ));
87
88 let outbound_slots = Arc::new(ConnectionSlots::new(config.outbound_slots));
89 let connector = Connector::new(
90 key_pair,
91 config.max_connect_retries,
92 outbound_slots.clone(),
93 config.enable_tls,
94 monitor.clone(),
95 ex.clone(),
96 );
97
98 let inbound_slots = Arc::new(ConnectionSlots::new(config.inbound_slots));
99 let listener = Listener::new(
100 key_pair,
101 inbound_slots.clone(),
102 config.enable_tls,
103 monitor.clone(),
104 ex.clone(),
105 );
106
107 let task_group = TaskGroup::with_executor(ex);
108
109 Arc::new(Self {
110 refresh_service,
111 lookup_service,
112 conn_queue,
113 table,
114 connector,
115 listener,
116 task_group,
117 config,
118 })
119 }
120
121 pub async fn start(self: &Arc<Self>) -> Result<()> {
123 if let Some(endpoint) = &self.config.listen_endpoint {
125 if self.config.discovery_port == 0 {
127 return Err(Error::Config(
128 "Please add a valid discovery port".to_string(),
129 ));
130 }
131
132 let resolved_endpoint = self.start_listener(endpoint).await?;
133
134 info!("Resolved listen endpoint: {resolved_endpoint}");
135 self.lookup_service
136 .set_listen_endpoint(&resolved_endpoint)?;
137 self.refresh_service
138 .set_listen_endpoint(&resolved_endpoint)?;
139 }
140
141 self.lookup_service.start().await?;
143 self.refresh_service.start().await?;
145
146 for endpoint in self.config.peer_endpoints.iter() {
148 let _ = self.connect(endpoint, None).await;
149 }
150
151 self.task_group.spawn(
153 {
154 let this = self.clone();
155 async move { this.connect_loop().await }
156 },
157 |res| async move {
158 if let TaskResult::Completed(Err(err)) = res {
159 error!("Connect loop stopped: {err}");
160 }
161 },
162 );
163
164 Ok(())
165 }
166
167 pub async fn shutdown(&self) {
169 self.task_group.cancel().await;
170 self.connector.shutdown().await;
171 self.listener.shutdown().await;
172
173 self.refresh_service.shutdown().await;
174 self.lookup_service.shutdown().await;
175 }
176
177 async fn start_listener(self: &Arc<Self>, endpoint: &Endpoint) -> Result<Endpoint> {
179 let callback = {
180 let this = self.clone();
181 |c: ConnRef| async move {
182 this.conn_queue.handle(c, ConnDirection::Inbound).await?;
183 Ok(())
184 }
185 };
186
187 let resolved_endpoint = self.listener.start(endpoint.clone(), callback).await?;
188 Ok(resolved_endpoint)
189 }
190
191 async fn connect_loop(self: Arc<Self>) -> Result<()> {
198 let backoff = Backoff::new(500, self.config.seeding_interval * 1000);
199 loop {
200 match self.table.random_entry(PENDING_ENTRY) {
201 Some(entry) => {
202 backoff.reset();
203 let endpoint = Endpoint::Tcp(entry.addr, entry.port);
204 self.connect(&endpoint, Some(entry.key.into())).await;
205 }
206 None => {
207 backoff.sleep().await;
208 self.start_seeding().await;
209 }
210 }
211 }
212 }
213
214 async fn connect(self: &Arc<Self>, endpoint: &Endpoint, pid: Option<PeerID>) {
216 let cback = {
217 let this = self.clone();
218 let endpoint = endpoint.clone();
219 let pid = pid.clone();
220 |conn: ConnRef| async move {
221 let result = this.conn_queue.handle(conn, ConnDirection::Outbound).await;
222
223 let pid = match pid {
225 Some(p) => p,
226 None => return Ok(()),
227 };
228
229 match result {
230 Err(Error::IncompatiblePeer) => {
231 error!("Failed to do handshake: {endpoint} incompatible peer");
232 this.table.update_entry(&pid.0, INCOMPATIBLE_ENTRY);
233 }
234 Err(Error::PeerAlreadyConnected) => {
235 this.table.update_entry(&pid.0, CONNECTED_ENTRY)
236 }
237 Err(_) => this.table.update_entry(&pid.0, UNSTABLE_ENTRY),
238 Ok(_) => this.table.update_entry(&pid.0, DISCONNECTED_ENTRY),
239 }
240
241 Ok(())
242 }
243 };
244
245 let result = self
246 .connector
247 .connect_with_cback(endpoint, &pid, cback)
248 .await;
249
250 if let Some(pid) = &pid {
251 match result {
252 Ok(_) => self.table.update_entry(&pid.0, CONNECTED_ENTRY),
253 Err(_) => self.table.update_entry(&pid.0, UNREACHABLE_ENTRY),
254 }
255 }
256 }
257
258 async fn start_seeding(&self) {
265 match self.table.random_entry(PENDING_ENTRY | CONNECTED_ENTRY) {
266 Some(entry) => {
267 let endpoint = Endpoint::Tcp(entry.addr, entry.discovery_port);
268 let peer_id = Some(entry.key.into());
269 if let Err(err) = self.lookup_service.start_lookup(&endpoint, peer_id).await {
270 self.table.update_entry(&entry.key, UNSTABLE_ENTRY);
271 error!("Failed to do lookup: {endpoint}: {err}");
272 }
273 }
274 None => {
275 let peers = &self.config.bootstrap_peers;
276 for endpoint in peers.choose_multiple(&mut OsRng, peers.len()) {
277 if let Err(err) = self.lookup_service.start_lookup(endpoint, None).await {
278 error!("Failed to do lookup: {endpoint}: {err}");
279 }
280 }
281 }
282 }
283 }
284}