Skip to main content

karyon_jsonrpc/client/http/
mod.rs

1//! HTTP client backend: HTTP/1 (smol), HTTP/1+2 (tokio), and HTTP/3 (QUIC).
2
3#[cfg(feature = "smol")]
4mod h1;
5#[cfg(feature = "tokio")]
6mod h2;
7#[cfg(feature = "http3")]
8mod h3;
9
10use std::sync::Arc;
11use std::time::Duration;
12
13#[cfg(feature = "tokio")]
14use bytes::Bytes;
15use http_body_util::BodyExt;
16#[cfg(feature = "tokio")]
17use http_body_util::Full;
18use hyper::StatusCode;
19use log::info;
20use serde::{de::DeserializeOwned, Serialize};
21use serde_json::json;
22
23#[cfg(feature = "tokio")]
24use hyper_util::client::legacy::Client as HyperClient;
25
26#[cfg(feature = "tokio")]
27use crate::hyper_exec::HyperExecutor;
28
29use karyon_core::{
30    async_util::{timeout, TaskGroup},
31    util::random_32,
32};
33
34#[cfg(feature = "http3")]
35use karyon_core::async_util::TaskResult;
36
37use karyon_net::Endpoint;
38
39use crate::{
40    client::{Client, ClientBackend, ClientConfig, RequestID, WsCodec},
41    codec::JsonRpcCodec,
42    error::{Error, Result},
43    message::{self, SubscriptionID},
44};
45
46#[cfg(feature = "http3")]
47use crate::client::subscriptions::{Subscription, Subscriptions};
48
49/// HTTP client backend for JSON-RPC.
50pub(crate) struct HttpClientBackend {
51    inner: HttpTransport,
52    /// Shared with the Client so driver tasks (h1 per-request, h3
53    /// connection) get cancelled when the Client stops.
54    task_group: Arc<TaskGroup>,
55}
56
57// smol uses HTTP/1.1 only via `hyper::client::conn::http1`, which
58// is the smallest hyper API that compiles against `smol-hyper`.
59// tokio gets HTTP/1.1 + HTTP/2 with connection pooling for free
60// through `hyper-util`'s legacy `Client`. The hyper client is boxed
61// because it is much larger than the other variants.
62enum HttpTransport {
63    #[cfg(feature = "smol")]
64    Smol { endpoint: Endpoint },
65    #[cfg(feature = "tokio")]
66    Tokio {
67        endpoint: Endpoint,
68        client: Box<HyperClient<hyper_util::client::legacy::connect::HttpConnector, Full<Bytes>>>,
69    },
70    #[cfg(feature = "http3")]
71    H3 { send_request: h3::H3SendRequest },
72}
73
74impl HttpClientBackend {
75    /// Create an HTTP/1.1 + HTTP/2 client.
76    pub(crate) fn new(endpoint: &Endpoint, task_group: Arc<TaskGroup>) -> Result<Self> {
77        #[cfg(feature = "smol")]
78        let inner = HttpTransport::Smol {
79            endpoint: endpoint.clone(),
80        };
81
82        #[cfg(feature = "tokio")]
83        let inner = HttpTransport::Tokio {
84            endpoint: endpoint.clone(),
85            client: Box::new(
86                HyperClient::builder(HyperExecutor::new(task_group.clone())).build_http(),
87            ),
88        };
89
90        Ok(Self { inner, task_group })
91    }
92
93    /// Create an HTTP/3 client (QUIC). The driver task is spawned via
94    /// `task_group` so it gets cancelled when the Client stops.
95    #[cfg(feature = "http3")]
96    pub(crate) async fn new_h3(
97        endpoint: &Endpoint,
98        quic_config: &karyon_net::quic::ClientQuicConfig,
99        task_group: Arc<TaskGroup>,
100    ) -> Result<Self> {
101        let send_request = h3::connect(endpoint, quic_config, task_group.clone()).await?;
102        Ok(Self {
103            inner: HttpTransport::H3 { send_request },
104            task_group,
105        })
106    }
107
108    /// Send a JSON-RPC request and return the response.
109    pub(crate) async fn send_request(&self, msg: serde_json::Value) -> Result<message::Response> {
110        let _ = &self.task_group;
111        match &self.inner {
112            #[cfg(feature = "smol")]
113            HttpTransport::Smol { endpoint } => h1::send(endpoint, msg, &self.task_group).await,
114            #[cfg(feature = "tokio")]
115            HttpTransport::Tokio { endpoint, client } => h2::send(client, endpoint, msg).await,
116            #[cfg(feature = "http3")]
117            HttpTransport::H3 { send_request } => h3::send(send_request.clone(), msg).await,
118        }
119    }
120
121    /// Subscribe over HTTP/3. Returns the initial response + recv stream.
122    #[cfg(feature = "http3")]
123    pub(crate) async fn subscribe_h3(
124        &self,
125        msg: serde_json::Value,
126    ) -> Result<(message::Response, h3::H3RecvStream)> {
127        let HttpTransport::H3 { send_request } = &self.inner else {
128            return Err(Error::UnsupportedProtocol(
129                "subscribe_h3 requires HTTP/3".into(),
130            ));
131        };
132        h3::subscribe(send_request.clone(), msg).await
133    }
134}
135
136/// Shared HTTP response parser (used by h1 and h2).
137pub(super) async fn parse_response<B>(response: hyper::Response<B>) -> Result<message::Response>
138where
139    B: hyper::body::Body,
140    B::Error: std::fmt::Display,
141{
142    if response.status() != StatusCode::OK {
143        return Err(Error::HttpError(format!(
144            "HTTP error: {}",
145            response.status()
146        )));
147    }
148
149    let body = response
150        .collect()
151        .await
152        .map_err(|e| Error::HttpError(e.to_string()))?
153        .to_bytes();
154
155    let rpc_response: message::Response = serde_json::from_slice(&body)?;
156    Ok(rpc_response)
157}
158
159/// Build the HTTP backend, optionally with HTTP/3 support.
160pub(super) async fn build_backend(
161    config: &ClientConfig,
162    task_group: Arc<TaskGroup>,
163) -> Result<ClientBackend> {
164    #[cfg(feature = "http3")]
165    let http_backend = match config.quic_config.as_ref() {
166        Some(qc) => HttpClientBackend::new_h3(&config.endpoint, qc, task_group).await?,
167        None => HttpClientBackend::new(&config.endpoint, task_group)?,
168    };
169    #[cfg(not(feature = "http3"))]
170    let http_backend = HttpClientBackend::new(&config.endpoint, task_group)?;
171
172    info!("HTTP client configured for endpoint: {}", config.endpoint);
173
174    Ok(ClientBackend::Http {
175        http_backend: Box::new(http_backend),
176        #[cfg(feature = "http3")]
177        subscriptions: Subscriptions::new(config.subscription_buffer_size),
178    })
179}
180
181// -- Client impl (HTTP-specific) --
182
183impl<B, W> Client<B, W>
184where
185    B: JsonRpcCodec,
186    W: WsCodec,
187{
188    pub(super) async fn http_call<T: Serialize + DeserializeOwned>(
189        &self,
190        http_backend: &HttpClientBackend,
191        method: &str,
192        params: T,
193    ) -> Result<message::Response> {
194        let id: RequestID = random_32();
195        let request = message::Request {
196            jsonrpc: message::JSONRPC_VERSION.to_string(),
197            id: json!(id),
198            method: method.to_string(),
199            params: Some(json!(params)),
200        };
201        let msg = serde_json::to_value(request)?;
202
203        let response = match self.config.timeout {
204            Some(t) => timeout(Duration::from_millis(t), http_backend.send_request(msg)).await?,
205            None => http_backend.send_request(msg).await,
206        }?;
207
208        Ok(response)
209    }
210
211    /// Subscribe over HTTP/3.
212    #[cfg(feature = "http3")]
213    pub(super) async fn h3_subscribe<T: Serialize + DeserializeOwned>(
214        self: &Arc<Self>,
215        http_backend: &HttpClientBackend,
216        method: &str,
217        params: T,
218    ) -> Result<Arc<Subscription>> {
219        let ClientBackend::Http { subscriptions, .. } = &self.backend else {
220            return Err(Error::InvalidState("not in HTTP mode".into()));
221        };
222
223        let id: RequestID = random_32();
224        let request = message::Request {
225            jsonrpc: message::JSONRPC_VERSION.to_string(),
226            id: json!(id),
227            method: method.to_string(),
228            params: Some(json!(params)),
229        };
230        let msg = serde_json::to_value(request)?;
231
232        let (response, recv_stream) = http_backend.subscribe_h3(msg).await?;
233
234        if let Some(error) = response.error {
235            return Err(Error::SubscribeError(error.code, error.message));
236        }
237
238        let sub_id = match response.result {
239            Some(result) => serde_json::from_value::<SubscriptionID>(result)?,
240            None => return Err(Error::InvalidMsg("Invalid subscription id".into())),
241        };
242
243        let sub = subscriptions.subscribe(sub_id).await;
244
245        self.task_group.spawn(
246            h3::notification_reader_task(recv_stream, sub.clone()),
247            |res: TaskResult<Result<()>>| async move {
248                log::debug!("H3 notification reader task ended: {res}");
249            },
250        );
251
252        Ok(sub)
253    }
254
255    /// Unsubscribe over HTTP/3.
256    #[cfg(feature = "http3")]
257    pub(super) async fn h3_unsubscribe(
258        &self,
259        http_backend: &HttpClientBackend,
260        method: &str,
261        sub_id: SubscriptionID,
262    ) -> Result<()> {
263        let ClientBackend::Http { subscriptions, .. } = &self.backend else {
264            return Err(Error::InvalidState("not in HTTP mode".into()));
265        };
266
267        let response = self.http_call(http_backend, method, sub_id).await?;
268        if let Some(error) = response.error {
269            return Err(Error::SubscribeError(error.code, error.message));
270        }
271        subscriptions.unsubscribe(&sub_id).await;
272        Ok(())
273    }
274}