karyon_net/transports/
unix.rs1use std::{
2 io,
3 path::PathBuf,
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use karyon_core::async_runtime::{
9 io::{AsyncRead, AsyncWrite},
10 net::{UnixListener as AsyncUnixListener, UnixStream},
11};
12
13#[cfg(feature = "tokio")]
14use tokio::io::ReadBuf;
15
16use crate::{ByteStream, Endpoint, Error, Result};
17
18pub struct UnixByteStream {
20 inner: UnixStream,
21 peer_endpoint: Option<Endpoint>,
22 local_endpoint: Option<Endpoint>,
23}
24
25impl ByteStream for UnixByteStream {
26 fn peer_endpoint(&self) -> Option<Endpoint> {
27 self.peer_endpoint.clone()
28 }
29 fn local_endpoint(&self) -> Option<Endpoint> {
30 self.local_endpoint.clone()
31 }
32}
33
34pub async fn connect(endpoint: &Endpoint) -> Result<Box<dyn ByteStream>> {
36 let path: PathBuf = endpoint.clone().try_into()?;
37 let stream = UnixStream::connect(&path).await?;
38 let peer = stream
39 .peer_addr()
40 .ok()
41 .and_then(|a| a.as_pathname().map(Endpoint::new_unix_addr));
42 let local = stream
43 .local_addr()
44 .ok()
45 .and_then(|a| a.as_pathname().map(Endpoint::new_unix_addr));
46 Ok(Box::new(UnixByteStream {
47 inner: stream,
48 peer_endpoint: peer,
49 local_endpoint: local,
50 }))
51}
52
53pub struct UnixListener {
55 inner: AsyncUnixListener,
56}
57
58impl UnixListener {
59 pub fn bind(endpoint: &Endpoint) -> Result<Self> {
61 let path: PathBuf = endpoint.clone().try_into()?;
62 let inner = AsyncUnixListener::bind(path)?;
63 Ok(Self { inner })
64 }
65
66 pub async fn accept(&self) -> Result<Box<dyn ByteStream>> {
68 let (stream, _) = self.inner.accept().await?;
69 let peer = stream
70 .peer_addr()
71 .ok()
72 .and_then(|a| a.as_pathname().map(Endpoint::new_unix_addr));
73 let local = stream
74 .local_addr()
75 .ok()
76 .and_then(|a| a.as_pathname().map(Endpoint::new_unix_addr));
77 Ok(Box::new(UnixByteStream {
78 inner: stream,
79 peer_endpoint: peer,
80 local_endpoint: local,
81 }))
82 }
83
84 pub fn local_endpoint(&self) -> Result<Endpoint> {
86 let addr = self.inner.local_addr()?;
87 addr.as_pathname()
88 .map(Endpoint::new_unix_addr)
89 .ok_or_else(|| {
90 Error::IO(io::Error::new(
91 io::ErrorKind::AddrNotAvailable,
92 "unnamed unix socket",
93 ))
94 })
95 }
96}
97
98#[cfg(feature = "smol")]
101impl AsyncRead for UnixByteStream {
102 fn poll_read(
103 mut self: Pin<&mut Self>,
104 cx: &mut Context<'_>,
105 buf: &mut [u8],
106 ) -> Poll<io::Result<usize>> {
107 Pin::new(&mut self.inner).poll_read(cx, buf)
108 }
109}
110
111#[cfg(feature = "tokio")]
112impl AsyncRead for UnixByteStream {
113 fn poll_read(
114 mut self: Pin<&mut Self>,
115 cx: &mut Context<'_>,
116 buf: &mut ReadBuf<'_>,
117 ) -> Poll<io::Result<()>> {
118 Pin::new(&mut self.inner).poll_read(cx, buf)
119 }
120}
121
122#[cfg(feature = "smol")]
123impl AsyncWrite for UnixByteStream {
124 fn poll_write(
125 mut self: Pin<&mut Self>,
126 cx: &mut Context<'_>,
127 buf: &[u8],
128 ) -> Poll<io::Result<usize>> {
129 Pin::new(&mut self.inner).poll_write(cx, buf)
130 }
131
132 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
133 Pin::new(&mut self.inner).poll_flush(cx)
134 }
135
136 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
137 Pin::new(&mut self.inner).poll_close(cx)
138 }
139}
140
141#[cfg(feature = "tokio")]
142impl AsyncWrite for UnixByteStream {
143 fn poll_write(
144 mut self: Pin<&mut Self>,
145 cx: &mut Context<'_>,
146 buf: &[u8],
147 ) -> Poll<io::Result<usize>> {
148 Pin::new(&mut self.inner).poll_write(cx, buf)
149 }
150
151 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
152 Pin::new(&mut self.inner).poll_flush(cx)
153 }
154
155 fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
156 Pin::new(&mut self.inner).poll_shutdown(cx)
157 }
158}