Skip to main content

atrg_firehose/
backoff.rs

1//! Exponential backoff with jitter for firehose reconnection.
2
3use std::time::Duration;
4
5/// Exponential backoff starting at 500ms and capping at 30 seconds.
6///
7/// Each call to [`next_delay`](Backoff::next_delay) doubles the delay
8/// (with jitter) up to the configured maximum. Call [`reset`](Backoff::reset)
9/// after a successful connection to start over.
10pub struct Backoff {
11    attempt: u32,
12    base_ms: u64,
13    max_ms: u64,
14}
15
16impl Backoff {
17    /// Create a new backoff starting at 500ms, capping at 30s.
18    pub fn new() -> Self {
19        Self {
20            attempt: 0,
21            base_ms: 500,
22            max_ms: 30_000,
23        }
24    }
25
26    /// Get the next backoff duration and advance the state.
27    ///
28    /// The delay is `base_ms * 2^attempt`, clamped to `max_ms`, with a
29    /// small deterministic jitter derived from the attempt number.
30    pub fn next_delay(&mut self) -> Duration {
31        let shift = self.attempt.min(31);
32        let delay_ms = self
33            .base_ms
34            .saturating_mul(1u64.checked_shl(shift).unwrap_or(u64::MAX));
35        let clamped = delay_ms.min(self.max_ms);
36
37        // Deterministic jitter: add up to ~12% based on attempt parity.
38        let jitter_ms =
39            (clamped / 8).wrapping_mul((self.attempt as u64 % 3) + 1) % (clamped / 4 + 1);
40        let total = clamped.saturating_add(jitter_ms).min(self.max_ms);
41
42        self.attempt = self.attempt.saturating_add(1);
43        Duration::from_millis(total)
44    }
45
46    /// Reset the backoff to the initial value (call on successful connection).
47    pub fn reset(&mut self) {
48        self.attempt = 0;
49    }
50
51    /// Get the current backoff duration in milliseconds (without advancing).
52    pub fn current_ms(&self) -> u64 {
53        let shift = self.attempt.min(31);
54        let delay_ms = self
55            .base_ms
56            .saturating_mul(1u64.checked_shl(shift).unwrap_or(u64::MAX));
57        delay_ms.min(self.max_ms)
58    }
59}
60
61impl Default for Backoff {
62    fn default() -> Self {
63        Self::new()
64    }
65}
66
67#[cfg(test)]
68mod tests {
69    use super::*;
70
71    #[test]
72    fn backoff_increases() {
73        let mut b = Backoff::new();
74        let d1 = b.next_delay();
75        let d2 = b.next_delay();
76        let d3 = b.next_delay();
77        // Each delay should be >= the previous base (500, 1000, 2000).
78        assert!(d1.as_millis() >= 500);
79        assert!(d2.as_millis() >= 1000);
80        assert!(d3.as_millis() >= 2000);
81    }
82
83    #[test]
84    fn backoff_caps_at_max() {
85        let mut b = Backoff::new();
86        for _ in 0..30 {
87            b.next_delay();
88        }
89        let d = b.next_delay();
90        assert!(d.as_millis() <= 30_000);
91    }
92
93    #[test]
94    fn backoff_resets() {
95        let mut b = Backoff::new();
96        b.next_delay();
97        b.next_delay();
98        b.next_delay();
99        b.reset();
100        assert_eq!(b.current_ms(), 500);
101        let d = b.next_delay();
102        assert!(d.as_millis() >= 500);
103        assert!(d.as_millis() <= 1000);
104    }
105
106    #[test]
107    fn current_ms_reflects_state() {
108        let mut b = Backoff::new();
109        assert_eq!(b.current_ms(), 500);
110        b.next_delay();
111        assert_eq!(b.current_ms(), 1000);
112        b.next_delay();
113        assert_eq!(b.current_ms(), 2000);
114    }
115
116    #[test]
117    fn default_is_same_as_new() {
118        let b = Backoff::default();
119        assert_eq!(b.current_ms(), 500);
120    }
121}