karyon_jsonrpc/client/
subscriptions.rs

1use std::{collections::HashMap, sync::Arc};
2
3use async_channel::{Receiver, Sender};
4use serde_json::json;
5use serde_json::Value;
6
7use karyon_core::async_runtime::lock::Mutex;
8
9use crate::{
10    error::{Error, Result},
11    message::{Notification, NotificationResult, SubscriptionID},
12};
13
14/// A subscription established when the client's subscribe to a method
15pub struct Subscription {
16    id: SubscriptionID,
17    rx: Receiver<Value>,
18    tx: Sender<Value>,
19}
20
21impl Subscription {
22    fn new(id: SubscriptionID, buffer_size: usize) -> Arc<Self> {
23        let (tx, rx) = async_channel::bounded(buffer_size);
24        Arc::new(Self { tx, id, rx })
25    }
26
27    pub async fn recv(&self) -> Result<Value> {
28        self.rx.recv().await.map_err(|_| Error::SubscriptionClosed)
29    }
30
31    pub fn id(&self) -> SubscriptionID {
32        self.id
33    }
34
35    async fn notify(&self, val: Value) -> Result<()> {
36        if self.tx.is_full() {
37            return Err(Error::SubscriptionBufferFull);
38        }
39        self.tx.send(val).await?;
40        Ok(())
41    }
42
43    fn close(&self) {
44        self.tx.close();
45    }
46}
47
48/// Manages subscriptions for the client.
49pub(super) struct Subscriptions {
50    subs: Mutex<HashMap<SubscriptionID, Arc<Subscription>>>,
51    sub_buffer_size: usize,
52}
53
54impl Subscriptions {
55    /// Creates a new [`Subscriptions`].
56    pub(super) fn new(sub_buffer_size: usize) -> Arc<Self> {
57        Arc::new(Self {
58            subs: Mutex::new(HashMap::new()),
59            sub_buffer_size,
60        })
61    }
62
63    /// Returns a new [`Subscription`]
64    pub(super) async fn subscribe(&self, id: SubscriptionID) -> Arc<Subscription> {
65        let sub = Subscription::new(id, self.sub_buffer_size);
66        self.subs.lock().await.insert(id, sub.clone());
67        sub
68    }
69
70    /// Closes subscription channels and clear the inner map.
71    pub(super) async fn clear(&self) {
72        let mut subs = self.subs.lock().await;
73        for (_, sub) in subs.iter() {
74            sub.close();
75        }
76        subs.clear();
77    }
78
79    /// Unsubscribe from the provided subscription id.
80    pub(super) async fn unsubscribe(&self, id: &SubscriptionID) {
81        if let Some(sub) = self.subs.lock().await.remove(id) {
82            sub.close();
83        }
84    }
85
86    /// Notifies the subscription about the given notification.
87    pub(super) async fn notify(&self, nt: Notification) -> Result<()> {
88        let nt_res: NotificationResult = match nt.params {
89            Some(ref p) => serde_json::from_value(p.clone())?,
90            None => return Err(Error::InvalidMsg("Invalid notification msg".to_string())),
91        };
92
93        match self.subs.lock().await.get(&nt_res.subscription) {
94            Some(s) => s.notify(nt_res.result.unwrap_or(json!(""))).await?,
95            None => {
96                return Err(Error::InvalidMsg("Unknown notification".to_string()));
97            }
98        }
99
100        Ok(())
101    }
102}