1mod bucket;
2mod entry;
3
4use std::net::IpAddr;
5
6use parking_lot::RwLock;
7
8use rand::seq::IndexedRandom;
9
10use karyon_net::Addr;
11
12use crate::bloom::Bloom;
13
14pub use bucket::{
15 Bucket, BucketEntry, EntryStatusFlag, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY,
16 UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
17};
18pub use entry::{xor_distance, Entry, Key};
19
20use bucket::BUCKET_SIZE;
21use entry::KEY_SIZE;
22
23const TABLE_SIZE: usize = 32;
25
26const DISTANCE_LIMIT: usize = 32;
28
29const MAX_MATCHED_SUBNET_IN_BUCKET: usize = 1;
31
32const MAX_MATCHED_SUBNET_IN_TABLE: usize = 6;
34
35#[derive(Debug)]
37pub enum AddEntryResult {
38 Added,
40 Exists,
42 Ignored,
44 Restricted,
46}
47
48#[derive(Debug)]
51pub struct RoutingTable {
52 key: Key,
53 buckets: RwLock<Vec<Bucket>>,
54}
55
56impl RoutingTable {
57 pub fn new(key: Key) -> Self {
59 let buckets: Vec<Bucket> = (0..TABLE_SIZE).map(|_| Bucket::new()).collect();
60 Self {
61 key,
62 buckets: RwLock::new(buckets),
63 }
64 }
65
66 pub fn add_entry(&self, entry: Entry) -> AddEntryResult {
72 let bucket_idx = match self.bucket_index(&entry.key) {
74 Some(i) => i,
75 None => return AddEntryResult::Ignored,
76 };
77
78 let mut buckets = self.buckets.write();
79
80 if buckets[bucket_idx].contains_key(&entry.key) {
82 return AddEntryResult::Exists;
83 }
84
85 if subnet_restricted(&buckets, bucket_idx, &entry) {
87 return AddEntryResult::Restricted;
88 }
89
90 let bucket = &mut buckets[bucket_idx];
91
92 if bucket.len() < BUCKET_SIZE {
94 bucket.add(&entry);
95 return AddEntryResult::Added;
96 }
97
98 let incompatible_entry = bucket.iter().find(|e| e.is_incompatible()).cloned();
100 if let Some(e) = incompatible_entry {
101 bucket.remove(&e.entry.key);
102 bucket.add(&entry);
103 return AddEntryResult::Added;
104 }
105
106 AddEntryResult::Ignored
108 }
109
110 pub fn contains_key(&self, key: &Key) -> bool {
112 let buckets = self.buckets.read();
113 let bucket_idx = match self.bucket_index(key) {
115 Some(bi) => bi,
116 None => return false,
117 };
118
119 let bucket = &buckets[bucket_idx];
120 bucket.contains_key(key)
121 }
122
123 pub fn update_entry(&self, key: &Key, entry_flag: EntryStatusFlag) {
128 let mut buckets = self.buckets.write();
129 let bucket_idx = match self.bucket_index(key) {
131 Some(bi) => bi,
132 None => return,
133 };
134
135 let bucket = &mut buckets[bucket_idx];
136 bucket.update_entry(key, entry_flag);
137 }
138
139 pub fn bucket_indexes(&self, target_key: &Key) -> Vec<usize> {
141 let mut indexes = vec![];
142
143 let bucket_idx = self.bucket_index(target_key).unwrap_or(0);
145
146 indexes.push(bucket_idx);
147
148 for i in 1..DISTANCE_LIMIT {
150 if bucket_idx >= i && bucket_idx - i >= 1 {
151 indexes.push(bucket_idx - i);
152 }
153
154 if bucket_idx + i < (TABLE_SIZE - 1) {
155 indexes.push(bucket_idx + i);
156 }
157 }
158
159 indexes
160 }
161
162 pub fn closest_entries(&self, target_key: &Key, max_entries: usize) -> Vec<Entry> {
164 self.closest_entries_filtered(target_key, max_entries, &Bloom::empty())
165 }
166
167 pub fn closest_entries_filtered(
171 &self,
172 target_key: &Key,
173 max_entries: usize,
174 mine: &Bloom,
175 ) -> Vec<Entry> {
176 let buckets = self.buckets.read();
177 let mut entries: Vec<Entry> = vec![];
178
179 'outer: for idx in self.bucket_indexes(target_key) {
180 let bucket = &buckets[idx];
181 for bucket_entry in bucket.iter() {
182 if bucket_entry.is_unreachable() || bucket_entry.is_unstable() {
183 continue;
184 }
185 if !matches_local(&bucket_entry.entry.protocols, mine) {
186 continue;
187 }
188
189 entries.push(bucket_entry.entry.clone());
190 if entries.len() == max_entries {
191 break 'outer;
192 }
193 }
194 }
195
196 entries.sort_by(|a, b| {
197 xor_distance(target_key, &a.key).cmp(&xor_distance(target_key, &b.key))
198 });
199
200 entries
201 }
202
203 pub fn entries_with_item(&self, item: &[u8]) -> Vec<Entry> {
207 let buckets = self.buckets.read();
208 let mut entries = Vec::new();
209 for bucket in buckets.iter() {
210 for be in bucket.iter() {
211 if be.is_unreachable() || be.is_unstable() {
212 continue;
213 }
214 if be.entry.protocols.may_contain(item) {
215 entries.push(be.entry.clone());
216 }
217 }
218 }
219 entries
220 }
221
222 pub fn remove_entry(&self, key: &Key) {
224 let mut buckets = self.buckets.write();
225 let bucket_idx = match self.bucket_index(key) {
227 Some(bi) => bi,
228 None => return,
229 };
230
231 let bucket = &mut buckets[bucket_idx];
232 bucket.remove(key);
233 }
234
235 pub fn has_discovery_ip(&self, ip: &IpAddr) -> bool {
238 let buckets = self.buckets.read();
239 for bucket in buckets.iter() {
240 for be in bucket.iter() {
241 for a in be.entry.discovery_addrs.iter() {
242 if let Addr::Ip(entry_ip) = &a.addr {
243 if entry_ip == ip {
244 return true;
245 }
246 }
247 }
248 }
249 }
250 false
251 }
252
253 pub fn refresh_candidates(&self, per_bucket: usize) -> Vec<BucketEntry> {
256 let buckets = self.buckets.read();
257 let mut entries = Vec::new();
258 for bucket in buckets.iter() {
259 for entry in bucket
260 .iter()
261 .filter(|e| !e.is_connected() && !e.is_incompatible())
262 .take(per_bucket)
263 {
264 entries.push(entry.clone());
265 }
266 }
267 entries
268 }
269
270 pub fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option<Entry> {
272 self.random_entry_filtered(entry_flag, &Bloom::empty())
273 }
274
275 pub fn random_entry_filtered(
278 &self,
279 entry_flag: EntryStatusFlag,
280 mine: &Bloom,
281 ) -> Option<Entry> {
282 let buckets = self.buckets.read();
283 for bucket in buckets.choose_multiple(&mut rand::rng(), buckets.len()) {
284 for entry in bucket.random_iter(bucket.len()) {
285 if entry.status & entry_flag == 0 {
286 continue;
287 }
288 if !matches_local(&entry.entry.protocols, mine) {
289 continue;
290 }
291 return Some(entry.entry.clone());
292 }
293 }
294
295 None
296 }
297
298 fn bucket_index(&self, key: &Key) -> Option<usize> {
300 let distance = xor_distance(&self.key, key);
302
303 for (i, b) in distance.iter().enumerate() {
304 if *b != 0 {
305 let lz = i * 8 + b.leading_zeros() as usize;
306 let bits = KEY_SIZE * 8 - 1;
307 let idx = (bits - lz) / 8;
308 return Some(idx);
309 }
310 }
311 None
312 }
313}
314
315fn matches_local(peer: &Bloom, mine: &Bloom) -> bool {
320 if mine.is_empty() {
321 return true;
322 }
323 if !peer.covers_mandatory(mine) {
324 return false;
325 }
326 if mine.optional == 0 {
327 return true;
328 }
329 peer.intersects_optional(mine)
330}
331
332fn subnet_restricted(buckets: &[Bucket], idx: usize, entry: &Entry) -> bool {
339 let mut bucket_count = 0;
340 let mut table_count = 0;
341
342 for (i, bucket) in buckets.iter().enumerate() {
343 for e in bucket.iter() {
344 let matched = match (e.entry.primary_addr(), entry.primary_addr()) {
345 (Some(a), Some(b)) => subnet_match(a, b),
346 _ => false,
347 };
348 if matched {
349 if i == idx {
350 bucket_count += 1;
351 }
352 table_count += 1;
353 }
354
355 if bucket_count >= MAX_MATCHED_SUBNET_IN_BUCKET {
356 return true;
357 }
358 }
359
360 if table_count >= MAX_MATCHED_SUBNET_IN_TABLE {
361 return true;
362 }
363 }
364
365 false
366}
367
368fn subnet_match(addr: &Addr, other_addr: &Addr) -> bool {
370 if is_loopback_pair(addr, other_addr) {
372 return false;
373 }
374 match (addr, other_addr) {
375 (Addr::Ip(IpAddr::V4(ip)), Addr::Ip(IpAddr::V4(other_ip))) => {
377 ip.octets()[0..3] == other_ip.octets()[0..3]
378 }
379 (Addr::Ip(IpAddr::V6(ip)), Addr::Ip(IpAddr::V6(other_ip))) => {
381 ip.segments()[0..4] == other_ip.segments()[0..4]
382 }
383 _ => false,
384 }
385}
386
387fn is_loopback_pair(a: &Addr, b: &Addr) -> bool {
389 match (a, b) {
390 (Addr::Ip(a), Addr::Ip(b)) => a.is_loopback() && b.is_loopback(),
391 _ => false,
392 }
393}
394
395#[cfg(test)]
396mod tests {
397 use super::bucket::ALL_ENTRY;
398 use super::*;
399
400 use karyon_net::Addr;
401
402 use crate::{
403 bloom::Bloom,
404 message::{PeerAddr, Protocol},
405 };
406
407 struct Setup {
408 local_key: Key,
409 keys: Vec<Key>,
410 }
411
412 fn new_entry(key: &Key, addr: &Addr, port: u16, discovery_port: u16) -> Entry {
413 Entry {
414 key: *key,
415 addrs: vec![PeerAddr {
416 addr: addr.clone(),
417 port,
418 protocol: Protocol::Tcp,
419 priority: 0,
420 }],
421 discovery_addrs: vec![
422 PeerAddr {
423 addr: addr.clone(),
424 port: discovery_port,
425 protocol: Protocol::Tcp,
426 priority: 0,
427 },
428 PeerAddr {
429 addr: addr.clone(),
430 port: discovery_port,
431 protocol: Protocol::Udp,
432 priority: 1,
433 },
434 ],
435 protocols: Bloom::empty(),
436 }
437 }
438
439 impl Setup {
440 fn new() -> Self {
441 let keys = vec![
442 [
443 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
444 0, 0, 0, 0, 0, 1,
445 ],
446 [
447 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
448 1, 1, 0, 1, 1, 2,
449 ],
450 [
451 0, 0, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
452 0, 0, 0, 0, 0, 3,
453 ],
454 [
455 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 30, 1, 18, 0, 0, 0,
456 0, 0, 0, 0, 0, 4,
457 ],
458 [
459 223, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
460 0, 0, 0, 0, 0, 5,
461 ],
462 [
463 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 50, 1, 18, 0, 0, 0,
464 0, 0, 0, 0, 0, 6,
465 ],
466 [
467 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 50, 1, 18, 0, 0,
468 0, 0, 0, 0, 0, 0, 7,
469 ],
470 [
471 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 10, 50, 1, 18, 0, 0,
472 0, 0, 0, 0, 0, 0, 8,
473 ],
474 [
475 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 10, 10, 50, 1, 18, 0, 0,
476 0, 0, 0, 0, 0, 0, 9,
477 ],
478 ];
479
480 Self {
481 local_key: [
482 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
483 0, 0, 0, 0, 0, 0,
484 ],
485 keys,
486 }
487 }
488
489 fn entries(&self) -> Vec<Entry> {
490 let mut entries = vec![];
491 for (i, key) in self.keys.iter().enumerate() {
492 entries.push(new_entry(
493 key,
494 &Addr::Ip(format!("127.0.{i}.1").parse().unwrap()),
495 3000,
496 3010,
497 ));
498 }
499 entries
500 }
501
502 fn table(&self) -> RoutingTable {
503 let table = RoutingTable::new(self.local_key);
504
505 for entry in self.entries() {
506 let res = table.add_entry(entry);
507 assert!(matches!(res, AddEntryResult::Added));
508 }
509
510 table
511 }
512 }
513
514 #[test]
515 fn test_bucket_index() {
516 let setup = Setup::new();
517 let table = setup.table();
518
519 assert_eq!(table.bucket_index(&setup.local_key), None);
520 assert_eq!(table.bucket_index(&setup.keys[0]), Some(0));
521 assert_eq!(table.bucket_index(&setup.keys[1]), Some(5));
522 assert_eq!(table.bucket_index(&setup.keys[2]), Some(26));
523 assert_eq!(table.bucket_index(&setup.keys[3]), Some(11));
524 assert_eq!(table.bucket_index(&setup.keys[4]), Some(31));
525 assert_eq!(table.bucket_index(&setup.keys[5]), Some(11));
526 assert_eq!(table.bucket_index(&setup.keys[6]), Some(12));
527 assert_eq!(table.bucket_index(&setup.keys[7]), Some(13));
528 assert_eq!(table.bucket_index(&setup.keys[8]), Some(14));
529 }
530
531 #[test]
532 fn test_closest_entries() {
533 let setup = Setup::new();
534 let table = setup.table();
535 let entries = setup.entries();
536
537 assert_eq!(
538 table.closest_entries(&setup.keys[5], 8),
539 vec![
540 entries[5].clone(),
541 entries[3].clone(),
542 entries[1].clone(),
543 entries[6].clone(),
544 entries[7].clone(),
545 entries[8].clone(),
546 entries[2].clone(),
547 ]
548 );
549
550 assert_eq!(
551 table.closest_entries(&setup.keys[4], 2),
552 vec![entries[4].clone(), entries[2].clone()]
553 );
554 }
555
556 #[test]
557 fn test_random_entry() {
558 let setup = Setup::new();
559 let table = setup.table();
560 let entries = setup.entries();
561
562 let entry = table.random_entry(ALL_ENTRY);
563 assert!(entry.is_some());
564
565 let entry = table.random_entry(CONNECTED_ENTRY);
566 assert!(entry.is_none());
567
568 for entry in entries {
569 table.remove_entry(&entry.key);
570 }
571
572 let entry = table.random_entry(ALL_ENTRY);
573 assert!(entry.is_none());
574 }
575
576 #[test]
577 fn test_add_entries() {
578 let setup = Setup::new();
579 let table = setup.table();
580
581 let key = [
582 0, 0, 0, 0, 0, 0, 0, 1, 3, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
583 0, 0, 5,
584 ];
585
586 let key2 = [
587 0, 0, 0, 0, 0, 0, 0, 1, 2, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
588 0, 0, 5,
589 ];
590
591 let entry1 = new_entry(&key, &Addr::Ip("240.120.3.1".parse().unwrap()), 3000, 3010);
592 assert!(matches!(
593 table.add_entry(entry1.clone()),
594 AddEntryResult::Added
595 ));
596
597 assert!(matches!(table.add_entry(entry1), AddEntryResult::Exists));
598
599 let entry2 = new_entry(&key2, &Addr::Ip("240.120.3.2".parse().unwrap()), 3000, 3010);
600 assert!(matches!(
601 table.add_entry(entry2),
602 AddEntryResult::Restricted
603 ));
604
605 let mut key: [u8; 32] = [0; 32];
606
607 for i in 0..BUCKET_SIZE {
608 key[i] += 1;
609 let entry = new_entry(
610 &key,
611 &Addr::Ip(format!("127.0.{i}.1").parse().unwrap()),
612 3000,
613 3010,
614 );
615 table.add_entry(entry);
616 }
617
618 key[BUCKET_SIZE] += 1;
619 let entry = new_entry(&key, &Addr::Ip("125.20.0.1".parse().unwrap()), 3000, 3010);
620 assert!(matches!(table.add_entry(entry), AddEntryResult::Ignored));
621 }
622
623 use std::net::{Ipv4Addr, Ipv6Addr};
624 #[test]
625 fn check_subnet_match() {
626 let addr_v4 = Addr::Ip(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)));
627 let other_addr_v4 = Addr::Ip(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)));
628
629 let addr_v6 = Addr::Ip(IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)));
630 let other_addr_v6 = Addr::Ip(IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 2)));
631 let diff_addr_v6 = Addr::Ip(IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb7, 0, 0, 0, 0, 0, 2)));
632
633 assert!(subnet_match(&addr_v4, &other_addr_v4));
634 assert!(subnet_match(&addr_v6, &other_addr_v6));
635 assert!(!subnet_match(&addr_v6, &diff_addr_v6));
636 }
637}