1use std::{
4 collections::HashMap,
5 sync::{atomic::Ordering, Arc},
6};
7
8use async_channel::Sender;
9use log::{debug, info};
10use parking_lot::Mutex;
11use serde::{de::DeserializeOwned, Serialize};
12use serde_json::json;
13
14use karyon_core::{
15 async_util::{select, Either, TaskResult},
16 util::random_32,
17};
18
19use karyon_net::{
20 framed,
21 quic::{QuicConn, QuicEndpoint},
22 FramedReader, FramedWriter, StreamMux,
23};
24
25use crate::{
26 client::{
27 subscriptions::{Subscription, Subscriptions},
28 Client, ClientBackend, ClientConfig, RequestID, WsCodec,
29 },
30 codec::{JsonCodec, JsonRpcCodec},
31 error::{Error, Result},
32 message::{self, SubscriptionID},
33};
34
35pub(super) async fn build_backend(config: &ClientConfig) -> Result<ClientBackend> {
37 let quic_config = config
38 .quic_config
39 .clone()
40 .ok_or(Error::QUICConfigRequired)?;
41 let quic_conn = QuicEndpoint::dial(&config.endpoint, quic_config).await?;
42
43 info!(
44 "Successfully connected to the RPC server: {:?}",
45 quic_conn.peer_endpoint().ok()
46 );
47
48 Ok(ClientBackend::QuicStream {
49 quic_conn: Arc::new(quic_conn),
50 subscriptions: Subscriptions::new(config.subscription_buffer_size),
51 unsub_chans: Mutex::new(HashMap::new()),
52 })
53}
54
55pub(super) async fn call<B, W, T>(
56 client: &Client<B, W>,
57 quic_conn: &QuicConn,
58 method: &str,
59 params: T,
60) -> Result<message::Response>
61where
62 B: JsonRpcCodec,
63 W: WsCodec,
64 T: Serialize + DeserializeOwned,
65{
66 if client.disconnect.load(Ordering::Relaxed) {
67 return Err(Error::ClientDisconnected);
68 }
69
70 let stream = quic_conn.open_stream().await?;
71 let mut conn = framed(stream, JsonCodec::default());
72
73 let id: RequestID = random_32();
74 let request = message::Request {
75 jsonrpc: message::JSONRPC_VERSION.to_string(),
76 id: json!(id),
77 method: method.to_string(),
78 params: Some(json!(params)),
79 };
80
81 conn.send_msg(serde_json::to_value(request)?).await?;
82 let msg = conn.recv_msg().await?;
83 let response: message::Response = serde_json::from_value(msg)?;
84 Ok(response)
85}
86
87pub(super) async fn subscribe<B, W, T>(
88 client: &Arc<Client<B, W>>,
89 quic_conn: &QuicConn,
90 subscriptions: &Subscriptions,
91 unsub_chans: &Mutex<HashMap<SubscriptionID, Sender<serde_json::Value>>>,
92 method: &str,
93 params: T,
94) -> Result<Arc<Subscription>>
95where
96 B: JsonRpcCodec,
97 W: WsCodec,
98 T: Serialize + DeserializeOwned,
99{
100 if client.disconnect.load(Ordering::Relaxed) {
101 return Err(Error::ClientDisconnected);
102 }
103
104 let stream = quic_conn.open_stream().await?;
105 let mut conn = framed(stream, JsonCodec::default());
106
107 let id: RequestID = random_32();
108 let request = message::Request {
109 jsonrpc: message::JSONRPC_VERSION.to_string(),
110 id: json!(id),
111 method: method.to_string(),
112 params: Some(json!(params)),
113 };
114
115 conn.send_msg(serde_json::to_value(request)?).await?;
116
117 let msg = conn.recv_msg().await?;
118 let response: message::Response = serde_json::from_value(msg)?;
119
120 if let Some(error) = response.error {
121 return Err(Error::SubscribeError(error.code, error.message));
122 }
123
124 let sub_id = match response.result {
125 Some(result) => serde_json::from_value::<SubscriptionID>(result)?,
126 None => return Err(Error::InvalidMsg("Invalid subscription id".to_string())),
127 };
128
129 let (unsub_tx, unsub_rx) = async_channel::bounded::<serde_json::Value>(1);
130 unsub_chans.lock().insert(sub_id, unsub_tx);
131
132 let sub = subscriptions.subscribe(sub_id).await;
133
134 let (reader, writer) = conn.split();
135
136 client.task_group.spawn(
137 subscription_stream_task(reader, writer, unsub_rx, sub.clone()),
138 |res: TaskResult<Result<()>>| async move {
139 debug!("QUIC subscription stream task ended: {res}");
140 },
141 );
142
143 Ok(sub)
144}
145
146async fn subscription_stream_task(
147 mut reader: FramedReader<JsonCodec>,
148 mut writer: FramedWriter<JsonCodec>,
149 unsub_rx: async_channel::Receiver<serde_json::Value>,
150 sub: Arc<Subscription>,
151) -> Result<()> {
152 loop {
153 match select(reader.recv_msg(), unsub_rx.recv()).await {
154 Either::Left(msg) => {
155 let msg = match msg {
156 Ok(m) => m,
157 Err(_) => break,
158 };
159 let nt: message::Notification = match serde_json::from_value(msg) {
160 Ok(v) => v,
161 Err(_) => continue,
162 };
163 debug!("<-- {nt}");
164 let nt_res: message::NotificationResult = match nt.params {
165 Some(ref p) => match serde_json::from_value(p.clone()) {
166 Ok(r) => r,
167 Err(_) => continue,
168 },
169 None => continue,
170 };
171 let val = nt_res.result.unwrap_or(serde_json::json!(""));
172 if sub.notify(val).await.is_err() {
173 break;
174 }
175 }
176 Either::Right(unsub_msg) => {
177 if let Ok(msg) = unsub_msg {
178 let _ = writer.send_msg(msg).await;
179 }
180 break;
181 }
182 }
183 }
184 Ok(())
185}