Skip to main content

atrg_firehose/
consumer.rs

1//! Firehose WebSocket consumer with bounded backpressure.
2//!
3//! The consumer connects to a relay's `com.atproto.sync.subscribeRepos`
4//! endpoint over WebSocket, decodes binary CBOR frames, extracts records
5//! from CAR blocks, and dispatches [`FirehoseEvent`]s through a bounded
6//! `mpsc` channel to a user-supplied handler.
7
8use std::sync::atomic::Ordering;
9use std::sync::Arc;
10
11use futures::StreamExt;
12use tokio::sync::mpsc;
13use tokio_tungstenite::tungstenite::Message;
14
15use crate::backoff::Backoff;
16use crate::car;
17use crate::event::{FirehoseCommit, FirehoseEvent, OpAction, RepoOp};
18use crate::metrics::MetricsCounter;
19use crate::FirehoseConfig;
20use crate::FirehoseHandler;
21
22/// Spawn the firehose consumer as a pair of background tasks.
23///
24/// Returns a join handle for the reader task. The consumer architecture:
25///
26/// 1. **Reader task** — connects to the relay WebSocket, decodes incoming
27///    binary CBOR frames into [`FirehoseEvent`]s, and sends them into a
28///    bounded `mpsc` channel. Reconnects with exponential backoff on error.
29/// 2. **Dispatcher task** — reads events from the channel and invokes the
30///    user-supplied handler for each one.
31///
32/// Backpressure: when the channel is full, the reader drops events and
33/// increments the `events_dropped` metric counter.
34///
35/// The `state` parameter is an arbitrary `Clone + Send + 'static` value
36/// that is forwarded to the handler on every event. In a typical atrg app
37/// this is `AppState`, but the consumer itself does not depend on
38/// `atrg-core` to avoid a cyclic dependency.
39pub async fn spawn_firehose<S>(
40    config: &FirehoseConfig,
41    state: S,
42    handler: FirehoseHandler<S>,
43) -> anyhow::Result<tokio::task::JoinHandle<()>>
44where
45    S: Clone + Send + Sync + 'static,
46{
47    let metrics = MetricsCounter::new();
48    let channel_capacity = config.channel_capacity;
49
50    let url = build_ws_url(&config.relay, config.cursor);
51
52    tracing::info!(
53        url = %url,
54        channel_capacity = channel_capacity,
55        cursor = ?config.cursor,
56        "starting firehose consumer"
57    );
58
59    let (tx, rx) = mpsc::channel::<FirehoseEvent>(channel_capacity);
60
61    // Spawn the dispatcher task.
62    spawn_dispatcher(rx, handler, state, metrics.clone());
63
64    // Spawn the reader task.
65    let handle = spawn_reader(url, tx, metrics);
66
67    Ok(handle)
68}
69
70/// Build the firehose WebSocket subscription URL.
71fn build_ws_url(relay: &str, cursor: Option<i64>) -> String {
72    let base = relay.trim_end_matches('/');
73    let endpoint = format!("{}/xrpc/com.atproto.sync.subscribeRepos", base);
74
75    match cursor {
76        Some(seq) => format!("{}?cursor={}", endpoint, seq),
77        None => endpoint,
78    }
79}
80
81/// Spawn the dispatcher task that reads from the channel and calls the handler.
82fn spawn_dispatcher<S>(
83    mut rx: mpsc::Receiver<FirehoseEvent>,
84    handler: FirehoseHandler<S>,
85    state: S,
86    metrics: Arc<MetricsCounter>,
87) where
88    S: Clone + Send + Sync + 'static,
89{
90    tokio::spawn(async move {
91        while let Some(event) = rx.recv().await {
92            if let Err(e) = handler(event, state.clone()).await {
93                tracing::error!(error = %e, "firehose event handler error");
94                metrics.errors.fetch_add(1, Ordering::Relaxed);
95            }
96        }
97        tracing::info!("firehose dispatcher task exiting");
98    });
99}
100
101/// Spawn the reader task that connects to the WebSocket and feeds the channel.
102fn spawn_reader(
103    url: String,
104    tx: mpsc::Sender<FirehoseEvent>,
105    metrics: Arc<MetricsCounter>,
106) -> tokio::task::JoinHandle<()> {
107    tokio::spawn(async move {
108        let mut backoff = Backoff::new();
109
110        loop {
111            match connect_and_read(&url, &tx, &metrics).await {
112                Ok(()) => {
113                    tracing::info!("firehose WebSocket closed cleanly");
114                }
115                Err(e) => {
116                    metrics.reconnects.fetch_add(1, Ordering::Relaxed);
117                    tracing::warn!(error = %e, "firehose connection error, will reconnect");
118                }
119            }
120
121            let delay = backoff.next_delay();
122            metrics
123                .current_backoff_ms
124                .store(delay.as_millis() as u64, Ordering::Relaxed);
125            tracing::info!(delay_ms = %delay.as_millis(), "reconnecting to firehose");
126            tokio::time::sleep(delay).await;
127        }
128    })
129}
130
131/// Connect to the WebSocket and read events until the connection drops.
132async fn connect_and_read(
133    url: &str,
134    tx: &mpsc::Sender<FirehoseEvent>,
135    metrics: &Arc<MetricsCounter>,
136) -> anyhow::Result<()> {
137    let (ws_stream, _response) = tokio_tungstenite::connect_async(url).await?;
138    tracing::info!(url = %url, "connected to firehose relay");
139
140    // Reset backoff on successful connection.
141    metrics.current_backoff_ms.store(0, Ordering::Relaxed);
142
143    let (_write, mut read) = ws_stream.split();
144
145    while let Some(msg_result) = read.next().await {
146        let msg = msg_result?;
147        match msg {
148            Message::Binary(data) => {
149                handle_binary_frame(&data, tx, metrics);
150            }
151            Message::Close(_) => {
152                tracing::info!("firehose WebSocket closed by server");
153                break;
154            }
155            // Ping/Pong are handled automatically by tungstenite.
156            // Text frames are not expected from the firehose.
157            _ => {}
158        }
159    }
160
161    Ok(())
162}
163
164/// Decode and dispatch a single binary CBOR frame from the firehose.
165fn handle_binary_frame(
166    data: &[u8],
167    tx: &mpsc::Sender<FirehoseEvent>,
168    metrics: &Arc<MetricsCounter>,
169) {
170    metrics.events_received.fetch_add(1, Ordering::Relaxed);
171    update_last_event_timestamp(metrics);
172
173    let event = match decode_frame(data) {
174        Ok(Some(ev)) => ev,
175        Ok(None) => {
176            // Unknown or unsupported frame type — silently skip.
177            return;
178        }
179        Err(e) => {
180            tracing::debug!(error = %e, "failed to decode firehose frame");
181            metrics.errors.fetch_add(1, Ordering::Relaxed);
182            return;
183        }
184    };
185
186    // Update last_seq for cursor tracking.
187    if let Some(seq) = event.seq() {
188        metrics.last_seq.store(seq, Ordering::Relaxed);
189    }
190
191    // Backpressure: drop if channel is full.
192    if tx.capacity() == 0 {
193        metrics.events_dropped.fetch_add(1, Ordering::Relaxed);
194        tracing::debug!("firehose channel full, dropping event");
195        return;
196    }
197
198    if tx.try_send(event).is_err() {
199        metrics.events_dropped.fetch_add(1, Ordering::Relaxed);
200        tracing::debug!("firehose channel full on try_send, dropping event");
201    }
202}
203
204/// Decode a binary CBOR frame into a [`FirehoseEvent`].
205///
206/// The firehose wire format is two concatenated CBOR values:
207/// 1. Header: `{ "op": 1, "t": "#commit" }` (or other type tag)
208/// 2. Body: the event payload as a CBOR map
209///
210/// Returns `Ok(None)` for unrecognized event types.
211fn decode_frame(data: &[u8]) -> anyhow::Result<Option<FirehoseEvent>> {
212    let mut cursor = data;
213
214    // Decode the header.
215    let header: ciborium::Value = ciborium::from_reader(&mut cursor)
216        .map_err(|e| anyhow::anyhow!("failed to decode CBOR header: {}", e))?;
217
218    let header_map =
219        cbor_as_map(&header).ok_or_else(|| anyhow::anyhow!("CBOR header is not a map"))?;
220
221    let op = cbor_map_get_int(header_map, "op").unwrap_or(0);
222    let frame_type = cbor_map_get_str(header_map, "t").unwrap_or_default();
223
224    // op=1 is a regular message frame, op=-1 is an error frame.
225    if op == -1 {
226        return decode_error_frame(&mut cursor);
227    }
228
229    if op != 1 {
230        return Ok(None);
231    }
232
233    // Decode the body.
234    let body: ciborium::Value = ciborium::from_reader(&mut cursor)
235        .map_err(|e| anyhow::anyhow!("failed to decode CBOR body: {}", e))?;
236
237    let body_map = cbor_as_map(&body).ok_or_else(|| anyhow::anyhow!("CBOR body is not a map"))?;
238
239    match frame_type.as_str() {
240        "#commit" => decode_commit(body_map).map(Some),
241        "#handle" => decode_handle(body_map).map(Some),
242        "#identity" => decode_identity(body_map).map(Some),
243        "#tombstone" => decode_tombstone(body_map).map(Some),
244        "#info" => decode_info(body_map).map(Some),
245        _ => {
246            tracing::trace!(frame_type = %frame_type, "unknown firehose frame type");
247            Ok(None)
248        }
249    }
250}
251
252/// Decode an error frame (op=-1) into an info event.
253fn decode_error_frame(cursor: &mut &[u8]) -> anyhow::Result<Option<FirehoseEvent>> {
254    let body: ciborium::Value = ciborium::from_reader(cursor)
255        .map_err(|e| anyhow::anyhow!("failed to decode error body: {}", e))?;
256
257    let body_map = cbor_as_map(&body).unwrap_or_default();
258    let name = cbor_map_get_str(body_map, "error").unwrap_or_else(|| "UnknownError".to_string());
259    let message = cbor_map_get_str(body_map, "message");
260
261    tracing::warn!(name = %name, message = ?message, "firehose error frame");
262
263    Ok(Some(FirehoseEvent::Info { name, message }))
264}
265
266/// Decode a `#commit` frame body.
267fn decode_commit(body: &[(ciborium::Value, ciborium::Value)]) -> anyhow::Result<FirehoseEvent> {
268    let seq = cbor_map_get_int(body, "seq").ok_or_else(|| anyhow::anyhow!("commit missing seq"))?;
269    let repo = cbor_map_get_str(body, "repo").unwrap_or_default();
270    let rev = cbor_map_get_str(body, "rev").unwrap_or_default();
271    let time = cbor_map_get_str(body, "time").unwrap_or_default();
272
273    // Decode the ops array.
274    let raw_ops = cbor_map_get_array(body, "ops").unwrap_or_default();
275
276    // Decode the CAR blocks.
277    let blocks_data = cbor_map_get_bytes(body, "blocks").unwrap_or_default();
278
279    let car_decoded = if blocks_data.is_empty() {
280        car::CarDecoded::empty()
281    } else {
282        car::decode_car(&blocks_data).unwrap_or_else(|e| {
283            tracing::debug!(error = %e, "failed to decode CAR blocks");
284            car::CarDecoded::empty()
285        })
286    };
287
288    let mut ops = Vec::with_capacity(raw_ops.len());
289    for op_val in &raw_ops {
290        if let Some(op_map) = cbor_as_map(op_val) {
291            let action_str = cbor_map_get_str(op_map, "action").unwrap_or_default();
292            let action = match action_str.as_str() {
293                "create" => OpAction::Create,
294                "update" => OpAction::Update,
295                "delete" => OpAction::Delete,
296                _ => continue,
297            };
298
299            let path = cbor_map_get_str(op_map, "path").unwrap_or_default();
300            let cid = extract_cid_string(op_map);
301
302            // Look up the record in CAR blocks by CID.
303            let record = cid
304                .as_deref()
305                .and_then(|c| car_decoded.blocks.get(c).cloned());
306
307            ops.push(RepoOp {
308                action,
309                path,
310                record,
311                cid,
312            });
313        }
314    }
315
316    Ok(FirehoseEvent::Commit(FirehoseCommit {
317        seq,
318        repo,
319        rev,
320        ops,
321        time,
322    }))
323}
324
325/// Extract the CID string from an op's `cid` field.
326///
327/// In the CBOR encoding, the CID is represented as a CBOR tag 42 wrapping bytes,
328/// or sometimes as a map with a `$link` field. We hex-encode the raw bytes.
329fn extract_cid_string(op_map: &[(ciborium::Value, ciborium::Value)]) -> Option<String> {
330    let cid_val = cbor_map_get_value(op_map, "cid")?;
331    match cid_val {
332        ciborium::Value::Tag(_tag, inner) => {
333            if let ciborium::Value::Bytes(b) = inner.as_ref() {
334                // Skip the leading 0x00 identity multibase prefix if present.
335                let bytes = if b.first() == Some(&0x00) { &b[1..] } else { b };
336                Some(hex_encode(bytes))
337            } else {
338                None
339            }
340        }
341        ciborium::Value::Bytes(b) => {
342            let bytes = if b.first() == Some(&0x00) { &b[1..] } else { b };
343            Some(hex_encode(bytes))
344        }
345        ciborium::Value::Map(m) => cbor_map_get_str(m, "$link"),
346        _ => None,
347    }
348}
349
350/// Decode a `#handle` frame body.
351fn decode_handle(body: &[(ciborium::Value, ciborium::Value)]) -> anyhow::Result<FirehoseEvent> {
352    let seq =
353        cbor_map_get_int(body, "seq").ok_or_else(|| anyhow::anyhow!("handle event missing seq"))?;
354    let did = cbor_map_get_str(body, "did").unwrap_or_default();
355    let handle = cbor_map_get_str(body, "handle").unwrap_or_default();
356
357    Ok(FirehoseEvent::Handle { seq, did, handle })
358}
359
360/// Decode an `#identity` frame body.
361fn decode_identity(body: &[(ciborium::Value, ciborium::Value)]) -> anyhow::Result<FirehoseEvent> {
362    let seq = cbor_map_get_int(body, "seq")
363        .ok_or_else(|| anyhow::anyhow!("identity event missing seq"))?;
364    let did = cbor_map_get_str(body, "did").unwrap_or_default();
365
366    Ok(FirehoseEvent::Identity { seq, did })
367}
368
369/// Decode a `#tombstone` frame body.
370fn decode_tombstone(body: &[(ciborium::Value, ciborium::Value)]) -> anyhow::Result<FirehoseEvent> {
371    let seq = cbor_map_get_int(body, "seq")
372        .ok_or_else(|| anyhow::anyhow!("tombstone event missing seq"))?;
373    let did = cbor_map_get_str(body, "did").unwrap_or_default();
374
375    Ok(FirehoseEvent::Tombstone { seq, did })
376}
377
378/// Decode an `#info` frame body.
379fn decode_info(body: &[(ciborium::Value, ciborium::Value)]) -> anyhow::Result<FirehoseEvent> {
380    let name = cbor_map_get_str(body, "name").unwrap_or_else(|| "Unknown".to_string());
381    let message = cbor_map_get_str(body, "message");
382
383    Ok(FirehoseEvent::Info { name, message })
384}
385
386// ---------------------------------------------------------------------------
387// CBOR value helpers
388// ---------------------------------------------------------------------------
389
390/// Try to interpret a `ciborium::Value` as a map (list of key-value pairs).
391fn cbor_as_map(val: &ciborium::Value) -> Option<&[(ciborium::Value, ciborium::Value)]> {
392    match val {
393        ciborium::Value::Map(pairs) => Some(pairs.as_slice()),
394        _ => None,
395    }
396}
397
398/// Look up a string key in a CBOR map and return the value.
399fn cbor_map_get_value<'a>(
400    map: &'a [(ciborium::Value, ciborium::Value)],
401    key: &str,
402) -> Option<&'a ciborium::Value> {
403    map.iter().find_map(|(k, v)| {
404        if let ciborium::Value::Text(s) = k {
405            if s == key {
406                return Some(v);
407            }
408        }
409        None
410    })
411}
412
413/// Look up a string key in a CBOR map and return it as a string.
414fn cbor_map_get_str(map: &[(ciborium::Value, ciborium::Value)], key: &str) -> Option<String> {
415    cbor_map_get_value(map, key).and_then(|v| {
416        if let ciborium::Value::Text(s) = v {
417            Some(s.clone())
418        } else {
419            None
420        }
421    })
422}
423
424/// Look up a string key in a CBOR map and return it as an i64.
425fn cbor_map_get_int(map: &[(ciborium::Value, ciborium::Value)], key: &str) -> Option<i64> {
426    cbor_map_get_value(map, key).and_then(|v| match v {
427        ciborium::Value::Integer(i) => {
428            let val: i128 = (*i).into();
429            i64::try_from(val).ok()
430        }
431        _ => None,
432    })
433}
434
435/// Look up a string key in a CBOR map and return it as a byte vector.
436fn cbor_map_get_bytes(map: &[(ciborium::Value, ciborium::Value)], key: &str) -> Option<Vec<u8>> {
437    cbor_map_get_value(map, key).and_then(|v| {
438        if let ciborium::Value::Bytes(b) = v {
439            Some(b.clone())
440        } else {
441            None
442        }
443    })
444}
445
446/// Look up a string key in a CBOR map and return it as an array of values.
447fn cbor_map_get_array(
448    map: &[(ciborium::Value, ciborium::Value)],
449    key: &str,
450) -> Option<Vec<ciborium::Value>> {
451    cbor_map_get_value(map, key).and_then(|v| {
452        if let ciborium::Value::Array(a) = v {
453            Some(a.clone())
454        } else {
455            None
456        }
457    })
458}
459
460/// Hex-encode a byte slice.
461fn hex_encode(bytes: &[u8]) -> String {
462    let mut s = String::with_capacity(bytes.len() * 2);
463    for b in bytes {
464        use std::fmt::Write;
465        let _ = write!(s, "{:02x}", b);
466    }
467    s
468}
469
470/// Record the current wall-clock time as the last-event timestamp.
471fn update_last_event_timestamp(metrics: &Arc<MetricsCounter>) {
472    let now_ms = std::time::SystemTime::now()
473        .duration_since(std::time::UNIX_EPOCH)
474        .unwrap_or_default()
475        .as_millis() as u64;
476    metrics.last_event_at.store(now_ms, Ordering::Relaxed);
477}
478
479#[cfg(test)]
480mod tests {
481    use super::*;
482
483    #[test]
484    fn build_ws_url_no_cursor() {
485        let url = build_ws_url("wss://bsky.network", None);
486        assert_eq!(
487            url,
488            "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
489        );
490    }
491
492    #[test]
493    fn build_ws_url_with_cursor() {
494        let url = build_ws_url("wss://bsky.network", Some(12345));
495        assert_eq!(
496            url,
497            "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos?cursor=12345"
498        );
499    }
500
501    #[test]
502    fn build_ws_url_strips_trailing_slash() {
503        let url = build_ws_url("wss://bsky.network/", None);
504        assert_eq!(
505            url,
506            "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
507        );
508    }
509
510    #[test]
511    fn hex_encode_works() {
512        assert_eq!(hex_encode(&[0xde, 0xad, 0xbe, 0xef]), "deadbeef");
513        assert_eq!(hex_encode(&[]), "");
514        assert_eq!(hex_encode(&[0x00, 0x01, 0xff]), "0001ff");
515    }
516
517    #[test]
518    fn cbor_helpers_work() {
519        let map = vec![
520            (
521                ciborium::Value::Text("name".to_string()),
522                ciborium::Value::Text("hello".to_string()),
523            ),
524            (
525                ciborium::Value::Text("seq".to_string()),
526                ciborium::Value::Integer(42.into()),
527            ),
528        ];
529
530        assert_eq!(cbor_map_get_str(&map, "name"), Some("hello".to_string()));
531        assert_eq!(cbor_map_get_int(&map, "seq"), Some(42));
532        assert_eq!(cbor_map_get_str(&map, "missing"), None);
533        assert_eq!(cbor_map_get_int(&map, "missing"), None);
534    }
535
536    // -----------------------------------------------------------------------
537    // Helpers for constructing CBOR frames
538    // -----------------------------------------------------------------------
539
540    fn encode_frame(header: &ciborium::Value, body: &ciborium::Value) -> Vec<u8> {
541        let mut buf = Vec::new();
542        ciborium::into_writer(header, &mut buf).unwrap();
543        ciborium::into_writer(body, &mut buf).unwrap();
544        buf
545    }
546
547    fn cbor_map(entries: Vec<(&str, ciborium::Value)>) -> ciborium::Value {
548        ciborium::Value::Map(
549            entries
550                .into_iter()
551                .map(|(k, v)| (ciborium::Value::Text(k.to_string()), v))
552                .collect(),
553        )
554    }
555
556    fn cbor_map_pairs(
557        entries: Vec<(&str, ciborium::Value)>,
558    ) -> Vec<(ciborium::Value, ciborium::Value)> {
559        entries
560            .into_iter()
561            .map(|(k, v)| (ciborium::Value::Text(k.to_string()), v))
562            .collect()
563    }
564
565    // -----------------------------------------------------------------------
566    // decode_frame tests
567    // -----------------------------------------------------------------------
568
569    #[test]
570    fn decode_frame_commit() {
571        let header = cbor_map(vec![
572            ("op", ciborium::Value::Integer(1.into())),
573            ("t", ciborium::Value::Text("#commit".into())),
574        ]);
575        let body = cbor_map(vec![
576            ("seq", ciborium::Value::Integer(42.into())),
577            ("repo", ciborium::Value::Text("did:plc:abc".into())),
578            ("rev", ciborium::Value::Text("rev1".into())),
579            ("time", ciborium::Value::Text("2024-01-01".into())),
580            ("ops", ciborium::Value::Array(vec![])),
581            ("blocks", ciborium::Value::Bytes(vec![])),
582        ]);
583        let data = encode_frame(&header, &body);
584        let result = decode_frame(&data).unwrap().unwrap();
585        match result {
586            FirehoseEvent::Commit(c) => {
587                assert_eq!(c.seq, 42);
588                assert_eq!(c.repo, "did:plc:abc");
589                assert_eq!(c.rev, "rev1");
590                assert_eq!(c.time, "2024-01-01");
591                assert!(c.ops.is_empty());
592            }
593            other => panic!("expected Commit, got {:?}", other),
594        }
595    }
596
597    #[test]
598    fn decode_frame_handle() {
599        let header = cbor_map(vec![
600            ("op", ciborium::Value::Integer(1.into())),
601            ("t", ciborium::Value::Text("#handle".into())),
602        ]);
603        let body = cbor_map(vec![
604            ("seq", ciborium::Value::Integer(1.into())),
605            ("did", ciborium::Value::Text("did:plc:abc".into())),
606            ("handle", ciborium::Value::Text("alice.bsky.social".into())),
607        ]);
608        let data = encode_frame(&header, &body);
609        let result = decode_frame(&data).unwrap().unwrap();
610        match result {
611            FirehoseEvent::Handle { seq, did, handle } => {
612                assert_eq!(seq, 1);
613                assert_eq!(did, "did:plc:abc");
614                assert_eq!(handle, "alice.bsky.social");
615            }
616            other => panic!("expected Handle, got {:?}", other),
617        }
618    }
619
620    #[test]
621    fn decode_frame_identity() {
622        let header = cbor_map(vec![
623            ("op", ciborium::Value::Integer(1.into())),
624            ("t", ciborium::Value::Text("#identity".into())),
625        ]);
626        let body = cbor_map(vec![
627            ("seq", ciborium::Value::Integer(2.into())),
628            ("did", ciborium::Value::Text("did:plc:abc".into())),
629        ]);
630        let data = encode_frame(&header, &body);
631        let result = decode_frame(&data).unwrap().unwrap();
632        match result {
633            FirehoseEvent::Identity { seq, did } => {
634                assert_eq!(seq, 2);
635                assert_eq!(did, "did:plc:abc");
636            }
637            other => panic!("expected Identity, got {:?}", other),
638        }
639    }
640
641    #[test]
642    fn decode_frame_tombstone() {
643        let header = cbor_map(vec![
644            ("op", ciborium::Value::Integer(1.into())),
645            ("t", ciborium::Value::Text("#tombstone".into())),
646        ]);
647        let body = cbor_map(vec![
648            ("seq", ciborium::Value::Integer(3.into())),
649            ("did", ciborium::Value::Text("did:plc:abc".into())),
650        ]);
651        let data = encode_frame(&header, &body);
652        let result = decode_frame(&data).unwrap().unwrap();
653        match result {
654            FirehoseEvent::Tombstone { seq, did } => {
655                assert_eq!(seq, 3);
656                assert_eq!(did, "did:plc:abc");
657            }
658            other => panic!("expected Tombstone, got {:?}", other),
659        }
660    }
661
662    #[test]
663    fn decode_frame_info() {
664        let header = cbor_map(vec![
665            ("op", ciborium::Value::Integer(1.into())),
666            ("t", ciborium::Value::Text("#info".into())),
667        ]);
668        let body = cbor_map(vec![
669            ("name", ciborium::Value::Text("OutdatedCursor".into())),
670            ("message", ciborium::Value::Text("you are behind".into())),
671        ]);
672        let data = encode_frame(&header, &body);
673        let result = decode_frame(&data).unwrap().unwrap();
674        match result {
675            FirehoseEvent::Info { name, message } => {
676                assert_eq!(name, "OutdatedCursor");
677                assert_eq!(message, Some("you are behind".to_string()));
678            }
679            other => panic!("expected Info, got {:?}", other),
680        }
681    }
682
683    #[test]
684    fn decode_frame_error_op_minus_one() {
685        let header = cbor_map(vec![("op", ciborium::Value::Integer((-1).into()))]);
686        let body = cbor_map(vec![
687            ("error", ciborium::Value::Text("FutureCursor".into())),
688            ("message", ciborium::Value::Text("bad cursor".into())),
689        ]);
690        let data = encode_frame(&header, &body);
691        let result = decode_frame(&data).unwrap().unwrap();
692        match result {
693            FirehoseEvent::Info { name, message } => {
694                assert_eq!(name, "FutureCursor");
695                assert_eq!(message, Some("bad cursor".to_string()));
696            }
697            other => panic!("expected Info from error frame, got {:?}", other),
698        }
699    }
700
701    #[test]
702    fn decode_frame_unknown_type_returns_none() {
703        let header = cbor_map(vec![
704            ("op", ciborium::Value::Integer(1.into())),
705            ("t", ciborium::Value::Text("#unknown".into())),
706        ]);
707        let body = cbor_map(vec![]);
708        let data = encode_frame(&header, &body);
709        assert!(decode_frame(&data).unwrap().is_none());
710    }
711
712    #[test]
713    fn decode_frame_unknown_op_returns_none() {
714        let header = cbor_map(vec![
715            ("op", ciborium::Value::Integer(99.into())),
716            ("t", ciborium::Value::Text("#commit".into())),
717        ]);
718        // Body isn't read for unknown op, but we need valid CBOR after header
719        // Actually for op != 1 and op != -1, we return Ok(None) before reading body.
720        let mut data = Vec::new();
721        ciborium::into_writer(&header, &mut data).unwrap();
722        assert!(decode_frame(&data).unwrap().is_none());
723    }
724
725    // -----------------------------------------------------------------------
726    // decode_commit tests
727    // -----------------------------------------------------------------------
728
729    #[test]
730    fn decode_commit_with_ops() {
731        let op_create = cbor_map(vec![
732            ("action", ciborium::Value::Text("create".into())),
733            (
734                "path",
735                ciborium::Value::Text("app.bsky.feed.post/abc".into()),
736            ),
737        ]);
738        let op_update = cbor_map(vec![
739            ("action", ciborium::Value::Text("update".into())),
740            (
741                "path",
742                ciborium::Value::Text("app.bsky.feed.post/def".into()),
743            ),
744        ]);
745        let op_delete = cbor_map(vec![
746            ("action", ciborium::Value::Text("delete".into())),
747            (
748                "path",
749                ciborium::Value::Text("app.bsky.feed.post/ghi".into()),
750            ),
751        ]);
752
753        let body = cbor_map_pairs(vec![
754            ("seq", ciborium::Value::Integer(10.into())),
755            ("repo", ciborium::Value::Text("did:plc:xyz".into())),
756            ("rev", ciborium::Value::Text("r2".into())),
757            ("time", ciborium::Value::Text("2024-06-01".into())),
758            (
759                "ops",
760                ciborium::Value::Array(vec![op_create, op_update, op_delete]),
761            ),
762            ("blocks", ciborium::Value::Bytes(vec![])),
763        ]);
764
765        let result = decode_commit(&body).unwrap();
766        match result {
767            FirehoseEvent::Commit(c) => {
768                assert_eq!(c.seq, 10);
769                assert_eq!(c.repo, "did:plc:xyz");
770                assert_eq!(c.ops.len(), 3);
771                assert_eq!(c.ops[0].action, OpAction::Create);
772                assert_eq!(c.ops[0].path, "app.bsky.feed.post/abc");
773                assert_eq!(c.ops[1].action, OpAction::Update);
774                assert_eq!(c.ops[2].action, OpAction::Delete);
775            }
776            other => panic!("expected Commit, got {:?}", other),
777        }
778    }
779
780    #[test]
781    fn decode_commit_empty_ops() {
782        let body = cbor_map_pairs(vec![
783            ("seq", ciborium::Value::Integer(5.into())),
784            ("repo", ciborium::Value::Text("did:plc:abc".into())),
785            ("rev", ciborium::Value::Text("r1".into())),
786            ("time", ciborium::Value::Text("2024-01-01".into())),
787            ("ops", ciborium::Value::Array(vec![])),
788            ("blocks", ciborium::Value::Bytes(vec![])),
789        ]);
790        let result = decode_commit(&body).unwrap();
791        match result {
792            FirehoseEvent::Commit(c) => assert!(c.ops.is_empty()),
793            other => panic!("expected Commit, got {:?}", other),
794        }
795    }
796
797    #[test]
798    fn decode_commit_missing_seq_errors() {
799        let body = cbor_map_pairs(vec![("repo", ciborium::Value::Text("did:plc:abc".into()))]);
800        assert!(decode_commit(&body).is_err());
801    }
802
803    // -----------------------------------------------------------------------
804    // decode_handle / decode_identity / decode_tombstone / decode_info tests
805    // -----------------------------------------------------------------------
806
807    #[test]
808    fn decode_handle_success() {
809        let body = cbor_map_pairs(vec![
810            ("seq", ciborium::Value::Integer(7.into())),
811            ("did", ciborium::Value::Text("did:plc:h".into())),
812            ("handle", ciborium::Value::Text("bob.test".into())),
813        ]);
814        match decode_handle(&body).unwrap() {
815            FirehoseEvent::Handle { seq, did, handle } => {
816                assert_eq!(seq, 7);
817                assert_eq!(did, "did:plc:h");
818                assert_eq!(handle, "bob.test");
819            }
820            other => panic!("expected Handle, got {:?}", other),
821        }
822    }
823
824    #[test]
825    fn decode_handle_missing_seq_errors() {
826        let body = cbor_map_pairs(vec![("did", ciborium::Value::Text("did:plc:h".into()))]);
827        assert!(decode_handle(&body).is_err());
828    }
829
830    #[test]
831    fn decode_identity_success() {
832        let body = cbor_map_pairs(vec![
833            ("seq", ciborium::Value::Integer(8.into())),
834            ("did", ciborium::Value::Text("did:plc:i".into())),
835        ]);
836        match decode_identity(&body).unwrap() {
837            FirehoseEvent::Identity { seq, did } => {
838                assert_eq!(seq, 8);
839                assert_eq!(did, "did:plc:i");
840            }
841            other => panic!("expected Identity, got {:?}", other),
842        }
843    }
844
845    #[test]
846    fn decode_identity_missing_seq_errors() {
847        let body = cbor_map_pairs(vec![("did", ciborium::Value::Text("did:plc:i".into()))]);
848        assert!(decode_identity(&body).is_err());
849    }
850
851    #[test]
852    fn decode_tombstone_success() {
853        let body = cbor_map_pairs(vec![
854            ("seq", ciborium::Value::Integer(9.into())),
855            ("did", ciborium::Value::Text("did:plc:t".into())),
856        ]);
857        match decode_tombstone(&body).unwrap() {
858            FirehoseEvent::Tombstone { seq, did } => {
859                assert_eq!(seq, 9);
860                assert_eq!(did, "did:plc:t");
861            }
862            other => panic!("expected Tombstone, got {:?}", other),
863        }
864    }
865
866    #[test]
867    fn decode_tombstone_missing_seq_errors() {
868        let body = cbor_map_pairs(vec![("did", ciborium::Value::Text("did:plc:t".into()))]);
869        assert!(decode_tombstone(&body).is_err());
870    }
871
872    #[test]
873    fn decode_info_success() {
874        let body = cbor_map_pairs(vec![
875            ("name", ciborium::Value::Text("OutdatedCursor".into())),
876            ("message", ciborium::Value::Text("old cursor".into())),
877        ]);
878        match decode_info(&body).unwrap() {
879            FirehoseEvent::Info { name, message } => {
880                assert_eq!(name, "OutdatedCursor");
881                assert_eq!(message, Some("old cursor".to_string()));
882            }
883            other => panic!("expected Info, got {:?}", other),
884        }
885    }
886
887    #[test]
888    fn decode_info_no_message() {
889        let body = cbor_map_pairs(vec![("name", ciborium::Value::Text("SomeInfo".into()))]);
890        match decode_info(&body).unwrap() {
891            FirehoseEvent::Info { name, message } => {
892                assert_eq!(name, "SomeInfo");
893                assert_eq!(message, None);
894            }
895            other => panic!("expected Info, got {:?}", other),
896        }
897    }
898
899    // -----------------------------------------------------------------------
900    // extract_cid_string tests
901    // -----------------------------------------------------------------------
902
903    #[test]
904    fn extract_cid_tag42_with_prefix() {
905        let map = cbor_map_pairs(vec![(
906            "cid",
907            ciborium::Value::Tag(42, Box::new(ciborium::Value::Bytes(vec![0x00, 0xde, 0xad]))),
908        )]);
909        assert_eq!(extract_cid_string(&map), Some("dead".to_string()));
910    }
911
912    #[test]
913    fn extract_cid_tag42_no_prefix() {
914        let map = cbor_map_pairs(vec![(
915            "cid",
916            ciborium::Value::Tag(42, Box::new(ciborium::Value::Bytes(vec![0xab, 0xcd]))),
917        )]);
918        assert_eq!(extract_cid_string(&map), Some("abcd".to_string()));
919    }
920
921    #[test]
922    fn extract_cid_raw_bytes() {
923        let map = cbor_map_pairs(vec![(
924            "cid",
925            ciborium::Value::Bytes(vec![0x00, 0x01, 0x02]),
926        )]);
927        assert_eq!(extract_cid_string(&map), Some("0102".to_string()));
928    }
929
930    #[test]
931    fn extract_cid_link_map() {
932        let link_map = ciborium::Value::Map(vec![(
933            ciborium::Value::Text("$link".into()),
934            ciborium::Value::Text("bafyabc".into()),
935        )]);
936        let map = cbor_map_pairs(vec![("cid", link_map)]);
937        assert_eq!(extract_cid_string(&map), Some("bafyabc".to_string()));
938    }
939
940    #[test]
941    fn extract_cid_other_type_returns_none() {
942        let map = cbor_map_pairs(vec![("cid", ciborium::Value::Integer(123.into()))]);
943        assert_eq!(extract_cid_string(&map), None);
944    }
945
946    #[test]
947    fn extract_cid_missing_key_returns_none() {
948        let map = cbor_map_pairs(vec![("other", ciborium::Value::Text("x".into()))]);
949        assert_eq!(extract_cid_string(&map), None);
950    }
951
952    // -----------------------------------------------------------------------
953    // cbor_map_get_bytes / cbor_map_get_array tests
954    // -----------------------------------------------------------------------
955
956    #[test]
957    fn cbor_map_get_bytes_present() {
958        let map = cbor_map_pairs(vec![("data", ciborium::Value::Bytes(vec![1, 2, 3]))]);
959        assert_eq!(cbor_map_get_bytes(&map, "data"), Some(vec![1, 2, 3]));
960    }
961
962    #[test]
963    fn cbor_map_get_bytes_missing() {
964        let map = cbor_map_pairs(vec![("other", ciborium::Value::Integer(1.into()))]);
965        assert_eq!(cbor_map_get_bytes(&map, "data"), None);
966    }
967
968    #[test]
969    fn cbor_map_get_array_present() {
970        let map = cbor_map_pairs(vec![(
971            "items",
972            ciborium::Value::Array(vec![
973                ciborium::Value::Integer(1.into()),
974                ciborium::Value::Integer(2.into()),
975            ]),
976        )]);
977        let arr = cbor_map_get_array(&map, "items").unwrap();
978        assert_eq!(arr.len(), 2);
979    }
980
981    #[test]
982    fn cbor_map_get_array_missing() {
983        let map = cbor_map_pairs(vec![("other", ciborium::Value::Integer(1.into()))]);
984        assert_eq!(cbor_map_get_array(&map, "items"), None);
985    }
986
987    // -----------------------------------------------------------------------
988    // handle_binary_frame tests
989    // -----------------------------------------------------------------------
990
991    #[test]
992    fn handle_binary_frame_sends_event() {
993        let header = cbor_map(vec![
994            ("op", ciborium::Value::Integer(1.into())),
995            ("t", ciborium::Value::Text("#identity".into())),
996        ]);
997        let body = cbor_map(vec![
998            ("seq", ciborium::Value::Integer(77.into())),
999            ("did", ciborium::Value::Text("did:plc:test".into())),
1000        ]);
1001        let data = encode_frame(&header, &body);
1002
1003        let (tx, mut rx) = mpsc::channel(16);
1004        let metrics = MetricsCounter::new();
1005
1006        handle_binary_frame(&data, &tx, &metrics);
1007
1008        let event = rx.try_recv().unwrap();
1009        match event {
1010            FirehoseEvent::Identity { seq, did } => {
1011                assert_eq!(seq, 77);
1012                assert_eq!(did, "did:plc:test");
1013            }
1014            other => panic!("expected Identity, got {:?}", other),
1015        }
1016        assert_eq!(metrics.events_received.load(Ordering::Relaxed), 1);
1017        assert_eq!(metrics.events_dropped.load(Ordering::Relaxed), 0);
1018    }
1019
1020    #[test]
1021    fn handle_binary_frame_drops_when_channel_full() {
1022        let header = cbor_map(vec![
1023            ("op", ciborium::Value::Integer(1.into())),
1024            ("t", ciborium::Value::Text("#identity".into())),
1025        ]);
1026        let body = cbor_map(vec![
1027            ("seq", ciborium::Value::Integer(1.into())),
1028            ("did", ciborium::Value::Text("did:plc:x".into())),
1029        ]);
1030        let data = encode_frame(&header, &body);
1031
1032        // Channel with capacity 1, pre-fill it.
1033        let (tx, _rx) = mpsc::channel(1);
1034        let metrics = MetricsCounter::new();
1035
1036        // Fill the channel.
1037        handle_binary_frame(&data, &tx, &metrics);
1038        // Second send should drop.
1039        handle_binary_frame(&data, &tx, &metrics);
1040
1041        assert_eq!(metrics.events_received.load(Ordering::Relaxed), 2);
1042        assert_eq!(metrics.events_dropped.load(Ordering::Relaxed), 1);
1043    }
1044
1045    #[test]
1046    fn handle_binary_frame_bad_data_increments_errors() {
1047        let (tx, _rx) = mpsc::channel(16);
1048        let metrics = MetricsCounter::new();
1049
1050        handle_binary_frame(&[0xff, 0xff], &tx, &metrics);
1051
1052        assert_eq!(metrics.events_received.load(Ordering::Relaxed), 1);
1053        assert_eq!(metrics.errors.load(Ordering::Relaxed), 1);
1054    }
1055
1056    // -----------------------------------------------------------------------
1057    // update_last_event_timestamp tests
1058    // -----------------------------------------------------------------------
1059
1060    #[test]
1061    fn update_last_event_timestamp_sets_nonzero() {
1062        let metrics = MetricsCounter::new();
1063        assert_eq!(metrics.last_event_at.load(Ordering::Relaxed), 0);
1064        update_last_event_timestamp(&metrics);
1065        assert_ne!(metrics.last_event_at.load(Ordering::Relaxed), 0);
1066    }
1067}