karyon_jsonrpc/server/
dispatch.rs1use std::sync::Arc;
4
5use log::debug;
6
7use crate::{
8 message,
9 server::{
10 channel::Channel, pubsub_service::PubSubRPCMethod, service::RPCMethod, Server,
11 FAILED_TO_PARSE_ERROR_MSG, INVALID_REQUEST_ERROR_MSG, METHOD_NOT_FOUND_ERROR_MSG,
12 UNSUPPORTED_JSONRPC_VERSION,
13 },
14};
15
16pub(super) struct NewRequest {
17 pub srvc_name: String,
18 pub method_name: String,
19 pub msg: message::Request,
20}
21
22pub(super) enum SanityCheckResult {
23 NewReq(NewRequest),
24 ErrRes(message::Response),
25}
26
27pub(super) enum Handler<'a> {
29 Pubsub(PubSubRPCMethod<'a>),
30 Rpc(RPCMethod<'a>),
31 None,
32}
33
34pub(super) fn sanity_check(request: serde_json::Value) -> SanityCheckResult {
36 let rpc_msg = match serde_json::from_value::<message::Request>(request) {
37 Ok(m) => m,
38 Err(_) => return err_res(None, message::PARSE_ERROR_CODE, FAILED_TO_PARSE_ERROR_MSG),
39 };
40
41 if rpc_msg.jsonrpc != message::JSONRPC_VERSION {
42 return err_res(
43 Some(rpc_msg.id),
44 message::INVALID_REQUEST_ERROR_CODE,
45 UNSUPPORTED_JSONRPC_VERSION,
46 );
47 }
48
49 debug!("<-- {rpc_msg}");
50
51 let srvc_method: Vec<&str> = rpc_msg.method.split('.').collect();
52 if srvc_method.len() < 2 {
53 return err_res(
54 Some(rpc_msg.id),
55 message::INVALID_REQUEST_ERROR_CODE,
56 INVALID_REQUEST_ERROR_MSG,
57 );
58 }
59
60 let srvc_name = srvc_method[0].to_string();
61 let method_name = srvc_method[1..].join(".");
62 SanityCheckResult::NewReq(NewRequest {
63 srvc_name,
64 method_name,
65 msg: rpc_msg,
66 })
67}
68
69fn err_res(id: Option<serde_json::Value>, code: i32, message: &str) -> SanityCheckResult {
70 SanityCheckResult::ErrRes(message::Response {
71 error: Some(message::Error {
72 code,
73 message: message.to_string(),
74 data: None,
75 }),
76 id,
77 ..Default::default()
78 })
79}
80
81fn method_not_found(id: Option<serde_json::Value>) -> message::Response {
82 message::Response {
83 error: Some(message::Error {
84 code: message::METHOD_NOT_FOUND_ERROR_CODE,
85 message: METHOD_NOT_FOUND_ERROR_MSG.to_string(),
86 data: None,
87 }),
88 id,
89 ..Default::default()
90 }
91}
92
93impl Server {
94 pub(super) fn resolve_handler(
97 &self,
98 srvc_name: &str,
99 method_name: &str,
100 allow_pubsub: bool,
101 ) -> Handler<'_> {
102 if allow_pubsub {
103 if let Some(method) = self
104 .config
105 .pubsub_services
106 .get(srvc_name)
107 .and_then(|s| s.get_pubsub_method(method_name))
108 {
109 return Handler::Pubsub(method);
110 }
111 }
112 if let Some(method) = self
113 .config
114 .services
115 .get(srvc_name)
116 .and_then(|s| s.get_method(method_name))
117 {
118 return Handler::Rpc(method);
119 }
120 Handler::None
121 }
122
123 pub(crate) async fn handle_request(
126 &self,
127 channel: Option<Arc<Channel>>,
128 msg: serde_json::Value,
129 ) -> message::Response {
130 let req = match sanity_check(msg) {
131 SanityCheckResult::NewReq(req) => req,
132 SanityCheckResult::ErrRes(res) => return res,
133 };
134
135 let id = Some(req.msg.id.clone());
136 let params = req.msg.params.unwrap_or(serde_json::json!(()));
137
138 match self.resolve_handler(&req.srvc_name, &req.method_name, channel.is_some()) {
139 Handler::Pubsub(method) => {
140 let chan = channel.expect("channel required for pubsub");
141 match method(chan, req.msg.method, params).await {
142 Ok(res) => message::Response {
143 result: Some(res),
144 id,
145 ..Default::default()
146 },
147 Err(err) => err.to_response(id, None),
148 }
149 }
150 Handler::Rpc(method) => match method(params).await {
151 Ok(res) => message::Response {
152 result: Some(res),
153 id,
154 ..Default::default()
155 },
156 Err(err) => err.to_response(id, None),
157 },
158 Handler::None => method_not_found(id),
159 }
160 }
161}