Skip to main content

karyon_jsonrpc/server/
channel.rs

1use std::collections::HashSet;
2use std::sync::{Arc, Weak};
3
4use karyon_core::{async_runtime::lock::RwLock, util::random_32};
5
6use crate::{
7    error::{Error, Result},
8    message::SubscriptionID,
9};
10
11#[derive(Debug)]
12pub struct NewNotification {
13    pub sub_id: SubscriptionID,
14    pub result: serde_json::Value,
15    pub method: String,
16}
17
18/// Represents a new subscription
19#[derive(Clone)]
20pub struct Subscription {
21    id: SubscriptionID,
22    parent: Weak<Channel>,
23    chan: async_channel::Sender<NewNotification>,
24    method: String,
25}
26
27impl Subscription {
28    /// Creates a new [`Subscription`]
29    fn new(
30        parent: Weak<Channel>,
31        id: SubscriptionID,
32        chan: async_channel::Sender<NewNotification>,
33        method: &str,
34    ) -> Self {
35        Self {
36            parent,
37            id,
38            chan,
39            method: method.to_string(),
40        }
41    }
42
43    /// Returns the subscription id.
44    pub fn id(&self) -> SubscriptionID {
45        self.id
46    }
47
48    /// Sends a notification to the subscriber
49    pub async fn notify(&self, res: serde_json::Value) -> Result<()> {
50        if self.still_subscribed().await {
51            let nt = NewNotification {
52                sub_id: self.id,
53                result: res,
54                method: self.method.clone(),
55            };
56            self.chan.send(nt).await?;
57            Ok(())
58        } else {
59            Err(Error::SubscriptionNotFound(self.id.to_string()))
60        }
61    }
62
63    /// Checks from the partent if this subscription is still subscribed
64    async fn still_subscribed(&self) -> bool {
65        match self.parent.upgrade() {
66            Some(parent) => parent.subs.read().await.contains(&self.id),
67            None => false,
68        }
69    }
70}
71
72/// Represents a connection channel for creating/removing subscriptions
73pub struct Channel {
74    chan: async_channel::Sender<NewNotification>,
75    subs: RwLock<HashSet<SubscriptionID>>,
76}
77
78impl Channel {
79    /// Creates a new [`Channel`]
80    pub(crate) fn new(chan: async_channel::Sender<NewNotification>) -> Arc<Channel> {
81        Arc::new(Self {
82            chan,
83            subs: RwLock::new(HashSet::new()),
84        })
85    }
86
87    /// Creates a new [`Subscription`]
88    pub async fn new_subscription(
89        self: &Arc<Self>,
90        method: &str,
91        sub_id: Option<SubscriptionID>,
92    ) -> Result<Subscription> {
93        let sub_id = sub_id.unwrap_or_else(random_32);
94        if !self.subs.write().await.insert(sub_id) {
95            return Err(Error::SubscriptionDuplicated(sub_id.to_string()));
96        }
97
98        let sub = Subscription::new(Arc::downgrade(self), sub_id, self.chan.clone(), method);
99        Ok(sub)
100    }
101
102    /// Removes a [`Subscription`]
103    pub async fn remove_subscription(&self, id: &SubscriptionID) -> Result<()> {
104        let mut subs = self.subs.write().await;
105        if !subs.remove(id) {
106            return Err(Error::SubscriptionNotFound(id.to_string()));
107        }
108        Ok(())
109    }
110
111    /// Closes the [`Channel`]
112    pub(crate) fn close(&self) {
113        self.chan.close();
114    }
115}