Skip to main content

karyon_net/
framed.rs

1use std::io::{self, ErrorKind};
2
3use karyon_core::async_runtime::io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf};
4
5use crate::{
6    codec::Codec,
7    message::{MessageRx, MessageTx},
8    stream::ByteStream,
9    Buffer, ByteBuffer, Endpoint, Result,
10};
11
12// Default read chunk size (64KB).
13const DEFAULT_READ_CHUNK_SIZE: usize = 64 * 1024;
14
15/// Convert a ByteStream + codec into a FramedConn.
16/// The codec handles both framing and serialization over the
17/// byte stream. Call `split()` for concurrent read/write.
18pub fn framed<C>(stream: Box<dyn ByteStream>, codec: C) -> FramedConn<C>
19where
20    C: Codec<ByteBuffer> + Clone + 'static,
21    C::Error: From<io::Error> + Into<crate::Error> + Send + Sync,
22    C::Message: Send + Sync + 'static,
23{
24    let peer = stream.peer_endpoint();
25    let local = stream.local_endpoint();
26    let (rh, wh) = split(stream);
27
28    FramedConn {
29        reader: FramedReader {
30            inner: rh,
31            decoder: codec.clone(),
32            buffer: Buffer::new(),
33            read_chunk_size: DEFAULT_READ_CHUNK_SIZE,
34            peer_endpoint: peer.clone(),
35            local_endpoint: local.clone(),
36        },
37        writer: FramedWriter {
38            inner: wh,
39            encoder: codec,
40            buffer: Buffer::new(),
41            peer_endpoint: peer,
42            local_endpoint: local,
43        },
44    }
45}
46
47/// Framed message connection over a byte stream.
48pub struct FramedConn<C> {
49    reader: FramedReader<C>,
50    writer: FramedWriter<C>,
51}
52
53impl<C> FramedConn<C>
54where
55    C: Codec<ByteBuffer> + Clone + Send + Sync + 'static,
56    C::Message: Send + Sync + 'static,
57    C::Error: From<io::Error> + Into<crate::Error> + Send + Sync,
58{
59    /// Receive one complete message.
60    pub async fn recv_msg(&mut self) -> Result<C::Message> {
61        self.reader.recv_msg().await
62    }
63
64    /// Send one complete message.
65    pub async fn send_msg(&mut self, msg: C::Message) -> Result<()> {
66        self.writer.send_msg(msg).await
67    }
68
69    /// Remote peer address.
70    pub fn peer_endpoint(&self) -> Option<Endpoint> {
71        self.reader.peer_endpoint.clone()
72    }
73
74    /// Local address.
75    pub fn local_endpoint(&self) -> Option<Endpoint> {
76        self.reader.local_endpoint.clone()
77    }
78
79    /// Split into independent reader and writer halves.
80    pub fn split(self) -> (FramedReader<C>, FramedWriter<C>) {
81        (self.reader, self.writer)
82    }
83}
84
85/// Read half of a framed connection.
86pub struct FramedReader<C> {
87    inner: ReadHalf<Box<dyn ByteStream>>,
88    decoder: C,
89    buffer: ByteBuffer,
90    read_chunk_size: usize,
91    peer_endpoint: Option<Endpoint>,
92    local_endpoint: Option<Endpoint>,
93}
94
95impl<C> FramedReader<C>
96where
97    C: Codec<ByteBuffer> + Send + Sync,
98    C::Message: Send + Sync,
99    C::Error: From<io::Error> + Into<crate::Error> + Send + Sync,
100{
101    /// Remote peer address.
102    pub fn peer_endpoint(&self) -> Option<Endpoint> {
103        self.peer_endpoint.clone()
104    }
105
106    /// Local address.
107    pub fn local_endpoint(&self) -> Option<Endpoint> {
108        self.local_endpoint.clone()
109    }
110
111    /// Receive one complete message.
112    pub async fn recv_msg(&mut self) -> Result<C::Message> {
113        // Try decoding from buffered data first.
114        if let Some((n, item)) = self.decoder.decode(&mut self.buffer).map_err(Into::into)? {
115            self.buffer.advance(n);
116            return Ok(item);
117        }
118
119        loop {
120            let mut buf = vec![0u8; self.read_chunk_size];
121            let n = self.inner.read(&mut buf).await?;
122
123            if n == 0 {
124                if self.buffer.is_empty() {
125                    return Err(io::Error::from(ErrorKind::ConnectionAborted).into());
126                } else {
127                    return Err(io::Error::new(
128                        ErrorKind::UnexpectedEof,
129                        "bytes remaining in buffer",
130                    )
131                    .into());
132                }
133            }
134
135            self.buffer.extend_from_slice(&buf[..n]);
136
137            if let Some((cn, item)) = self.decoder.decode(&mut self.buffer).map_err(Into::into)? {
138                self.buffer.advance(cn);
139                return Ok(item);
140            }
141        }
142    }
143}
144
145impl<C> MessageRx for FramedReader<C>
146where
147    C: Codec<ByteBuffer> + Send + Sync,
148    C::Message: Send + Sync,
149    C::Error: From<io::Error> + Into<crate::Error> + Send + Sync,
150{
151    type Message = C::Message;
152
153    fn recv_msg(&mut self) -> impl std::future::Future<Output = Result<Self::Message>> + Send {
154        FramedReader::recv_msg(self)
155    }
156
157    fn peer_endpoint(&self) -> Option<Endpoint> {
158        FramedReader::peer_endpoint(self)
159    }
160}
161
162/// Write half of a framed connection.
163pub struct FramedWriter<C> {
164    inner: WriteHalf<Box<dyn ByteStream>>,
165    encoder: C,
166    buffer: ByteBuffer,
167    peer_endpoint: Option<Endpoint>,
168    local_endpoint: Option<Endpoint>,
169}
170
171impl<C> FramedWriter<C>
172where
173    C: Codec<ByteBuffer> + Send + Sync,
174    C::Message: Send + Sync,
175    C::Error: From<io::Error> + Into<crate::Error> + Send + Sync,
176{
177    /// Remote peer address.
178    pub fn peer_endpoint(&self) -> Option<Endpoint> {
179        self.peer_endpoint.clone()
180    }
181
182    /// Local address.
183    pub fn local_endpoint(&self) -> Option<Endpoint> {
184        self.local_endpoint.clone()
185    }
186
187    /// Send one complete message.
188    pub async fn send_msg(&mut self, msg: C::Message) -> Result<()> {
189        self.encoder
190            .encode(&msg, &mut self.buffer)
191            .map_err(Into::into)?;
192
193        while !self.buffer.is_empty() {
194            let n = self.inner.write(self.buffer.as_ref()).await?;
195            if n == 0 {
196                return Err(io::Error::from(ErrorKind::UnexpectedEof).into());
197            }
198            self.buffer.advance(n);
199        }
200
201        self.inner.flush().await?;
202        Ok(())
203    }
204}
205
206impl<C> MessageTx for FramedWriter<C>
207where
208    C: Codec<ByteBuffer> + Send + Sync,
209    C::Message: Send + Sync,
210    C::Error: From<io::Error> + Into<crate::Error> + Send + Sync,
211{
212    type Message = C::Message;
213
214    fn send_msg(
215        &mut self,
216        msg: Self::Message,
217    ) -> impl std::future::Future<Output = Result<()>> + Send {
218        FramedWriter::send_msg(self, msg)
219    }
220
221    fn peer_endpoint(&self) -> Option<Endpoint> {
222        FramedWriter::peer_endpoint(self)
223    }
224}