karyon_jsonrpc/client/http/
h1.rs1use std::net::SocketAddr;
5
6use bytes::Bytes;
7use http_body_util::Full;
8use hyper::Request;
9
10use karyon_core::{
11 async_runtime::net::TcpStream,
12 async_util::{TaskGroup, TaskResult},
13};
14use karyon_net::Endpoint;
15use smol_hyper::rt::FuturesIo;
16
17use crate::{
18 client::http::parse_response,
19 error::{Error, Result},
20 message,
21};
22
23pub(super) async fn send(
24 endpoint: &Endpoint,
25 msg: serde_json::Value,
26 task_group: &TaskGroup,
27) -> Result<message::Response> {
28 let body = serde_json::to_vec(&msg)?;
29
30 let addr = SocketAddr::try_from(endpoint.clone())?;
33
34 let stream = TcpStream::connect(addr).await?;
35 let io = FuturesIo::new(stream);
36
37 let (mut sender, conn) = hyper::client::conn::http1::handshake(io)
38 .await
39 .map_err(|e| Error::HttpError(e.to_string()))?;
40
41 task_group.spawn(driver_task(conn), |_: TaskResult<()>| async {});
46
47 let req = Request::post(endpoint.to_string())
48 .header("Content-Type", "application/json")
49 .body(Full::new(Bytes::from(body)))
50 .map_err(|e| Error::HttpError(e.to_string()))?;
51
52 let response = sender
53 .send_request(req)
54 .await
55 .map_err(|e| Error::HttpError(e.to_string()))?;
56
57 parse_response(response).await
58}
59
60async fn driver_task<T>(conn: hyper::client::conn::http1::Connection<T, Full<Bytes>>)
61where
62 T: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
63{
64 if let Err(err) = conn.await {
65 log::error!("HTTP client connection error: {err}");
66 }
67}