karyon_jsonrpc/server/
channel.rs1use std::collections::HashSet;
2use std::sync::{Arc, Weak};
3
4use karyon_core::{async_runtime::lock::Mutex, util::random_32};
5
6use crate::{
7 error::{Error, Result},
8 message::SubscriptionID,
9};
10
11#[derive(Debug)]
12pub struct NewNotification {
13 pub sub_id: SubscriptionID,
14 pub result: serde_json::Value,
15 pub method: String,
16}
17
18#[derive(Clone)]
20pub struct Subscription {
21 pub id: SubscriptionID,
22 parent: Weak<Channel>,
23 chan: async_channel::Sender<NewNotification>,
24 method: String,
25}
26
27impl Subscription {
28 fn new(
30 parent: Weak<Channel>,
31 id: SubscriptionID,
32 chan: async_channel::Sender<NewNotification>,
33 method: &str,
34 ) -> Self {
35 Self {
36 parent,
37 id,
38 chan,
39 method: method.to_string(),
40 }
41 }
42
43 pub async fn notify(&self, res: serde_json::Value) -> Result<()> {
45 if self.still_subscribed().await {
46 let nt = NewNotification {
47 sub_id: self.id,
48 result: res,
49 method: self.method.clone(),
50 };
51 self.chan.send(nt).await?;
52 Ok(())
53 } else {
54 Err(Error::SubscriptionNotFound(self.id.to_string()))
55 }
56 }
57
58 async fn still_subscribed(&self) -> bool {
60 match self.parent.upgrade() {
61 Some(parent) => parent.subs.lock().await.contains(&self.id),
62 None => false,
63 }
64 }
65}
66
67pub struct Channel {
69 chan: async_channel::Sender<NewNotification>,
70 subs: Mutex<HashSet<SubscriptionID>>,
71}
72
73impl Channel {
74 pub(crate) fn new(chan: async_channel::Sender<NewNotification>) -> Arc<Channel> {
76 Arc::new(Self {
77 chan,
78 subs: Mutex::new(HashSet::new()),
79 })
80 }
81
82 pub async fn new_subscription(
84 self: &Arc<Self>,
85 method: &str,
86 sub_id: Option<SubscriptionID>,
87 ) -> Result<Subscription> {
88 let sub_id = sub_id.unwrap_or_else(random_32);
89 if !self.subs.lock().await.insert(sub_id) {
90 return Err(Error::SubscriptionDuplicated(sub_id.to_string()));
91 }
92
93 let sub = Subscription::new(Arc::downgrade(self), sub_id, self.chan.clone(), method);
94 Ok(sub)
95 }
96
97 pub async fn remove_subscription(&self, id: &SubscriptionID) -> Result<()> {
99 let mut subs = self.subs.lock().await;
100 if !subs.remove(id) {
101 return Err(Error::SubscriptionNotFound(id.to_string()));
102 }
103 Ok(())
104 }
105
106 pub(crate) fn close(&self) {
108 self.chan.close();
109 }
110}