karyon_jsonrpc/server/
channel.rs1use std::collections::HashSet;
2use std::sync::{Arc, Weak};
3
4use karyon_core::{async_runtime::lock::RwLock, util::random_32};
5
6use crate::{
7 error::{Error, Result},
8 message::SubscriptionID,
9};
10
11#[derive(Debug)]
12pub struct NewNotification {
13 pub sub_id: SubscriptionID,
14 pub result: serde_json::Value,
15 pub method: String,
16}
17
18#[derive(Clone)]
20pub struct Subscription {
21 id: SubscriptionID,
22 parent: Weak<Channel>,
23 chan: async_channel::Sender<NewNotification>,
24 method: String,
25}
26
27impl Subscription {
28 fn new(
30 parent: Weak<Channel>,
31 id: SubscriptionID,
32 chan: async_channel::Sender<NewNotification>,
33 method: &str,
34 ) -> Self {
35 Self {
36 parent,
37 id,
38 chan,
39 method: method.to_string(),
40 }
41 }
42
43 pub fn id(&self) -> SubscriptionID {
45 self.id
46 }
47
48 pub async fn notify(&self, res: serde_json::Value) -> Result<()> {
50 if self.still_subscribed().await {
51 let nt = NewNotification {
52 sub_id: self.id,
53 result: res,
54 method: self.method.clone(),
55 };
56 self.chan.send(nt).await?;
57 Ok(())
58 } else {
59 Err(Error::SubscriptionNotFound(self.id.to_string()))
60 }
61 }
62
63 async fn still_subscribed(&self) -> bool {
65 match self.parent.upgrade() {
66 Some(parent) => parent.subs.read().await.contains(&self.id),
67 None => false,
68 }
69 }
70}
71
72pub struct Channel {
74 chan: async_channel::Sender<NewNotification>,
75 subs: RwLock<HashSet<SubscriptionID>>,
76}
77
78impl Channel {
79 pub(crate) fn new(chan: async_channel::Sender<NewNotification>) -> Arc<Channel> {
81 Arc::new(Self {
82 chan,
83 subs: RwLock::new(HashSet::new()),
84 })
85 }
86
87 pub async fn new_subscription(
89 self: &Arc<Self>,
90 method: &str,
91 sub_id: Option<SubscriptionID>,
92 ) -> Result<Subscription> {
93 let sub_id = sub_id.unwrap_or_else(random_32);
94 if !self.subs.write().await.insert(sub_id) {
95 return Err(Error::SubscriptionDuplicated(sub_id.to_string()));
96 }
97
98 let sub = Subscription::new(Arc::downgrade(self), sub_id, self.chan.clone(), method);
99 Ok(sub)
100 }
101
102 pub async fn remove_subscription(&self, id: &SubscriptionID) -> Result<()> {
104 let mut subs = self.subs.write().await;
105 if !subs.remove(id) {
106 return Err(Error::SubscriptionNotFound(id.to_string()));
107 }
108 Ok(())
109 }
110
111 pub(crate) fn close(&self) {
113 self.chan.close();
114 }
115}