Skip to main content

karyon_jsonrpc/client/http/
h1.rs

1//! HTTP/1.1 client over smol. No connection pooling: each request
2//! opens a fresh TCP connection.
3
4use std::net::SocketAddr;
5
6use bytes::Bytes;
7use http_body_util::Full;
8use hyper::Request;
9
10use karyon_core::{
11    async_runtime::net::TcpStream,
12    async_util::{TaskGroup, TaskResult},
13};
14use karyon_net::Endpoint;
15use smol_hyper::rt::FuturesIo;
16
17use crate::{
18    client::http::parse_response,
19    error::{Error, Result},
20    message,
21};
22
23pub(super) async fn send(
24    endpoint: &Endpoint,
25    msg: serde_json::Value,
26    task_group: &TaskGroup,
27) -> Result<message::Response> {
28    let body = serde_json::to_vec(&msg)?;
29
30    // Resolve the endpoint to a SocketAddr (handles both literal IPs
31    // and domain names via the system DNS).
32    let addr = SocketAddr::try_from(endpoint.clone())?;
33
34    let stream = TcpStream::connect(addr).await?;
35    let io = FuturesIo::new(stream);
36
37    let (mut sender, conn) = hyper::client::conn::http1::handshake(io)
38        .await
39        .map_err(|e| Error::HttpError(e.to_string()))?;
40
41    // hyper's HTTP/1.1 client splits I/O from request building: the
42    // returned `conn` future drives the wire protocol and must be
43    // polled concurrently, otherwise `sender` blocks forever. Spawn
44    // it via the Client task_group so it gets cancelled on stop.
45    task_group.spawn(driver_task(conn), |_: TaskResult<()>| async {});
46
47    let req = Request::post(endpoint.to_string())
48        .header("Content-Type", "application/json")
49        .body(Full::new(Bytes::from(body)))
50        .map_err(|e| Error::HttpError(e.to_string()))?;
51
52    let response = sender
53        .send_request(req)
54        .await
55        .map_err(|e| Error::HttpError(e.to_string()))?;
56
57    parse_response(response).await
58}
59
60async fn driver_task<T>(conn: hyper::client::conn::http1::Connection<T, Full<Bytes>>)
61where
62    T: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
63{
64    if let Err(err) = conn.await {
65        log::error!("HTTP client connection error: {err}");
66    }
67}