karyon_jsonrpc/server/
builder.rs

1use std::{collections::HashMap, sync::Arc};
2
3use karyon_core::async_runtime::Executor;
4
5#[cfg(feature = "tcp")]
6use crate::net::Endpoint;
7
8#[cfg(feature = "tls")]
9use karyon_net::async_rustls::rustls;
10
11#[cfg(feature = "tcp")]
12use crate::{error::Error, net::TcpConfig};
13
14use crate::{
15    codec::{ClonableJsonCodec, JsonCodec},
16    error::Result,
17    message::Notification,
18    net::ToEndpoint,
19    server::channel::NewNotification,
20    server::default_notification_encoder,
21    server::PubSubRPCService,
22    server::RPCService,
23};
24
25use super::{Server, ServerConfig};
26
27/// Builder for constructing an RPC [`Server`].
28pub struct ServerBuilder<C> {
29    config: ServerConfig,
30    codec: C,
31    executor: Option<Executor>,
32}
33
34impl<C> ServerBuilder<C>
35where
36    C: ClonableJsonCodec + 'static,
37{
38    /// Creates a new [`ServerBuilder`] With a custom codec.
39    ///
40    /// This function initializes a `ServerBuilder` with the specified endpoint
41    /// and custom codec.
42    ///
43    /// # Example
44    ///
45    /// ```
46    ///
47    /// #[cfg(feature = "ws")]
48    /// use async_tungstenite::tungstenite::Message;
49    /// use serde_json::Value;
50    /// #[cfg(feature = "ws")]
51    /// use karyon_jsonrpc::codec::{WebSocketCodec, WebSocketDecoder, WebSocketEncoder};
52    /// use karyon_jsonrpc::{server::ServerBuilder, codec::{Codec, Decoder, Encoder, ByteBuffer}, error::{Error, Result}};
53    ///
54    ///
55    /// #[derive(Clone)]
56    /// pub struct CustomJsonCodec {}
57    ///
58    /// impl Codec for CustomJsonCodec {
59    ///     type Message = serde_json::Value;
60    ///     type Error = Error;
61    /// }
62    ///
63    /// #[cfg(feature = "ws")]
64    /// impl WebSocketCodec for CustomJsonCodec {
65    ///     type Message = serde_json::Value;
66    ///     type Error = Error;
67    /// }
68    ///
69    /// impl Encoder for CustomJsonCodec {
70    ///     type EnMessage = serde_json::Value;
71    ///     type EnError = Error;
72    ///     fn encode(&self, src: &Self::EnMessage, dst: &mut ByteBuffer) -> Result<usize> {
73    ///         let msg = match serde_json::to_string(src) {
74    ///             Ok(m) => m,
75    ///             Err(err) => return Err(Error::Encode(err.to_string())),
76    ///         };
77    ///         let buf = msg.as_bytes();
78    ///         dst.extend_from_slice(buf);
79    ///         Ok(buf.len())
80    ///     }
81    /// }
82    ///
83    /// impl Decoder for CustomJsonCodec {
84    ///     type DeMessage = serde_json::Value;
85    ///     type DeError = Error;
86    ///     fn decode(&self, src: &mut ByteBuffer) -> Result<Option<(usize, Self::DeMessage)>> {
87    ///         let de = serde_json::Deserializer::from_slice(src.as_ref());
88    ///         let mut iter = de.into_iter::<serde_json::Value>();
89    ///
90    ///         let item = match iter.next() {
91    ///             Some(Ok(item)) => item,
92    ///             Some(Err(ref e)) if e.is_eof() => return Ok(None),
93    ///             Some(Err(e)) => return Err(Error::Decode(e.to_string())),
94    ///             None => return Ok(None),
95    ///         };
96    ///
97    ///         Ok(Some((iter.byte_offset(), item)))
98    ///     }
99    /// }
100    ///
101    ///
102    /// #[cfg(feature = "ws")]
103    /// impl WebSocketEncoder for CustomJsonCodec {
104    ///     type EnMessage = serde_json::Value;
105    ///     type EnError = Error;
106    ///
107    ///     fn encode(&self, src: &Self::EnMessage) -> Result<Message> {
108    ///         let msg = match serde_json::to_string(src) {
109    ///             Ok(m) => m,
110    ///             Err(err) => return Err(Error::Encode(err.to_string())),
111    ///         };
112    ///         Ok(Message::Text(msg))
113    ///     }
114    /// }
115    ///
116    /// #[cfg(feature = "ws")]
117    /// impl WebSocketDecoder for CustomJsonCodec {
118    ///     type DeMessage = serde_json::Value;
119    ///     type DeError = Error;
120    ///     fn decode(&self, src: &Message) -> Result<Option<Self::DeMessage>> {
121    ///          match src {
122    ///              Message::Text(s) => match serde_json::from_str(s) {
123    ///                  Ok(m) => Ok(Some(m)),
124    ///                  Err(err) => Err(Error::Decode(err.to_string())),
125    ///              },
126    ///              Message::Binary(s) => match serde_json::from_slice(s) {
127    ///                  Ok(m) => Ok(m),
128    ///                  Err(err) => Err(Error::Decode(err.to_string())),
129    ///              },
130    ///              Message::Close(_) => Err(Error::IO(std::io::ErrorKind::ConnectionAborted.into())),
131    ///              m => Err(Error::Decode(format!(
132    ///                  "Receive unexpected message: {:?}",
133    ///                  m
134    ///              ))),
135    ///          }
136    ///      }
137    /// }
138    ///
139    /// async {
140    ///     let server = ServerBuilder::new_with_codec("tcp://127.0.0.1:3000", CustomJsonCodec{})
141    ///         .expect("Create a new server builder")
142    ///         .build().await
143    ///         .expect("Build the server");
144    /// };
145    /// ```
146    ///
147    pub fn new_with_codec(endpoint: impl ToEndpoint, codec: C) -> Result<ServerBuilder<C>> {
148        let endpoint = endpoint.to_endpoint()?;
149        Ok(ServerBuilder {
150            config: ServerConfig {
151                endpoint,
152                services: HashMap::new(),
153                pubsub_services: HashMap::new(),
154                #[cfg(feature = "tcp")]
155                tcp_config: Default::default(),
156                #[cfg(feature = "tls")]
157                tls_config: None,
158                notification_encoder: default_notification_encoder,
159            },
160            codec,
161            executor: None,
162        })
163    }
164
165    /// Adds a new RPC service to the server.
166    ///
167    /// # Example
168    /// ```
169    /// use std::sync::Arc;
170    ///
171    /// use serde_json::Value;
172    ///
173    /// use karyon_jsonrpc::{rpc_impl, error::RPCError, server::ServerBuilder};
174    ///
175    /// struct Ping {}
176    ///
177    /// #[rpc_impl]
178    /// impl Ping {
179    ///     async fn ping(&self, _params: Value) -> Result<Value, RPCError> {
180    ///         Ok(serde_json::json!("Pong"))
181    ///     }
182    /// }
183    ///
184    /// async {
185    ///     let server = ServerBuilder::new("ws://127.0.0.1:3000")
186    ///         .expect("Create a new server builder")
187    ///         .service(Arc::new(Ping{}))
188    ///         .build().await
189    ///         .expect("Build the server");
190    /// };
191    ///
192    /// ```
193    pub fn service(mut self, service: Arc<dyn RPCService>) -> Self {
194        self.config.services.insert(service.name(), service);
195        self
196    }
197
198    /// Adds a new PubSub RPC service to the server.
199    ///
200    /// # Example
201    /// ```
202    /// use std::sync::Arc;
203    ///
204    /// use serde_json::Value;
205    ///
206    /// use karyon_jsonrpc::{
207    ///     rpc_impl, rpc_pubsub_impl, error::RPCError, message::SubscriptionID,
208    ///     server::{ServerBuilder, Channel},
209    /// };
210    ///
211    /// struct Ping {}
212    ///
213    /// #[rpc_impl]
214    /// impl Ping {
215    ///     async fn ping(&self, _params: Value) -> Result<Value, RPCError> {
216    ///         Ok(serde_json::json!("Pong"))
217    ///     }
218    /// }
219    ///
220    /// #[rpc_pubsub_impl]
221    /// impl Ping {
222    ///    async fn log_subscribe(
223    ///         &self,
224    ///         chan: Arc<Channel>,
225    ///         method: String,
226    ///         _params: Value,
227    ///     ) -> Result<Value, RPCError> {
228    ///         let sub = chan.new_subscription(&method, None).await.expect("Failed to subscribe");
229    ///         let sub_id = sub.id.clone();
230    ///         Ok(serde_json::json!(sub_id))
231    ///     }
232    ///
233    ///     async fn log_unsubscribe(
234    ///         &self,
235    ///         chan: Arc<Channel>,
236    ///         _method: String,
237    ///         params: Value,
238    ///     ) -> Result<Value, RPCError> {
239    ///         let sub_id: SubscriptionID = serde_json::from_value(params)?;
240    ///         chan.remove_subscription(&sub_id).await;
241    ///         Ok(serde_json::json!(true))
242    ///     }
243    /// }
244    ///
245    /// async {
246    ///     let ping_service = Arc::new(Ping{});
247    ///     let server = ServerBuilder::new("ws://127.0.0.1:3000")
248    ///         .expect("Create a new server builder")
249    ///         .service(ping_service.clone())
250    ///         .pubsub_service(ping_service)
251    ///         .build().await
252    ///         .expect("Build the server");
253    /// };
254    ///
255    /// ```
256    pub fn pubsub_service(mut self, service: Arc<dyn PubSubRPCService>) -> Self {
257        self.config.pubsub_services.insert(service.name(), service);
258        self
259    }
260
261    /// Configure TCP settings for the server.
262    ///
263    /// # Example
264    ///
265    /// ```
266    /// use karyon_jsonrpc::{server::ServerBuilder, net::TcpConfig};
267    ///
268    /// async {
269    ///     let tcp_config = TcpConfig::default();
270    ///     let server = ServerBuilder::new("ws://127.0.0.1:3000")
271    ///         .expect("Create a new server builder")
272    ///         .tcp_config(tcp_config)
273    ///         .expect("Add tcp config")
274    ///         .build().await
275    ///         .expect("Build the server");
276    /// };
277    /// ```
278    ///
279    /// This function will return an error if the endpoint does not support TCP protocols.
280    #[cfg(feature = "tcp")]
281    pub fn tcp_config(mut self, config: TcpConfig) -> Result<ServerBuilder<C>> {
282        match self.config.endpoint {
283            Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => {
284                self.config.tcp_config = config;
285                Ok(self)
286            }
287            _ => Err(Error::UnsupportedProtocol(self.config.endpoint.to_string())),
288        }
289    }
290
291    /// Configure TLS settings for the server.
292    ///
293    /// # Example
294    ///
295    /// ```ignore
296    /// use karon_jsonrpc::ServerBuilder;
297    /// use futures_rustls::rustls;
298    ///
299    /// async {
300    ///     let tls_config = rustls::ServerConfig::new(...);
301    ///     let server = ServerBuilder::new("ws://127.0.0.1:3000")
302    ///         .expect("Create a new server builder")
303    ///         .tls_config(tls_config)
304    ///         .expect("Add tls config")
305    ///         .build().await
306    ///         .expect("Build the server");
307    /// };
308    /// ```
309    ///
310    /// This function will return an error if the endpoint does not support TLS protocols.
311    #[cfg(feature = "tls")]
312    pub fn tls_config(mut self, config: rustls::ServerConfig) -> Result<ServerBuilder<C>> {
313        match self.config.endpoint {
314            Endpoint::Tls(..) | Endpoint::Wss(..) => {
315                self.config.tls_config = Some(config);
316                Ok(self)
317            }
318            _ => Err(Error::UnsupportedProtocol(format!(
319                "Invalid tls config for endpoint: {}",
320                self.config.endpoint
321            ))),
322        }
323    }
324
325    /// With an executor.
326    pub async fn with_executor(mut self, ex: Executor) -> Self {
327        self.executor = Some(ex);
328        self
329    }
330
331    /// With a custom notification encoder
332    pub fn with_notification_encoder(
333        mut self,
334        notification_encoder: fn(NewNotification) -> Notification,
335    ) -> Self {
336        self.config.notification_encoder = notification_encoder;
337        self
338    }
339
340    /// Builds the server with the configured options.
341    pub async fn build(self) -> Result<Arc<Server>> {
342        Server::init(self.config, self.executor, self.codec).await
343    }
344}
345
346impl ServerBuilder<JsonCodec> {
347    /// Creates a new [`ServerBuilder`]
348    ///
349    /// This function initializes a `ServerBuilder` with the specified endpoint.
350    ///
351    /// # Example
352    ///
353    /// ```
354    /// use karyon_jsonrpc::server::ServerBuilder;
355    ///
356    /// async {
357    ///     let server = ServerBuilder::new("ws://127.0.0.1:3000")
358    ///         .expect("Create a new server builder")
359    ///         .build().await
360    ///         .expect("Build the server");
361    /// };
362    /// ```
363    pub fn new(endpoint: impl ToEndpoint) -> Result<ServerBuilder<JsonCodec>> {
364        Self::new_with_codec(endpoint, JsonCodec {})
365    }
366}