Skip to main content

karyon_jsonrpc/server/http/
h1h2.rs

1//! HTTP/1.1 and HTTP/2 over TCP via hyper.
2
3use std::{net::SocketAddr, sync::Arc};
4
5use bytes::Bytes;
6use http_body_util::{BodyExt, Full, LengthLimitError, Limited};
7use hyper::{
8    body::{Body, Incoming},
9    service::service_fn,
10    Method, Request, Response, StatusCode,
11};
12use hyper_util::server::conn::auto::Builder as HttpConnBuilder;
13use log::{debug, error};
14
15use karyon_core::{
16    async_runtime::net::{TcpListener, TcpStream},
17    async_util::TaskResult,
18};
19
20use crate::{
21    error::Result,
22    message,
23    server::{
24        http::{
25            json_response, json_response_bytes, HyperExecutor, ERR_BODY_TOO_LARGE,
26            ERR_METHOD_NOT_ALLOWED, ERR_READ_BODY, MAX_HTTP_BODY_SIZE,
27        },
28        Server, FAILED_TO_PARSE_ERROR_MSG,
29    },
30};
31
32pub(super) async fn accept_tcp(server: &Arc<Server>, listener: &TcpListener) {
33    match listener.accept().await {
34        Ok((stream, peer_addr)) => {
35            server.task_group.spawn(
36                serve_task(server.clone(), stream, peer_addr),
37                |_: TaskResult<Result<()>>| async {},
38            );
39        }
40        Err(err) => {
41            error!("Accept TCP connection: {err}");
42        }
43    }
44}
45
46async fn serve_task(server: Arc<Server>, stream: TcpStream, peer_addr: SocketAddr) -> Result<()> {
47    if let Err(err) = serve_conn(server, stream, peer_addr).await {
48        error!("HTTP/1-2 from {peer_addr}: {err}");
49    }
50    Ok(())
51}
52
53async fn serve_conn(server: Arc<Server>, stream: TcpStream, peer_addr: SocketAddr) -> Result<()> {
54    debug!("New HTTP/1-2 connection from {peer_addr}");
55
56    let task_group = server.task_group.clone();
57    let service = service_fn(move |req: Request<Incoming>| {
58        let server = server.clone();
59        async move { handle_hyper_request(server, req).await }
60    });
61
62    #[cfg(feature = "smol")]
63    let io = smol_hyper::rt::FuturesIo::new(stream);
64    #[cfg(feature = "tokio")]
65    let io = hyper_util::rt::TokioIo::new(stream);
66
67    let builder = HttpConnBuilder::new(HyperExecutor::new(task_group));
68    let conn = builder.serve_connection(io, service);
69
70    if let Err(err) = conn.await {
71        debug!("HTTP/1-2 from {peer_addr} closed: {err}");
72    }
73
74    Ok(())
75}
76
77async fn handle_hyper_request(
78    server: Arc<Server>,
79    req: Request<Incoming>,
80) -> std::result::Result<Response<Full<Bytes>>, hyper::Error> {
81    if req.method() != Method::POST {
82        return Ok(json_response(
83            StatusCode::METHOD_NOT_ALLOWED,
84            ERR_METHOD_NOT_ALLOWED,
85        ));
86    }
87
88    // Fast reject when Content-Length already exceeds the cap.
89    if let Some(len) = req.body().size_hint().upper() {
90        if len > MAX_HTTP_BODY_SIZE {
91            return Ok(json_response(
92                StatusCode::PAYLOAD_TOO_LARGE,
93                ERR_BODY_TOO_LARGE,
94            ));
95        }
96    }
97
98    // Limited enforces the cap during read regardless of Content-Length.
99    let limited = Limited::new(req.into_body(), MAX_HTTP_BODY_SIZE as usize);
100    let body = match limited.collect().await {
101        Ok(collected) => collected.to_bytes(),
102        Err(err) => {
103            if err.downcast_ref::<LengthLimitError>().is_some() {
104                return Ok(json_response(
105                    StatusCode::PAYLOAD_TOO_LARGE,
106                    ERR_BODY_TOO_LARGE,
107                ));
108            }
109            return Ok(json_response(StatusCode::BAD_REQUEST, ERR_READ_BODY));
110        }
111    };
112
113    let msg: serde_json::Value = match serde_json::from_slice(&body) {
114        Ok(v) => v,
115        Err(_) => {
116            let resp = message::Response {
117                error: Some(message::Error {
118                    code: message::PARSE_ERROR_CODE,
119                    message: FAILED_TO_PARSE_ERROR_MSG.to_string(),
120                    data: None,
121                }),
122                ..Default::default()
123            };
124            let json = serde_json::to_vec(&resp).unwrap();
125            return Ok(json_response_bytes(StatusCode::OK, json));
126        }
127    };
128
129    let response = server.handle_request(None, msg).await;
130    debug!("--> {response}");
131
132    let json = serde_json::to_vec(&response).unwrap();
133    Ok(json_response_bytes(StatusCode::OK, json))
134}