Skip to main content

atrg_stream/
metrics.rs

1//! Jetstream consumer metrics.
2
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::Arc;
5
6/// Metrics snapshot for the Jetstream consumer.
7#[derive(Debug, Clone)]
8pub struct JetstreamMetrics {
9    /// Total events received from the WebSocket.
10    pub events_received: u64,
11    /// Events dropped due to backpressure.
12    pub events_dropped: u64,
13    /// Errors encountered during processing.
14    pub errors: u64,
15    /// Number of reconnections.
16    pub reconnects: u64,
17    /// Timestamp of last event (unix ms).
18    pub last_event_at: u64,
19    /// Current backoff duration in ms.
20    pub current_backoff_ms: u64,
21    /// Current queue depth.
22    pub queue_depth: u64,
23}
24
25/// Shared, atomic counters for the consumer.
26#[derive(Debug)]
27pub struct MetricsCounter {
28    /// Total events received from the WebSocket.
29    pub(crate) events_received: AtomicU64,
30    /// Events dropped due to backpressure.
31    pub(crate) events_dropped: AtomicU64,
32    /// Errors encountered during processing.
33    pub(crate) errors: AtomicU64,
34    /// Number of reconnections.
35    pub(crate) reconnects: AtomicU64,
36    /// Timestamp of last event (unix ms).
37    pub(crate) last_event_at: AtomicU64,
38    /// Current backoff duration in ms.
39    pub(crate) current_backoff_ms: AtomicU64,
40}
41
42impl MetricsCounter {
43    /// Create a new zeroed counter set.
44    pub fn new() -> Arc<Self> {
45        Arc::new(Self::default())
46    }
47
48    /// Snapshot the current metrics.
49    pub fn snapshot(&self, queue_depth: u64) -> JetstreamMetrics {
50        JetstreamMetrics {
51            events_received: self.events_received.load(Ordering::Relaxed),
52            events_dropped: self.events_dropped.load(Ordering::Relaxed),
53            errors: self.errors.load(Ordering::Relaxed),
54            reconnects: self.reconnects.load(Ordering::Relaxed),
55            last_event_at: self.last_event_at.load(Ordering::Relaxed),
56            current_backoff_ms: self.current_backoff_ms.load(Ordering::Relaxed),
57            queue_depth,
58        }
59    }
60}
61
62impl Default for MetricsCounter {
63    fn default() -> Self {
64        Self {
65            events_received: AtomicU64::new(0),
66            events_dropped: AtomicU64::new(0),
67            errors: AtomicU64::new(0),
68            reconnects: AtomicU64::new(0),
69            last_event_at: AtomicU64::new(0),
70            current_backoff_ms: AtomicU64::new(0),
71        }
72    }
73}
74
75#[cfg(test)]
76mod tests {
77    use super::*;
78
79    #[test]
80    fn metrics_counter_starts_at_zero() {
81        let counter = MetricsCounter::new();
82        let snapshot = counter.snapshot(0);
83        assert_eq!(snapshot.events_received, 0);
84        assert_eq!(snapshot.events_dropped, 0);
85        assert_eq!(snapshot.errors, 0);
86        assert_eq!(snapshot.reconnects, 0);
87        assert_eq!(snapshot.last_event_at, 0);
88        assert_eq!(snapshot.current_backoff_ms, 0);
89        assert_eq!(snapshot.queue_depth, 0);
90    }
91
92    #[test]
93    fn metrics_counter_increments() {
94        let counter = MetricsCounter::new();
95        counter.events_received.fetch_add(5, Ordering::Relaxed);
96        counter.errors.fetch_add(1, Ordering::Relaxed);
97        let snapshot = counter.snapshot(3);
98        assert_eq!(snapshot.events_received, 5);
99        assert_eq!(snapshot.errors, 1);
100        assert_eq!(snapshot.queue_depth, 3);
101    }
102
103    #[test]
104    fn metrics_snapshot_is_independent() {
105        let counter = MetricsCounter::new();
106        counter.events_received.fetch_add(1, Ordering::Relaxed);
107        let snap1 = counter.snapshot(0);
108        counter.events_received.fetch_add(1, Ordering::Relaxed);
109        let snap2 = counter.snapshot(0);
110        assert_eq!(snap1.events_received, 1);
111        assert_eq!(snap2.events_received, 2);
112    }
113}