karyon_jsonrpc/client/http/
h3.rs1use std::net::SocketAddr;
4use std::sync::Arc;
5
6use bytes::{Buf, Bytes};
7use hyper::{Request, StatusCode};
8use log::debug;
9
10use karyon_core::async_util::{TaskGroup, TaskResult};
11use karyon_net::Endpoint;
12
13use crate::{
14 client::subscriptions::Subscription,
15 error::{Error, Result},
16 message,
17};
18
19pub(super) type H3SendRequest = h3::client::SendRequest<h3_quinn::OpenStreams, Bytes>;
20pub(super) type H3RecvStream = h3::client::RequestStream<h3_quinn::RecvStream, Bytes>;
21
22pub(super) async fn connect(
25 endpoint: &Endpoint,
26 quic_config: &karyon_net::quic::ClientQuicConfig,
27 task_group: Arc<TaskGroup>,
28) -> Result<H3SendRequest> {
29 let quic_ep = Endpoint::new_quic_addr(SocketAddr::try_from(endpoint.clone())?);
30 let quic_conn = karyon_net::quic::QuicEndpoint::dial(&quic_ep, quic_config.clone()).await?;
31
32 let h3_conn = h3_quinn::Connection::new(quic_conn.inner().clone());
33 let (driver, send_request) = h3::client::new(h3_conn)
34 .await
35 .map_err(|e| Error::HttpError(format!("H3 handshake: {e}")))?;
36
37 task_group.spawn(driver_task(driver), |_: TaskResult<()>| async {});
42
43 Ok(send_request)
44}
45
46async fn driver_task(mut driver: h3::client::Connection<h3_quinn::Connection, Bytes>) {
47 let closed = futures_util::future::poll_fn(|cx| driver.poll_close(cx)).await;
48 debug!("H3 client driver closed: {closed}");
49}
50
51pub(super) async fn send(
52 mut send_request: H3SendRequest,
53 msg: serde_json::Value,
54) -> Result<message::Response> {
55 let body = serde_json::to_vec(&msg)?;
56
57 let req = Request::post("https://localhost/")
61 .header("Content-Type", "application/json")
62 .body(())
63 .map_err(|e| Error::HttpError(e.to_string()))?;
64
65 let mut stream = send_request
66 .send_request(req)
67 .await
68 .map_err(|e| Error::HttpError(format!("H3 send: {e}")))?;
69
70 stream
71 .send_data(Bytes::from(body))
72 .await
73 .map_err(|e| Error::HttpError(format!("H3 data: {e}")))?;
74
75 stream
76 .finish()
77 .await
78 .map_err(|e| Error::HttpError(format!("H3 finish: {e}")))?;
79
80 let resp = stream
81 .recv_response()
82 .await
83 .map_err(|e| Error::HttpError(format!("H3 recv: {e}")))?;
84
85 if resp.status() != StatusCode::OK {
86 return Err(Error::HttpError(format!("HTTP/3 error: {}", resp.status())));
87 }
88
89 let mut resp_body = Vec::new();
90 while let Some(chunk) = stream
91 .recv_data()
92 .await
93 .map_err(|e| Error::HttpError(format!("H3 data: {e}")))?
94 {
95 resp_body.extend_from_slice(Buf::chunk(&chunk));
96 }
97
98 let rpc_response: message::Response = serde_json::from_slice(&resp_body)?;
99 Ok(rpc_response)
100}
101
102pub(super) async fn subscribe(
105 mut send_request: H3SendRequest,
106 msg: serde_json::Value,
107) -> Result<(message::Response, H3RecvStream)> {
108 let body = serde_json::to_vec(&msg)?;
109
110 let req = Request::post("https://localhost/")
111 .header("Content-Type", "application/json")
112 .body(())
113 .map_err(|e| Error::HttpError(e.to_string()))?;
114
115 let mut stream = send_request
116 .send_request(req)
117 .await
118 .map_err(|e| Error::HttpError(format!("H3 send: {e}")))?;
119
120 stream
121 .send_data(Bytes::from(body))
122 .await
123 .map_err(|e| Error::HttpError(format!("H3 data: {e}")))?;
124
125 stream
126 .finish()
127 .await
128 .map_err(|e| Error::HttpError(format!("H3 finish: {e}")))?;
129
130 let resp = stream
131 .recv_response()
132 .await
133 .map_err(|e| Error::HttpError(format!("H3 recv: {e}")))?;
134
135 if resp.status() != StatusCode::OK {
136 return Err(Error::HttpError(format!("HTTP/3 error: {}", resp.status())));
137 }
138
139 let first = stream
140 .recv_data()
141 .await
142 .map_err(|e| Error::HttpError(format!("H3 data: {e}")))?
143 .ok_or_else(|| Error::HttpError("H3 stream closed before response".into()))?;
144
145 let rpc_response: message::Response = serde_json::from_slice(Buf::chunk(&first))?;
146 let recv_stream = stream.split().1;
147 Ok((rpc_response, recv_stream))
148}
149
150pub(super) async fn notification_reader_task(
151 recv: H3RecvStream,
152 sub: Arc<Subscription>,
153) -> Result<()> {
154 read_notifications(recv, &sub).await
155}
156
157async fn read_notifications(mut recv: H3RecvStream, sub: &Subscription) -> Result<()> {
158 while let Some(chunk) = recv
159 .recv_data()
160 .await
161 .map_err(|e| Error::HttpError(format!("H3 notification: {e}")))?
162 {
163 let nt: message::Notification = match serde_json::from_slice(Buf::chunk(&chunk)) {
164 Ok(v) => v,
165 Err(_) => continue,
166 };
167 debug!("<-- {nt}");
168 let nt_res: message::NotificationResult = match nt.params {
169 Some(ref p) => match serde_json::from_value(p.clone()) {
170 Ok(r) => r,
171 Err(_) => continue,
172 },
173 None => continue,
174 };
175 let val = nt_res.result.unwrap_or(serde_json::json!(""));
176 if sub.notify(val).await.is_err() {
177 break;
178 }
179 }
180 Ok(())
181}