karyon_jsonrpc/client/builder.rs
1use std::sync::Arc;
2
3#[cfg(feature = "tls")]
4use karyon_net::async_rustls::rustls;
5
6use crate::{
7 codec::{ClonableJsonCodec, JsonCodec},
8 error::Result,
9 net::ToEndpoint,
10};
11#[cfg(feature = "tcp")]
12use crate::{error::Error, net::Endpoint, net::TcpConfig};
13
14use super::{Client, ClientConfig};
15
16const DEFAULT_TIMEOUT: u64 = 3000; // 3s
17
18const DEFAULT_MAX_SUBSCRIPTION_BUFFER_SIZE: usize = 20000;
19
20/// Builder for constructing an RPC [`Client`].
21pub struct ClientBuilder<C> {
22 inner: ClientConfig,
23 codec: C,
24}
25
26impl ClientBuilder<JsonCodec> {
27 /// Creates a new [`ClientBuilder`]
28 ///
29 /// This function initializes a `ClientBuilder` with the specified endpoint.
30 ///
31 /// # Example
32 ///
33 /// ```
34 /// use karyon_jsonrpc::client::ClientBuilder;
35 ///
36 /// async {
37 /// let builder = ClientBuilder::new("ws://127.0.0.1:3000")
38 /// .expect("Create a new client builder");
39 /// let client = builder.build().await
40 /// .expect("Build a new client");
41 /// };
42 /// ```
43 pub fn new(endpoint: impl ToEndpoint) -> Result<ClientBuilder<JsonCodec>> {
44 ClientBuilder::new_with_codec(endpoint, JsonCodec {})
45 }
46}
47
48impl<C> ClientBuilder<C>
49where
50 C: ClonableJsonCodec + 'static,
51{
52 /// Creates a new [`ClientBuilder`]
53 ///
54 /// This function initializes a `ClientBuilder` with the specified endpoint
55 /// and the given json codec.
56 /// # Example
57 ///
58 /// ```
59 ///
60 /// #[cfg(feature = "ws")]
61 /// use karyon_jsonrpc::codec::{WebSocketCodec, WebSocketDecoder, WebSocketEncoder};
62 /// #[cfg(feature = "ws")]
63 /// use async_tungstenite::tungstenite::Message;
64 /// use serde_json::Value;
65 ///
66 /// use karyon_jsonrpc::{
67 /// client::ClientBuilder, codec::{Codec, Decoder, Encoder, ByteBuffer},
68 /// error::{Error, Result}
69 /// };
70 ///
71 /// #[derive(Clone)]
72 /// pub struct CustomJsonCodec {}
73 ///
74 /// impl Codec for CustomJsonCodec {
75 /// type Message = serde_json::Value;
76 /// type Error = Error;
77 /// }
78 ///
79 /// #[cfg(feature = "ws")]
80 /// impl WebSocketCodec for CustomJsonCodec {
81 /// type Message = serde_json::Value;
82 /// type Error = Error;
83 /// }
84 ///
85 /// impl Encoder for CustomJsonCodec {
86 /// type EnMessage = serde_json::Value;
87 /// type EnError = Error;
88 /// fn encode(&self, src: &Self::EnMessage, dst: &mut ByteBuffer) -> Result<usize> {
89 /// let msg = match serde_json::to_string(src) {
90 /// Ok(m) => m,
91 /// Err(err) => return Err(Error::Encode(err.to_string())),
92 /// };
93 /// let buf = msg.as_bytes();
94 /// dst.extend_from_slice(buf);
95 /// Ok(buf.len())
96 /// }
97 /// }
98 ///
99 /// impl Decoder for CustomJsonCodec {
100 /// type DeMessage = serde_json::Value;
101 /// type DeError = Error;
102 /// fn decode(&self, src: &mut ByteBuffer) -> Result<Option<(usize, Self::DeMessage)>> {
103 /// let de = serde_json::Deserializer::from_slice(src.as_ref());
104 /// let mut iter = de.into_iter::<serde_json::Value>();
105 ///
106 /// let item = match iter.next() {
107 /// Some(Ok(item)) => item,
108 /// Some(Err(ref e)) if e.is_eof() => return Ok(None),
109 /// Some(Err(e)) => return Err(Error::Decode(e.to_string())),
110 /// None => return Ok(None),
111 /// };
112 ///
113 /// Ok(Some((iter.byte_offset(), item)))
114 /// }
115 /// }
116 ///
117 /// #[cfg(feature = "ws")]
118 /// impl WebSocketEncoder for CustomJsonCodec {
119 /// type EnMessage = serde_json::Value;
120 /// type EnError = Error;
121 ///
122 /// fn encode(&self, src: &Self::EnMessage) -> Result<Message> {
123 /// let msg = match serde_json::to_string(src) {
124 /// Ok(m) => m,
125 /// Err(err) => return Err(Error::Encode(err.to_string())),
126 /// };
127 /// Ok(Message::Text(msg))
128 /// }
129 /// }
130 ///
131 /// #[cfg(feature = "ws")]
132 /// impl WebSocketDecoder for CustomJsonCodec {
133 /// type DeMessage = serde_json::Value;
134 /// type DeError = Error;
135 /// fn decode(&self, src: &Message) -> Result<Option<Self::DeMessage>> {
136 /// match src {
137 /// Message::Text(s) => match serde_json::from_str(s) {
138 /// Ok(m) => Ok(Some(m)),
139 /// Err(err) => Err(Error::Decode(err.to_string())),
140 /// },
141 /// Message::Binary(s) => match serde_json::from_slice(s) {
142 /// Ok(m) => Ok(m),
143 /// Err(err) => Err(Error::Decode(err.to_string())),
144 /// },
145 /// Message::Close(_) => Err(Error::IO(std::io::ErrorKind::ConnectionAborted.into())),
146 /// m => Err(Error::Decode(format!(
147 /// "Receive unexpected message: {:?}",
148 /// m
149 /// ))),
150 /// }
151 /// }
152 /// }
153 ///
154 /// async {
155 /// let builder = ClientBuilder::new_with_codec("tcp://127.0.0.1:3000", CustomJsonCodec {})
156 /// .expect("Create a new client builder with a custom json codec");
157 /// let client = builder.build().await
158 /// .expect("Build a new client");
159 /// };
160 /// ```
161 pub fn new_with_codec(endpoint: impl ToEndpoint, codec: C) -> Result<ClientBuilder<C>> {
162 let endpoint = endpoint.to_endpoint()?;
163 Ok(ClientBuilder {
164 inner: ClientConfig {
165 endpoint,
166 timeout: Some(DEFAULT_TIMEOUT),
167 #[cfg(feature = "tcp")]
168 tcp_config: Default::default(),
169 #[cfg(feature = "tls")]
170 tls_config: None,
171 subscription_buffer_size: DEFAULT_MAX_SUBSCRIPTION_BUFFER_SIZE,
172 },
173 codec,
174 })
175 }
176
177 /// Set timeout for receiving messages, in milliseconds. Requests will
178 /// fail if it takes longer.
179 ///
180 /// # Example
181 ///
182 /// ```
183 /// use karyon_jsonrpc::client::ClientBuilder;
184 ///
185 /// async {
186 /// let client = ClientBuilder::new("ws://127.0.0.1:3000")
187 /// .expect("Create a new client builder")
188 /// .set_timeout(5000)
189 /// .build().await
190 /// .expect("Build a new client");
191 /// };
192 /// ```
193 pub fn set_timeout(mut self, timeout: u64) -> Self {
194 self.inner.timeout = Some(timeout);
195 self
196 }
197
198 /// Set max size for the subscription buffer.
199 ///
200 /// The client will stop when the subscriber cannot keep up.
201 /// When subscribing to a method, a new channel with the provided buffer
202 /// size is initialized. Once the buffer is full and the subscriber doesn't
203 /// process the messages in the buffer, the client will disconnect and
204 /// raise an error.
205 ///
206 /// # Example
207 ///
208 /// ```
209 /// use karyon_jsonrpc::client::ClientBuilder;
210 ///
211 /// async {
212 /// let client = ClientBuilder::new("ws://127.0.0.1:3000")
213 /// .expect("Create a new client builder")
214 /// .set_max_subscription_buffer_size(10000)
215 /// .build().await
216 /// .expect("Build a new client");
217 /// };
218 /// ```
219 pub fn set_max_subscription_buffer_size(mut self, size: usize) -> Self {
220 self.inner.subscription_buffer_size = size;
221 self
222 }
223
224 /// Configure TCP settings for the client.
225 ///
226 /// # Example
227 ///
228 /// ```
229 /// use karyon_jsonrpc::{client::ClientBuilder, net::TcpConfig};
230 ///
231 /// async {
232 /// let tcp_config = TcpConfig::default();
233 ///
234 /// let client = ClientBuilder::new("ws://127.0.0.1:3000")
235 /// .expect("Create a new client builder")
236 /// .tcp_config(tcp_config)
237 /// .expect("Add tcp config")
238 /// .build().await
239 /// .expect("Build a new client");
240 /// };
241 /// ```
242 ///
243 /// This function will return an error if the endpoint does not support TCP protocols.
244 #[cfg(feature = "tcp")]
245 pub fn tcp_config(mut self, config: TcpConfig) -> Result<Self> {
246 match self.inner.endpoint {
247 Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => {
248 self.inner.tcp_config = config;
249 Ok(self)
250 }
251 _ => Err(Error::UnsupportedProtocol(self.inner.endpoint.to_string())),
252 }
253 }
254
255 /// Configure TLS settings for the client.
256 ///
257 /// # Example
258 ///
259 /// ```ignore
260 /// use karyon_jsonrpc::client::ClientBuilder;
261 /// use futures_rustls::rustls;
262 ///
263 /// async {
264 /// let tls_config = rustls::ClientConfig::new(...);
265 ///
266 /// let client_builder = ClientBuilder::new("ws://127.0.0.1:3000")
267 /// .expect("Create a new client builder")
268 /// .tls_config(tls_config, "example.com")
269 /// .expect("Add tls config")
270 /// .build().await
271 /// .expect("Build a new client");
272 /// };
273 /// ```
274 ///
275 /// This function will return an error if the endpoint does not support TLS protocols.
276 #[cfg(feature = "tls")]
277 pub fn tls_config(mut self, config: rustls::ClientConfig, dns_name: &str) -> Result<Self> {
278 match self.inner.endpoint {
279 Endpoint::Tls(..) | Endpoint::Wss(..) => {
280 self.inner.tls_config = Some((config, dns_name.to_string()));
281 Ok(self)
282 }
283 _ => Err(Error::UnsupportedProtocol(format!(
284 "Invalid tls config for endpoint: {}",
285 self.inner.endpoint
286 ))),
287 }
288 }
289
290 /// Build RPC client from [`ClientBuilder`].
291 ///
292 /// This function creates a new RPC client using the configurations
293 /// specified in the `ClientBuilder`. It returns a `Arc<Client>` on success.
294 ///
295 /// # Example
296 ///
297 /// ```
298 /// use karyon_jsonrpc::{client::ClientBuilder, net::TcpConfig};
299 ///
300 /// async {
301 /// let tcp_config = TcpConfig::default();
302 /// let client = ClientBuilder::new("ws://127.0.0.1:3000")
303 /// .expect("Create a new client builder")
304 /// .tcp_config(tcp_config)
305 /// .expect("Add tcp config")
306 /// .set_timeout(5000)
307 /// .build().await
308 /// .expect("Build a new client");
309 /// };
310 ///
311 /// ```
312 pub async fn build(self) -> Result<Arc<Client<C>>> {
313 let client = Client::init(self.inner, self.codec).await?;
314 Ok(client)
315 }
316}