Skip to main content

karyon_jsonrpc/server/
dispatch.rs

1//! Request sanity check and method dispatch.
2
3use std::sync::Arc;
4
5use log::debug;
6
7use crate::{
8    message,
9    server::{
10        channel::Channel, pubsub_service::PubSubRPCMethod, service::RPCMethod, Server,
11        FAILED_TO_PARSE_ERROR_MSG, INVALID_REQUEST_ERROR_MSG, METHOD_NOT_FOUND_ERROR_MSG,
12        UNSUPPORTED_JSONRPC_VERSION,
13    },
14};
15
16pub(super) struct NewRequest {
17    pub srvc_name: String,
18    pub method_name: String,
19    pub msg: message::Request,
20}
21
22pub(super) enum SanityCheckResult {
23    NewReq(NewRequest),
24    ErrRes(message::Response),
25}
26
27/// Resolved handler for a request. Holds whichever method matched.
28pub(super) enum Handler<'a> {
29    Pubsub(PubSubRPCMethod<'a>),
30    Rpc(RPCMethod<'a>),
31    None,
32}
33
34/// Parse and validate a raw JSON-RPC request.
35pub(super) fn sanity_check(request: serde_json::Value) -> SanityCheckResult {
36    let rpc_msg = match serde_json::from_value::<message::Request>(request) {
37        Ok(m) => m,
38        Err(_) => return err_res(None, message::PARSE_ERROR_CODE, FAILED_TO_PARSE_ERROR_MSG),
39    };
40
41    if rpc_msg.jsonrpc != message::JSONRPC_VERSION {
42        return err_res(
43            Some(rpc_msg.id),
44            message::INVALID_REQUEST_ERROR_CODE,
45            UNSUPPORTED_JSONRPC_VERSION,
46        );
47    }
48
49    debug!("<-- {rpc_msg}");
50
51    let srvc_method: Vec<&str> = rpc_msg.method.split('.').collect();
52    if srvc_method.len() < 2 {
53        return err_res(
54            Some(rpc_msg.id),
55            message::INVALID_REQUEST_ERROR_CODE,
56            INVALID_REQUEST_ERROR_MSG,
57        );
58    }
59
60    let srvc_name = srvc_method[0].to_string();
61    let method_name = srvc_method[1..].join(".");
62    SanityCheckResult::NewReq(NewRequest {
63        srvc_name,
64        method_name,
65        msg: rpc_msg,
66    })
67}
68
69fn err_res(id: Option<serde_json::Value>, code: i32, message: &str) -> SanityCheckResult {
70    SanityCheckResult::ErrRes(message::Response {
71        error: Some(message::Error {
72            code,
73            message: message.to_string(),
74            data: None,
75        }),
76        id,
77        ..Default::default()
78    })
79}
80
81fn method_not_found(id: Option<serde_json::Value>) -> message::Response {
82    message::Response {
83        error: Some(message::Error {
84            code: message::METHOD_NOT_FOUND_ERROR_CODE,
85            message: METHOD_NOT_FOUND_ERROR_MSG.to_string(),
86            data: None,
87        }),
88        id,
89        ..Default::default()
90    }
91}
92
93impl Server {
94    /// Resolve a request to its handler. Pubsub handlers take priority.
95    /// If `allow_pubsub` is false, only regular RPC services are checked.
96    pub(super) fn resolve_handler(
97        &self,
98        srvc_name: &str,
99        method_name: &str,
100        allow_pubsub: bool,
101    ) -> Handler<'_> {
102        if allow_pubsub {
103            if let Some(method) = self
104                .config
105                .pubsub_services
106                .get(srvc_name)
107                .and_then(|s| s.get_pubsub_method(method_name))
108            {
109                return Handler::Pubsub(method);
110            }
111        }
112        if let Some(method) = self
113            .config
114            .services
115            .get(srvc_name)
116            .and_then(|s| s.get_method(method_name))
117        {
118            return Handler::Rpc(method);
119        }
120        Handler::None
121    }
122
123    /// Dispatch a JSON request. Pass `None` for `channel` when called
124    /// from a context without pubsub support (HTTP/1.x and HTTP/2).
125    pub(crate) async fn handle_request(
126        &self,
127        channel: Option<Arc<Channel>>,
128        msg: serde_json::Value,
129    ) -> message::Response {
130        let req = match sanity_check(msg) {
131            SanityCheckResult::NewReq(req) => req,
132            SanityCheckResult::ErrRes(res) => return res,
133        };
134
135        let id = Some(req.msg.id.clone());
136        let params = req.msg.params.unwrap_or(serde_json::json!(()));
137
138        match self.resolve_handler(&req.srvc_name, &req.method_name, channel.is_some()) {
139            Handler::Pubsub(method) => {
140                let chan = channel.expect("channel required for pubsub");
141                match method(chan, req.msg.method, params).await {
142                    Ok(res) => message::Response {
143                        result: Some(res),
144                        id,
145                        ..Default::default()
146                    },
147                    Err(err) => err.to_response(id, None),
148                }
149            }
150            Handler::Rpc(method) => match method(params).await {
151                Ok(res) => message::Response {
152                    result: Some(res),
153                    id,
154                    ..Default::default()
155                },
156                Err(err) => err.to_response(id, None),
157            },
158            Handler::None => method_not_found(id),
159        }
160    }
161}