Skip to main content

karyon_net/transports/
unix.rs

1use std::{
2    io,
3    path::PathBuf,
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8use karyon_core::async_runtime::{
9    io::{AsyncRead, AsyncWrite},
10    net::{UnixListener as AsyncUnixListener, UnixStream},
11};
12
13#[cfg(feature = "tokio")]
14use tokio::io::ReadBuf;
15
16use crate::{ByteStream, Endpoint, Error, Result};
17
18/// Unix stream implementing `ByteStream`.
19pub struct UnixByteStream {
20    inner: UnixStream,
21    peer_endpoint: Option<Endpoint>,
22    local_endpoint: Option<Endpoint>,
23}
24
25impl ByteStream for UnixByteStream {
26    fn peer_endpoint(&self) -> Option<Endpoint> {
27        self.peer_endpoint.clone()
28    }
29    fn local_endpoint(&self) -> Option<Endpoint> {
30        self.local_endpoint.clone()
31    }
32}
33
34/// Connect to a Unix socket.
35pub async fn connect(endpoint: &Endpoint) -> Result<Box<dyn ByteStream>> {
36    let path: PathBuf = endpoint.clone().try_into()?;
37    let stream = UnixStream::connect(&path).await?;
38    let peer = stream
39        .peer_addr()
40        .ok()
41        .and_then(|a| a.as_pathname().map(Endpoint::new_unix_addr));
42    let local = stream
43        .local_addr()
44        .ok()
45        .and_then(|a| a.as_pathname().map(Endpoint::new_unix_addr));
46    Ok(Box::new(UnixByteStream {
47        inner: stream,
48        peer_endpoint: peer,
49        local_endpoint: local,
50    }))
51}
52
53/// Unix socket listener.
54pub struct UnixListener {
55    inner: AsyncUnixListener,
56}
57
58impl UnixListener {
59    /// Bind to a Unix socket path.
60    pub fn bind(endpoint: &Endpoint) -> Result<Self> {
61        let path: PathBuf = endpoint.clone().try_into()?;
62        let inner = AsyncUnixListener::bind(path)?;
63        Ok(Self { inner })
64    }
65
66    /// Accept a new connection.
67    pub async fn accept(&self) -> Result<Box<dyn ByteStream>> {
68        let (stream, _) = self.inner.accept().await?;
69        let peer = stream
70            .peer_addr()
71            .ok()
72            .and_then(|a| a.as_pathname().map(Endpoint::new_unix_addr));
73        let local = stream
74            .local_addr()
75            .ok()
76            .and_then(|a| a.as_pathname().map(Endpoint::new_unix_addr));
77        Ok(Box::new(UnixByteStream {
78            inner: stream,
79            peer_endpoint: peer,
80            local_endpoint: local,
81        }))
82    }
83
84    /// Local endpoint.
85    pub fn local_endpoint(&self) -> Result<Endpoint> {
86        let addr = self.inner.local_addr()?;
87        addr.as_pathname()
88            .map(Endpoint::new_unix_addr)
89            .ok_or_else(|| {
90                Error::IO(io::Error::new(
91                    io::ErrorKind::AddrNotAvailable,
92                    "unnamed unix socket",
93                ))
94            })
95    }
96}
97
98// -- AsyncRead / AsyncWrite delegation --
99
100#[cfg(feature = "smol")]
101impl AsyncRead for UnixByteStream {
102    fn poll_read(
103        mut self: Pin<&mut Self>,
104        cx: &mut Context<'_>,
105        buf: &mut [u8],
106    ) -> Poll<io::Result<usize>> {
107        Pin::new(&mut self.inner).poll_read(cx, buf)
108    }
109}
110
111#[cfg(feature = "tokio")]
112impl AsyncRead for UnixByteStream {
113    fn poll_read(
114        mut self: Pin<&mut Self>,
115        cx: &mut Context<'_>,
116        buf: &mut ReadBuf<'_>,
117    ) -> Poll<io::Result<()>> {
118        Pin::new(&mut self.inner).poll_read(cx, buf)
119    }
120}
121
122#[cfg(feature = "smol")]
123impl AsyncWrite for UnixByteStream {
124    fn poll_write(
125        mut self: Pin<&mut Self>,
126        cx: &mut Context<'_>,
127        buf: &[u8],
128    ) -> Poll<io::Result<usize>> {
129        Pin::new(&mut self.inner).poll_write(cx, buf)
130    }
131
132    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
133        Pin::new(&mut self.inner).poll_flush(cx)
134    }
135
136    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
137        Pin::new(&mut self.inner).poll_close(cx)
138    }
139}
140
141#[cfg(feature = "tokio")]
142impl AsyncWrite for UnixByteStream {
143    fn poll_write(
144        mut self: Pin<&mut Self>,
145        cx: &mut Context<'_>,
146        buf: &[u8],
147    ) -> Poll<io::Result<usize>> {
148        Pin::new(&mut self.inner).poll_write(cx, buf)
149    }
150
151    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
152        Pin::new(&mut self.inner).poll_flush(cx)
153    }
154
155    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
156        Pin::new(&mut self.inner).poll_shutdown(cx)
157    }
158}