Skip to main content

karyon_jsonrpc/server/
builder.rs

1use std::{collections::HashMap, sync::Arc};
2
3use karyon_core::async_runtime::Executor;
4
5use karyon_net::ToEndpoint;
6
7#[cfg(any(feature = "tcp", feature = "tls", feature = "quic"))]
8use karyon_net::Endpoint;
9
10#[cfg(feature = "tcp")]
11use karyon_net::tcp::TcpConfig;
12
13#[cfg(feature = "quic")]
14use karyon_net::quic::ServerQuicConfig;
15
16use crate::{
17    codec::{JsonCodec, JsonRpcCodec},
18    error::Result,
19    message::{Notification, NotificationResult, JSONRPC_VERSION},
20    server::channel::NewNotification,
21    server::PubSubRPCService,
22    server::RPCService,
23    server::WsCodec,
24};
25
26#[cfg(any(feature = "tcp", feature = "tls", feature = "quic"))]
27use crate::error::Error;
28
29use super::{Server, ServerConfig};
30
31/// Builder for constructing an RPC [`Server`].
32///
33/// # Example
34///
35/// ```no_run
36/// use std::sync::Arc;
37/// use serde_json::Value;
38/// use karyon_jsonrpc::{error::RPCError, rpc_impl, server::ServerBuilder};
39///
40/// struct Ping {}
41///
42/// #[rpc_impl]
43/// impl Ping {
44///     async fn ping(&self, _params: Value) -> Result<Value, RPCError> {
45///         Ok(serde_json::json!("pong"))
46///     }
47/// }
48///
49/// async {
50///     let server = ServerBuilder::new("tcp://127.0.0.1:60000")
51///         .expect("create builder")
52///         .service(Arc::new(Ping {}))
53///         .build().await
54///         .expect("build server");
55///
56///     server.start_block().await.expect("run server");
57/// };
58/// ```
59pub struct ServerBuilder<B, W = JsonCodec> {
60    config: ServerConfig,
61    byte_codec: B,
62    ws_codec: W,
63    executor: Option<Executor>,
64}
65
66impl<B, W> ServerBuilder<B, W>
67where
68    B: JsonRpcCodec,
69    W: WsCodec,
70{
71    /// Add an RPC service.
72    pub fn service(mut self, service: Arc<dyn RPCService>) -> Self {
73        self.config.services.insert(service.name(), service);
74        self
75    }
76
77    /// Add a PubSub RPC service.
78    pub fn pubsub_service(mut self, service: Arc<dyn PubSubRPCService>) -> Self {
79        self.config.pubsub_services.insert(service.name(), service);
80        self
81    }
82
83    /// Set TCP config.
84    #[cfg(feature = "tcp")]
85    pub fn tcp_config(mut self, config: TcpConfig) -> Result<Self> {
86        match self.config.endpoint {
87            Endpoint::Tcp(..) | Endpoint::Tls(..) | Endpoint::Ws(..) | Endpoint::Wss(..) => {
88                self.config.tcp_config = config;
89                Ok(self)
90            }
91            _ => Err(Error::UnsupportedProtocol(self.config.endpoint.to_string())),
92        }
93    }
94
95    /// Set TLS config.
96    #[cfg(feature = "tls")]
97    pub fn tls_config(mut self, config: karyon_net::tls::ServerTlsConfig) -> Result<Self> {
98        match self.config.endpoint {
99            Endpoint::Tls(..) | Endpoint::Wss(..) => {
100                self.config.tls_config = Some(config);
101                Ok(self)
102            }
103            _ => Err(Error::UnsupportedProtocol(format!(
104                "Invalid tls config for endpoint: {}",
105                self.config.endpoint
106            ))),
107        }
108    }
109
110    /// Set QUIC config.
111    #[cfg(feature = "quic")]
112    pub fn quic_config(mut self, config: ServerQuicConfig) -> Result<Self> {
113        match self.config.endpoint {
114            Endpoint::Quic(..) => {
115                self.config.quic_config = Some(config);
116                Ok(self)
117            }
118            #[cfg(feature = "http3")]
119            Endpoint::Http(..) => {
120                self.config.quic_config = Some(config);
121                Ok(self)
122            }
123            _ => Err(Error::UnsupportedProtocol(format!(
124                "Invalid quic config for endpoint: {}",
125                self.config.endpoint
126            ))),
127        }
128    }
129
130    /// Set an executor.
131    pub async fn with_executor(mut self, ex: Executor) -> Self {
132        self.executor = Some(ex);
133        self
134    }
135
136    /// Set a custom notification encoder.
137    pub fn with_notification_encoder(
138        mut self,
139        encoder: fn(NewNotification) -> Notification,
140    ) -> Self {
141        self.config.notification_encoder = encoder;
142        self
143    }
144
145    /// Override the WebSocket codec. Only meaningful for `ws://` /
146    /// `wss://` endpoints.
147    #[cfg(feature = "ws")]
148    pub fn with_ws_codec<W2: WsCodec>(self, ws_codec: W2) -> ServerBuilder<B, W2> {
149        ServerBuilder {
150            config: self.config,
151            byte_codec: self.byte_codec,
152            ws_codec,
153            executor: self.executor,
154        }
155    }
156
157    /// Build the server.
158    pub async fn build(self) -> Result<Arc<Server>> {
159        Server::init(self.config, self.executor, self.byte_codec, self.ws_codec).await
160    }
161}
162
163impl<B: JsonRpcCodec> ServerBuilder<B, JsonCodec> {
164    /// Create a builder with a custom byte-stream codec. The default
165    /// `JsonCodec` is used for `ws://` / `wss://` endpoints; override
166    /// with `with_ws_codec`.
167    pub fn new_with_codec(
168        endpoint: impl ToEndpoint,
169        codec: B,
170    ) -> Result<ServerBuilder<B, JsonCodec>> {
171        let endpoint = endpoint.to_endpoint()?;
172        Ok(ServerBuilder {
173            config: ServerConfig {
174                endpoint,
175                services: HashMap::new(),
176                pubsub_services: HashMap::new(),
177                #[cfg(feature = "tcp")]
178                tcp_config: Default::default(),
179                #[cfg(feature = "tls")]
180                tls_config: None,
181                #[cfg(feature = "quic")]
182                quic_config: None,
183                notification_encoder: default_notification_encoder,
184            },
185            byte_codec: codec,
186            ws_codec: JsonCodec::default(),
187            executor: None,
188        })
189    }
190}
191
192impl ServerBuilder<JsonCodec, JsonCodec> {
193    /// Create a builder with the default JSON codec.
194    pub fn new(endpoint: impl ToEndpoint) -> Result<ServerBuilder<JsonCodec, JsonCodec>> {
195        Self::new_with_codec(endpoint, JsonCodec::default())
196    }
197}
198
199fn default_notification_encoder(nt: NewNotification) -> Notification {
200    let params = Some(serde_json::json!(NotificationResult {
201        subscription: nt.sub_id,
202        result: Some(nt.result),
203    }));
204
205    Notification {
206        jsonrpc: JSONRPC_VERSION.to_string(),
207        method: nt.method,
208        params,
209    }
210}