1use std::{collections::VecDeque, sync::Arc};
2
3use async_channel::Sender;
4
5use karyon_core::{async_runtime::lock::Mutex, async_util::CondVar};
6
7use karyon_net::{Endpoint, FramedConn, FramedReader, FramedWriter};
8
9#[cfg(feature = "quic")]
10use karyon_net::quic::QuicConn;
11
12use crate::{codec::PeerNetMsgCodec, peer::ConnDirection, Error, PeerID, Result};
13
14type PeerConnRef = FramedConn<PeerNetMsgCodec>;
16
17pub struct QueuedConn {
20 pub reader: FramedReader<PeerNetMsgCodec>,
21 pub writer: FramedWriter<PeerNetMsgCodec>,
22 pub direction: ConnDirection,
23 pub remote_endpoint: Endpoint,
24 pub disconnect_signal: Sender<crate::Result<()>>,
25 pub verified_peer_id: Option<PeerID>,
30 #[cfg(feature = "quic")]
31 pub quic_conn: Option<QuicConn>,
32}
33
34pub struct ConnQueue {
36 queue: Mutex<VecDeque<QueuedConn>>,
37 conn_available: CondVar,
38}
39
40impl ConnQueue {
41 pub fn new() -> Arc<Self> {
42 Arc::new(Self {
43 queue: Mutex::new(VecDeque::new()),
44 conn_available: CondVar::new(),
45 })
46 }
47
48 pub async fn handle(
51 &self,
52 conn: PeerConnRef,
53 direction: ConnDirection,
54 verified_peer_id: Option<PeerID>,
55 ) -> Result<()> {
56 let endpoint = conn
57 .peer_endpoint()
58 .ok_or(Error::InvalidEndpoint("missing peer endpoint".into()))?;
59 let (reader, writer) = conn.split();
60
61 let (disconnect_tx, disconnect_rx) = async_channel::bounded(1);
62 let queued = QueuedConn {
63 reader,
64 writer,
65 direction,
66 remote_endpoint: endpoint,
67 disconnect_signal: disconnect_tx,
68 verified_peer_id,
69 #[cfg(feature = "quic")]
70 quic_conn: None,
71 };
72
73 self.queue.lock().await.push_back(queued);
74 self.conn_available.signal();
75
76 if let Ok(result) = disconnect_rx.recv().await {
77 return result;
78 }
79
80 Ok(())
81 }
82
83 #[cfg(feature = "quic")]
86 pub async fn handle_quic(
87 &self,
88 conn: PeerConnRef,
89 quic_conn: QuicConn,
90 direction: ConnDirection,
91 verified_peer_id: Option<PeerID>,
92 ) -> Result<()> {
93 let endpoint = conn
94 .peer_endpoint()
95 .ok_or(Error::InvalidEndpoint("missing peer endpoint".into()))?;
96 let (reader, writer) = conn.split();
97
98 let (disconnect_tx, disconnect_rx) = async_channel::bounded(1);
99 let queued = QueuedConn {
100 reader,
101 writer,
102 direction,
103 remote_endpoint: endpoint,
104 disconnect_signal: disconnect_tx,
105 verified_peer_id,
106 quic_conn: Some(quic_conn),
107 };
108
109 self.queue.lock().await.push_back(queued);
110 self.conn_available.signal();
111
112 if let Ok(result) = disconnect_rx.recv().await {
113 return result;
114 }
115
116 Ok(())
117 }
118
119 pub async fn next(&self) -> QueuedConn {
121 let mut queue = self.queue.lock().await;
122 while queue.is_empty() {
123 queue = self.conn_available.wait(queue).await;
124 }
125 queue.pop_front().unwrap()
126 }
127}