1use std::io::{self, ErrorKind};
2
3use karyon_core::async_runtime::io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf};
4
5use crate::{
6 codec::Codec,
7 message::{MessageRx, MessageTx},
8 stream::ByteStream,
9 Buffer, ByteBuffer, Endpoint, Result,
10};
11
12const DEFAULT_READ_CHUNK_SIZE: usize = 64 * 1024;
14
15pub fn framed<C>(stream: Box<dyn ByteStream>, codec: C) -> FramedConn<C>
19where
20 C: Codec<ByteBuffer> + Clone + 'static,
21 C::Error: From<io::Error> + Into<crate::Error> + Send + Sync,
22 C::Message: Send + Sync + 'static,
23{
24 let peer = stream.peer_endpoint();
25 let local = stream.local_endpoint();
26 let (rh, wh) = split(stream);
27
28 FramedConn {
29 reader: FramedReader {
30 inner: rh,
31 decoder: codec.clone(),
32 buffer: Buffer::new(),
33 read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
34 peer_endpoint: peer.clone(),
35 local_endpoint: local.clone(),
36 },
37 writer: FramedWriter {
38 inner: wh,
39 encoder: codec,
40 buffer: Buffer::new(),
41 peer_endpoint: peer,
42 local_endpoint: local,
43 },
44 }
45}
46
47pub struct FramedConn<C> {
49 reader: FramedReader<C>,
50 writer: FramedWriter<C>,
51}
52
53impl<C> FramedConn<C>
54where
55 C: Codec<ByteBuffer> + Clone + Send + Sync + 'static,
56 C::Message: Send + Sync + 'static,
57 C::Error: From<io::Error> + Into<crate::Error> + Send + Sync,
58{
59 pub async fn recv_msg(&mut self) -> Result<C::Message> {
61 self.reader.recv_msg().await
62 }
63
64 pub async fn send_msg(&mut self, msg: C::Message) -> Result<()> {
66 self.writer.send_msg(msg).await
67 }
68
69 pub fn peer_endpoint(&self) -> Option<Endpoint> {
71 self.reader.peer_endpoint.clone()
72 }
73
74 pub fn local_endpoint(&self) -> Option<Endpoint> {
76 self.reader.local_endpoint.clone()
77 }
78
79 pub fn split(self) -> (FramedReader<C>, FramedWriter<C>) {
81 (self.reader, self.writer)
82 }
83}
84
85pub struct FramedReader<C> {
87 inner: ReadHalf<Box<dyn ByteStream>>,
88 decoder: C,
89 buffer: ByteBuffer,
90 read_chunk_size: usize,
91 peer_endpoint: Option<Endpoint>,
92 local_endpoint: Option<Endpoint>,
93}
94
95impl<C> FramedReader<C>
96where
97 C: Codec<ByteBuffer> + Send + Sync,
98 C::Message: Send + Sync,
99 C::Error: From<io::Error> + Into<crate::Error> + Send + Sync,
100{
101 pub fn peer_endpoint(&self) -> Option<Endpoint> {
103 self.peer_endpoint.clone()
104 }
105
106 pub fn local_endpoint(&self) -> Option<Endpoint> {
108 self.local_endpoint.clone()
109 }
110
111 pub async fn recv_msg(&mut self) -> Result<C::Message> {
113 if let Some((n, item)) = self.decoder.decode(&mut self.buffer).map_err(Into::into)? {
115 self.buffer.advance(n);
116 return Ok(item);
117 }
118
119 loop {
120 let mut buf = vec![0u8; self.read_chunk_size];
121 let n = self.inner.read(&mut buf).await?;
122
123 if n == 0 {
124 if self.buffer.is_empty() {
125 return Err(io::Error::from(ErrorKind::ConnectionAborted).into());
126 } else {
127 return Err(io::Error::new(
128 ErrorKind::UnexpectedEof,
129 "bytes remaining in buffer",
130 )
131 .into());
132 }
133 }
134
135 self.buffer.extend_from_slice(&buf[..n]);
136
137 if let Some((cn, item)) = self.decoder.decode(&mut self.buffer).map_err(Into::into)? {
138 self.buffer.advance(cn);
139 return Ok(item);
140 }
141 }
142 }
143}
144
145impl<C> MessageRx for FramedReader<C>
146where
147 C: Codec<ByteBuffer> + Send + Sync,
148 C::Message: Send + Sync,
149 C::Error: From<io::Error> + Into<crate::Error> + Send + Sync,
150{
151 type Message = C::Message;
152
153 fn recv_msg(&mut self) -> impl std::future::Future<Output = Result<Self::Message>> + Send {
154 FramedReader::recv_msg(self)
155 }
156
157 fn peer_endpoint(&self) -> Option<Endpoint> {
158 FramedReader::peer_endpoint(self)
159 }
160}
161
162pub struct FramedWriter<C> {
164 inner: WriteHalf<Box<dyn ByteStream>>,
165 encoder: C,
166 buffer: ByteBuffer,
167 peer_endpoint: Option<Endpoint>,
168 local_endpoint: Option<Endpoint>,
169}
170
171impl<C> FramedWriter<C>
172where
173 C: Codec<ByteBuffer> + Send + Sync,
174 C::Message: Send + Sync,
175 C::Error: From<io::Error> + Into<crate::Error> + Send + Sync,
176{
177 pub fn peer_endpoint(&self) -> Option<Endpoint> {
179 self.peer_endpoint.clone()
180 }
181
182 pub fn local_endpoint(&self) -> Option<Endpoint> {
184 self.local_endpoint.clone()
185 }
186
187 pub async fn send_msg(&mut self, msg: C::Message) -> Result<()> {
189 self.encoder
190 .encode(&msg, &mut self.buffer)
191 .map_err(Into::into)?;
192
193 while !self.buffer.is_empty() {
194 let n = self.inner.write(self.buffer.as_ref()).await?;
195 if n == 0 {
196 return Err(io::Error::from(ErrorKind::UnexpectedEof).into());
197 }
198 self.buffer.advance(n);
199 }
200
201 self.inner.flush().await?;
202 Ok(())
203 }
204}
205
206impl<C> MessageTx for FramedWriter<C>
207where
208 C: Codec<ByteBuffer> + Send + Sync,
209 C::Message: Send + Sync,
210 C::Error: From<io::Error> + Into<crate::Error> + Send + Sync,
211{
212 type Message = C::Message;
213
214 fn send_msg(
215 &mut self,
216 msg: Self::Message,
217 ) -> impl std::future::Future<Output = Result<()>> + Send {
218 FramedWriter::send_msg(self, msg)
219 }
220
221 fn peer_endpoint(&self) -> Option<Endpoint> {
222 FramedWriter::peer_endpoint(self)
223 }
224}