karyon_jsonrpc/server/
channel.rs

1use std::collections::HashSet;
2use std::sync::{Arc, Weak};
3
4use karyon_core::{async_runtime::lock::Mutex, util::random_32};
5
6use crate::{
7    error::{Error, Result},
8    message::SubscriptionID,
9};
10
11#[derive(Debug)]
12pub struct NewNotification {
13    pub sub_id: SubscriptionID,
14    pub result: serde_json::Value,
15    pub method: String,
16}
17
18/// Represents a new subscription
19#[derive(Clone)]
20pub struct Subscription {
21    pub id: SubscriptionID,
22    parent: Weak<Channel>,
23    chan: async_channel::Sender<NewNotification>,
24    method: String,
25}
26
27impl Subscription {
28    /// Creates a new [`Subscription`]
29    fn new(
30        parent: Weak<Channel>,
31        id: SubscriptionID,
32        chan: async_channel::Sender<NewNotification>,
33        method: &str,
34    ) -> Self {
35        Self {
36            parent,
37            id,
38            chan,
39            method: method.to_string(),
40        }
41    }
42
43    /// Sends a notification to the subscriber
44    pub async fn notify(&self, res: serde_json::Value) -> Result<()> {
45        if self.still_subscribed().await {
46            let nt = NewNotification {
47                sub_id: self.id,
48                result: res,
49                method: self.method.clone(),
50            };
51            self.chan.send(nt).await?;
52            Ok(())
53        } else {
54            Err(Error::SubscriptionNotFound(self.id.to_string()))
55        }
56    }
57
58    /// Checks from the partent if this subscription is still subscribed
59    async fn still_subscribed(&self) -> bool {
60        match self.parent.upgrade() {
61            Some(parent) => parent.subs.lock().await.contains(&self.id),
62            None => false,
63        }
64    }
65}
66
67/// Represents a connection channel for creating/removing subscriptions
68pub struct Channel {
69    chan: async_channel::Sender<NewNotification>,
70    subs: Mutex<HashSet<SubscriptionID>>,
71}
72
73impl Channel {
74    /// Creates a new [`Channel`]
75    pub(crate) fn new(chan: async_channel::Sender<NewNotification>) -> Arc<Channel> {
76        Arc::new(Self {
77            chan,
78            subs: Mutex::new(HashSet::new()),
79        })
80    }
81
82    /// Creates a new [`Subscription`]
83    pub async fn new_subscription(
84        self: &Arc<Self>,
85        method: &str,
86        sub_id: Option<SubscriptionID>,
87    ) -> Result<Subscription> {
88        let sub_id = sub_id.unwrap_or_else(random_32);
89        if !self.subs.lock().await.insert(sub_id) {
90            return Err(Error::SubscriptionDuplicated(sub_id.to_string()));
91        }
92
93        let sub = Subscription::new(Arc::downgrade(self), sub_id, self.chan.clone(), method);
94        Ok(sub)
95    }
96
97    /// Removes a [`Subscription`]
98    pub async fn remove_subscription(&self, id: &SubscriptionID) -> Result<()> {
99        let mut subs = self.subs.lock().await;
100        if !subs.remove(id) {
101            return Err(Error::SubscriptionNotFound(id.to_string()));
102        }
103        Ok(())
104    }
105
106    /// Closes the [`Channel`]
107    pub(crate) fn close(&self) {
108        self.chan.close();
109    }
110}