Skip to main content

karyon_jsonrpc/server/
quic.rs

1//! QUIC transport: one request per stream, with separate
2//! notification streaming for pubsub.
3
4use std::sync::Arc;
5
6use log::{debug, error};
7
8use karyon_core::async_util::{select, Either, TaskResult};
9
10use karyon_net::{framed, quic::QuicConn, FramedConn, StreamMux};
11
12use crate::{
13    codec::JsonCodec,
14    error::Result,
15    message,
16    server::{
17        channel::Channel,
18        dispatch::{sanity_check, Handler, NewRequest, SanityCheckResult},
19        Server, CHANNEL_SUBSCRIPTION_BUFFER_SIZE,
20    },
21};
22
23impl Server {
24    /// Accept a QUIC connection; each incoming stream is handled
25    /// as an independent request.
26    pub(super) fn handle_quic_conn(self: &Arc<Self>, quic_conn: QuicConn) -> Result<()> {
27        let peer = quic_conn.peer_endpoint().ok();
28        debug!("Handle QUIC connection {peer:?}");
29
30        self.task_group.spawn(
31            quic_accept_streams_task(self.clone(), Arc::new(quic_conn)),
32            move |result: TaskResult<Result<()>>| async move {
33                if let TaskResult::Completed(Err(err)) = result {
34                    debug!("QUIC conn {peer:?} dropped: {err}");
35                } else {
36                    debug!("QUIC conn {peer:?} dropped");
37                }
38            },
39        );
40
41        Ok(())
42    }
43}
44
45async fn quic_accept_streams_task(server: Arc<Server>, quic_conn: Arc<QuicConn>) -> Result<()> {
46    let codec = JsonCodec::default();
47    loop {
48        let stream = quic_conn.accept_stream().await?;
49        let conn = framed(stream, codec.clone());
50        server.task_group.spawn(
51            quic_handle_stream_task(server.clone(), conn),
52            |_: TaskResult<Result<()>>| async {},
53        );
54    }
55}
56
57async fn quic_handle_stream_task(server: Arc<Server>, conn: FramedConn<JsonCodec>) -> Result<()> {
58    if let Err(err) = handle_quic_stream(server, conn).await {
59        error!("Handle QUIC stream: {err}");
60    }
61    Ok(())
62}
63
64/// Handle a single QUIC stream: one request, one response.
65/// Upgrades to pubsub notification streaming if the method matches.
66async fn handle_quic_stream(server: Arc<Server>, mut conn: FramedConn<JsonCodec>) -> Result<()> {
67    let msg = conn.recv_msg().await?;
68
69    let req = match sanity_check(msg) {
70        SanityCheckResult::NewReq(req) => req,
71        SanityCheckResult::ErrRes(res) => {
72            conn.send_msg(serde_json::json!(res)).await?;
73            return Ok(());
74        }
75    };
76
77    // Pubsub handlers need a dedicated stream for streaming notifications.
78    // Everything else (regular RPC, method not found) can reuse the
79    // shared dispatch path.
80    let is_pubsub = matches!(
81        server.resolve_handler(&req.srvc_name, &req.method_name, true),
82        Handler::Pubsub(_)
83    );
84
85    if is_pubsub {
86        return handle_quic_subscription(server, conn, req).await;
87    }
88
89    let msg = serde_json::to_value(&req.msg).expect("serializable request");
90    let response = server.handle_request(None, msg).await;
91    debug!("--> {response}");
92    conn.send_msg(serde_json::json!(response)).await?;
93    Ok(())
94}
95
96/// Pubsub over QUIC: split the stream so notifications stream
97/// from the writer while the reader waits for an unsubscribe.
98async fn handle_quic_subscription(
99    server: Arc<Server>,
100    conn: FramedConn<JsonCodec>,
101    req: NewRequest,
102) -> Result<()> {
103    let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_SUBSCRIPTION_BUFFER_SIZE);
104    let channel = Channel::new(ch_tx);
105
106    let method = match server.resolve_handler(&req.srvc_name, &req.method_name, true) {
107        Handler::Pubsub(m) => m,
108        _ => unreachable!("pubsub method presence checked by caller"),
109    };
110
111    let params = req.msg.params.unwrap_or(serde_json::json!(()));
112    let result = method(channel.clone(), req.msg.method, params).await;
113
114    let response = match result {
115        Ok(res) => message::Response {
116            result: Some(res),
117            id: Some(req.msg.id),
118            ..Default::default()
119        },
120        Err(err) => {
121            let mut conn = conn;
122            let response = err.to_response(Some(req.msg.id), None);
123            conn.send_msg(serde_json::json!(response)).await?;
124            return Ok(());
125        }
126    };
127
128    debug!("--> {response}");
129
130    let (mut reader, mut writer) = conn.split();
131    writer.send_msg(serde_json::json!(response)).await?;
132
133    let notification_encoder = server.config.notification_encoder;
134
135    loop {
136        match select(ch_rx.recv(), reader.recv_msg()).await {
137            Either::Left(nt) => {
138                let nt = nt?;
139                let notification = notification_encoder(nt);
140                debug!("--> {notification}");
141                writer.send_msg(serde_json::json!(notification)).await?;
142            }
143            Either::Right(msg) => match msg {
144                Ok(msg) => {
145                    let response = server.handle_request(Some(channel.clone()), msg).await;
146                    debug!("--> {response}");
147                    writer.send_msg(serde_json::json!(response)).await?;
148                    channel.close();
149                    return Ok(());
150                }
151                Err(_) => {
152                    channel.close();
153                    return Ok(());
154                }
155            },
156        }
157    }
158}