karyon_core/async_util/
condwait.rs

1use super::CondVar;
2use crate::async_runtime::lock::Mutex;
3
4/// CondWait is a wrapper struct for CondVar with a Mutex boolean flag.
5///
6/// # Example
7///
8///```
9/// use std::sync::Arc;
10///
11/// use karyon_core::async_util::CondWait;
12/// use karyon_core::async_runtime::spawn;
13///
14///  async {
15///     let cond_wait = Arc::new(CondWait::new());
16///     let task = spawn({
17///         let cond_wait = cond_wait.clone();
18///         async move {
19///             cond_wait.wait().await;
20///             // ...
21///         }
22///     });
23///
24///     cond_wait.signal().await;
25///  };
26///
27/// ```
28///
29pub struct CondWait {
30    /// The CondVar
31    condvar: CondVar,
32    /// Boolean flag
33    w: Mutex<bool>,
34}
35
36impl CondWait {
37    /// Creates a new CondWait.
38    pub fn new() -> Self {
39        Self {
40            condvar: CondVar::new(),
41            w: Mutex::new(false),
42        }
43    }
44
45    /// Waits for a signal or broadcast.
46    pub async fn wait(&self) {
47        let mut w = self.w.lock().await;
48
49        // While the boolean flag is false, wait for a signal.
50        while !*w {
51            w = self.condvar.wait(w).await;
52        }
53    }
54
55    /// Signal a waiting task.
56    pub async fn signal(&self) {
57        *self.w.lock().await = true;
58        self.condvar.signal();
59    }
60
61    /// Signal all waiting tasks.
62    pub async fn broadcast(&self) {
63        *self.w.lock().await = true;
64        self.condvar.broadcast();
65    }
66
67    /// Reset the boolean flag value to false.
68    pub async fn reset(&self) {
69        *self.w.lock().await = false;
70    }
71}
72
73impl Default for CondWait {
74    fn default() -> Self {
75        Self::new()
76    }
77}
78
79#[cfg(test)]
80mod tests {
81    use std::sync::{
82        atomic::{AtomicUsize, Ordering},
83        Arc,
84    };
85
86    use crate::async_runtime::{block_on, spawn};
87
88    use super::*;
89
90    #[test]
91    fn test_cond_wait() {
92        block_on(async {
93            let cond_wait = Arc::new(CondWait::new());
94            let count = Arc::new(AtomicUsize::new(0));
95
96            let task = spawn({
97                let cond_wait = cond_wait.clone();
98                let count = count.clone();
99                async move {
100                    cond_wait.wait().await;
101                    count.fetch_add(1, Ordering::Relaxed);
102                    // do something
103                }
104            });
105
106            // Send a signal to the waiting task
107            cond_wait.signal().await;
108
109            let _ = task.await;
110
111            // Reset the boolean flag
112            cond_wait.reset().await;
113
114            assert_eq!(count.load(Ordering::Relaxed), 1);
115
116            let task1 = spawn({
117                let cond_wait = cond_wait.clone();
118                let count = count.clone();
119                async move {
120                    cond_wait.wait().await;
121                    count.fetch_add(1, Ordering::Relaxed);
122                    // do something
123                }
124            });
125
126            let task2 = spawn({
127                let cond_wait = cond_wait.clone();
128                let count = count.clone();
129                async move {
130                    cond_wait.wait().await;
131                    count.fetch_add(1, Ordering::Relaxed);
132                    // do something
133                }
134            });
135
136            // Broadcast a signal to all waiting tasks
137            cond_wait.broadcast().await;
138
139            let _ = task1.await;
140            let _ = task2.await;
141            assert_eq!(count.load(Ordering::Relaxed), 3);
142        });
143    }
144}