Skip to main content

karyon_jsonrpc/client/
quic_stream.rs

1//! QUIC client mode: one stream per call, one stream per subscription.
2
3use std::{
4    collections::HashMap,
5    sync::{atomic::Ordering, Arc},
6};
7
8use async_channel::Sender;
9use log::{debug, info};
10use parking_lot::Mutex;
11use serde::{de::DeserializeOwned, Serialize};
12use serde_json::json;
13
14use karyon_core::{
15    async_util::{select, Either, TaskResult},
16    util::random_32,
17};
18
19use karyon_net::{
20    framed,
21    quic::{QuicConn, QuicEndpoint},
22    FramedReader, FramedWriter, StreamMux,
23};
24
25use crate::{
26    client::{
27        subscriptions::{Subscription, Subscriptions},
28        Client, ClientBackend, ClientConfig, RequestID, WsCodec,
29    },
30    codec::{JsonCodec, JsonRpcCodec},
31    error::{Error, Result},
32    message::{self, SubscriptionID},
33};
34
35/// Build the QUIC backend by dialing the server.
36pub(super) async fn build_backend(config: &ClientConfig) -> Result<ClientBackend> {
37    let quic_config = config
38        .quic_config
39        .clone()
40        .ok_or(Error::QUICConfigRequired)?;
41    let quic_conn = QuicEndpoint::dial(&config.endpoint, quic_config).await?;
42
43    info!(
44        "Successfully connected to the RPC server: {:?}",
45        quic_conn.peer_endpoint().ok()
46    );
47
48    Ok(ClientBackend::QuicStream {
49        quic_conn: Arc::new(quic_conn),
50        subscriptions: Subscriptions::new(config.subscription_buffer_size),
51        unsub_chans: Mutex::new(HashMap::new()),
52    })
53}
54
55pub(super) async fn call<B, W, T>(
56    client: &Client<B, W>,
57    quic_conn: &QuicConn,
58    method: &str,
59    params: T,
60) -> Result<message::Response>
61where
62    B: JsonRpcCodec,
63    W: WsCodec,
64    T: Serialize + DeserializeOwned,
65{
66    if client.disconnect.load(Ordering::Relaxed) {
67        return Err(Error::ClientDisconnected);
68    }
69
70    let stream = quic_conn.open_stream().await?;
71    let mut conn = framed(stream, JsonCodec::default());
72
73    let id: RequestID = random_32();
74    let request = message::Request {
75        jsonrpc: message::JSONRPC_VERSION.to_string(),
76        id: json!(id),
77        method: method.to_string(),
78        params: Some(json!(params)),
79    };
80
81    conn.send_msg(serde_json::to_value(request)?).await?;
82    let msg = conn.recv_msg().await?;
83    let response: message::Response = serde_json::from_value(msg)?;
84    Ok(response)
85}
86
87pub(super) async fn subscribe<B, W, T>(
88    client: &Arc<Client<B, W>>,
89    quic_conn: &QuicConn,
90    subscriptions: &Subscriptions,
91    unsub_chans: &Mutex<HashMap<SubscriptionID, Sender<serde_json::Value>>>,
92    method: &str,
93    params: T,
94) -> Result<Arc<Subscription>>
95where
96    B: JsonRpcCodec,
97    W: WsCodec,
98    T: Serialize + DeserializeOwned,
99{
100    if client.disconnect.load(Ordering::Relaxed) {
101        return Err(Error::ClientDisconnected);
102    }
103
104    let stream = quic_conn.open_stream().await?;
105    let mut conn = framed(stream, JsonCodec::default());
106
107    let id: RequestID = random_32();
108    let request = message::Request {
109        jsonrpc: message::JSONRPC_VERSION.to_string(),
110        id: json!(id),
111        method: method.to_string(),
112        params: Some(json!(params)),
113    };
114
115    conn.send_msg(serde_json::to_value(request)?).await?;
116
117    let msg = conn.recv_msg().await?;
118    let response: message::Response = serde_json::from_value(msg)?;
119
120    if let Some(error) = response.error {
121        return Err(Error::SubscribeError(error.code, error.message));
122    }
123
124    let sub_id = match response.result {
125        Some(result) => serde_json::from_value::<SubscriptionID>(result)?,
126        None => return Err(Error::InvalidMsg("Invalid subscription id".to_string())),
127    };
128
129    let (unsub_tx, unsub_rx) = async_channel::bounded::<serde_json::Value>(1);
130    unsub_chans.lock().insert(sub_id, unsub_tx);
131
132    let sub = subscriptions.subscribe(sub_id).await;
133
134    let (reader, writer) = conn.split();
135
136    client.task_group.spawn(
137        subscription_stream_task(reader, writer, unsub_rx, sub.clone()),
138        |res: TaskResult<Result<()>>| async move {
139            debug!("QUIC subscription stream task ended: {res}");
140        },
141    );
142
143    Ok(sub)
144}
145
146async fn subscription_stream_task(
147    mut reader: FramedReader<JsonCodec>,
148    mut writer: FramedWriter<JsonCodec>,
149    unsub_rx: async_channel::Receiver<serde_json::Value>,
150    sub: Arc<Subscription>,
151) -> Result<()> {
152    loop {
153        match select(reader.recv_msg(), unsub_rx.recv()).await {
154            Either::Left(msg) => {
155                let msg = match msg {
156                    Ok(m) => m,
157                    Err(_) => break,
158                };
159                let nt: message::Notification = match serde_json::from_value(msg) {
160                    Ok(v) => v,
161                    Err(_) => continue,
162                };
163                debug!("<-- {nt}");
164                let nt_res: message::NotificationResult = match nt.params {
165                    Some(ref p) => match serde_json::from_value(p.clone()) {
166                        Ok(r) => r,
167                        Err(_) => continue,
168                    },
169                    None => continue,
170                };
171                let val = nt_res.result.unwrap_or(serde_json::json!(""));
172                if sub.notify(val).await.is_err() {
173                    break;
174                }
175            }
176            Either::Right(unsub_msg) => {
177                if let Ok(msg) = unsub_msg {
178                    let _ = writer.send_msg(msg).await;
179                }
180                break;
181            }
182        }
183    }
184    Ok(())
185}