Skip to main content

karyon_p2p/
conn_queue.rs

1use std::{collections::VecDeque, sync::Arc};
2
3use async_channel::Sender;
4
5use karyon_core::{async_runtime::lock::Mutex, async_util::CondVar};
6
7use karyon_net::{Endpoint, FramedConn, FramedReader, FramedWriter};
8
9#[cfg(feature = "quic")]
10use karyon_net::quic::QuicConn;
11
12use crate::{codec::PeerNetMsgCodec, peer::ConnDirection, Error, PeerID, Result};
13
14/// Framed peer-data-plane connection used by the queue.
15type PeerConnRef = FramedConn<PeerNetMsgCodec>;
16
17/// A connection queued for the PeerPool to handshake and turn into
18/// a `Peer`. Carries the raw split halves plus metadata.
19pub struct QueuedConn {
20    pub reader: FramedReader<PeerNetMsgCodec>,
21    pub writer: FramedWriter<PeerNetMsgCodec>,
22    pub direction: ConnDirection,
23    pub remote_endpoint: Endpoint,
24    pub disconnect_signal: Sender<crate::Result<()>>,
25    /// PeerID derived from the secure transport (TLS cert).
26    /// `None` for unauthenticated transports (TCP, Unix).
27    /// The application handshake asserts `vermsg.peer_id == this` when
28    /// `Some`, so a peer can't claim a PeerID it can't prove.
29    pub verified_peer_id: Option<PeerID>,
30    #[cfg(feature = "quic")]
31    pub quic_conn: Option<QuicConn>,
32}
33
34/// FIFO of pending connections awaiting handshake.
35pub struct ConnQueue {
36    queue: Mutex<VecDeque<QueuedConn>>,
37    conn_available: CondVar,
38}
39
40impl ConnQueue {
41    pub fn new() -> Arc<Self> {
42        Arc::new(Self {
43            queue: Mutex::new(VecDeque::new()),
44            conn_available: CondVar::new(),
45        })
46    }
47
48    /// Handle a stream connection: split into halves, push, and
49    /// block until disconnect.
50    pub async fn handle(
51        &self,
52        conn: PeerConnRef,
53        direction: ConnDirection,
54        verified_peer_id: Option<PeerID>,
55    ) -> Result<()> {
56        let endpoint = conn
57            .peer_endpoint()
58            .ok_or(Error::InvalidEndpoint("missing peer endpoint".into()))?;
59        let (reader, writer) = conn.split();
60
61        let (disconnect_tx, disconnect_rx) = async_channel::bounded(1);
62        let queued = QueuedConn {
63            reader,
64            writer,
65            direction,
66            remote_endpoint: endpoint,
67            disconnect_signal: disconnect_tx,
68            verified_peer_id,
69            #[cfg(feature = "quic")]
70            quic_conn: None,
71        };
72
73        self.queue.lock().await.push_back(queued);
74        self.conn_available.signal();
75
76        if let Ok(result) = disconnect_rx.recv().await {
77            return result;
78        }
79
80        Ok(())
81    }
82
83    /// Handle a QUIC connection. The `conn` is the handshake stream;
84    /// `quic_conn` is the full QUIC connection for protocol streams.
85    #[cfg(feature = "quic")]
86    pub async fn handle_quic(
87        &self,
88        conn: PeerConnRef,
89        quic_conn: QuicConn,
90        direction: ConnDirection,
91        verified_peer_id: Option<PeerID>,
92    ) -> Result<()> {
93        let endpoint = conn
94            .peer_endpoint()
95            .ok_or(Error::InvalidEndpoint("missing peer endpoint".into()))?;
96        let (reader, writer) = conn.split();
97
98        let (disconnect_tx, disconnect_rx) = async_channel::bounded(1);
99        let queued = QueuedConn {
100            reader,
101            writer,
102            direction,
103            remote_endpoint: endpoint,
104            disconnect_signal: disconnect_tx,
105            verified_peer_id,
106            quic_conn: Some(quic_conn),
107        };
108
109        self.queue.lock().await.push_back(queued);
110        self.conn_available.signal();
111
112        if let Ok(result) = disconnect_rx.recv().await {
113            return result;
114        }
115
116        Ok(())
117    }
118
119    /// Waits for the next connection in the queue.
120    pub async fn next(&self) -> QueuedConn {
121        let mut queue = self.queue.lock().await;
122        while queue.is_empty() {
123            queue = self.conn_available.wait(queue).await;
124        }
125        queue.pop_front().unwrap()
126    }
127}