1#[cfg(feature = "ws")]
2use async_tungstenite::tungstenite::Message;
3
4pub use karyon_net::codec::{ByteBuffer, Codec, Decoder, Encoder};
5
6#[cfg(feature = "ws")]
7pub use karyon_net::codec::{WebSocketCodec, WebSocketDecoder, WebSocketEncoder};
8
9use crate::error::Error;
10
11#[cfg(not(feature = "ws"))]
12pub trait ClonableJsonCodec: Codec<Message = serde_json::Value, Error = Error> + Clone {}
13#[cfg(not(feature = "ws"))]
14impl<T: Codec<Message = serde_json::Value, Error = Error> + Clone> ClonableJsonCodec for T {}
15
16#[cfg(feature = "ws")]
17pub trait ClonableJsonCodec:
18 Codec<Message = serde_json::Value, Error = Error>
19 + WebSocketCodec<Message = serde_json::Value, Error = Error>
20 + Clone
21{
22}
23#[cfg(feature = "ws")]
24impl<
25 T: Codec<Message = serde_json::Value, Error = Error>
26 + WebSocketCodec<Message = serde_json::Value, Error = Error>
27 + Clone,
28 > ClonableJsonCodec for T
29{
30}
31
32#[derive(Clone)]
33pub struct JsonCodec {}
34
35impl Codec for JsonCodec {
36 type Message = serde_json::Value;
37 type Error = Error;
38}
39
40impl Encoder for JsonCodec {
41 type EnMessage = serde_json::Value;
42 type EnError = Error;
43 fn encode(&self, src: &Self::EnMessage, dst: &mut ByteBuffer) -> Result<usize, Self::EnError> {
44 let msg = match serde_json::to_string(src) {
45 Ok(m) => m,
46 Err(err) => return Err(Error::Encode(err.to_string())),
47 };
48 let buf = msg.as_bytes();
49 dst.extend_from_slice(buf);
50 Ok(buf.len())
51 }
52}
53
54impl Decoder for JsonCodec {
55 type DeMessage = serde_json::Value;
56 type DeError = Error;
57 fn decode(
58 &self,
59 src: &mut ByteBuffer,
60 ) -> Result<Option<(usize, Self::DeMessage)>, Self::DeError> {
61 let de = serde_json::Deserializer::from_slice(src.as_ref());
62 let mut iter = de.into_iter::<serde_json::Value>();
63
64 let item = match iter.next() {
65 Some(Ok(item)) => item,
66 Some(Err(ref e)) if e.is_eof() => return Ok(None),
67 Some(Err(e)) => return Err(Error::Decode(e.to_string())),
68 None => return Ok(None),
69 };
70
71 Ok(Some((iter.byte_offset(), item)))
72 }
73}
74
75#[cfg(feature = "ws")]
76#[derive(Clone)]
77pub struct WsJsonCodec {}
78
79#[cfg(feature = "ws")]
80impl WebSocketCodec for JsonCodec {
81 type Message = serde_json::Value;
82 type Error = Error;
83}
84
85#[cfg(feature = "ws")]
86impl WebSocketEncoder for JsonCodec {
87 type EnMessage = serde_json::Value;
88 type EnError = Error;
89
90 fn encode(&self, src: &Self::EnMessage) -> Result<Message, Self::EnError> {
91 let msg = match serde_json::to_string(src) {
92 Ok(m) => m,
93 Err(err) => return Err(Error::Encode(err.to_string())),
94 };
95 Ok(Message::Text(msg.into()))
96 }
97}
98
99#[cfg(feature = "ws")]
100impl WebSocketDecoder for JsonCodec {
101 type DeMessage = serde_json::Value;
102 type DeError = Error;
103
104 fn decode(&self, src: &Message) -> Result<Option<Self::DeMessage>, Self::DeError> {
105 match src {
106 Message::Text(s) => match serde_json::from_str(s) {
107 Ok(m) => Ok(Some(m)),
108 Err(err) => Err(Error::Decode(err.to_string())),
109 },
110 Message::Binary(s) => match serde_json::from_slice(s) {
111 Ok(m) => Ok(m),
112 Err(err) => Err(Error::Decode(err.to_string())),
113 },
114 Message::Close(_) => Err(Error::IO(std::io::ErrorKind::ConnectionAborted.into())),
115 m => Err(Error::Decode(format!(
116 "Receive unexpected message: {:?}",
117 m
118 ))),
119 }
120 }
121}