1pub mod builder;
2mod message_dispatcher;
3mod multiplexed;
4mod subscriptions;
5
6#[cfg(feature = "http")]
7mod http;
8#[cfg(feature = "quic")]
9mod quic_stream;
10
11use std::{
12 marker::PhantomData,
13 sync::{atomic::AtomicBool, Arc},
14};
15
16use async_channel::{Receiver, Sender};
17use log::info;
18use serde::{de::DeserializeOwned, Serialize};
19use serde_json::json;
20
21use karyon_core::{async_runtime::Executor, async_util::TaskGroup, util::random_32};
22
23use karyon_net::Endpoint;
24
25#[cfg(feature = "tcp")]
26use karyon_net::tcp::TcpConfig;
27
28use crate::{
29 codec::{JsonCodec, JsonRpcCodec},
30 error::{Error, Result},
31 message::{self, SubscriptionID},
32};
33
34#[cfg(feature = "quic")]
35use karyon_net::quic::ClientQuicConfig;
36#[cfg(feature = "quic")]
37use parking_lot::Mutex;
38#[cfg(feature = "quic")]
39use std::collections::HashMap;
40
41#[cfg(feature = "ws")]
42use crate::codec::JsonRpcWsCodec;
43
44pub use builder::ClientBuilder;
45pub use subscriptions::Subscription;
46
47use message_dispatcher::MessageDispatcher;
48use subscriptions::Subscriptions;
49
50type RequestID = u32;
51
52#[cfg(feature = "ws")]
56pub trait WsCodec: JsonRpcWsCodec {}
57#[cfg(feature = "ws")]
58impl<T: JsonRpcWsCodec> WsCodec for T {}
59
60#[cfg(not(feature = "ws"))]
61pub trait WsCodec: Clone + Send + Sync + 'static {}
62#[cfg(not(feature = "ws"))]
63impl<T: Clone + Send + Sync + 'static> WsCodec for T {}
64
65pub(crate) struct ClientConfig {
66 pub endpoint: Endpoint,
67 #[cfg(feature = "tcp")]
68 pub tcp_config: TcpConfig,
69 #[cfg(feature = "tls")]
70 pub tls_config: Option<karyon_net::tls::ClientTlsConfig>,
71 #[cfg(feature = "quic")]
72 pub quic_config: Option<ClientQuicConfig>,
73 pub timeout: Option<u64>,
74 pub subscription_buffer_size: usize,
75}
76
77pub(crate) enum ClientBackend {
78 Multiplexed {
83 message_dispatcher: MessageDispatcher,
84 subscriptions: Arc<Subscriptions>,
85 send_chan: (Sender<serde_json::Value>, Receiver<serde_json::Value>),
88 },
89 #[cfg(feature = "quic")]
90 QuicStream {
91 quic_conn: Arc<karyon_net::quic::QuicConn>,
92 subscriptions: Arc<Subscriptions>,
93 unsub_chans: Mutex<HashMap<SubscriptionID, Sender<serde_json::Value>>>,
97 },
98 #[cfg(feature = "http")]
99 Http {
100 http_backend: Box<http::HttpClientBackend>,
103 #[cfg(feature = "http3")]
104 subscriptions: Arc<Subscriptions>,
105 },
106}
107
108pub struct Client<B = JsonCodec, W = JsonCodec> {
114 pub(crate) disconnect: AtomicBool,
115 pub(crate) task_group: Arc<TaskGroup>,
116 pub(crate) config: ClientConfig,
117 pub(crate) backend: ClientBackend,
118 _codecs: PhantomData<(B, W)>,
119}
120
121impl<B, W> Client<B, W>
122where
123 B: JsonRpcCodec,
124 W: WsCodec,
125{
126 pub async fn call<T: Serialize + DeserializeOwned, V: DeserializeOwned>(
128 &self,
129 method: &str,
130 params: T,
131 ) -> Result<V> {
132 let response = match &self.backend {
133 #[cfg(feature = "http")]
134 ClientBackend::Http { http_backend, .. } => {
135 self.http_call(http_backend, method, params).await?
136 }
137 #[cfg(feature = "quic")]
138 ClientBackend::QuicStream { quic_conn, .. } => {
139 quic_stream::call(self, quic_conn, method, params).await?
140 }
141 _ => multiplexed::send_request(self, method, params).await?,
142 };
143
144 if let Some(error) = response.error {
145 return Err(Error::CallError(error.code, error.message));
146 }
147
148 match response.result {
149 Some(result) => Ok(serde_json::from_value::<V>(result)?),
150 None => Err(Error::InvalidMsg("Invalid response result".to_string())),
151 }
152 }
153
154 pub async fn subscribe<T: Serialize + DeserializeOwned>(
156 self: &Arc<Self>,
157 method: &str,
158 params: T,
159 ) -> Result<Arc<Subscription>> {
160 match &self.backend {
161 #[cfg(all(feature = "http", not(feature = "http3")))]
162 ClientBackend::Http { .. } => Err(Error::UnsupportedProtocol(
163 "Subscriptions not supported over HTTP/1-2".to_string(),
164 )),
165 #[cfg(feature = "http3")]
166 ClientBackend::Http { http_backend, .. } => {
167 self.h3_subscribe(http_backend, method, params).await
168 }
169 #[cfg(feature = "quic")]
170 ClientBackend::QuicStream {
171 quic_conn,
172 subscriptions,
173 unsub_chans,
174 } => {
175 quic_stream::subscribe(self, quic_conn, subscriptions, unsub_chans, method, params)
176 .await
177 }
178 ClientBackend::Multiplexed { subscriptions, .. } => {
179 let response = multiplexed::send_request(self, method, params).await?;
180 let sub_id = match response.result {
181 Some(result) => serde_json::from_value::<SubscriptionID>(result)?,
182 None => return Err(Error::InvalidMsg("Invalid subscription id".to_string())),
183 };
184 let sub = subscriptions.subscribe(sub_id).await;
185 Ok(sub)
186 }
187 }
188 }
189
190 pub async fn unsubscribe(&self, method: &str, sub_id: SubscriptionID) -> Result<()> {
192 match &self.backend {
193 #[cfg(all(feature = "http", not(feature = "http3")))]
194 ClientBackend::Http { .. } => Err(Error::UnsupportedProtocol(
195 "Subscriptions not supported over HTTP/1-2".to_string(),
196 )),
197 #[cfg(feature = "http3")]
198 ClientBackend::Http { http_backend, .. } => {
199 self.h3_unsubscribe(http_backend, method, sub_id).await
200 }
201 #[cfg(feature = "quic")]
202 ClientBackend::QuicStream {
203 subscriptions,
204 unsub_chans,
205 ..
206 } => {
207 let ch = unsub_chans.lock().remove(&sub_id);
215 if let Some(ch) = ch {
216 let request = message::Request {
217 jsonrpc: message::JSONRPC_VERSION.to_string(),
218 id: json!(random_32()),
219 method: method.to_string(),
220 params: Some(json!(sub_id)),
221 };
222 let _ = ch.send(serde_json::to_value(request)?).await;
223 }
224 subscriptions.unsubscribe(&sub_id).await;
225 Ok(())
226 }
227 ClientBackend::Multiplexed { subscriptions, .. } => {
228 let _ = multiplexed::send_request(self, method, sub_id).await?;
229 subscriptions.unsubscribe(&sub_id).await;
230 Ok(())
231 }
232 }
233 }
234
235 pub async fn stop(&self) {
237 self.task_group.cancel().await;
238 }
239
240 pub(super) async fn init(
245 config: ClientConfig,
246 byte_codec: B,
247 ws_codec: W,
248 executor: Option<Executor>,
249 ) -> Result<Arc<Self>> {
250 info!("Connecting to RPC endpoint: {}", config.endpoint);
251
252 let task_group = Arc::new(match executor {
255 Some(ex) => TaskGroup::with_executor(ex),
256 None => TaskGroup::new(),
257 });
258
259 #[cfg(feature = "http")]
260 if let Endpoint::Http(..) = &config.endpoint {
261 let backend = http::build_backend(&config, task_group.clone()).await?;
262 return Ok(Self::with_backend(config, task_group, backend));
263 }
264
265 #[cfg(feature = "quic")]
266 if let Endpoint::Quic(..) = &config.endpoint {
267 let backend = quic_stream::build_backend(&config).await?;
268 return Ok(Self::with_backend(config, task_group, backend));
269 }
270
271 #[cfg(feature = "ws")]
272 if matches!(&config.endpoint, Endpoint::Ws(..) | Endpoint::Wss(..)) {
273 let (backend, conn) = multiplexed::build_ws_backend(&config, ws_codec).await?;
274 let client = Self::with_backend(config, task_group, backend);
275 let (reader, writer) = conn.split();
276 multiplexed::start_io_loop(&client, reader, writer);
277 return Ok(client);
278 }
279
280 let (backend, conn) = multiplexed::build_byte_backend(&config, byte_codec).await?;
281 let client = Self::with_backend(config, task_group, backend);
282 let (reader, writer) = conn.split();
283 multiplexed::start_io_loop(&client, reader, writer);
284 Ok(client)
285 }
286
287 fn with_backend(
288 config: ClientConfig,
289 task_group: Arc<TaskGroup>,
290 backend: ClientBackend,
291 ) -> Arc<Self> {
292 Arc::new(Client {
293 disconnect: AtomicBool::new(false),
294 task_group,
295 config,
296 backend,
297 _codecs: PhantomData,
298 })
299 }
300}