1use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
7use std::sync::Arc;
8
9#[derive(Debug, Clone, serde::Serialize)]
11pub struct FirehoseMetrics {
12 pub events_received: u64,
14 pub events_dropped: u64,
16 pub errors: u64,
18 pub reconnects: u64,
20 pub last_event_at: u64,
22 pub current_backoff_ms: u64,
24 pub last_seq: i64,
26 pub queue_depth: u64,
28}
29
30#[derive(Debug)]
32pub struct MetricsCounter {
33 pub(crate) events_received: AtomicU64,
35 pub(crate) events_dropped: AtomicU64,
37 pub(crate) errors: AtomicU64,
39 pub(crate) reconnects: AtomicU64,
41 pub(crate) last_event_at: AtomicU64,
43 pub(crate) current_backoff_ms: AtomicU64,
45 pub(crate) last_seq: AtomicI64,
47}
48
49impl MetricsCounter {
50 pub fn new() -> Arc<Self> {
52 Arc::new(Self::default())
53 }
54
55 pub fn snapshot(&self, queue_depth: u64) -> FirehoseMetrics {
57 FirehoseMetrics {
58 events_received: self.events_received.load(Ordering::Relaxed),
59 events_dropped: self.events_dropped.load(Ordering::Relaxed),
60 errors: self.errors.load(Ordering::Relaxed),
61 reconnects: self.reconnects.load(Ordering::Relaxed),
62 last_event_at: self.last_event_at.load(Ordering::Relaxed),
63 current_backoff_ms: self.current_backoff_ms.load(Ordering::Relaxed),
64 last_seq: self.last_seq.load(Ordering::Relaxed),
65 queue_depth,
66 }
67 }
68}
69
70impl Default for MetricsCounter {
71 fn default() -> Self {
72 Self {
73 events_received: AtomicU64::new(0),
74 events_dropped: AtomicU64::new(0),
75 errors: AtomicU64::new(0),
76 reconnects: AtomicU64::new(0),
77 last_event_at: AtomicU64::new(0),
78 current_backoff_ms: AtomicU64::new(0),
79 last_seq: AtomicI64::new(0),
80 }
81 }
82}
83
84#[cfg(test)]
85mod tests {
86 use super::*;
87
88 #[test]
89 fn metrics_counter_starts_at_zero() {
90 let counter = MetricsCounter::new();
91 let snapshot = counter.snapshot(0);
92 assert_eq!(snapshot.events_received, 0);
93 assert_eq!(snapshot.events_dropped, 0);
94 assert_eq!(snapshot.errors, 0);
95 assert_eq!(snapshot.reconnects, 0);
96 assert_eq!(snapshot.last_event_at, 0);
97 assert_eq!(snapshot.current_backoff_ms, 0);
98 assert_eq!(snapshot.last_seq, 0);
99 assert_eq!(snapshot.queue_depth, 0);
100 }
101
102 #[test]
103 fn metrics_counter_increments() {
104 let counter = MetricsCounter::new();
105 counter.events_received.fetch_add(5, Ordering::Relaxed);
106 counter.errors.fetch_add(1, Ordering::Relaxed);
107 counter.last_seq.store(42, Ordering::Relaxed);
108 let snapshot = counter.snapshot(3);
109 assert_eq!(snapshot.events_received, 5);
110 assert_eq!(snapshot.errors, 1);
111 assert_eq!(snapshot.last_seq, 42);
112 assert_eq!(snapshot.queue_depth, 3);
113 }
114
115 #[test]
116 fn metrics_snapshot_is_independent() {
117 let counter = MetricsCounter::new();
118 counter.events_received.fetch_add(1, Ordering::Relaxed);
119 let snap1 = counter.snapshot(0);
120 counter.events_received.fetch_add(1, Ordering::Relaxed);
121 let snap2 = counter.snapshot(0);
122 assert_eq!(snap1.events_received, 1);
123 assert_eq!(snap2.events_received, 2);
124 }
125
126 #[test]
127 fn last_seq_tracks_negative_values() {
128 let counter = MetricsCounter::new();
129 counter.last_seq.store(-1, Ordering::Relaxed);
130 let snapshot = counter.snapshot(0);
131 assert_eq!(snapshot.last_seq, -1);
132 }
133}