karyon_jsonrpc/client/
builder.rs

1use std::sync::Arc;
2
3#[cfg(feature = "tls")]
4use karyon_net::async_rustls::rustls;
5
6use crate::{
7    codec::{ClonableJsonCodec, JsonCodec},
8    error::Result,
9    net::ToEndpoint,
10};
11#[cfg(feature = "tcp")]
12use crate::{error::Error, net::Endpoint, net::TcpConfig};
13
14use super::{Client, ClientConfig};
15
16const DEFAULT_TIMEOUT: u64 = 3000; // 3s
17
18const DEFAULT_MAX_SUBSCRIPTION_BUFFER_SIZE: usize = 20000;
19
20/// Builder for constructing an RPC [`Client`].
21pub struct ClientBuilder<C> {
22    inner: ClientConfig,
23    codec: C,
24}
25
26impl ClientBuilder<JsonCodec> {
27    /// Creates a new [`ClientBuilder`]
28    ///
29    /// This function initializes a `ClientBuilder` with the specified endpoint.
30    ///
31    /// # Example
32    ///
33    /// ```
34    /// use karyon_jsonrpc::client::ClientBuilder;
35    ///  
36    /// async {
37    ///     let builder = ClientBuilder::new("ws://127.0.0.1:3000")
38    ///         .expect("Create a new client builder");
39    ///     let client = builder.build().await
40    ///         .expect("Build a new client");
41    /// };
42    /// ```
43    pub fn new(endpoint: impl ToEndpoint) -> Result<ClientBuilder<JsonCodec>> {
44        ClientBuilder::new_with_codec(endpoint, JsonCodec {})
45    }
46}
47
48impl<C> ClientBuilder<C>
49where
50    C: ClonableJsonCodec + 'static,
51{
52    /// Creates a new [`ClientBuilder`]
53    ///
54    /// This function initializes a `ClientBuilder` with the specified endpoint
55    /// and the given json codec.
56    /// # Example
57    ///
58    /// ```
59    ///
60    /// #[cfg(feature = "ws")]
61    /// use karyon_jsonrpc::codec::{WebSocketCodec, WebSocketDecoder, WebSocketEncoder};
62    /// #[cfg(feature = "ws")]
63    /// use async_tungstenite::tungstenite::Message;
64    /// use serde_json::Value;
65    ///
66    /// use karyon_jsonrpc::{
67    ///     client::ClientBuilder, codec::{Codec, Decoder, Encoder, ByteBuffer},
68    ///     error::{Error, Result}
69    /// };
70    ///
71    /// #[derive(Clone)]
72    /// pub struct CustomJsonCodec {}
73    ///
74    /// impl Codec for CustomJsonCodec {
75    ///     type Message = serde_json::Value;
76    ///     type Error = Error;
77    /// }
78    ///
79    /// #[cfg(feature = "ws")]
80    /// impl WebSocketCodec for CustomJsonCodec {
81    ///     type Message = serde_json::Value;
82    ///     type Error = Error;
83    /// }
84    ///
85    /// impl Encoder for CustomJsonCodec {
86    ///     type EnMessage = serde_json::Value;
87    ///     type EnError = Error;
88    ///     fn encode(&self, src: &Self::EnMessage, dst: &mut ByteBuffer) -> Result<usize> {
89    ///         let msg = match serde_json::to_string(src) {
90    ///             Ok(m) => m,
91    ///             Err(err) => return Err(Error::Encode(err.to_string())),
92    ///         };
93    ///         let buf = msg.as_bytes();
94    ///         dst.extend_from_slice(buf);
95    ///         Ok(buf.len())
96    ///     }
97    /// }
98    ///
99    /// impl Decoder for CustomJsonCodec {
100    ///     type DeMessage = serde_json::Value;
101    ///     type DeError = Error;
102    ///     fn decode(&self, src: &mut ByteBuffer) -> Result<Option<(usize, Self::DeMessage)>> {
103    ///         let de = serde_json::Deserializer::from_slice(src.as_ref());
104    ///         let mut iter = de.into_iter::<serde_json::Value>();
105    ///
106    ///         let item = match iter.next() {
107    ///             Some(Ok(item)) => item,
108    ///             Some(Err(ref e)) if e.is_eof() => return Ok(None),
109    ///             Some(Err(e)) => return Err(Error::Decode(e.to_string())),
110    ///             None => return Ok(None),
111    ///         };
112    ///
113    ///         Ok(Some((iter.byte_offset(), item)))
114    ///     }
115    /// }
116    ///
117    /// #[cfg(feature = "ws")]
118    /// impl WebSocketEncoder for CustomJsonCodec {
119    ///     type EnMessage = serde_json::Value;
120    ///     type EnError = Error;
121    ///
122    ///     fn encode(&self, src: &Self::EnMessage) -> Result<Message> {
123    ///         let msg = match serde_json::to_string(src) {
124    ///             Ok(m) => m,
125    ///             Err(err) => return Err(Error::Encode(err.to_string())),
126    ///         };
127    ///         Ok(Message::Text(msg))
128    ///     }
129    /// }
130    ///
131    /// #[cfg(feature = "ws")]
132    /// impl WebSocketDecoder for CustomJsonCodec {
133    ///     type DeMessage = serde_json::Value;
134    ///     type DeError = Error;
135    ///     fn decode(&self, src: &Message) -> Result<Option<Self::DeMessage>> {
136    ///          match src {
137    ///              Message::Text(s) => match serde_json::from_str(s) {
138    ///                  Ok(m) => Ok(Some(m)),
139    ///                  Err(err) => Err(Error::Decode(err.to_string())),
140    ///              },
141    ///              Message::Binary(s) => match serde_json::from_slice(s) {
142    ///                  Ok(m) => Ok(m),
143    ///                  Err(err) => Err(Error::Decode(err.to_string())),
144    ///              },
145    ///              Message::Close(_) => Err(Error::IO(std::io::ErrorKind::ConnectionAborted.into())),
146    ///              m => Err(Error::Decode(format!(
147    ///                  "Receive unexpected message: {:?}",
148    ///                  m
149    ///              ))),
150    ///          }
151    ///      }
152    /// }
153    ///
154    /// async {
155    ///     let builder = ClientBuilder::new_with_codec("tcp://127.0.0.1:3000", CustomJsonCodec {})
156    ///         .expect("Create a new client builder with a custom json codec");
157    ///     let client = builder.build().await
158    ///         .expect("Build a new client");
159    /// };
160    /// ```
161    pub fn new_with_codec(endpoint: impl ToEndpoint, codec: C) -> Result<ClientBuilder<C>> {
162        let endpoint = endpoint.to_endpoint()?;
163        Ok(ClientBuilder {
164            inner: ClientConfig {
165                endpoint,
166                timeout: Some(DEFAULT_TIMEOUT),
167                #[cfg(feature = "tcp")]
168                tcp_config: Default::default(),
169                #[cfg(feature = "tls")]
170                tls_config: None,
171                subscription_buffer_size: DEFAULT_MAX_SUBSCRIPTION_BUFFER_SIZE,
172            },
173            codec,
174        })
175    }
176
177    /// Set timeout for receiving messages, in milliseconds. Requests will
178    /// fail if it takes longer.
179    ///
180    /// # Example
181    ///
182    /// ```
183    /// use karyon_jsonrpc::client::ClientBuilder;
184    ///  
185    /// async {
186    ///     let client = ClientBuilder::new("ws://127.0.0.1:3000")
187    ///         .expect("Create a new client builder")
188    ///         .set_timeout(5000)
189    ///         .build().await
190    ///         .expect("Build a new client");
191    /// };
192    /// ```
193    pub fn set_timeout(mut self, timeout: u64) -> Self {
194        self.inner.timeout = Some(timeout);
195        self
196    }
197
198    /// Set max size for the subscription buffer.
199    ///
200    /// The client will stop when the subscriber cannot keep up.
201    /// When subscribing to a method, a new channel with the provided buffer
202    /// size is initialized. Once the buffer is full and the subscriber doesn't
203    /// process the messages in the buffer, the client will disconnect and
204    /// raise an error.
205    ///
206    /// # Example
207    ///
208    /// ```
209    /// use karyon_jsonrpc::client::ClientBuilder;
210    ///  
211    /// async {
212    ///     let client = ClientBuilder::new("ws://127.0.0.1:3000")
213    ///         .expect("Create a new client builder")
214    ///         .set_max_subscription_buffer_size(10000)
215    ///         .build().await
216    ///         .expect("Build a new client");
217    /// };
218    /// ```
219    pub fn set_max_subscription_buffer_size(mut self, size: usize) -> Self {
220        self.inner.subscription_buffer_size = size;
221        self
222    }
223
224    /// Configure TCP settings for the client.
225    ///
226    /// # Example
227    ///
228    /// ```
229    /// use karyon_jsonrpc::{client::ClientBuilder, net::TcpConfig};
230    ///  
231    /// async {
232    ///     let tcp_config = TcpConfig::default();
233    ///
234    ///     let client = ClientBuilder::new("ws://127.0.0.1:3000")
235    ///         .expect("Create a new client builder")
236    ///         .tcp_config(tcp_config)
237    ///         .expect("Add tcp config")
238    ///         .build().await
239    ///         .expect("Build a new client");
240    /// };
241    /// ```
242    ///
243    /// This function will return an error if the endpoint does not support TCP protocols.
244    #[cfg(feature = "tcp")]
245    pub fn tcp_config(mut self, config: TcpConfig) -> Result<Self> {
246        match self.inner.endpoint {
247            Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => {
248                self.inner.tcp_config = config;
249                Ok(self)
250            }
251            _ => Err(Error::UnsupportedProtocol(self.inner.endpoint.to_string())),
252        }
253    }
254
255    /// Configure TLS settings for the client.
256    ///
257    /// # Example
258    ///
259    /// ```ignore
260    /// use karyon_jsonrpc::client::ClientBuilder;
261    /// use futures_rustls::rustls;
262    ///  
263    /// async {
264    ///     let tls_config = rustls::ClientConfig::new(...);
265    ///
266    ///     let client_builder = ClientBuilder::new("ws://127.0.0.1:3000")
267    ///         .expect("Create a new client builder")
268    ///         .tls_config(tls_config, "example.com")
269    ///         .expect("Add tls config")
270    ///         .build().await
271    ///         .expect("Build a new client");
272    /// };
273    /// ```
274    ///
275    /// This function will return an error if the endpoint does not support TLS protocols.
276    #[cfg(feature = "tls")]
277    pub fn tls_config(mut self, config: rustls::ClientConfig, dns_name: &str) -> Result<Self> {
278        match self.inner.endpoint {
279            Endpoint::Tls(..) | Endpoint::Wss(..) => {
280                self.inner.tls_config = Some((config, dns_name.to_string()));
281                Ok(self)
282            }
283            _ => Err(Error::UnsupportedProtocol(format!(
284                "Invalid tls config for endpoint: {}",
285                self.inner.endpoint
286            ))),
287        }
288    }
289
290    /// Build RPC client from [`ClientBuilder`].
291    ///
292    /// This function creates a new RPC client using the configurations
293    /// specified in the `ClientBuilder`. It returns a `Arc<Client>` on success.
294    ///
295    /// # Example
296    ///
297    /// ```
298    /// use karyon_jsonrpc::{client::ClientBuilder, net::TcpConfig};
299    ///  
300    /// async {
301    ///     let tcp_config = TcpConfig::default();
302    ///     let client = ClientBuilder::new("ws://127.0.0.1:3000")
303    ///         .expect("Create a new client builder")
304    ///         .tcp_config(tcp_config)
305    ///         .expect("Add tcp config")
306    ///         .set_timeout(5000)
307    ///         .build().await
308    ///         .expect("Build a new client");
309    /// };
310    ///
311    /// ```
312    pub async fn build(self) -> Result<Arc<Client<C>>> {
313        let client = Client::init(self.inner, self.codec).await?;
314        Ok(client)
315    }
316}