karyon_jsonrpc/server/
quic.rs1use std::sync::Arc;
5
6use log::{debug, error};
7
8use karyon_core::async_util::{select, Either, TaskResult};
9
10use karyon_net::{framed, quic::QuicConn, FramedConn, StreamMux};
11
12use crate::{
13 codec::JsonCodec,
14 error::Result,
15 message,
16 server::{
17 channel::Channel,
18 dispatch::{sanity_check, Handler, NewRequest, SanityCheckResult},
19 Server, CHANNEL_SUBSCRIPTION_BUFFER_SIZE,
20 },
21};
22
23impl Server {
24 pub(super) fn handle_quic_conn(self: &Arc<Self>, quic_conn: QuicConn) -> Result<()> {
27 let peer = quic_conn.peer_endpoint().ok();
28 debug!("Handle QUIC connection {peer:?}");
29
30 self.task_group.spawn(
31 quic_accept_streams_task(self.clone(), Arc::new(quic_conn)),
32 move |result: TaskResult<Result<()>>| async move {
33 if let TaskResult::Completed(Err(err)) = result {
34 debug!("QUIC conn {peer:?} dropped: {err}");
35 } else {
36 debug!("QUIC conn {peer:?} dropped");
37 }
38 },
39 );
40
41 Ok(())
42 }
43}
44
45async fn quic_accept_streams_task(server: Arc<Server>, quic_conn: Arc<QuicConn>) -> Result<()> {
46 let codec = JsonCodec::default();
47 loop {
48 let stream = quic_conn.accept_stream().await?;
49 let conn = framed(stream, codec.clone());
50 server.task_group.spawn(
51 quic_handle_stream_task(server.clone(), conn),
52 |_: TaskResult<Result<()>>| async {},
53 );
54 }
55}
56
57async fn quic_handle_stream_task(server: Arc<Server>, conn: FramedConn<JsonCodec>) -> Result<()> {
58 if let Err(err) = handle_quic_stream(server, conn).await {
59 error!("Handle QUIC stream: {err}");
60 }
61 Ok(())
62}
63
64async fn handle_quic_stream(server: Arc<Server>, mut conn: FramedConn<JsonCodec>) -> Result<()> {
67 let msg = conn.recv_msg().await?;
68
69 let req = match sanity_check(msg) {
70 SanityCheckResult::NewReq(req) => req,
71 SanityCheckResult::ErrRes(res) => {
72 conn.send_msg(serde_json::json!(res)).await?;
73 return Ok(());
74 }
75 };
76
77 let is_pubsub = matches!(
81 server.resolve_handler(&req.srvc_name, &req.method_name, true),
82 Handler::Pubsub(_)
83 );
84
85 if is_pubsub {
86 return handle_quic_subscription(server, conn, req).await;
87 }
88
89 let msg = serde_json::to_value(&req.msg).expect("serializable request");
90 let response = server.handle_request(None, msg).await;
91 debug!("--> {response}");
92 conn.send_msg(serde_json::json!(response)).await?;
93 Ok(())
94}
95
96async fn handle_quic_subscription(
99 server: Arc<Server>,
100 conn: FramedConn<JsonCodec>,
101 req: NewRequest,
102) -> Result<()> {
103 let (ch_tx, ch_rx) = async_channel::bounded(CHANNEL_SUBSCRIPTION_BUFFER_SIZE);
104 let channel = Channel::new(ch_tx);
105
106 let method = match server.resolve_handler(&req.srvc_name, &req.method_name, true) {
107 Handler::Pubsub(m) => m,
108 _ => unreachable!("pubsub method presence checked by caller"),
109 };
110
111 let params = req.msg.params.unwrap_or(serde_json::json!(()));
112 let result = method(channel.clone(), req.msg.method, params).await;
113
114 let response = match result {
115 Ok(res) => message::Response {
116 result: Some(res),
117 id: Some(req.msg.id),
118 ..Default::default()
119 },
120 Err(err) => {
121 let mut conn = conn;
122 let response = err.to_response(Some(req.msg.id), None);
123 conn.send_msg(serde_json::json!(response)).await?;
124 return Ok(());
125 }
126 };
127
128 debug!("--> {response}");
129
130 let (mut reader, mut writer) = conn.split();
131 writer.send_msg(serde_json::json!(response)).await?;
132
133 let notification_encoder = server.config.notification_encoder;
134
135 loop {
136 match select(ch_rx.recv(), reader.recv_msg()).await {
137 Either::Left(nt) => {
138 let nt = nt?;
139 let notification = notification_encoder(nt);
140 debug!("--> {notification}");
141 writer.send_msg(serde_json::json!(notification)).await?;
142 }
143 Either::Right(msg) => match msg {
144 Ok(msg) => {
145 let response = server.handle_request(Some(channel.clone()), msg).await;
146 debug!("--> {response}");
147 writer.send_msg(serde_json::json!(response)).await?;
148 channel.close();
149 return Ok(());
150 }
151 Err(_) => {
152 channel.close();
153 return Ok(());
154 }
155 },
156 }
157 }
158}