karyon_core/async_util/
backoff.rs

1use std::{
2    cmp::min,
3    sync::atomic::{AtomicBool, AtomicU32, Ordering},
4    time::Duration,
5};
6
7use super::sleep;
8
9/// Exponential backoff
10/// <https://en.wikipedia.org/wiki/Exponential_backoff>
11///
12/// # Examples
13///
14/// ```
15/// use karyon_core::async_util::Backoff;
16///
17///  async {
18///     let backoff = Backoff::new(300, 3000);
19///
20///     loop {
21///         backoff.sleep().await;
22///     
23///         // do something
24///         break;
25///     }
26///
27///     backoff.reset();
28///
29///     // ....
30///  };
31///
32/// ```
33///
34pub struct Backoff {
35    /// The base delay in milliseconds for the initial retry.
36    base_delay: u64,
37    /// The max delay in milliseconds allowed for a retry.
38    max_delay: u64,
39    /// Atomic counter
40    retries: AtomicU32,
41    /// Stop flag
42    stop: AtomicBool,
43}
44
45impl Backoff {
46    /// Creates a new Backoff.
47    pub fn new(base_delay: u64, max_delay: u64) -> Self {
48        Self {
49            base_delay,
50            max_delay,
51            retries: AtomicU32::new(0),
52            stop: AtomicBool::new(false),
53        }
54    }
55
56    /// Sleep based on the current retry count and delay values.
57    /// Retruns the delay value.
58    pub async fn sleep(&self) -> u64 {
59        if self.stop.load(Ordering::SeqCst) {
60            sleep(Duration::from_millis(self.max_delay)).await;
61            return self.max_delay;
62        }
63
64        let retries = self.retries.load(Ordering::SeqCst);
65        let delay = self.base_delay * (2_u64).pow(retries);
66        let delay = min(delay, self.max_delay);
67
68        if delay == self.max_delay {
69            self.stop.store(true, Ordering::SeqCst);
70        }
71
72        self.retries.store(retries + 1, Ordering::SeqCst);
73
74        sleep(Duration::from_millis(delay)).await;
75        delay
76    }
77
78    /// Reset the retry counter to 0.
79    pub fn reset(&self) {
80        self.retries.store(0, Ordering::SeqCst);
81        self.stop.store(false, Ordering::SeqCst);
82    }
83}
84
85#[cfg(test)]
86mod tests {
87    use std::sync::Arc;
88
89    use crate::async_runtime::{block_on, spawn};
90
91    use super::*;
92
93    #[test]
94    fn test_backoff() {
95        block_on(async move {
96            let backoff = Arc::new(Backoff::new(5, 15));
97            let backoff_c = backoff.clone();
98            spawn(async move {
99                let delay = backoff_c.sleep().await;
100                assert_eq!(delay, 5);
101
102                let delay = backoff_c.sleep().await;
103                assert_eq!(delay, 10);
104
105                let delay = backoff_c.sleep().await;
106                assert_eq!(delay, 15);
107            })
108            .await
109            .unwrap();
110
111            spawn(async move {
112                backoff.reset();
113                let delay = backoff.sleep().await;
114                assert_eq!(delay, 5);
115            })
116            .await
117            .unwrap();
118        });
119    }
120}