1use std::sync::atomic::{AtomicU64, Ordering};
4use std::sync::Arc;
5
6#[derive(Debug, Clone)]
8pub struct JetstreamMetrics {
9 pub events_received: u64,
11 pub events_dropped: u64,
13 pub errors: u64,
15 pub reconnects: u64,
17 pub last_event_at: u64,
19 pub current_backoff_ms: u64,
21 pub queue_depth: u64,
23}
24
25#[derive(Debug)]
27pub struct MetricsCounter {
28 pub(crate) events_received: AtomicU64,
30 pub(crate) events_dropped: AtomicU64,
32 pub(crate) errors: AtomicU64,
34 pub(crate) reconnects: AtomicU64,
36 pub(crate) last_event_at: AtomicU64,
38 pub(crate) current_backoff_ms: AtomicU64,
40}
41
42impl MetricsCounter {
43 pub fn new() -> Arc<Self> {
45 Arc::new(Self::default())
46 }
47
48 pub fn snapshot(&self, queue_depth: u64) -> JetstreamMetrics {
50 JetstreamMetrics {
51 events_received: self.events_received.load(Ordering::Relaxed),
52 events_dropped: self.events_dropped.load(Ordering::Relaxed),
53 errors: self.errors.load(Ordering::Relaxed),
54 reconnects: self.reconnects.load(Ordering::Relaxed),
55 last_event_at: self.last_event_at.load(Ordering::Relaxed),
56 current_backoff_ms: self.current_backoff_ms.load(Ordering::Relaxed),
57 queue_depth,
58 }
59 }
60}
61
62impl Default for MetricsCounter {
63 fn default() -> Self {
64 Self {
65 events_received: AtomicU64::new(0),
66 events_dropped: AtomicU64::new(0),
67 errors: AtomicU64::new(0),
68 reconnects: AtomicU64::new(0),
69 last_event_at: AtomicU64::new(0),
70 current_backoff_ms: AtomicU64::new(0),
71 }
72 }
73}
74
75#[cfg(test)]
76mod tests {
77 use super::*;
78
79 #[test]
80 fn metrics_counter_starts_at_zero() {
81 let counter = MetricsCounter::new();
82 let snapshot = counter.snapshot(0);
83 assert_eq!(snapshot.events_received, 0);
84 assert_eq!(snapshot.events_dropped, 0);
85 assert_eq!(snapshot.errors, 0);
86 assert_eq!(snapshot.reconnects, 0);
87 assert_eq!(snapshot.last_event_at, 0);
88 assert_eq!(snapshot.current_backoff_ms, 0);
89 assert_eq!(snapshot.queue_depth, 0);
90 }
91
92 #[test]
93 fn metrics_counter_increments() {
94 let counter = MetricsCounter::new();
95 counter.events_received.fetch_add(5, Ordering::Relaxed);
96 counter.errors.fetch_add(1, Ordering::Relaxed);
97 let snapshot = counter.snapshot(3);
98 assert_eq!(snapshot.events_received, 5);
99 assert_eq!(snapshot.errors, 1);
100 assert_eq!(snapshot.queue_depth, 3);
101 }
102
103 #[test]
104 fn metrics_snapshot_is_independent() {
105 let counter = MetricsCounter::new();
106 counter.events_received.fetch_add(1, Ordering::Relaxed);
107 let snap1 = counter.snapshot(0);
108 counter.events_received.fetch_add(1, Ordering::Relaxed);
109 let snap2 = counter.snapshot(0);
110 assert_eq!(snap1.events_received, 1);
111 assert_eq!(snap2.events_received, 2);
112 }
113}