Skip to main content

karyon_jsonrpc/client/
mod.rs

1pub mod builder;
2mod message_dispatcher;
3mod multiplexed;
4mod subscriptions;
5
6#[cfg(feature = "http")]
7mod http;
8#[cfg(feature = "quic")]
9mod quic_stream;
10
11use std::{
12    marker::PhantomData,
13    sync::{atomic::AtomicBool, Arc},
14};
15
16use async_channel::{Receiver, Sender};
17use log::info;
18use serde::{de::DeserializeOwned, Serialize};
19use serde_json::json;
20
21use karyon_core::{async_runtime::Executor, async_util::TaskGroup, util::random_32};
22
23use karyon_net::Endpoint;
24
25#[cfg(feature = "tcp")]
26use karyon_net::tcp::TcpConfig;
27
28use crate::{
29    codec::{JsonCodec, JsonRpcCodec},
30    error::{Error, Result},
31    message::{self, SubscriptionID},
32};
33
34#[cfg(feature = "quic")]
35use karyon_net::quic::ClientQuicConfig;
36#[cfg(feature = "quic")]
37use parking_lot::Mutex;
38#[cfg(feature = "quic")]
39use std::collections::HashMap;
40
41#[cfg(feature = "ws")]
42use crate::codec::JsonRpcWsCodec;
43
44pub use builder::ClientBuilder;
45pub use subscriptions::Subscription;
46
47use message_dispatcher::MessageDispatcher;
48use subscriptions::Subscriptions;
49
50type RequestID = u32;
51
52/// Bound on the WebSocket codec generic. With the `ws` feature it
53/// requires `JsonRpcWsCodec`; otherwise it accepts any clonable type
54/// (the codec is unused) so callers can pass `JsonCodec` unchanged.
55#[cfg(feature = "ws")]
56pub trait WsCodec: JsonRpcWsCodec {}
57#[cfg(feature = "ws")]
58impl<T: JsonRpcWsCodec> WsCodec for T {}
59
60#[cfg(not(feature = "ws"))]
61pub trait WsCodec: Clone + Send + Sync + 'static {}
62#[cfg(not(feature = "ws"))]
63impl<T: Clone + Send + Sync + 'static> WsCodec for T {}
64
65pub(crate) struct ClientConfig {
66    pub endpoint: Endpoint,
67    #[cfg(feature = "tcp")]
68    pub tcp_config: TcpConfig,
69    #[cfg(feature = "tls")]
70    pub tls_config: Option<karyon_net::tls::ClientTlsConfig>,
71    #[cfg(feature = "quic")]
72    pub quic_config: Option<ClientQuicConfig>,
73    pub timeout: Option<u64>,
74    pub subscription_buffer_size: usize,
75}
76
77pub(crate) enum ClientBackend {
78    /// One persistent message connection used by many concurrent calls
79    /// and subscriptions. Each request is tagged with a unique id; the
80    /// reader task multiplexes responses back to the right caller.
81    /// Used for TCP, TLS, WS, WSS, and Unix.
82    Multiplexed {
83        message_dispatcher: MessageDispatcher,
84        subscriptions: Arc<Subscriptions>,
85        // TODO what is the reason for this  ?
86        // AND why not using Async queue ?
87        send_chan: (Sender<serde_json::Value>, Receiver<serde_json::Value>),
88    },
89    #[cfg(feature = "quic")]
90    QuicStream {
91        quic_conn: Arc<karyon_net::quic::QuicConn>,
92        subscriptions: Arc<Subscriptions>,
93        /// Per-subscription unsubscribe signal: send the unsubscribe
94        /// JSON; the reader task forwards it and exits.
95        // TODO why not using async queue here ?
96        unsub_chans: Mutex<HashMap<SubscriptionID, Sender<serde_json::Value>>>,
97    },
98    #[cfg(feature = "http")]
99    Http {
100        // Boxed because the HTTP backend (with hyper Client) is much
101        // larger than the other ClientBackend variants.
102        http_backend: Box<http::HttpClientBackend>,
103        #[cfg(feature = "http3")]
104        subscriptions: Arc<Subscriptions>,
105    },
106}
107
108/// An RPC client that connects to a JSON-RPC 2.0 server.
109///
110/// `B` is the byte-stream codec used for TCP/TLS/Unix/QUIC/HTTP.
111/// `W` is the WebSocket codec used for `ws://` / `wss://`.
112/// Both default to `JsonCodec`.
113pub struct Client<B = JsonCodec, W = JsonCodec> {
114    pub(crate) disconnect: AtomicBool,
115    pub(crate) task_group: Arc<TaskGroup>,
116    pub(crate) config: ClientConfig,
117    pub(crate) backend: ClientBackend,
118    _codecs: PhantomData<(B, W)>,
119}
120
121impl<B, W> Client<B, W>
122where
123    B: JsonRpcCodec,
124    W: WsCodec,
125{
126    /// Call a method, wait for response.
127    pub async fn call<T: Serialize + DeserializeOwned, V: DeserializeOwned>(
128        &self,
129        method: &str,
130        params: T,
131    ) -> Result<V> {
132        let response = match &self.backend {
133            #[cfg(feature = "http")]
134            ClientBackend::Http { http_backend, .. } => {
135                self.http_call(http_backend, method, params).await?
136            }
137            #[cfg(feature = "quic")]
138            ClientBackend::QuicStream { quic_conn, .. } => {
139                quic_stream::call(self, quic_conn, method, params).await?
140            }
141            _ => multiplexed::send_request(self, method, params).await?,
142        };
143
144        if let Some(error) = response.error {
145            return Err(Error::CallError(error.code, error.message));
146        }
147
148        match response.result {
149            Some(result) => Ok(serde_json::from_value::<V>(result)?),
150            None => Err(Error::InvalidMsg("Invalid response result".to_string())),
151        }
152    }
153
154    /// Subscribe to a method.
155    pub async fn subscribe<T: Serialize + DeserializeOwned>(
156        self: &Arc<Self>,
157        method: &str,
158        params: T,
159    ) -> Result<Arc<Subscription>> {
160        match &self.backend {
161            #[cfg(all(feature = "http", not(feature = "http3")))]
162            ClientBackend::Http { .. } => Err(Error::UnsupportedProtocol(
163                "Subscriptions not supported over HTTP/1-2".to_string(),
164            )),
165            #[cfg(feature = "http3")]
166            ClientBackend::Http { http_backend, .. } => {
167                self.h3_subscribe(http_backend, method, params).await
168            }
169            #[cfg(feature = "quic")]
170            ClientBackend::QuicStream {
171                quic_conn,
172                subscriptions,
173                unsub_chans,
174            } => {
175                quic_stream::subscribe(self, quic_conn, subscriptions, unsub_chans, method, params)
176                    .await
177            }
178            ClientBackend::Multiplexed { subscriptions, .. } => {
179                let response = multiplexed::send_request(self, method, params).await?;
180                let sub_id = match response.result {
181                    Some(result) => serde_json::from_value::<SubscriptionID>(result)?,
182                    None => return Err(Error::InvalidMsg("Invalid subscription id".to_string())),
183                };
184                let sub = subscriptions.subscribe(sub_id).await;
185                Ok(sub)
186            }
187        }
188    }
189
190    /// Unsubscribe.
191    pub async fn unsubscribe(&self, method: &str, sub_id: SubscriptionID) -> Result<()> {
192        match &self.backend {
193            #[cfg(all(feature = "http", not(feature = "http3")))]
194            ClientBackend::Http { .. } => Err(Error::UnsupportedProtocol(
195                "Subscriptions not supported over HTTP/1-2".to_string(),
196            )),
197            #[cfg(feature = "http3")]
198            ClientBackend::Http { http_backend, .. } => {
199                self.h3_unsubscribe(http_backend, method, sub_id).await
200            }
201            #[cfg(feature = "quic")]
202            ClientBackend::QuicStream {
203                subscriptions,
204                unsub_chans,
205                ..
206            } => {
207                // The QUIC subscription stream's reader and writer halves
208                // are owned by the spawned task. We can't write directly
209                // from here, so we hand the unsubscribe message to that
210                // task via a channel; the task forwards it on the wire
211                // and exits.
212                // Bind the removed sender in its own `let` so the parking_lot
213                // MutexGuard temporary doesn't live across the `.await`.
214                let ch = unsub_chans.lock().remove(&sub_id);
215                if let Some(ch) = ch {
216                    let request = message::Request {
217                        jsonrpc: message::JSONRPC_VERSION.to_string(),
218                        id: json!(random_32()),
219                        method: method.to_string(),
220                        params: Some(json!(sub_id)),
221                    };
222                    let _ = ch.send(serde_json::to_value(request)?).await;
223                }
224                subscriptions.unsubscribe(&sub_id).await;
225                Ok(())
226            }
227            ClientBackend::Multiplexed { subscriptions, .. } => {
228                let _ = multiplexed::send_request(self, method, sub_id).await?;
229                subscriptions.unsubscribe(&sub_id).await;
230                Ok(())
231            }
232        }
233    }
234
235    /// Disconnect the client.
236    pub async fn stop(&self) {
237        self.task_group.cancel().await;
238    }
239
240    /// Pick the right backend based on endpoint kind, then build the
241    /// `Client`. Each backend module returns just the `ClientBackend`;
242    /// assembly happens here. Multiplexed mode also needs an extra
243    /// step to start its background reader/writer loop.
244    pub(super) async fn init(
245        config: ClientConfig,
246        byte_codec: B,
247        ws_codec: W,
248        executor: Option<Executor>,
249    ) -> Result<Arc<Self>> {
250        info!("Connecting to RPC endpoint: {}", config.endpoint);
251
252        // Build task_group early so backends (HTTP driver tasks) can
253        // spawn into the same group that Client::stop cancels.
254        let task_group = Arc::new(match executor {
255            Some(ex) => TaskGroup::with_executor(ex),
256            None => TaskGroup::new(),
257        });
258
259        #[cfg(feature = "http")]
260        if let Endpoint::Http(..) = &config.endpoint {
261            let backend = http::build_backend(&config, task_group.clone()).await?;
262            return Ok(Self::with_backend(config, task_group, backend));
263        }
264
265        #[cfg(feature = "quic")]
266        if let Endpoint::Quic(..) = &config.endpoint {
267            let backend = quic_stream::build_backend(&config).await?;
268            return Ok(Self::with_backend(config, task_group, backend));
269        }
270
271        #[cfg(feature = "ws")]
272        if matches!(&config.endpoint, Endpoint::Ws(..) | Endpoint::Wss(..)) {
273            let (backend, conn) = multiplexed::build_ws_backend(&config, ws_codec).await?;
274            let client = Self::with_backend(config, task_group, backend);
275            let (reader, writer) = conn.split();
276            multiplexed::start_io_loop(&client, reader, writer);
277            return Ok(client);
278        }
279
280        let (backend, conn) = multiplexed::build_byte_backend(&config, byte_codec).await?;
281        let client = Self::with_backend(config, task_group, backend);
282        let (reader, writer) = conn.split();
283        multiplexed::start_io_loop(&client, reader, writer);
284        Ok(client)
285    }
286
287    fn with_backend(
288        config: ClientConfig,
289        task_group: Arc<TaskGroup>,
290        backend: ClientBackend,
291    ) -> Arc<Self> {
292        Arc::new(Client {
293            disconnect: AtomicBool::new(false),
294            task_group,
295            config,
296            backend,
297            _codecs: PhantomData,
298        })
299    }
300}