karyon_core/pubsub.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
use std::{collections::HashMap, sync::Arc};
use futures_util::stream::{FuturesUnordered, StreamExt};
use log::error;
use crate::{async_runtime::lock::Mutex, util::random_32, Result};
const CHANNEL_BUFFER_SIZE: usize = 1000;
pub type SubscriptionID = u32;
/// A simple publish-subscribe system.
// # Example
///
/// ```
/// use karyon_core::pubsub::{Publisher};
///
/// async {
/// let publisher = Publisher::new();
///
/// let sub = publisher.subscribe().await;
///
/// publisher.notify(&String::from("MESSAGE")).await;
///
/// let msg = sub.recv().await;
///
/// // ....
/// };
///
/// ```
pub struct Publisher<T> {
subs: Mutex<HashMap<SubscriptionID, async_channel::Sender<T>>>,
subscription_buffer_size: usize,
}
impl<T: Clone> Publisher<T> {
/// Creates a new [`Publisher`]
pub fn new() -> Arc<Publisher<T>> {
Arc::new(Self {
subs: Mutex::new(HashMap::new()),
subscription_buffer_size: CHANNEL_BUFFER_SIZE,
})
}
/// Creates a new [`Publisher`] with the provided buffer size for the
/// [`Subscription`] channel.
///
/// This is important to control the memory used by the [`Subscription`] channel.
/// If the subscriber can't keep up with the new messages coming, then the
/// channel buffer will fill with new messages, and if the buffer is full,
/// the emit function will block until the subscriber starts to process
/// the buffered messages.
///
/// If `size` is zero, this function will panic.
pub fn with_buffer_size(size: usize) -> Arc<Publisher<T>> {
Arc::new(Self {
subs: Mutex::new(HashMap::new()),
subscription_buffer_size: size,
})
}
/// Subscribes and return a [`Subscription`]
pub async fn subscribe(self: &Arc<Self>) -> Subscription<T> {
let mut subs = self.subs.lock().await;
let chan = async_channel::bounded(self.subscription_buffer_size);
let mut sub_id = random_32();
// Generate a new one if sub_id already exists
while subs.contains_key(&sub_id) {
sub_id = random_32();
}
let sub = Subscription::new(sub_id, self.clone(), chan.1);
subs.insert(sub_id, chan.0);
sub
}
/// Unsubscribes by providing subscription id
pub async fn unsubscribe(self: &Arc<Self>, id: &SubscriptionID) {
self.subs.lock().await.remove(id);
}
/// Notifies all subscribers
pub async fn notify(self: &Arc<Self>, value: &T) {
let mut subs = self.subs.lock().await;
let mut results = FuturesUnordered::new();
let mut closed_subs = vec![];
for (sub_id, sub) in subs.iter() {
let result = async { (*sub_id, sub.send(value.clone()).await) };
results.push(result);
}
while let Some((id, fut_err)) = results.next().await {
if let Err(err) = fut_err {
error!("failed to notify {}: {}", id, err);
closed_subs.push(id);
}
}
drop(results);
for sub_id in closed_subs.iter() {
subs.remove(sub_id);
}
}
}
// Subscription
pub struct Subscription<T> {
id: SubscriptionID,
recv_chan: async_channel::Receiver<T>,
publisher: Arc<Publisher<T>>,
}
impl<T: Clone> Subscription<T> {
/// Creates a new [`Subscription`]
pub fn new(
id: SubscriptionID,
publisher: Arc<Publisher<T>>,
recv_chan: async_channel::Receiver<T>,
) -> Subscription<T> {
Self {
id,
recv_chan,
publisher,
}
}
/// Receive a message from the [`Publisher`]
pub async fn recv(&self) -> Result<T> {
let msg = self.recv_chan.recv().await?;
Ok(msg)
}
/// Unsubscribe from the [`Publisher`]
pub async fn unsubscribe(&self) {
self.publisher.unsubscribe(&self.id).await;
}
}