karyon_jsonrpc/server/builder.rs
1use std::{collections::HashMap, sync::Arc};
2
3use karyon_core::async_runtime::Executor;
4
5#[cfg(feature = "tcp")]
6use crate::net::Endpoint;
7
8#[cfg(feature = "tls")]
9use karyon_net::async_rustls::rustls;
10
11#[cfg(feature = "tcp")]
12use crate::{error::Error, net::TcpConfig};
13
14use crate::{
15 codec::{ClonableJsonCodec, JsonCodec},
16 error::Result,
17 message::Notification,
18 net::ToEndpoint,
19 server::channel::NewNotification,
20 server::default_notification_encoder,
21 server::PubSubRPCService,
22 server::RPCService,
23};
24
25use super::{Server, ServerConfig};
26
27/// Builder for constructing an RPC [`Server`].
28pub struct ServerBuilder<C> {
29 config: ServerConfig,
30 codec: C,
31 executor: Option<Executor>,
32}
33
34impl<C> ServerBuilder<C>
35where
36 C: ClonableJsonCodec + 'static,
37{
38 /// Creates a new [`ServerBuilder`] With a custom codec.
39 ///
40 /// This function initializes a `ServerBuilder` with the specified endpoint
41 /// and custom codec.
42 ///
43 /// # Example
44 ///
45 /// ```
46 ///
47 /// #[cfg(feature = "ws")]
48 /// use async_tungstenite::tungstenite::Message;
49 /// use serde_json::Value;
50 /// #[cfg(feature = "ws")]
51 /// use karyon_jsonrpc::codec::{WebSocketCodec, WebSocketDecoder, WebSocketEncoder};
52 /// use karyon_jsonrpc::{server::ServerBuilder, codec::{Codec, Decoder, Encoder, ByteBuffer}, error::{Error, Result}};
53 ///
54 ///
55 /// #[derive(Clone)]
56 /// pub struct CustomJsonCodec {}
57 ///
58 /// impl Codec for CustomJsonCodec {
59 /// type Message = serde_json::Value;
60 /// type Error = Error;
61 /// }
62 ///
63 /// #[cfg(feature = "ws")]
64 /// impl WebSocketCodec for CustomJsonCodec {
65 /// type Message = serde_json::Value;
66 /// type Error = Error;
67 /// }
68 ///
69 /// impl Encoder for CustomJsonCodec {
70 /// type EnMessage = serde_json::Value;
71 /// type EnError = Error;
72 /// fn encode(&self, src: &Self::EnMessage, dst: &mut ByteBuffer) -> Result<usize> {
73 /// let msg = match serde_json::to_string(src) {
74 /// Ok(m) => m,
75 /// Err(err) => return Err(Error::Encode(err.to_string())),
76 /// };
77 /// let buf = msg.as_bytes();
78 /// dst.extend_from_slice(buf);
79 /// Ok(buf.len())
80 /// }
81 /// }
82 ///
83 /// impl Decoder for CustomJsonCodec {
84 /// type DeMessage = serde_json::Value;
85 /// type DeError = Error;
86 /// fn decode(&self, src: &mut ByteBuffer) -> Result<Option<(usize, Self::DeMessage)>> {
87 /// let de = serde_json::Deserializer::from_slice(src.as_ref());
88 /// let mut iter = de.into_iter::<serde_json::Value>();
89 ///
90 /// let item = match iter.next() {
91 /// Some(Ok(item)) => item,
92 /// Some(Err(ref e)) if e.is_eof() => return Ok(None),
93 /// Some(Err(e)) => return Err(Error::Decode(e.to_string())),
94 /// None => return Ok(None),
95 /// };
96 ///
97 /// Ok(Some((iter.byte_offset(), item)))
98 /// }
99 /// }
100 ///
101 ///
102 /// #[cfg(feature = "ws")]
103 /// impl WebSocketEncoder for CustomJsonCodec {
104 /// type EnMessage = serde_json::Value;
105 /// type EnError = Error;
106 ///
107 /// fn encode(&self, src: &Self::EnMessage) -> Result<Message> {
108 /// let msg = match serde_json::to_string(src) {
109 /// Ok(m) => m,
110 /// Err(err) => return Err(Error::Encode(err.to_string())),
111 /// };
112 /// Ok(Message::Text(msg))
113 /// }
114 /// }
115 ///
116 /// #[cfg(feature = "ws")]
117 /// impl WebSocketDecoder for CustomJsonCodec {
118 /// type DeMessage = serde_json::Value;
119 /// type DeError = Error;
120 /// fn decode(&self, src: &Message) -> Result<Option<Self::DeMessage>> {
121 /// match src {
122 /// Message::Text(s) => match serde_json::from_str(s) {
123 /// Ok(m) => Ok(Some(m)),
124 /// Err(err) => Err(Error::Decode(err.to_string())),
125 /// },
126 /// Message::Binary(s) => match serde_json::from_slice(s) {
127 /// Ok(m) => Ok(m),
128 /// Err(err) => Err(Error::Decode(err.to_string())),
129 /// },
130 /// Message::Close(_) => Err(Error::IO(std::io::ErrorKind::ConnectionAborted.into())),
131 /// m => Err(Error::Decode(format!(
132 /// "Receive unexpected message: {:?}",
133 /// m
134 /// ))),
135 /// }
136 /// }
137 /// }
138 ///
139 /// async {
140 /// let server = ServerBuilder::new_with_codec("tcp://127.0.0.1:3000", CustomJsonCodec{})
141 /// .expect("Create a new server builder")
142 /// .build().await
143 /// .expect("Build the server");
144 /// };
145 /// ```
146 ///
147 pub fn new_with_codec(endpoint: impl ToEndpoint, codec: C) -> Result<ServerBuilder<C>> {
148 let endpoint = endpoint.to_endpoint()?;
149 Ok(ServerBuilder {
150 config: ServerConfig {
151 endpoint,
152 services: HashMap::new(),
153 pubsub_services: HashMap::new(),
154 #[cfg(feature = "tcp")]
155 tcp_config: Default::default(),
156 #[cfg(feature = "tls")]
157 tls_config: None,
158 notification_encoder: default_notification_encoder,
159 },
160 codec,
161 executor: None,
162 })
163 }
164
165 /// Adds a new RPC service to the server.
166 ///
167 /// # Example
168 /// ```
169 /// use std::sync::Arc;
170 ///
171 /// use serde_json::Value;
172 ///
173 /// use karyon_jsonrpc::{rpc_impl, error::RPCError, server::ServerBuilder};
174 ///
175 /// struct Ping {}
176 ///
177 /// #[rpc_impl]
178 /// impl Ping {
179 /// async fn ping(&self, _params: Value) -> Result<Value, RPCError> {
180 /// Ok(serde_json::json!("Pong"))
181 /// }
182 /// }
183 ///
184 /// async {
185 /// let server = ServerBuilder::new("ws://127.0.0.1:3000")
186 /// .expect("Create a new server builder")
187 /// .service(Arc::new(Ping{}))
188 /// .build().await
189 /// .expect("Build the server");
190 /// };
191 ///
192 /// ```
193 pub fn service(mut self, service: Arc<dyn RPCService>) -> Self {
194 self.config.services.insert(service.name(), service);
195 self
196 }
197
198 /// Adds a new PubSub RPC service to the server.
199 ///
200 /// # Example
201 /// ```
202 /// use std::sync::Arc;
203 ///
204 /// use serde_json::Value;
205 ///
206 /// use karyon_jsonrpc::{
207 /// rpc_impl, rpc_pubsub_impl, error::RPCError, message::SubscriptionID,
208 /// server::{ServerBuilder, Channel},
209 /// };
210 ///
211 /// struct Ping {}
212 ///
213 /// #[rpc_impl]
214 /// impl Ping {
215 /// async fn ping(&self, _params: Value) -> Result<Value, RPCError> {
216 /// Ok(serde_json::json!("Pong"))
217 /// }
218 /// }
219 ///
220 /// #[rpc_pubsub_impl]
221 /// impl Ping {
222 /// async fn log_subscribe(
223 /// &self,
224 /// chan: Arc<Channel>,
225 /// method: String,
226 /// _params: Value,
227 /// ) -> Result<Value, RPCError> {
228 /// let sub = chan.new_subscription(&method, None).await.expect("Failed to subscribe");
229 /// let sub_id = sub.id.clone();
230 /// Ok(serde_json::json!(sub_id))
231 /// }
232 ///
233 /// async fn log_unsubscribe(
234 /// &self,
235 /// chan: Arc<Channel>,
236 /// _method: String,
237 /// params: Value,
238 /// ) -> Result<Value, RPCError> {
239 /// let sub_id: SubscriptionID = serde_json::from_value(params)?;
240 /// chan.remove_subscription(&sub_id).await;
241 /// Ok(serde_json::json!(true))
242 /// }
243 /// }
244 ///
245 /// async {
246 /// let ping_service = Arc::new(Ping{});
247 /// let server = ServerBuilder::new("ws://127.0.0.1:3000")
248 /// .expect("Create a new server builder")
249 /// .service(ping_service.clone())
250 /// .pubsub_service(ping_service)
251 /// .build().await
252 /// .expect("Build the server");
253 /// };
254 ///
255 /// ```
256 pub fn pubsub_service(mut self, service: Arc<dyn PubSubRPCService>) -> Self {
257 self.config.pubsub_services.insert(service.name(), service);
258 self
259 }
260
261 /// Configure TCP settings for the server.
262 ///
263 /// # Example
264 ///
265 /// ```
266 /// use karyon_jsonrpc::{server::ServerBuilder, net::TcpConfig};
267 ///
268 /// async {
269 /// let tcp_config = TcpConfig::default();
270 /// let server = ServerBuilder::new("ws://127.0.0.1:3000")
271 /// .expect("Create a new server builder")
272 /// .tcp_config(tcp_config)
273 /// .expect("Add tcp config")
274 /// .build().await
275 /// .expect("Build the server");
276 /// };
277 /// ```
278 ///
279 /// This function will return an error if the endpoint does not support TCP protocols.
280 #[cfg(feature = "tcp")]
281 pub fn tcp_config(mut self, config: TcpConfig) -> Result<ServerBuilder<C>> {
282 match self.config.endpoint {
283 Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => {
284 self.config.tcp_config = config;
285 Ok(self)
286 }
287 _ => Err(Error::UnsupportedProtocol(self.config.endpoint.to_string())),
288 }
289 }
290
291 /// Configure TLS settings for the server.
292 ///
293 /// # Example
294 ///
295 /// ```ignore
296 /// use karon_jsonrpc::ServerBuilder;
297 /// use futures_rustls::rustls;
298 ///
299 /// async {
300 /// let tls_config = rustls::ServerConfig::new(...);
301 /// let server = ServerBuilder::new("ws://127.0.0.1:3000")
302 /// .expect("Create a new server builder")
303 /// .tls_config(tls_config)
304 /// .expect("Add tls config")
305 /// .build().await
306 /// .expect("Build the server");
307 /// };
308 /// ```
309 ///
310 /// This function will return an error if the endpoint does not support TLS protocols.
311 #[cfg(feature = "tls")]
312 pub fn tls_config(mut self, config: rustls::ServerConfig) -> Result<ServerBuilder<C>> {
313 match self.config.endpoint {
314 Endpoint::Tls(..) | Endpoint::Wss(..) => {
315 self.config.tls_config = Some(config);
316 Ok(self)
317 }
318 _ => Err(Error::UnsupportedProtocol(format!(
319 "Invalid tls config for endpoint: {}",
320 self.config.endpoint
321 ))),
322 }
323 }
324
325 /// With an executor.
326 pub async fn with_executor(mut self, ex: Executor) -> Self {
327 self.executor = Some(ex);
328 self
329 }
330
331 /// With a custom notification encoder
332 pub fn with_notification_encoder(
333 mut self,
334 notification_encoder: fn(NewNotification) -> Notification,
335 ) -> Self {
336 self.config.notification_encoder = notification_encoder;
337 self
338 }
339
340 /// Builds the server with the configured options.
341 pub async fn build(self) -> Result<Arc<Server>> {
342 Server::init(self.config, self.executor, self.codec).await
343 }
344}
345
346impl ServerBuilder<JsonCodec> {
347 /// Creates a new [`ServerBuilder`]
348 ///
349 /// This function initializes a `ServerBuilder` with the specified endpoint.
350 ///
351 /// # Example
352 ///
353 /// ```
354 /// use karyon_jsonrpc::server::ServerBuilder;
355 ///
356 /// async {
357 /// let server = ServerBuilder::new("ws://127.0.0.1:3000")
358 /// .expect("Create a new server builder")
359 /// .build().await
360 /// .expect("Build the server");
361 /// };
362 /// ```
363 pub fn new(endpoint: impl ToEndpoint) -> Result<ServerBuilder<JsonCodec>> {
364 Self::new_with_codec(endpoint, JsonCodec {})
365 }
366}