Skip to main content

karyon_p2p/discovery/kademlia/routing_table/
mod.rs

1mod bucket;
2mod entry;
3
4use std::net::IpAddr;
5
6use parking_lot::RwLock;
7
8use rand::seq::IndexedRandom;
9
10use karyon_net::Addr;
11
12use crate::bloom::Bloom;
13
14pub use bucket::{
15    Bucket, BucketEntry, EntryStatusFlag, CONNECTED_ENTRY, DISCONNECTED_ENTRY, PENDING_ENTRY,
16    UNREACHABLE_ENTRY, UNSTABLE_ENTRY,
17};
18pub use entry::{xor_distance, Entry, Key};
19
20use bucket::BUCKET_SIZE;
21use entry::KEY_SIZE;
22
23/// The total number of buckets in the routing table.
24const TABLE_SIZE: usize = 32;
25
26/// The distance limit for the closest buckets.
27const DISTANCE_LIMIT: usize = 32;
28
29/// The maximum number of matched subnets allowed within a single bucket.
30const MAX_MATCHED_SUBNET_IN_BUCKET: usize = 1;
31
32/// The maximum number of matched subnets across the entire routing table.
33const MAX_MATCHED_SUBNET_IN_TABLE: usize = 6;
34
35/// Represents the possible result when adding a new entry.
36#[derive(Debug)]
37pub enum AddEntryResult {
38    /// The entry is added.
39    Added,
40    /// The entry is already exists.
41    Exists,
42    /// The entry is ignored.
43    Ignored,
44    /// The entry is restricted and not allowed.
45    Restricted,
46}
47
48/// This is a modified version of the Kademlia Distributed Hash Table (DHT).
49/// <https://en.wikipedia.org/wiki/Kademlia>
50#[derive(Debug)]
51pub struct RoutingTable {
52    key: Key,
53    buckets: RwLock<Vec<Bucket>>,
54}
55
56impl RoutingTable {
57    /// Creates a new RoutingTable
58    pub fn new(key: Key) -> Self {
59        let buckets: Vec<Bucket> = (0..TABLE_SIZE).map(|_| Bucket::new()).collect();
60        Self {
61            key,
62            buckets: RwLock::new(buckets),
63        }
64    }
65
66    /// Adds a new entry to the table and returns a result indicating success,
67    /// failure, or restrictions.
68    ///
69    /// Takes the write lock once and performs all checks inside it to
70    /// avoid TOCTOU between contains_key / subnet_restricted / add.
71    pub fn add_entry(&self, entry: Entry) -> AddEntryResult {
72        // Determine the index of the bucket where the entry should be placed.
73        let bucket_idx = match self.bucket_index(&entry.key) {
74            Some(i) => i,
75            None => return AddEntryResult::Ignored,
76        };
77
78        let mut buckets = self.buckets.write();
79
80        // Check if the entry already exists in the bucket.
81        if buckets[bucket_idx].contains_key(&entry.key) {
82            return AddEntryResult::Exists;
83        }
84
85        // Check if the entry is restricted.
86        if subnet_restricted(&buckets, bucket_idx, &entry) {
87            return AddEntryResult::Restricted;
88        }
89
90        let bucket = &mut buckets[bucket_idx];
91
92        // If the bucket has free space, add the entry and return success.
93        if bucket.len() < BUCKET_SIZE {
94            bucket.add(&entry);
95            return AddEntryResult::Added;
96        }
97
98        // Replace it with an incompatible entry if one exists.
99        let incompatible_entry = bucket.iter().find(|e| e.is_incompatible()).cloned();
100        if let Some(e) = incompatible_entry {
101            bucket.remove(&e.entry.key);
102            bucket.add(&entry);
103            return AddEntryResult::Added;
104        }
105
106        // If the bucket is full, the entry is ignored.
107        AddEntryResult::Ignored
108    }
109
110    /// Check if the table contains the given key.
111    pub fn contains_key(&self, key: &Key) -> bool {
112        let buckets = self.buckets.read();
113        // Determine the bucket index for the given key.
114        let bucket_idx = match self.bucket_index(key) {
115            Some(bi) => bi,
116            None => return false,
117        };
118
119        let bucket = &buckets[bucket_idx];
120        bucket.contains_key(key)
121    }
122
123    /// Updates the status of an entry in the routing table identified
124    /// by the given key.
125    ///
126    /// If the key is not found, no action is taken.
127    pub fn update_entry(&self, key: &Key, entry_flag: EntryStatusFlag) {
128        let mut buckets = self.buckets.write();
129        // Determine the bucket index for the given key.
130        let bucket_idx = match self.bucket_index(key) {
131            Some(bi) => bi,
132            None => return,
133        };
134
135        let bucket = &mut buckets[bucket_idx];
136        bucket.update_entry(key, entry_flag);
137    }
138
139    /// Returns a list of bucket indexes that are closest to the given target key.
140    pub fn bucket_indexes(&self, target_key: &Key) -> Vec<usize> {
141        let mut indexes = vec![];
142
143        // Determine the primary bucket index for the target key.
144        let bucket_idx = self.bucket_index(target_key).unwrap_or(0);
145
146        indexes.push(bucket_idx);
147
148        // Add additional bucket indexes within a certain distance limit.
149        for i in 1..DISTANCE_LIMIT {
150            if bucket_idx >= i && bucket_idx - i >= 1 {
151                indexes.push(bucket_idx - i);
152            }
153
154            if bucket_idx + i < (TABLE_SIZE - 1) {
155                indexes.push(bucket_idx + i);
156            }
157        }
158
159        indexes
160    }
161
162    /// Returns a list of the closest entries to the given target key, limited by max_entries.
163    pub fn closest_entries(&self, target_key: &Key, max_entries: usize) -> Vec<Entry> {
164        self.closest_entries_filtered(target_key, max_entries, &Bloom::empty())
165    }
166
167    /// Same as `closest_entries`, but skips entries whose bloom does
168    /// not satisfy `mine`. The peer's bloom must cover `mine.mandatory`
169    /// and (when non-empty) intersect `mine.optional`.
170    pub fn closest_entries_filtered(
171        &self,
172        target_key: &Key,
173        max_entries: usize,
174        mine: &Bloom,
175    ) -> Vec<Entry> {
176        let buckets = self.buckets.read();
177        let mut entries: Vec<Entry> = vec![];
178
179        'outer: for idx in self.bucket_indexes(target_key) {
180            let bucket = &buckets[idx];
181            for bucket_entry in bucket.iter() {
182                if bucket_entry.is_unreachable() || bucket_entry.is_unstable() {
183                    continue;
184                }
185                if !matches_local(&bucket_entry.entry.protocols, mine) {
186                    continue;
187                }
188
189                entries.push(bucket_entry.entry.clone());
190                if entries.len() == max_entries {
191                    break 'outer;
192                }
193            }
194        }
195
196        entries.sort_by(|a, b| {
197            xor_distance(target_key, &a.key).cmp(&xor_distance(target_key, &b.key))
198        });
199
200        entries
201    }
202
203    /// Returns all entries whose advertised bloom may contain `item`.
204    /// Skips unreachable / unstable entries. False positives are
205    /// possible per bloom semantics.
206    pub fn entries_with_item(&self, item: &[u8]) -> Vec<Entry> {
207        let buckets = self.buckets.read();
208        let mut entries = Vec::new();
209        for bucket in buckets.iter() {
210            for be in bucket.iter() {
211                if be.is_unreachable() || be.is_unstable() {
212                    continue;
213                }
214                if be.entry.protocols.may_contain(item) {
215                    entries.push(be.entry.clone());
216                }
217            }
218        }
219        entries
220    }
221
222    /// Removes an entry with the given key from the routing table, if it exists.
223    pub fn remove_entry(&self, key: &Key) {
224        let mut buckets = self.buckets.write();
225        // Determine the bucket index for the given key.
226        let bucket_idx = match self.bucket_index(key) {
227            Some(bi) => bi,
228            None => return,
229        };
230
231        let bucket = &mut buckets[bucket_idx];
232        bucket.remove(key);
233    }
234
235    /// Returns true if any entry has a discovery address with the given IP.
236    /// Used by the refresh listen loop to drop pings from unknown sources.
237    pub fn has_discovery_ip(&self, ip: &IpAddr) -> bool {
238        let buckets = self.buckets.read();
239        for bucket in buckets.iter() {
240            for be in bucket.iter() {
241                for a in be.entry.discovery_addrs.iter() {
242                    if let Addr::Ip(entry_ip) = &a.addr {
243                        if entry_ip == ip {
244                            return true;
245                        }
246                    }
247                }
248            }
249        }
250        false
251    }
252
253    /// Returns up to `per_bucket` entries from each bucket, skipping
254    /// connected and incompatible ones. Used by the refresh service.
255    pub fn refresh_candidates(&self, per_bucket: usize) -> Vec<BucketEntry> {
256        let buckets = self.buckets.read();
257        let mut entries = Vec::new();
258        for bucket in buckets.iter() {
259            for entry in bucket
260                .iter()
261                .filter(|e| !e.is_connected() && !e.is_incompatible())
262                .take(per_bucket)
263            {
264                entries.push(entry.clone());
265            }
266        }
267        entries
268    }
269
270    /// Returns a random entry from the routing table.
271    pub fn random_entry(&self, entry_flag: EntryStatusFlag) -> Option<Entry> {
272        self.random_entry_filtered(entry_flag, &Bloom::empty())
273    }
274
275    /// Same as `random_entry`, but only returns entries whose bloom
276    /// satisfies `mine` (covers mandatory, intersects optional when set).
277    pub fn random_entry_filtered(
278        &self,
279        entry_flag: EntryStatusFlag,
280        mine: &Bloom,
281    ) -> Option<Entry> {
282        let buckets = self.buckets.read();
283        for bucket in buckets.choose_multiple(&mut rand::rng(), buckets.len()) {
284            for entry in bucket.random_iter(bucket.len()) {
285                if entry.status & entry_flag == 0 {
286                    continue;
287                }
288                if !matches_local(&entry.entry.protocols, mine) {
289                    continue;
290                }
291                return Some(entry.entry.clone());
292            }
293        }
294
295        None
296    }
297
298    // Returns the bucket index for a given key in the table.
299    fn bucket_index(&self, key: &Key) -> Option<usize> {
300        // Calculate the XOR distance between the self key and the provided key.
301        let distance = xor_distance(&self.key, key);
302
303        for (i, b) in distance.iter().enumerate() {
304            if *b != 0 {
305                let lz = i * 8 + b.leading_zeros() as usize;
306                let bits = KEY_SIZE * 8 - 1;
307                let idx = (bits - lz) / 8;
308                return Some(idx);
309            }
310        }
311        None
312    }
313}
314
315/// True if `peer`'s advertised bloom is acceptable to a local node
316/// whose bloom is `mine`. Peer must cover every mandatory bit and,
317/// when optional is non-empty, share at least one optional bit.
318/// An empty `mine` matches everything.
319fn matches_local(peer: &Bloom, mine: &Bloom) -> bool {
320    if mine.is_empty() {
321        return true;
322    }
323    if !peer.covers_mandatory(mine) {
324        return false;
325    }
326    if mine.optional == 0 {
327        return true;
328    }
329    peer.intersects_optional(mine)
330}
331
332/// Iterate the buckets and count entries in the same subnet as `entry`.
333/// Restricted if same-bucket matches >= MAX_MATCHED_SUBNET_IN_BUCKET or
334/// table-wide matches >= MAX_MATCHED_SUBNET_IN_TABLE.
335///
336/// Takes a borrow of buckets so the caller (add_entry) holds a single
337/// write lock across the check and the insert.
338fn subnet_restricted(buckets: &[Bucket], idx: usize, entry: &Entry) -> bool {
339    let mut bucket_count = 0;
340    let mut table_count = 0;
341
342    for (i, bucket) in buckets.iter().enumerate() {
343        for e in bucket.iter() {
344            let matched = match (e.entry.primary_addr(), entry.primary_addr()) {
345                (Some(a), Some(b)) => subnet_match(a, b),
346                _ => false,
347            };
348            if matched {
349                if i == idx {
350                    bucket_count += 1;
351                }
352                table_count += 1;
353            }
354
355            if bucket_count >= MAX_MATCHED_SUBNET_IN_BUCKET {
356                return true;
357            }
358        }
359
360        if table_count >= MAX_MATCHED_SUBNET_IN_TABLE {
361            return true;
362        }
363    }
364
365    false
366}
367
368/// Check if two addresses belong to the same subnet.
369fn subnet_match(addr: &Addr, other_addr: &Addr) -> bool {
370    // Multiple loopback peers are allowed (e.g. local testing).
371    if is_loopback_pair(addr, other_addr) {
372        return false;
373    }
374    match (addr, other_addr) {
375        // Compare the /24 prefix.
376        (Addr::Ip(IpAddr::V4(ip)), Addr::Ip(IpAddr::V4(other_ip))) => {
377            ip.octets()[0..3] == other_ip.octets()[0..3]
378        }
379        // Compare the /64 prefix (first 4 16-bit segments).
380        (Addr::Ip(IpAddr::V6(ip)), Addr::Ip(IpAddr::V6(other_ip))) => {
381            ip.segments()[0..4] == other_ip.segments()[0..4]
382        }
383        _ => false,
384    }
385}
386
387/// True if both addresses are IPv4 or IPv6 loopback.
388fn is_loopback_pair(a: &Addr, b: &Addr) -> bool {
389    match (a, b) {
390        (Addr::Ip(a), Addr::Ip(b)) => a.is_loopback() && b.is_loopback(),
391        _ => false,
392    }
393}
394
395#[cfg(test)]
396mod tests {
397    use super::bucket::ALL_ENTRY;
398    use super::*;
399
400    use karyon_net::Addr;
401
402    use crate::{
403        bloom::Bloom,
404        message::{PeerAddr, Protocol},
405    };
406
407    struct Setup {
408        local_key: Key,
409        keys: Vec<Key>,
410    }
411
412    fn new_entry(key: &Key, addr: &Addr, port: u16, discovery_port: u16) -> Entry {
413        Entry {
414            key: *key,
415            addrs: vec![PeerAddr {
416                addr: addr.clone(),
417                port,
418                protocol: Protocol::Tcp,
419                priority: 0,
420            }],
421            discovery_addrs: vec![
422                PeerAddr {
423                    addr: addr.clone(),
424                    port: discovery_port,
425                    protocol: Protocol::Tcp,
426                    priority: 0,
427                },
428                PeerAddr {
429                    addr: addr.clone(),
430                    port: discovery_port,
431                    protocol: Protocol::Udp,
432                    priority: 1,
433                },
434            ],
435            protocols: Bloom::empty(),
436        }
437    }
438
439    impl Setup {
440        fn new() -> Self {
441            let keys = vec![
442                [
443                    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
444                    0, 0, 0, 0, 0, 1,
445                ],
446                [
447                    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
448                    1, 1, 0, 1, 1, 2,
449                ],
450                [
451                    0, 0, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
452                    0, 0, 0, 0, 0, 3,
453                ],
454                [
455                    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 30, 1, 18, 0, 0, 0,
456                    0, 0, 0, 0, 0, 4,
457                ],
458                [
459                    223, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
460                    0, 0, 0, 0, 0, 5,
461                ],
462                [
463                    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 50, 1, 18, 0, 0, 0,
464                    0, 0, 0, 0, 0, 6,
465                ],
466                [
467                    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 50, 1, 18, 0, 0,
468                    0, 0, 0, 0, 0, 0, 7,
469                ],
470                [
471                    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 10, 50, 1, 18, 0, 0,
472                    0, 0, 0, 0, 0, 0, 8,
473                ],
474                [
475                    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10, 10, 10, 50, 1, 18, 0, 0,
476                    0, 0, 0, 0, 0, 0, 9,
477                ],
478            ];
479
480            Self {
481                local_key: [
482                    0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
483                    0, 0, 0, 0, 0, 0,
484                ],
485                keys,
486            }
487        }
488
489        fn entries(&self) -> Vec<Entry> {
490            let mut entries = vec![];
491            for (i, key) in self.keys.iter().enumerate() {
492                entries.push(new_entry(
493                    key,
494                    &Addr::Ip(format!("127.0.{i}.1").parse().unwrap()),
495                    3000,
496                    3010,
497                ));
498            }
499            entries
500        }
501
502        fn table(&self) -> RoutingTable {
503            let table = RoutingTable::new(self.local_key);
504
505            for entry in self.entries() {
506                let res = table.add_entry(entry);
507                assert!(matches!(res, AddEntryResult::Added));
508            }
509
510            table
511        }
512    }
513
514    #[test]
515    fn test_bucket_index() {
516        let setup = Setup::new();
517        let table = setup.table();
518
519        assert_eq!(table.bucket_index(&setup.local_key), None);
520        assert_eq!(table.bucket_index(&setup.keys[0]), Some(0));
521        assert_eq!(table.bucket_index(&setup.keys[1]), Some(5));
522        assert_eq!(table.bucket_index(&setup.keys[2]), Some(26));
523        assert_eq!(table.bucket_index(&setup.keys[3]), Some(11));
524        assert_eq!(table.bucket_index(&setup.keys[4]), Some(31));
525        assert_eq!(table.bucket_index(&setup.keys[5]), Some(11));
526        assert_eq!(table.bucket_index(&setup.keys[6]), Some(12));
527        assert_eq!(table.bucket_index(&setup.keys[7]), Some(13));
528        assert_eq!(table.bucket_index(&setup.keys[8]), Some(14));
529    }
530
531    #[test]
532    fn test_closest_entries() {
533        let setup = Setup::new();
534        let table = setup.table();
535        let entries = setup.entries();
536
537        assert_eq!(
538            table.closest_entries(&setup.keys[5], 8),
539            vec![
540                entries[5].clone(),
541                entries[3].clone(),
542                entries[1].clone(),
543                entries[6].clone(),
544                entries[7].clone(),
545                entries[8].clone(),
546                entries[2].clone(),
547            ]
548        );
549
550        assert_eq!(
551            table.closest_entries(&setup.keys[4], 2),
552            vec![entries[4].clone(), entries[2].clone()]
553        );
554    }
555
556    #[test]
557    fn test_random_entry() {
558        let setup = Setup::new();
559        let table = setup.table();
560        let entries = setup.entries();
561
562        let entry = table.random_entry(ALL_ENTRY);
563        assert!(entry.is_some());
564
565        let entry = table.random_entry(CONNECTED_ENTRY);
566        assert!(entry.is_none());
567
568        for entry in entries {
569            table.remove_entry(&entry.key);
570        }
571
572        let entry = table.random_entry(ALL_ENTRY);
573        assert!(entry.is_none());
574    }
575
576    #[test]
577    fn test_add_entries() {
578        let setup = Setup::new();
579        let table = setup.table();
580
581        let key = [
582            0, 0, 0, 0, 0, 0, 0, 1, 3, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
583            0, 0, 5,
584        ];
585
586        let key2 = [
587            0, 0, 0, 0, 0, 0, 0, 1, 2, 4, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
588            0, 0, 5,
589        ];
590
591        let entry1 = new_entry(&key, &Addr::Ip("240.120.3.1".parse().unwrap()), 3000, 3010);
592        assert!(matches!(
593            table.add_entry(entry1.clone()),
594            AddEntryResult::Added
595        ));
596
597        assert!(matches!(table.add_entry(entry1), AddEntryResult::Exists));
598
599        let entry2 = new_entry(&key2, &Addr::Ip("240.120.3.2".parse().unwrap()), 3000, 3010);
600        assert!(matches!(
601            table.add_entry(entry2),
602            AddEntryResult::Restricted
603        ));
604
605        let mut key: [u8; 32] = [0; 32];
606
607        for i in 0..BUCKET_SIZE {
608            key[i] += 1;
609            let entry = new_entry(
610                &key,
611                &Addr::Ip(format!("127.0.{i}.1").parse().unwrap()),
612                3000,
613                3010,
614            );
615            table.add_entry(entry);
616        }
617
618        key[BUCKET_SIZE] += 1;
619        let entry = new_entry(&key, &Addr::Ip("125.20.0.1".parse().unwrap()), 3000, 3010);
620        assert!(matches!(table.add_entry(entry), AddEntryResult::Ignored));
621    }
622
623    use std::net::{Ipv4Addr, Ipv6Addr};
624    #[test]
625    fn check_subnet_match() {
626        let addr_v4 = Addr::Ip(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)));
627        let other_addr_v4 = Addr::Ip(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)));
628
629        let addr_v6 = Addr::Ip(IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 1)));
630        let other_addr_v6 = Addr::Ip(IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb8, 0, 0, 0, 0, 0, 2)));
631        let diff_addr_v6 = Addr::Ip(IpAddr::V6(Ipv6Addr::new(0x2001, 0xdb7, 0, 0, 0, 0, 0, 2)));
632
633        assert!(subnet_match(&addr_v4, &other_addr_v4));
634        assert!(subnet_match(&addr_v6, &other_addr_v6));
635        assert!(!subnet_match(&addr_v6, &diff_addr_v6));
636    }
637}