1#[cfg(feature = "ws")]
2use async_tungstenite::tungstenite::Message;
3
4pub use karyon_net::codec::{ByteBuffer, Codec, Decoder, Encoder};
5
6#[cfg(feature = "ws")]
7pub use karyon_net::codec::{WebSocketCodec, WebSocketDecoder, WebSocketEncoder};
8
9use crate::error::Error;
10
11const DEFAULT_MAX_BUFFER_SIZE: usize = 4 * 1024 * 1024; #[cfg(not(feature = "ws"))]
14pub trait ClonableJsonCodec: Codec<Message = serde_json::Value, Error = Error> + Clone {}
15#[cfg(not(feature = "ws"))]
16impl<T: Codec<Message = serde_json::Value, Error = Error> + Clone> ClonableJsonCodec for T {}
17
18#[cfg(feature = "ws")]
19pub trait ClonableJsonCodec:
20 Codec<Message = serde_json::Value, Error = Error>
21 + WebSocketCodec<Message = serde_json::Value, Error = Error>
22 + Clone
23{
24}
25#[cfg(feature = "ws")]
26impl<
27 T: Codec<Message = serde_json::Value, Error = Error>
28 + WebSocketCodec<Message = serde_json::Value, Error = Error>
29 + Clone,
30 > ClonableJsonCodec for T
31{
32}
33
34#[derive(Clone)]
35pub struct JsonCodec {
36 max_size: usize,
37}
38
39impl Default for JsonCodec {
40 fn default() -> Self {
41 Self {
42 max_size: DEFAULT_MAX_BUFFER_SIZE,
43 }
44 }
45}
46
47impl JsonCodec {
48 pub fn new(max_payload_size: usize) -> Self {
49 Self {
50 max_size: max_payload_size,
51 }
52 }
53}
54
55impl Codec for JsonCodec {
56 type Message = serde_json::Value;
57 type Error = Error;
58}
59
60impl Encoder for JsonCodec {
61 type EnMessage = serde_json::Value;
62 type EnError = Error;
63 fn encode(&self, src: &Self::EnMessage, dst: &mut ByteBuffer) -> Result<usize, Self::EnError> {
64 let msg = match serde_json::to_string(src) {
65 Ok(m) => m,
66 Err(err) => return Err(Error::Encode(err.to_string())),
67 };
68 let buf = msg.as_bytes();
69
70 if buf.len() > self.max_size {
71 return Err(Error::BufferFull(format!(
72 "Buffer size {} exceeds maximum {}",
73 buf.len(),
74 self.max_size
75 )));
76 }
77
78 dst.extend_from_slice(buf);
79 Ok(buf.len())
80 }
81}
82
83impl Decoder for JsonCodec {
84 type DeMessage = serde_json::Value;
85 type DeError = Error;
86 fn decode(
87 &self,
88 src: &mut ByteBuffer,
89 ) -> Result<Option<(usize, Self::DeMessage)>, Self::DeError> {
90 if src.len() > self.max_size {
91 return Err(Error::BufferFull(format!(
92 "Buffer size {} exceeds maximum {}",
93 src.len(),
94 self.max_size
95 )));
96 }
97
98 let de = serde_json::Deserializer::from_slice(src.as_ref());
99 let mut iter = de.into_iter::<serde_json::Value>();
100
101 let item = match iter.next() {
102 Some(Ok(item)) => item,
103 Some(Err(ref e)) if e.is_eof() => return Ok(None),
104 Some(Err(e)) => return Err(Error::Decode(e.to_string())),
105 None => return Ok(None),
106 };
107
108 Ok(Some((iter.byte_offset(), item)))
109 }
110}
111
112#[cfg(feature = "ws")]
113#[derive(Clone)]
114pub struct WsJsonCodec {}
115
116#[cfg(feature = "ws")]
117impl WebSocketCodec for JsonCodec {
118 type Message = serde_json::Value;
119 type Error = Error;
120}
121
122#[cfg(feature = "ws")]
123impl WebSocketEncoder for JsonCodec {
124 type EnMessage = serde_json::Value;
125 type EnError = Error;
126
127 fn encode(&self, src: &Self::EnMessage) -> Result<Message, Self::EnError> {
128 let msg = match serde_json::to_string(src) {
129 Ok(m) => m,
130 Err(err) => return Err(Error::Encode(err.to_string())),
131 };
132 Ok(Message::Text(msg.into()))
133 }
134}
135
136#[cfg(feature = "ws")]
137impl WebSocketDecoder for JsonCodec {
138 type DeMessage = serde_json::Value;
139 type DeError = Error;
140
141 fn decode(&self, src: &Message) -> Result<Option<Self::DeMessage>, Self::DeError> {
142 match src {
143 Message::Text(s) => match serde_json::from_str(s) {
144 Ok(m) => Ok(Some(m)),
145 Err(err) => Err(Error::Decode(err.to_string())),
146 },
147 Message::Binary(s) => match serde_json::from_slice(s) {
148 Ok(m) => Ok(m),
149 Err(err) => Err(Error::Decode(err.to_string())),
150 },
151 Message::Close(_) => Err(Error::IO(std::io::ErrorKind::ConnectionAborted.into())),
152 m => Err(Error::Decode(format!(
153 "Receive unexpected message: {:?}",
154 m
155 ))),
156 }
157 }
158}