karyon_jsonrpc/
codec.rs

1#[cfg(feature = "ws")]
2use async_tungstenite::tungstenite::Message;
3
4pub use karyon_net::codec::{ByteBuffer, Codec, Decoder, Encoder};
5
6#[cfg(feature = "ws")]
7pub use karyon_net::codec::{WebSocketCodec, WebSocketDecoder, WebSocketEncoder};
8
9use crate::error::Error;
10
11#[cfg(not(feature = "ws"))]
12pub trait ClonableJsonCodec: Codec<Message = serde_json::Value, Error = Error> + Clone {}
13#[cfg(not(feature = "ws"))]
14impl<T: Codec<Message = serde_json::Value, Error = Error> + Clone> ClonableJsonCodec for T {}
15
16#[cfg(feature = "ws")]
17pub trait ClonableJsonCodec:
18    Codec<Message = serde_json::Value, Error = Error>
19    + WebSocketCodec<Message = serde_json::Value, Error = Error>
20    + Clone
21{
22}
23#[cfg(feature = "ws")]
24impl<
25        T: Codec<Message = serde_json::Value, Error = Error>
26            + WebSocketCodec<Message = serde_json::Value, Error = Error>
27            + Clone,
28    > ClonableJsonCodec for T
29{
30}
31
32#[derive(Clone)]
33pub struct JsonCodec {}
34
35impl Codec for JsonCodec {
36    type Message = serde_json::Value;
37    type Error = Error;
38}
39
40impl Encoder for JsonCodec {
41    type EnMessage = serde_json::Value;
42    type EnError = Error;
43    fn encode(&self, src: &Self::EnMessage, dst: &mut ByteBuffer) -> Result<usize, Self::EnError> {
44        let msg = match serde_json::to_string(src) {
45            Ok(m) => m,
46            Err(err) => return Err(Error::Encode(err.to_string())),
47        };
48        let buf = msg.as_bytes();
49        dst.extend_from_slice(buf);
50        Ok(buf.len())
51    }
52}
53
54impl Decoder for JsonCodec {
55    type DeMessage = serde_json::Value;
56    type DeError = Error;
57    fn decode(
58        &self,
59        src: &mut ByteBuffer,
60    ) -> Result<Option<(usize, Self::DeMessage)>, Self::DeError> {
61        let de = serde_json::Deserializer::from_slice(src.as_ref());
62        let mut iter = de.into_iter::<serde_json::Value>();
63
64        let item = match iter.next() {
65            Some(Ok(item)) => item,
66            Some(Err(ref e)) if e.is_eof() => return Ok(None),
67            Some(Err(e)) => return Err(Error::Decode(e.to_string())),
68            None => return Ok(None),
69        };
70
71        Ok(Some((iter.byte_offset(), item)))
72    }
73}
74
75#[cfg(feature = "ws")]
76#[derive(Clone)]
77pub struct WsJsonCodec {}
78
79#[cfg(feature = "ws")]
80impl WebSocketCodec for JsonCodec {
81    type Message = serde_json::Value;
82    type Error = Error;
83}
84
85#[cfg(feature = "ws")]
86impl WebSocketEncoder for JsonCodec {
87    type EnMessage = serde_json::Value;
88    type EnError = Error;
89
90    fn encode(&self, src: &Self::EnMessage) -> Result<Message, Self::EnError> {
91        let msg = match serde_json::to_string(src) {
92            Ok(m) => m,
93            Err(err) => return Err(Error::Encode(err.to_string())),
94        };
95        Ok(Message::Text(msg.into()))
96    }
97}
98
99#[cfg(feature = "ws")]
100impl WebSocketDecoder for JsonCodec {
101    type DeMessage = serde_json::Value;
102    type DeError = Error;
103
104    fn decode(&self, src: &Message) -> Result<Option<Self::DeMessage>, Self::DeError> {
105        match src {
106            Message::Text(s) => match serde_json::from_str(s) {
107                Ok(m) => Ok(Some(m)),
108                Err(err) => Err(Error::Decode(err.to_string())),
109            },
110            Message::Binary(s) => match serde_json::from_slice(s) {
111                Ok(m) => Ok(m),
112                Err(err) => Err(Error::Decode(err.to_string())),
113            },
114            Message::Close(_) => Err(Error::IO(std::io::ErrorKind::ConnectionAborted.into())),
115            m => Err(Error::Decode(format!(
116                "Receive unexpected message: {:?}",
117                m
118            ))),
119        }
120    }
121}