karyon_jsonrpc/
codec.rs

1#[cfg(feature = "ws")]
2use async_tungstenite::tungstenite::Message;
3
4pub use karyon_net::codec::{ByteBuffer, Codec, Decoder, Encoder};
5
6#[cfg(feature = "ws")]
7pub use karyon_net::codec::{WebSocketCodec, WebSocketDecoder, WebSocketEncoder};
8
9use crate::error::Error;
10
11const DEFAULT_MAX_BUFFER_SIZE: usize = 4 * 1024 * 1024; // 4MB
12
13#[cfg(not(feature = "ws"))]
14pub trait ClonableJsonCodec: Codec<Message = serde_json::Value, Error = Error> + Clone {}
15#[cfg(not(feature = "ws"))]
16impl<T: Codec<Message = serde_json::Value, Error = Error> + Clone> ClonableJsonCodec for T {}
17
18#[cfg(feature = "ws")]
19pub trait ClonableJsonCodec:
20    Codec<Message = serde_json::Value, Error = Error>
21    + WebSocketCodec<Message = serde_json::Value, Error = Error>
22    + Clone
23{
24}
25#[cfg(feature = "ws")]
26impl<
27        T: Codec<Message = serde_json::Value, Error = Error>
28            + WebSocketCodec<Message = serde_json::Value, Error = Error>
29            + Clone,
30    > ClonableJsonCodec for T
31{
32}
33
34#[derive(Clone)]
35pub struct JsonCodec {
36    max_size: usize,
37}
38
39impl Default for JsonCodec {
40    fn default() -> Self {
41        Self {
42            max_size: DEFAULT_MAX_BUFFER_SIZE,
43        }
44    }
45}
46
47impl JsonCodec {
48    pub fn new(max_payload_size: usize) -> Self {
49        Self {
50            max_size: max_payload_size,
51        }
52    }
53}
54
55impl Codec for JsonCodec {
56    type Message = serde_json::Value;
57    type Error = Error;
58}
59
60impl Encoder for JsonCodec {
61    type EnMessage = serde_json::Value;
62    type EnError = Error;
63    fn encode(&self, src: &Self::EnMessage, dst: &mut ByteBuffer) -> Result<usize, Self::EnError> {
64        let msg = match serde_json::to_string(src) {
65            Ok(m) => m,
66            Err(err) => return Err(Error::Encode(err.to_string())),
67        };
68        let buf = msg.as_bytes();
69
70        if buf.len() > self.max_size {
71            return Err(Error::BufferFull(format!(
72                "Buffer size {} exceeds maximum {}",
73                buf.len(),
74                self.max_size
75            )));
76        }
77
78        dst.extend_from_slice(buf);
79        Ok(buf.len())
80    }
81}
82
83impl Decoder for JsonCodec {
84    type DeMessage = serde_json::Value;
85    type DeError = Error;
86    fn decode(
87        &self,
88        src: &mut ByteBuffer,
89    ) -> Result<Option<(usize, Self::DeMessage)>, Self::DeError> {
90        if src.len() > self.max_size {
91            return Err(Error::BufferFull(format!(
92                "Buffer size {} exceeds maximum {}",
93                src.len(),
94                self.max_size
95            )));
96        }
97
98        let de = serde_json::Deserializer::from_slice(src.as_ref());
99        let mut iter = de.into_iter::<serde_json::Value>();
100
101        let item = match iter.next() {
102            Some(Ok(item)) => item,
103            Some(Err(ref e)) if e.is_eof() => return Ok(None),
104            Some(Err(e)) => return Err(Error::Decode(e.to_string())),
105            None => return Ok(None),
106        };
107
108        Ok(Some((iter.byte_offset(), item)))
109    }
110}
111
112#[cfg(feature = "ws")]
113#[derive(Clone)]
114pub struct WsJsonCodec {}
115
116#[cfg(feature = "ws")]
117impl WebSocketCodec for JsonCodec {
118    type Message = serde_json::Value;
119    type Error = Error;
120}
121
122#[cfg(feature = "ws")]
123impl WebSocketEncoder for JsonCodec {
124    type EnMessage = serde_json::Value;
125    type EnError = Error;
126
127    fn encode(&self, src: &Self::EnMessage) -> Result<Message, Self::EnError> {
128        let msg = match serde_json::to_string(src) {
129            Ok(m) => m,
130            Err(err) => return Err(Error::Encode(err.to_string())),
131        };
132        Ok(Message::Text(msg.into()))
133    }
134}
135
136#[cfg(feature = "ws")]
137impl WebSocketDecoder for JsonCodec {
138    type DeMessage = serde_json::Value;
139    type DeError = Error;
140
141    fn decode(&self, src: &Message) -> Result<Option<Self::DeMessage>, Self::DeError> {
142        match src {
143            Message::Text(s) => match serde_json::from_str(s) {
144                Ok(m) => Ok(Some(m)),
145                Err(err) => Err(Error::Decode(err.to_string())),
146            },
147            Message::Binary(s) => match serde_json::from_slice(s) {
148                Ok(m) => Ok(m),
149                Err(err) => Err(Error::Decode(err.to_string())),
150            },
151            Message::Close(_) => Err(Error::IO(std::io::ErrorKind::ConnectionAborted.into())),
152            m => Err(Error::Decode(format!(
153                "Receive unexpected message: {:?}",
154                m
155            ))),
156        }
157    }
158}