Skip to main content

atrg_firehose/
metrics.rs

1//! Firehose consumer metrics.
2//!
3//! Provides atomic counters that are updated by the consumer tasks and a
4//! serializable snapshot type for exposing metrics via health endpoints.
5
6use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
7use std::sync::Arc;
8
9/// Snapshot of firehose consumer metrics.
10#[derive(Debug, Clone, serde::Serialize)]
11pub struct FirehoseMetrics {
12    /// Total events received from the WebSocket.
13    pub events_received: u64,
14    /// Events dropped due to backpressure.
15    pub events_dropped: u64,
16    /// Errors encountered during processing.
17    pub errors: u64,
18    /// Number of reconnections.
19    pub reconnects: u64,
20    /// Timestamp of last event (unix ms).
21    pub last_event_at: u64,
22    /// Current backoff duration in ms.
23    pub current_backoff_ms: u64,
24    /// Last sequence number processed.
25    pub last_seq: i64,
26    /// Current queue depth.
27    pub queue_depth: u64,
28}
29
30/// Shared, atomic counters for the firehose consumer.
31#[derive(Debug)]
32pub struct MetricsCounter {
33    /// Total events received from the WebSocket.
34    pub(crate) events_received: AtomicU64,
35    /// Events dropped due to backpressure.
36    pub(crate) events_dropped: AtomicU64,
37    /// Errors encountered during processing.
38    pub(crate) errors: AtomicU64,
39    /// Number of reconnections.
40    pub(crate) reconnects: AtomicU64,
41    /// Timestamp of last event (unix ms).
42    pub(crate) last_event_at: AtomicU64,
43    /// Current backoff duration in ms.
44    pub(crate) current_backoff_ms: AtomicU64,
45    /// Last sequence number processed.
46    pub(crate) last_seq: AtomicI64,
47}
48
49impl MetricsCounter {
50    /// Create a new zeroed counter set.
51    pub fn new() -> Arc<Self> {
52        Arc::new(Self::default())
53    }
54
55    /// Snapshot the current metrics.
56    pub fn snapshot(&self, queue_depth: u64) -> FirehoseMetrics {
57        FirehoseMetrics {
58            events_received: self.events_received.load(Ordering::Relaxed),
59            events_dropped: self.events_dropped.load(Ordering::Relaxed),
60            errors: self.errors.load(Ordering::Relaxed),
61            reconnects: self.reconnects.load(Ordering::Relaxed),
62            last_event_at: self.last_event_at.load(Ordering::Relaxed),
63            current_backoff_ms: self.current_backoff_ms.load(Ordering::Relaxed),
64            last_seq: self.last_seq.load(Ordering::Relaxed),
65            queue_depth,
66        }
67    }
68}
69
70impl Default for MetricsCounter {
71    fn default() -> Self {
72        Self {
73            events_received: AtomicU64::new(0),
74            events_dropped: AtomicU64::new(0),
75            errors: AtomicU64::new(0),
76            reconnects: AtomicU64::new(0),
77            last_event_at: AtomicU64::new(0),
78            current_backoff_ms: AtomicU64::new(0),
79            last_seq: AtomicI64::new(0),
80        }
81    }
82}
83
84#[cfg(test)]
85mod tests {
86    use super::*;
87
88    #[test]
89    fn metrics_counter_starts_at_zero() {
90        let counter = MetricsCounter::new();
91        let snapshot = counter.snapshot(0);
92        assert_eq!(snapshot.events_received, 0);
93        assert_eq!(snapshot.events_dropped, 0);
94        assert_eq!(snapshot.errors, 0);
95        assert_eq!(snapshot.reconnects, 0);
96        assert_eq!(snapshot.last_event_at, 0);
97        assert_eq!(snapshot.current_backoff_ms, 0);
98        assert_eq!(snapshot.last_seq, 0);
99        assert_eq!(snapshot.queue_depth, 0);
100    }
101
102    #[test]
103    fn metrics_counter_increments() {
104        let counter = MetricsCounter::new();
105        counter.events_received.fetch_add(5, Ordering::Relaxed);
106        counter.errors.fetch_add(1, Ordering::Relaxed);
107        counter.last_seq.store(42, Ordering::Relaxed);
108        let snapshot = counter.snapshot(3);
109        assert_eq!(snapshot.events_received, 5);
110        assert_eq!(snapshot.errors, 1);
111        assert_eq!(snapshot.last_seq, 42);
112        assert_eq!(snapshot.queue_depth, 3);
113    }
114
115    #[test]
116    fn metrics_snapshot_is_independent() {
117        let counter = MetricsCounter::new();
118        counter.events_received.fetch_add(1, Ordering::Relaxed);
119        let snap1 = counter.snapshot(0);
120        counter.events_received.fetch_add(1, Ordering::Relaxed);
121        let snap2 = counter.snapshot(0);
122        assert_eq!(snap1.events_received, 1);
123        assert_eq!(snap2.events_received, 2);
124    }
125
126    #[test]
127    fn last_seq_tracks_negative_values() {
128        let counter = MetricsCounter::new();
129        counter.last_seq.store(-1, Ordering::Relaxed);
130        let snapshot = counter.snapshot(0);
131        assert_eq!(snapshot.last_seq, -1);
132    }
133}