karyon_jsonrpc/client/http/
mod.rs1#[cfg(feature = "smol")]
4mod h1;
5#[cfg(feature = "tokio")]
6mod h2;
7#[cfg(feature = "http3")]
8mod h3;
9
10use std::sync::Arc;
11use std::time::Duration;
12
13#[cfg(feature = "tokio")]
14use bytes::Bytes;
15use http_body_util::BodyExt;
16#[cfg(feature = "tokio")]
17use http_body_util::Full;
18use hyper::StatusCode;
19use log::info;
20use serde::{de::DeserializeOwned, Serialize};
21use serde_json::json;
22
23#[cfg(feature = "tokio")]
24use hyper_util::client::legacy::Client as HyperClient;
25
26#[cfg(feature = "tokio")]
27use crate::hyper_exec::HyperExecutor;
28
29use karyon_core::{
30 async_util::{timeout, TaskGroup},
31 util::random_32,
32};
33
34#[cfg(feature = "http3")]
35use karyon_core::async_util::TaskResult;
36
37use karyon_net::Endpoint;
38
39use crate::{
40 client::{Client, ClientBackend, ClientConfig, RequestID, WsCodec},
41 codec::JsonRpcCodec,
42 error::{Error, Result},
43 message::{self, SubscriptionID},
44};
45
46#[cfg(feature = "http3")]
47use crate::client::subscriptions::{Subscription, Subscriptions};
48
49pub(crate) struct HttpClientBackend {
51 inner: HttpTransport,
52 task_group: Arc<TaskGroup>,
55}
56
57enum HttpTransport {
63 #[cfg(feature = "smol")]
64 Smol { endpoint: Endpoint },
65 #[cfg(feature = "tokio")]
66 Tokio {
67 endpoint: Endpoint,
68 client: Box<HyperClient<hyper_util::client::legacy::connect::HttpConnector, Full<Bytes>>>,
69 },
70 #[cfg(feature = "http3")]
71 H3 { send_request: h3::H3SendRequest },
72}
73
74impl HttpClientBackend {
75 pub(crate) fn new(endpoint: &Endpoint, task_group: Arc<TaskGroup>) -> Result<Self> {
77 #[cfg(feature = "smol")]
78 let inner = HttpTransport::Smol {
79 endpoint: endpoint.clone(),
80 };
81
82 #[cfg(feature = "tokio")]
83 let inner = HttpTransport::Tokio {
84 endpoint: endpoint.clone(),
85 client: Box::new(
86 HyperClient::builder(HyperExecutor::new(task_group.clone())).build_http(),
87 ),
88 };
89
90 Ok(Self { inner, task_group })
91 }
92
93 #[cfg(feature = "http3")]
96 pub(crate) async fn new_h3(
97 endpoint: &Endpoint,
98 quic_config: &karyon_net::quic::ClientQuicConfig,
99 task_group: Arc<TaskGroup>,
100 ) -> Result<Self> {
101 let send_request = h3::connect(endpoint, quic_config, task_group.clone()).await?;
102 Ok(Self {
103 inner: HttpTransport::H3 { send_request },
104 task_group,
105 })
106 }
107
108 pub(crate) async fn send_request(&self, msg: serde_json::Value) -> Result<message::Response> {
110 let _ = &self.task_group;
111 match &self.inner {
112 #[cfg(feature = "smol")]
113 HttpTransport::Smol { endpoint } => h1::send(endpoint, msg, &self.task_group).await,
114 #[cfg(feature = "tokio")]
115 HttpTransport::Tokio { endpoint, client } => h2::send(client, endpoint, msg).await,
116 #[cfg(feature = "http3")]
117 HttpTransport::H3 { send_request } => h3::send(send_request.clone(), msg).await,
118 }
119 }
120
121 #[cfg(feature = "http3")]
123 pub(crate) async fn subscribe_h3(
124 &self,
125 msg: serde_json::Value,
126 ) -> Result<(message::Response, h3::H3RecvStream)> {
127 let HttpTransport::H3 { send_request } = &self.inner else {
128 return Err(Error::UnsupportedProtocol(
129 "subscribe_h3 requires HTTP/3".into(),
130 ));
131 };
132 h3::subscribe(send_request.clone(), msg).await
133 }
134}
135
136pub(super) async fn parse_response<B>(response: hyper::Response<B>) -> Result<message::Response>
138where
139 B: hyper::body::Body,
140 B::Error: std::fmt::Display,
141{
142 if response.status() != StatusCode::OK {
143 return Err(Error::HttpError(format!(
144 "HTTP error: {}",
145 response.status()
146 )));
147 }
148
149 let body = response
150 .collect()
151 .await
152 .map_err(|e| Error::HttpError(e.to_string()))?
153 .to_bytes();
154
155 let rpc_response: message::Response = serde_json::from_slice(&body)?;
156 Ok(rpc_response)
157}
158
159pub(super) async fn build_backend(
161 config: &ClientConfig,
162 task_group: Arc<TaskGroup>,
163) -> Result<ClientBackend> {
164 #[cfg(feature = "http3")]
165 let http_backend = match config.quic_config.as_ref() {
166 Some(qc) => HttpClientBackend::new_h3(&config.endpoint, qc, task_group).await?,
167 None => HttpClientBackend::new(&config.endpoint, task_group)?,
168 };
169 #[cfg(not(feature = "http3"))]
170 let http_backend = HttpClientBackend::new(&config.endpoint, task_group)?;
171
172 info!("HTTP client configured for endpoint: {}", config.endpoint);
173
174 Ok(ClientBackend::Http {
175 http_backend: Box::new(http_backend),
176 #[cfg(feature = "http3")]
177 subscriptions: Subscriptions::new(config.subscription_buffer_size),
178 })
179}
180
181impl<B, W> Client<B, W>
184where
185 B: JsonRpcCodec,
186 W: WsCodec,
187{
188 pub(super) async fn http_call<T: Serialize + DeserializeOwned>(
189 &self,
190 http_backend: &HttpClientBackend,
191 method: &str,
192 params: T,
193 ) -> Result<message::Response> {
194 let id: RequestID = random_32();
195 let request = message::Request {
196 jsonrpc: message::JSONRPC_VERSION.to_string(),
197 id: json!(id),
198 method: method.to_string(),
199 params: Some(json!(params)),
200 };
201 let msg = serde_json::to_value(request)?;
202
203 let response = match self.config.timeout {
204 Some(t) => timeout(Duration::from_millis(t), http_backend.send_request(msg)).await?,
205 None => http_backend.send_request(msg).await,
206 }?;
207
208 Ok(response)
209 }
210
211 #[cfg(feature = "http3")]
213 pub(super) async fn h3_subscribe<T: Serialize + DeserializeOwned>(
214 self: &Arc<Self>,
215 http_backend: &HttpClientBackend,
216 method: &str,
217 params: T,
218 ) -> Result<Arc<Subscription>> {
219 let ClientBackend::Http { subscriptions, .. } = &self.backend else {
220 return Err(Error::InvalidState("not in HTTP mode".into()));
221 };
222
223 let id: RequestID = random_32();
224 let request = message::Request {
225 jsonrpc: message::JSONRPC_VERSION.to_string(),
226 id: json!(id),
227 method: method.to_string(),
228 params: Some(json!(params)),
229 };
230 let msg = serde_json::to_value(request)?;
231
232 let (response, recv_stream) = http_backend.subscribe_h3(msg).await?;
233
234 if let Some(error) = response.error {
235 return Err(Error::SubscribeError(error.code, error.message));
236 }
237
238 let sub_id = match response.result {
239 Some(result) => serde_json::from_value::<SubscriptionID>(result)?,
240 None => return Err(Error::InvalidMsg("Invalid subscription id".into())),
241 };
242
243 let sub = subscriptions.subscribe(sub_id).await;
244
245 self.task_group.spawn(
246 h3::notification_reader_task(recv_stream, sub.clone()),
247 |res: TaskResult<Result<()>>| async move {
248 log::debug!("H3 notification reader task ended: {res}");
249 },
250 );
251
252 Ok(sub)
253 }
254
255 #[cfg(feature = "http3")]
257 pub(super) async fn h3_unsubscribe(
258 &self,
259 http_backend: &HttpClientBackend,
260 method: &str,
261 sub_id: SubscriptionID,
262 ) -> Result<()> {
263 let ClientBackend::Http { subscriptions, .. } = &self.backend else {
264 return Err(Error::InvalidState("not in HTTP mode".into()));
265 };
266
267 let response = self.http_call(http_backend, method, sub_id).await?;
268 if let Some(error) = response.error {
269 return Err(Error::SubscribeError(error.code, error.message));
270 }
271 subscriptions.unsubscribe(&sub_id).await;
272 Ok(())
273 }
274}