karyon_jsonrpc/server/http/
h1h2.rs1use std::{net::SocketAddr, sync::Arc};
4
5use bytes::Bytes;
6use http_body_util::{BodyExt, Full, LengthLimitError, Limited};
7use hyper::{
8 body::{Body, Incoming},
9 service::service_fn,
10 Method, Request, Response, StatusCode,
11};
12use hyper_util::server::conn::auto::Builder as HttpConnBuilder;
13use log::{debug, error};
14
15use karyon_core::{
16 async_runtime::net::{TcpListener, TcpStream},
17 async_util::TaskResult,
18};
19
20use crate::{
21 error::Result,
22 message,
23 server::{
24 http::{
25 json_response, json_response_bytes, HyperExecutor, ERR_BODY_TOO_LARGE,
26 ERR_METHOD_NOT_ALLOWED, ERR_READ_BODY, MAX_HTTP_BODY_SIZE,
27 },
28 Server, FAILED_TO_PARSE_ERROR_MSG,
29 },
30};
31
32pub(super) async fn accept_tcp(server: &Arc<Server>, listener: &TcpListener) {
33 match listener.accept().await {
34 Ok((stream, peer_addr)) => {
35 server.task_group.spawn(
36 serve_task(server.clone(), stream, peer_addr),
37 |_: TaskResult<Result<()>>| async {},
38 );
39 }
40 Err(err) => {
41 error!("Accept TCP connection: {err}");
42 }
43 }
44}
45
46async fn serve_task(server: Arc<Server>, stream: TcpStream, peer_addr: SocketAddr) -> Result<()> {
47 if let Err(err) = serve_conn(server, stream, peer_addr).await {
48 error!("HTTP/1-2 from {peer_addr}: {err}");
49 }
50 Ok(())
51}
52
53async fn serve_conn(server: Arc<Server>, stream: TcpStream, peer_addr: SocketAddr) -> Result<()> {
54 debug!("New HTTP/1-2 connection from {peer_addr}");
55
56 let task_group = server.task_group.clone();
57 let service = service_fn(move |req: Request<Incoming>| {
58 let server = server.clone();
59 async move { handle_hyper_request(server, req).await }
60 });
61
62 #[cfg(feature = "smol")]
63 let io = smol_hyper::rt::FuturesIo::new(stream);
64 #[cfg(feature = "tokio")]
65 let io = hyper_util::rt::TokioIo::new(stream);
66
67 let builder = HttpConnBuilder::new(HyperExecutor::new(task_group));
68 let conn = builder.serve_connection(io, service);
69
70 if let Err(err) = conn.await {
71 debug!("HTTP/1-2 from {peer_addr} closed: {err}");
72 }
73
74 Ok(())
75}
76
77async fn handle_hyper_request(
78 server: Arc<Server>,
79 req: Request<Incoming>,
80) -> std::result::Result<Response<Full<Bytes>>, hyper::Error> {
81 if req.method() != Method::POST {
82 return Ok(json_response(
83 StatusCode::METHOD_NOT_ALLOWED,
84 ERR_METHOD_NOT_ALLOWED,
85 ));
86 }
87
88 if let Some(len) = req.body().size_hint().upper() {
90 if len > MAX_HTTP_BODY_SIZE {
91 return Ok(json_response(
92 StatusCode::PAYLOAD_TOO_LARGE,
93 ERR_BODY_TOO_LARGE,
94 ));
95 }
96 }
97
98 let limited = Limited::new(req.into_body(), MAX_HTTP_BODY_SIZE as usize);
100 let body = match limited.collect().await {
101 Ok(collected) => collected.to_bytes(),
102 Err(err) => {
103 if err.downcast_ref::<LengthLimitError>().is_some() {
104 return Ok(json_response(
105 StatusCode::PAYLOAD_TOO_LARGE,
106 ERR_BODY_TOO_LARGE,
107 ));
108 }
109 return Ok(json_response(StatusCode::BAD_REQUEST, ERR_READ_BODY));
110 }
111 };
112
113 let msg: serde_json::Value = match serde_json::from_slice(&body) {
114 Ok(v) => v,
115 Err(_) => {
116 let resp = message::Response {
117 error: Some(message::Error {
118 code: message::PARSE_ERROR_CODE,
119 message: FAILED_TO_PARSE_ERROR_MSG.to_string(),
120 data: None,
121 }),
122 ..Default::default()
123 };
124 let json = serde_json::to_vec(&resp).unwrap();
125 return Ok(json_response_bytes(StatusCode::OK, json));
126 }
127 };
128
129 let response = server.handle_request(None, msg).await;
130 debug!("--> {response}");
131
132 let json = serde_json::to_vec(&response).unwrap();
133 Ok(json_response_bytes(StatusCode::OK, json))
134}