Skip to main content

karyon_jsonrpc/client/http/
h3.rs

1//! HTTP/3 client over QUIC via h3 / h3-quinn.
2
3use std::net::SocketAddr;
4use std::sync::Arc;
5
6use bytes::{Buf, Bytes};
7use hyper::{Request, StatusCode};
8use log::debug;
9
10use karyon_core::async_util::{TaskGroup, TaskResult};
11use karyon_net::Endpoint;
12
13use crate::{
14    client::subscriptions::Subscription,
15    error::{Error, Result},
16    message,
17};
18
19pub(super) type H3SendRequest = h3::client::SendRequest<h3_quinn::OpenStreams, Bytes>;
20pub(super) type H3RecvStream = h3::client::RequestStream<h3_quinn::RecvStream, Bytes>;
21
22/// Connect to an HTTP/3 server and return a request sender. A
23/// background task drives the connection until close.
24pub(super) async fn connect(
25    endpoint: &Endpoint,
26    quic_config: &karyon_net::quic::ClientQuicConfig,
27    task_group: Arc<TaskGroup>,
28) -> Result<H3SendRequest> {
29    let quic_ep = Endpoint::new_quic_addr(SocketAddr::try_from(endpoint.clone())?);
30    let quic_conn = karyon_net::quic::QuicEndpoint::dial(&quic_ep, quic_config.clone()).await?;
31
32    let h3_conn = h3_quinn::Connection::new(quic_conn.inner().clone());
33    let (driver, send_request) = h3::client::new(h3_conn)
34        .await
35        .map_err(|e| Error::HttpError(format!("H3 handshake: {e}")))?;
36
37    // h3 splits I/O from the request API: `driver` polls the QUIC
38    // connection in the background; without it `send_request` would
39    // never make progress. Spawn via the Client task_group so it
40    // gets cancelled on stop.
41    task_group.spawn(driver_task(driver), |_: TaskResult<()>| async {});
42
43    Ok(send_request)
44}
45
46async fn driver_task(mut driver: h3::client::Connection<h3_quinn::Connection, Bytes>) {
47    let closed = futures_util::future::poll_fn(|cx| driver.poll_close(cx)).await;
48    debug!("H3 client driver closed: {closed}");
49}
50
51pub(super) async fn send(
52    mut send_request: H3SendRequest,
53    msg: serde_json::Value,
54) -> Result<message::Response> {
55    let body = serde_json::to_vec(&msg)?;
56
57    // The QUIC connection is already established, so the URI's scheme
58    // and authority are only filled in to satisfy `Request::post`'s
59    // parser; the server only routes on `:path`.
60    let req = Request::post("https://localhost/")
61        .header("Content-Type", "application/json")
62        .body(())
63        .map_err(|e| Error::HttpError(e.to_string()))?;
64
65    let mut stream = send_request
66        .send_request(req)
67        .await
68        .map_err(|e| Error::HttpError(format!("H3 send: {e}")))?;
69
70    stream
71        .send_data(Bytes::from(body))
72        .await
73        .map_err(|e| Error::HttpError(format!("H3 data: {e}")))?;
74
75    stream
76        .finish()
77        .await
78        .map_err(|e| Error::HttpError(format!("H3 finish: {e}")))?;
79
80    let resp = stream
81        .recv_response()
82        .await
83        .map_err(|e| Error::HttpError(format!("H3 recv: {e}")))?;
84
85    if resp.status() != StatusCode::OK {
86        return Err(Error::HttpError(format!("HTTP/3 error: {}", resp.status())));
87    }
88
89    let mut resp_body = Vec::new();
90    while let Some(chunk) = stream
91        .recv_data()
92        .await
93        .map_err(|e| Error::HttpError(format!("H3 data: {e}")))?
94    {
95        resp_body.extend_from_slice(Buf::chunk(&chunk));
96    }
97
98    let rpc_response: message::Response = serde_json::from_slice(&resp_body)?;
99    Ok(rpc_response)
100}
101
102/// Subscribe over HTTP/3: send the request, read the initial response,
103/// and return the recv half for streaming notifications.
104pub(super) async fn subscribe(
105    mut send_request: H3SendRequest,
106    msg: serde_json::Value,
107) -> Result<(message::Response, H3RecvStream)> {
108    let body = serde_json::to_vec(&msg)?;
109
110    let req = Request::post("https://localhost/")
111        .header("Content-Type", "application/json")
112        .body(())
113        .map_err(|e| Error::HttpError(e.to_string()))?;
114
115    let mut stream = send_request
116        .send_request(req)
117        .await
118        .map_err(|e| Error::HttpError(format!("H3 send: {e}")))?;
119
120    stream
121        .send_data(Bytes::from(body))
122        .await
123        .map_err(|e| Error::HttpError(format!("H3 data: {e}")))?;
124
125    stream
126        .finish()
127        .await
128        .map_err(|e| Error::HttpError(format!("H3 finish: {e}")))?;
129
130    let resp = stream
131        .recv_response()
132        .await
133        .map_err(|e| Error::HttpError(format!("H3 recv: {e}")))?;
134
135    if resp.status() != StatusCode::OK {
136        return Err(Error::HttpError(format!("HTTP/3 error: {}", resp.status())));
137    }
138
139    let first = stream
140        .recv_data()
141        .await
142        .map_err(|e| Error::HttpError(format!("H3 data: {e}")))?
143        .ok_or_else(|| Error::HttpError("H3 stream closed before response".into()))?;
144
145    let rpc_response: message::Response = serde_json::from_slice(Buf::chunk(&first))?;
146    let recv_stream = stream.split().1;
147    Ok((rpc_response, recv_stream))
148}
149
150pub(super) async fn notification_reader_task(
151    recv: H3RecvStream,
152    sub: Arc<Subscription>,
153) -> Result<()> {
154    read_notifications(recv, &sub).await
155}
156
157async fn read_notifications(mut recv: H3RecvStream, sub: &Subscription) -> Result<()> {
158    while let Some(chunk) = recv
159        .recv_data()
160        .await
161        .map_err(|e| Error::HttpError(format!("H3 notification: {e}")))?
162    {
163        let nt: message::Notification = match serde_json::from_slice(Buf::chunk(&chunk)) {
164            Ok(v) => v,
165            Err(_) => continue,
166        };
167        debug!("<-- {nt}");
168        let nt_res: message::NotificationResult = match nt.params {
169            Some(ref p) => match serde_json::from_value(p.clone()) {
170                Ok(r) => r,
171                Err(_) => continue,
172            },
173            None => continue,
174        };
175        let val = nt_res.result.unwrap_or(serde_json::json!(""));
176        if sub.notify(val).await.is_err() {
177            break;
178        }
179    }
180    Ok(())
181}