1use std::sync::atomic::Ordering;
9use std::sync::Arc;
10
11use futures::StreamExt;
12use tokio::sync::mpsc;
13use tokio_tungstenite::tungstenite::Message;
14
15use crate::backoff::Backoff;
16use crate::car;
17use crate::event::{FirehoseCommit, FirehoseEvent, OpAction, RepoOp};
18use crate::metrics::MetricsCounter;
19use crate::FirehoseConfig;
20use crate::FirehoseHandler;
21
22pub async fn spawn_firehose<S>(
40 config: &FirehoseConfig,
41 state: S,
42 handler: FirehoseHandler<S>,
43) -> anyhow::Result<tokio::task::JoinHandle<()>>
44where
45 S: Clone + Send + Sync + 'static,
46{
47 let metrics = MetricsCounter::new();
48 let channel_capacity = config.channel_capacity;
49
50 let url = build_ws_url(&config.relay, config.cursor);
51
52 tracing::info!(
53 url = %url,
54 channel_capacity = channel_capacity,
55 cursor = ?config.cursor,
56 "starting firehose consumer"
57 );
58
59 let (tx, rx) = mpsc::channel::<FirehoseEvent>(channel_capacity);
60
61 spawn_dispatcher(rx, handler, state, metrics.clone());
63
64 let handle = spawn_reader(url, tx, metrics);
66
67 Ok(handle)
68}
69
70fn build_ws_url(relay: &str, cursor: Option<i64>) -> String {
72 let base = relay.trim_end_matches('/');
73 let endpoint = format!("{}/xrpc/com.atproto.sync.subscribeRepos", base);
74
75 match cursor {
76 Some(seq) => format!("{}?cursor={}", endpoint, seq),
77 None => endpoint,
78 }
79}
80
81fn spawn_dispatcher<S>(
83 mut rx: mpsc::Receiver<FirehoseEvent>,
84 handler: FirehoseHandler<S>,
85 state: S,
86 metrics: Arc<MetricsCounter>,
87) where
88 S: Clone + Send + Sync + 'static,
89{
90 tokio::spawn(async move {
91 while let Some(event) = rx.recv().await {
92 if let Err(e) = handler(event, state.clone()).await {
93 tracing::error!(error = %e, "firehose event handler error");
94 metrics.errors.fetch_add(1, Ordering::Relaxed);
95 }
96 }
97 tracing::info!("firehose dispatcher task exiting");
98 });
99}
100
101fn spawn_reader(
103 url: String,
104 tx: mpsc::Sender<FirehoseEvent>,
105 metrics: Arc<MetricsCounter>,
106) -> tokio::task::JoinHandle<()> {
107 tokio::spawn(async move {
108 let mut backoff = Backoff::new();
109
110 loop {
111 match connect_and_read(&url, &tx, &metrics).await {
112 Ok(()) => {
113 tracing::info!("firehose WebSocket closed cleanly");
114 }
115 Err(e) => {
116 metrics.reconnects.fetch_add(1, Ordering::Relaxed);
117 tracing::warn!(error = %e, "firehose connection error, will reconnect");
118 }
119 }
120
121 let delay = backoff.next_delay();
122 metrics
123 .current_backoff_ms
124 .store(delay.as_millis() as u64, Ordering::Relaxed);
125 tracing::info!(delay_ms = %delay.as_millis(), "reconnecting to firehose");
126 tokio::time::sleep(delay).await;
127 }
128 })
129}
130
131async fn connect_and_read(
133 url: &str,
134 tx: &mpsc::Sender<FirehoseEvent>,
135 metrics: &Arc<MetricsCounter>,
136) -> anyhow::Result<()> {
137 let (ws_stream, _response) = tokio_tungstenite::connect_async(url).await?;
138 tracing::info!(url = %url, "connected to firehose relay");
139
140 metrics.current_backoff_ms.store(0, Ordering::Relaxed);
142
143 let (_write, mut read) = ws_stream.split();
144
145 while let Some(msg_result) = read.next().await {
146 let msg = msg_result?;
147 match msg {
148 Message::Binary(data) => {
149 handle_binary_frame(&data, tx, metrics);
150 }
151 Message::Close(_) => {
152 tracing::info!("firehose WebSocket closed by server");
153 break;
154 }
155 _ => {}
158 }
159 }
160
161 Ok(())
162}
163
164fn handle_binary_frame(
166 data: &[u8],
167 tx: &mpsc::Sender<FirehoseEvent>,
168 metrics: &Arc<MetricsCounter>,
169) {
170 metrics.events_received.fetch_add(1, Ordering::Relaxed);
171 update_last_event_timestamp(metrics);
172
173 let event = match decode_frame(data) {
174 Ok(Some(ev)) => ev,
175 Ok(None) => {
176 return;
178 }
179 Err(e) => {
180 tracing::debug!(error = %e, "failed to decode firehose frame");
181 metrics.errors.fetch_add(1, Ordering::Relaxed);
182 return;
183 }
184 };
185
186 if let Some(seq) = event.seq() {
188 metrics.last_seq.store(seq, Ordering::Relaxed);
189 }
190
191 if tx.capacity() == 0 {
193 metrics.events_dropped.fetch_add(1, Ordering::Relaxed);
194 tracing::debug!("firehose channel full, dropping event");
195 return;
196 }
197
198 if tx.try_send(event).is_err() {
199 metrics.events_dropped.fetch_add(1, Ordering::Relaxed);
200 tracing::debug!("firehose channel full on try_send, dropping event");
201 }
202}
203
204fn decode_frame(data: &[u8]) -> anyhow::Result<Option<FirehoseEvent>> {
212 let mut cursor = data;
213
214 let header: ciborium::Value = ciborium::from_reader(&mut cursor)
216 .map_err(|e| anyhow::anyhow!("failed to decode CBOR header: {}", e))?;
217
218 let header_map =
219 cbor_as_map(&header).ok_or_else(|| anyhow::anyhow!("CBOR header is not a map"))?;
220
221 let op = cbor_map_get_int(header_map, "op").unwrap_or(0);
222 let frame_type = cbor_map_get_str(header_map, "t").unwrap_or_default();
223
224 if op == -1 {
226 return decode_error_frame(&mut cursor);
227 }
228
229 if op != 1 {
230 return Ok(None);
231 }
232
233 let body: ciborium::Value = ciborium::from_reader(&mut cursor)
235 .map_err(|e| anyhow::anyhow!("failed to decode CBOR body: {}", e))?;
236
237 let body_map = cbor_as_map(&body).ok_or_else(|| anyhow::anyhow!("CBOR body is not a map"))?;
238
239 match frame_type.as_str() {
240 "#commit" => decode_commit(body_map).map(Some),
241 "#handle" => decode_handle(body_map).map(Some),
242 "#identity" => decode_identity(body_map).map(Some),
243 "#tombstone" => decode_tombstone(body_map).map(Some),
244 "#info" => decode_info(body_map).map(Some),
245 _ => {
246 tracing::trace!(frame_type = %frame_type, "unknown firehose frame type");
247 Ok(None)
248 }
249 }
250}
251
252fn decode_error_frame(cursor: &mut &[u8]) -> anyhow::Result<Option<FirehoseEvent>> {
254 let body: ciborium::Value = ciborium::from_reader(cursor)
255 .map_err(|e| anyhow::anyhow!("failed to decode error body: {}", e))?;
256
257 let body_map = cbor_as_map(&body).unwrap_or_default();
258 let name = cbor_map_get_str(body_map, "error").unwrap_or_else(|| "UnknownError".to_string());
259 let message = cbor_map_get_str(body_map, "message");
260
261 tracing::warn!(name = %name, message = ?message, "firehose error frame");
262
263 Ok(Some(FirehoseEvent::Info { name, message }))
264}
265
266fn decode_commit(body: &[(ciborium::Value, ciborium::Value)]) -> anyhow::Result<FirehoseEvent> {
268 let seq = cbor_map_get_int(body, "seq").ok_or_else(|| anyhow::anyhow!("commit missing seq"))?;
269 let repo = cbor_map_get_str(body, "repo").unwrap_or_default();
270 let rev = cbor_map_get_str(body, "rev").unwrap_or_default();
271 let time = cbor_map_get_str(body, "time").unwrap_or_default();
272
273 let raw_ops = cbor_map_get_array(body, "ops").unwrap_or_default();
275
276 let blocks_data = cbor_map_get_bytes(body, "blocks").unwrap_or_default();
278
279 let car_decoded = if blocks_data.is_empty() {
280 car::CarDecoded::empty()
281 } else {
282 car::decode_car(&blocks_data).unwrap_or_else(|e| {
283 tracing::debug!(error = %e, "failed to decode CAR blocks");
284 car::CarDecoded::empty()
285 })
286 };
287
288 let mut ops = Vec::with_capacity(raw_ops.len());
289 for op_val in &raw_ops {
290 if let Some(op_map) = cbor_as_map(op_val) {
291 let action_str = cbor_map_get_str(op_map, "action").unwrap_or_default();
292 let action = match action_str.as_str() {
293 "create" => OpAction::Create,
294 "update" => OpAction::Update,
295 "delete" => OpAction::Delete,
296 _ => continue,
297 };
298
299 let path = cbor_map_get_str(op_map, "path").unwrap_or_default();
300 let cid = extract_cid_string(op_map);
301
302 let record = cid
304 .as_deref()
305 .and_then(|c| car_decoded.blocks.get(c).cloned());
306
307 ops.push(RepoOp {
308 action,
309 path,
310 record,
311 cid,
312 });
313 }
314 }
315
316 Ok(FirehoseEvent::Commit(FirehoseCommit {
317 seq,
318 repo,
319 rev,
320 ops,
321 time,
322 }))
323}
324
325fn extract_cid_string(op_map: &[(ciborium::Value, ciborium::Value)]) -> Option<String> {
330 let cid_val = cbor_map_get_value(op_map, "cid")?;
331 match cid_val {
332 ciborium::Value::Tag(_tag, inner) => {
333 if let ciborium::Value::Bytes(b) = inner.as_ref() {
334 let bytes = if b.first() == Some(&0x00) { &b[1..] } else { b };
336 Some(hex_encode(bytes))
337 } else {
338 None
339 }
340 }
341 ciborium::Value::Bytes(b) => {
342 let bytes = if b.first() == Some(&0x00) { &b[1..] } else { b };
343 Some(hex_encode(bytes))
344 }
345 ciborium::Value::Map(m) => cbor_map_get_str(m, "$link"),
346 _ => None,
347 }
348}
349
350fn decode_handle(body: &[(ciborium::Value, ciborium::Value)]) -> anyhow::Result<FirehoseEvent> {
352 let seq =
353 cbor_map_get_int(body, "seq").ok_or_else(|| anyhow::anyhow!("handle event missing seq"))?;
354 let did = cbor_map_get_str(body, "did").unwrap_or_default();
355 let handle = cbor_map_get_str(body, "handle").unwrap_or_default();
356
357 Ok(FirehoseEvent::Handle { seq, did, handle })
358}
359
360fn decode_identity(body: &[(ciborium::Value, ciborium::Value)]) -> anyhow::Result<FirehoseEvent> {
362 let seq = cbor_map_get_int(body, "seq")
363 .ok_or_else(|| anyhow::anyhow!("identity event missing seq"))?;
364 let did = cbor_map_get_str(body, "did").unwrap_or_default();
365
366 Ok(FirehoseEvent::Identity { seq, did })
367}
368
369fn decode_tombstone(body: &[(ciborium::Value, ciborium::Value)]) -> anyhow::Result<FirehoseEvent> {
371 let seq = cbor_map_get_int(body, "seq")
372 .ok_or_else(|| anyhow::anyhow!("tombstone event missing seq"))?;
373 let did = cbor_map_get_str(body, "did").unwrap_or_default();
374
375 Ok(FirehoseEvent::Tombstone { seq, did })
376}
377
378fn decode_info(body: &[(ciborium::Value, ciborium::Value)]) -> anyhow::Result<FirehoseEvent> {
380 let name = cbor_map_get_str(body, "name").unwrap_or_else(|| "Unknown".to_string());
381 let message = cbor_map_get_str(body, "message");
382
383 Ok(FirehoseEvent::Info { name, message })
384}
385
386fn cbor_as_map(val: &ciborium::Value) -> Option<&[(ciborium::Value, ciborium::Value)]> {
392 match val {
393 ciborium::Value::Map(pairs) => Some(pairs.as_slice()),
394 _ => None,
395 }
396}
397
398fn cbor_map_get_value<'a>(
400 map: &'a [(ciborium::Value, ciborium::Value)],
401 key: &str,
402) -> Option<&'a ciborium::Value> {
403 map.iter().find_map(|(k, v)| {
404 if let ciborium::Value::Text(s) = k {
405 if s == key {
406 return Some(v);
407 }
408 }
409 None
410 })
411}
412
413fn cbor_map_get_str(map: &[(ciborium::Value, ciborium::Value)], key: &str) -> Option<String> {
415 cbor_map_get_value(map, key).and_then(|v| {
416 if let ciborium::Value::Text(s) = v {
417 Some(s.clone())
418 } else {
419 None
420 }
421 })
422}
423
424fn cbor_map_get_int(map: &[(ciborium::Value, ciborium::Value)], key: &str) -> Option<i64> {
426 cbor_map_get_value(map, key).and_then(|v| match v {
427 ciborium::Value::Integer(i) => {
428 let val: i128 = (*i).into();
429 i64::try_from(val).ok()
430 }
431 _ => None,
432 })
433}
434
435fn cbor_map_get_bytes(map: &[(ciborium::Value, ciborium::Value)], key: &str) -> Option<Vec<u8>> {
437 cbor_map_get_value(map, key).and_then(|v| {
438 if let ciborium::Value::Bytes(b) = v {
439 Some(b.clone())
440 } else {
441 None
442 }
443 })
444}
445
446fn cbor_map_get_array(
448 map: &[(ciborium::Value, ciborium::Value)],
449 key: &str,
450) -> Option<Vec<ciborium::Value>> {
451 cbor_map_get_value(map, key).and_then(|v| {
452 if let ciborium::Value::Array(a) = v {
453 Some(a.clone())
454 } else {
455 None
456 }
457 })
458}
459
460fn hex_encode(bytes: &[u8]) -> String {
462 let mut s = String::with_capacity(bytes.len() * 2);
463 for b in bytes {
464 use std::fmt::Write;
465 let _ = write!(s, "{:02x}", b);
466 }
467 s
468}
469
470fn update_last_event_timestamp(metrics: &Arc<MetricsCounter>) {
472 let now_ms = std::time::SystemTime::now()
473 .duration_since(std::time::UNIX_EPOCH)
474 .unwrap_or_default()
475 .as_millis() as u64;
476 metrics.last_event_at.store(now_ms, Ordering::Relaxed);
477}
478
479#[cfg(test)]
480mod tests {
481 use super::*;
482
483 #[test]
484 fn build_ws_url_no_cursor() {
485 let url = build_ws_url("wss://bsky.network", None);
486 assert_eq!(
487 url,
488 "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
489 );
490 }
491
492 #[test]
493 fn build_ws_url_with_cursor() {
494 let url = build_ws_url("wss://bsky.network", Some(12345));
495 assert_eq!(
496 url,
497 "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos?cursor=12345"
498 );
499 }
500
501 #[test]
502 fn build_ws_url_strips_trailing_slash() {
503 let url = build_ws_url("wss://bsky.network/", None);
504 assert_eq!(
505 url,
506 "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
507 );
508 }
509
510 #[test]
511 fn hex_encode_works() {
512 assert_eq!(hex_encode(&[0xde, 0xad, 0xbe, 0xef]), "deadbeef");
513 assert_eq!(hex_encode(&[]), "");
514 assert_eq!(hex_encode(&[0x00, 0x01, 0xff]), "0001ff");
515 }
516
517 #[test]
518 fn cbor_helpers_work() {
519 let map = vec![
520 (
521 ciborium::Value::Text("name".to_string()),
522 ciborium::Value::Text("hello".to_string()),
523 ),
524 (
525 ciborium::Value::Text("seq".to_string()),
526 ciborium::Value::Integer(42.into()),
527 ),
528 ];
529
530 assert_eq!(cbor_map_get_str(&map, "name"), Some("hello".to_string()));
531 assert_eq!(cbor_map_get_int(&map, "seq"), Some(42));
532 assert_eq!(cbor_map_get_str(&map, "missing"), None);
533 assert_eq!(cbor_map_get_int(&map, "missing"), None);
534 }
535
536 fn encode_frame(header: &ciborium::Value, body: &ciborium::Value) -> Vec<u8> {
541 let mut buf = Vec::new();
542 ciborium::into_writer(header, &mut buf).unwrap();
543 ciborium::into_writer(body, &mut buf).unwrap();
544 buf
545 }
546
547 fn cbor_map(entries: Vec<(&str, ciborium::Value)>) -> ciborium::Value {
548 ciborium::Value::Map(
549 entries
550 .into_iter()
551 .map(|(k, v)| (ciborium::Value::Text(k.to_string()), v))
552 .collect(),
553 )
554 }
555
556 fn cbor_map_pairs(
557 entries: Vec<(&str, ciborium::Value)>,
558 ) -> Vec<(ciborium::Value, ciborium::Value)> {
559 entries
560 .into_iter()
561 .map(|(k, v)| (ciborium::Value::Text(k.to_string()), v))
562 .collect()
563 }
564
565 #[test]
570 fn decode_frame_commit() {
571 let header = cbor_map(vec![
572 ("op", ciborium::Value::Integer(1.into())),
573 ("t", ciborium::Value::Text("#commit".into())),
574 ]);
575 let body = cbor_map(vec![
576 ("seq", ciborium::Value::Integer(42.into())),
577 ("repo", ciborium::Value::Text("did:plc:abc".into())),
578 ("rev", ciborium::Value::Text("rev1".into())),
579 ("time", ciborium::Value::Text("2024-01-01".into())),
580 ("ops", ciborium::Value::Array(vec![])),
581 ("blocks", ciborium::Value::Bytes(vec![])),
582 ]);
583 let data = encode_frame(&header, &body);
584 let result = decode_frame(&data).unwrap().unwrap();
585 match result {
586 FirehoseEvent::Commit(c) => {
587 assert_eq!(c.seq, 42);
588 assert_eq!(c.repo, "did:plc:abc");
589 assert_eq!(c.rev, "rev1");
590 assert_eq!(c.time, "2024-01-01");
591 assert!(c.ops.is_empty());
592 }
593 other => panic!("expected Commit, got {:?}", other),
594 }
595 }
596
597 #[test]
598 fn decode_frame_handle() {
599 let header = cbor_map(vec![
600 ("op", ciborium::Value::Integer(1.into())),
601 ("t", ciborium::Value::Text("#handle".into())),
602 ]);
603 let body = cbor_map(vec![
604 ("seq", ciborium::Value::Integer(1.into())),
605 ("did", ciborium::Value::Text("did:plc:abc".into())),
606 ("handle", ciborium::Value::Text("alice.bsky.social".into())),
607 ]);
608 let data = encode_frame(&header, &body);
609 let result = decode_frame(&data).unwrap().unwrap();
610 match result {
611 FirehoseEvent::Handle { seq, did, handle } => {
612 assert_eq!(seq, 1);
613 assert_eq!(did, "did:plc:abc");
614 assert_eq!(handle, "alice.bsky.social");
615 }
616 other => panic!("expected Handle, got {:?}", other),
617 }
618 }
619
620 #[test]
621 fn decode_frame_identity() {
622 let header = cbor_map(vec![
623 ("op", ciborium::Value::Integer(1.into())),
624 ("t", ciborium::Value::Text("#identity".into())),
625 ]);
626 let body = cbor_map(vec![
627 ("seq", ciborium::Value::Integer(2.into())),
628 ("did", ciborium::Value::Text("did:plc:abc".into())),
629 ]);
630 let data = encode_frame(&header, &body);
631 let result = decode_frame(&data).unwrap().unwrap();
632 match result {
633 FirehoseEvent::Identity { seq, did } => {
634 assert_eq!(seq, 2);
635 assert_eq!(did, "did:plc:abc");
636 }
637 other => panic!("expected Identity, got {:?}", other),
638 }
639 }
640
641 #[test]
642 fn decode_frame_tombstone() {
643 let header = cbor_map(vec![
644 ("op", ciborium::Value::Integer(1.into())),
645 ("t", ciborium::Value::Text("#tombstone".into())),
646 ]);
647 let body = cbor_map(vec![
648 ("seq", ciborium::Value::Integer(3.into())),
649 ("did", ciborium::Value::Text("did:plc:abc".into())),
650 ]);
651 let data = encode_frame(&header, &body);
652 let result = decode_frame(&data).unwrap().unwrap();
653 match result {
654 FirehoseEvent::Tombstone { seq, did } => {
655 assert_eq!(seq, 3);
656 assert_eq!(did, "did:plc:abc");
657 }
658 other => panic!("expected Tombstone, got {:?}", other),
659 }
660 }
661
662 #[test]
663 fn decode_frame_info() {
664 let header = cbor_map(vec![
665 ("op", ciborium::Value::Integer(1.into())),
666 ("t", ciborium::Value::Text("#info".into())),
667 ]);
668 let body = cbor_map(vec![
669 ("name", ciborium::Value::Text("OutdatedCursor".into())),
670 ("message", ciborium::Value::Text("you are behind".into())),
671 ]);
672 let data = encode_frame(&header, &body);
673 let result = decode_frame(&data).unwrap().unwrap();
674 match result {
675 FirehoseEvent::Info { name, message } => {
676 assert_eq!(name, "OutdatedCursor");
677 assert_eq!(message, Some("you are behind".to_string()));
678 }
679 other => panic!("expected Info, got {:?}", other),
680 }
681 }
682
683 #[test]
684 fn decode_frame_error_op_minus_one() {
685 let header = cbor_map(vec![("op", ciborium::Value::Integer((-1).into()))]);
686 let body = cbor_map(vec![
687 ("error", ciborium::Value::Text("FutureCursor".into())),
688 ("message", ciborium::Value::Text("bad cursor".into())),
689 ]);
690 let data = encode_frame(&header, &body);
691 let result = decode_frame(&data).unwrap().unwrap();
692 match result {
693 FirehoseEvent::Info { name, message } => {
694 assert_eq!(name, "FutureCursor");
695 assert_eq!(message, Some("bad cursor".to_string()));
696 }
697 other => panic!("expected Info from error frame, got {:?}", other),
698 }
699 }
700
701 #[test]
702 fn decode_frame_unknown_type_returns_none() {
703 let header = cbor_map(vec![
704 ("op", ciborium::Value::Integer(1.into())),
705 ("t", ciborium::Value::Text("#unknown".into())),
706 ]);
707 let body = cbor_map(vec![]);
708 let data = encode_frame(&header, &body);
709 assert!(decode_frame(&data).unwrap().is_none());
710 }
711
712 #[test]
713 fn decode_frame_unknown_op_returns_none() {
714 let header = cbor_map(vec![
715 ("op", ciborium::Value::Integer(99.into())),
716 ("t", ciborium::Value::Text("#commit".into())),
717 ]);
718 let mut data = Vec::new();
721 ciborium::into_writer(&header, &mut data).unwrap();
722 assert!(decode_frame(&data).unwrap().is_none());
723 }
724
725 #[test]
730 fn decode_commit_with_ops() {
731 let op_create = cbor_map(vec![
732 ("action", ciborium::Value::Text("create".into())),
733 (
734 "path",
735 ciborium::Value::Text("app.bsky.feed.post/abc".into()),
736 ),
737 ]);
738 let op_update = cbor_map(vec![
739 ("action", ciborium::Value::Text("update".into())),
740 (
741 "path",
742 ciborium::Value::Text("app.bsky.feed.post/def".into()),
743 ),
744 ]);
745 let op_delete = cbor_map(vec![
746 ("action", ciborium::Value::Text("delete".into())),
747 (
748 "path",
749 ciborium::Value::Text("app.bsky.feed.post/ghi".into()),
750 ),
751 ]);
752
753 let body = cbor_map_pairs(vec![
754 ("seq", ciborium::Value::Integer(10.into())),
755 ("repo", ciborium::Value::Text("did:plc:xyz".into())),
756 ("rev", ciborium::Value::Text("r2".into())),
757 ("time", ciborium::Value::Text("2024-06-01".into())),
758 (
759 "ops",
760 ciborium::Value::Array(vec![op_create, op_update, op_delete]),
761 ),
762 ("blocks", ciborium::Value::Bytes(vec![])),
763 ]);
764
765 let result = decode_commit(&body).unwrap();
766 match result {
767 FirehoseEvent::Commit(c) => {
768 assert_eq!(c.seq, 10);
769 assert_eq!(c.repo, "did:plc:xyz");
770 assert_eq!(c.ops.len(), 3);
771 assert_eq!(c.ops[0].action, OpAction::Create);
772 assert_eq!(c.ops[0].path, "app.bsky.feed.post/abc");
773 assert_eq!(c.ops[1].action, OpAction::Update);
774 assert_eq!(c.ops[2].action, OpAction::Delete);
775 }
776 other => panic!("expected Commit, got {:?}", other),
777 }
778 }
779
780 #[test]
781 fn decode_commit_empty_ops() {
782 let body = cbor_map_pairs(vec![
783 ("seq", ciborium::Value::Integer(5.into())),
784 ("repo", ciborium::Value::Text("did:plc:abc".into())),
785 ("rev", ciborium::Value::Text("r1".into())),
786 ("time", ciborium::Value::Text("2024-01-01".into())),
787 ("ops", ciborium::Value::Array(vec![])),
788 ("blocks", ciborium::Value::Bytes(vec![])),
789 ]);
790 let result = decode_commit(&body).unwrap();
791 match result {
792 FirehoseEvent::Commit(c) => assert!(c.ops.is_empty()),
793 other => panic!("expected Commit, got {:?}", other),
794 }
795 }
796
797 #[test]
798 fn decode_commit_missing_seq_errors() {
799 let body = cbor_map_pairs(vec![("repo", ciborium::Value::Text("did:plc:abc".into()))]);
800 assert!(decode_commit(&body).is_err());
801 }
802
803 #[test]
808 fn decode_handle_success() {
809 let body = cbor_map_pairs(vec![
810 ("seq", ciborium::Value::Integer(7.into())),
811 ("did", ciborium::Value::Text("did:plc:h".into())),
812 ("handle", ciborium::Value::Text("bob.test".into())),
813 ]);
814 match decode_handle(&body).unwrap() {
815 FirehoseEvent::Handle { seq, did, handle } => {
816 assert_eq!(seq, 7);
817 assert_eq!(did, "did:plc:h");
818 assert_eq!(handle, "bob.test");
819 }
820 other => panic!("expected Handle, got {:?}", other),
821 }
822 }
823
824 #[test]
825 fn decode_handle_missing_seq_errors() {
826 let body = cbor_map_pairs(vec![("did", ciborium::Value::Text("did:plc:h".into()))]);
827 assert!(decode_handle(&body).is_err());
828 }
829
830 #[test]
831 fn decode_identity_success() {
832 let body = cbor_map_pairs(vec![
833 ("seq", ciborium::Value::Integer(8.into())),
834 ("did", ciborium::Value::Text("did:plc:i".into())),
835 ]);
836 match decode_identity(&body).unwrap() {
837 FirehoseEvent::Identity { seq, did } => {
838 assert_eq!(seq, 8);
839 assert_eq!(did, "did:plc:i");
840 }
841 other => panic!("expected Identity, got {:?}", other),
842 }
843 }
844
845 #[test]
846 fn decode_identity_missing_seq_errors() {
847 let body = cbor_map_pairs(vec![("did", ciborium::Value::Text("did:plc:i".into()))]);
848 assert!(decode_identity(&body).is_err());
849 }
850
851 #[test]
852 fn decode_tombstone_success() {
853 let body = cbor_map_pairs(vec![
854 ("seq", ciborium::Value::Integer(9.into())),
855 ("did", ciborium::Value::Text("did:plc:t".into())),
856 ]);
857 match decode_tombstone(&body).unwrap() {
858 FirehoseEvent::Tombstone { seq, did } => {
859 assert_eq!(seq, 9);
860 assert_eq!(did, "did:plc:t");
861 }
862 other => panic!("expected Tombstone, got {:?}", other),
863 }
864 }
865
866 #[test]
867 fn decode_tombstone_missing_seq_errors() {
868 let body = cbor_map_pairs(vec![("did", ciborium::Value::Text("did:plc:t".into()))]);
869 assert!(decode_tombstone(&body).is_err());
870 }
871
872 #[test]
873 fn decode_info_success() {
874 let body = cbor_map_pairs(vec![
875 ("name", ciborium::Value::Text("OutdatedCursor".into())),
876 ("message", ciborium::Value::Text("old cursor".into())),
877 ]);
878 match decode_info(&body).unwrap() {
879 FirehoseEvent::Info { name, message } => {
880 assert_eq!(name, "OutdatedCursor");
881 assert_eq!(message, Some("old cursor".to_string()));
882 }
883 other => panic!("expected Info, got {:?}", other),
884 }
885 }
886
887 #[test]
888 fn decode_info_no_message() {
889 let body = cbor_map_pairs(vec![("name", ciborium::Value::Text("SomeInfo".into()))]);
890 match decode_info(&body).unwrap() {
891 FirehoseEvent::Info { name, message } => {
892 assert_eq!(name, "SomeInfo");
893 assert_eq!(message, None);
894 }
895 other => panic!("expected Info, got {:?}", other),
896 }
897 }
898
899 #[test]
904 fn extract_cid_tag42_with_prefix() {
905 let map = cbor_map_pairs(vec![(
906 "cid",
907 ciborium::Value::Tag(42, Box::new(ciborium::Value::Bytes(vec![0x00, 0xde, 0xad]))),
908 )]);
909 assert_eq!(extract_cid_string(&map), Some("dead".to_string()));
910 }
911
912 #[test]
913 fn extract_cid_tag42_no_prefix() {
914 let map = cbor_map_pairs(vec![(
915 "cid",
916 ciborium::Value::Tag(42, Box::new(ciborium::Value::Bytes(vec![0xab, 0xcd]))),
917 )]);
918 assert_eq!(extract_cid_string(&map), Some("abcd".to_string()));
919 }
920
921 #[test]
922 fn extract_cid_raw_bytes() {
923 let map = cbor_map_pairs(vec![(
924 "cid",
925 ciborium::Value::Bytes(vec![0x00, 0x01, 0x02]),
926 )]);
927 assert_eq!(extract_cid_string(&map), Some("0102".to_string()));
928 }
929
930 #[test]
931 fn extract_cid_link_map() {
932 let link_map = ciborium::Value::Map(vec![(
933 ciborium::Value::Text("$link".into()),
934 ciborium::Value::Text("bafyabc".into()),
935 )]);
936 let map = cbor_map_pairs(vec![("cid", link_map)]);
937 assert_eq!(extract_cid_string(&map), Some("bafyabc".to_string()));
938 }
939
940 #[test]
941 fn extract_cid_other_type_returns_none() {
942 let map = cbor_map_pairs(vec![("cid", ciborium::Value::Integer(123.into()))]);
943 assert_eq!(extract_cid_string(&map), None);
944 }
945
946 #[test]
947 fn extract_cid_missing_key_returns_none() {
948 let map = cbor_map_pairs(vec![("other", ciborium::Value::Text("x".into()))]);
949 assert_eq!(extract_cid_string(&map), None);
950 }
951
952 #[test]
957 fn cbor_map_get_bytes_present() {
958 let map = cbor_map_pairs(vec![("data", ciborium::Value::Bytes(vec![1, 2, 3]))]);
959 assert_eq!(cbor_map_get_bytes(&map, "data"), Some(vec![1, 2, 3]));
960 }
961
962 #[test]
963 fn cbor_map_get_bytes_missing() {
964 let map = cbor_map_pairs(vec![("other", ciborium::Value::Integer(1.into()))]);
965 assert_eq!(cbor_map_get_bytes(&map, "data"), None);
966 }
967
968 #[test]
969 fn cbor_map_get_array_present() {
970 let map = cbor_map_pairs(vec![(
971 "items",
972 ciborium::Value::Array(vec![
973 ciborium::Value::Integer(1.into()),
974 ciborium::Value::Integer(2.into()),
975 ]),
976 )]);
977 let arr = cbor_map_get_array(&map, "items").unwrap();
978 assert_eq!(arr.len(), 2);
979 }
980
981 #[test]
982 fn cbor_map_get_array_missing() {
983 let map = cbor_map_pairs(vec![("other", ciborium::Value::Integer(1.into()))]);
984 assert_eq!(cbor_map_get_array(&map, "items"), None);
985 }
986
987 #[test]
992 fn handle_binary_frame_sends_event() {
993 let header = cbor_map(vec![
994 ("op", ciborium::Value::Integer(1.into())),
995 ("t", ciborium::Value::Text("#identity".into())),
996 ]);
997 let body = cbor_map(vec![
998 ("seq", ciborium::Value::Integer(77.into())),
999 ("did", ciborium::Value::Text("did:plc:test".into())),
1000 ]);
1001 let data = encode_frame(&header, &body);
1002
1003 let (tx, mut rx) = mpsc::channel(16);
1004 let metrics = MetricsCounter::new();
1005
1006 handle_binary_frame(&data, &tx, &metrics);
1007
1008 let event = rx.try_recv().unwrap();
1009 match event {
1010 FirehoseEvent::Identity { seq, did } => {
1011 assert_eq!(seq, 77);
1012 assert_eq!(did, "did:plc:test");
1013 }
1014 other => panic!("expected Identity, got {:?}", other),
1015 }
1016 assert_eq!(metrics.events_received.load(Ordering::Relaxed), 1);
1017 assert_eq!(metrics.events_dropped.load(Ordering::Relaxed), 0);
1018 }
1019
1020 #[test]
1021 fn handle_binary_frame_drops_when_channel_full() {
1022 let header = cbor_map(vec![
1023 ("op", ciborium::Value::Integer(1.into())),
1024 ("t", ciborium::Value::Text("#identity".into())),
1025 ]);
1026 let body = cbor_map(vec![
1027 ("seq", ciborium::Value::Integer(1.into())),
1028 ("did", ciborium::Value::Text("did:plc:x".into())),
1029 ]);
1030 let data = encode_frame(&header, &body);
1031
1032 let (tx, _rx) = mpsc::channel(1);
1034 let metrics = MetricsCounter::new();
1035
1036 handle_binary_frame(&data, &tx, &metrics);
1038 handle_binary_frame(&data, &tx, &metrics);
1040
1041 assert_eq!(metrics.events_received.load(Ordering::Relaxed), 2);
1042 assert_eq!(metrics.events_dropped.load(Ordering::Relaxed), 1);
1043 }
1044
1045 #[test]
1046 fn handle_binary_frame_bad_data_increments_errors() {
1047 let (tx, _rx) = mpsc::channel(16);
1048 let metrics = MetricsCounter::new();
1049
1050 handle_binary_frame(&[0xff, 0xff], &tx, &metrics);
1051
1052 assert_eq!(metrics.events_received.load(Ordering::Relaxed), 1);
1053 assert_eq!(metrics.errors.load(Ordering::Relaxed), 1);
1054 }
1055
1056 #[test]
1061 fn update_last_event_timestamp_sets_nonzero() {
1062 let metrics = MetricsCounter::new();
1063 assert_eq!(metrics.last_event_at.load(Ordering::Relaxed), 0);
1064 update_last_event_timestamp(&metrics);
1065 assert_ne!(metrics.last_event_at.load(Ordering::Relaxed), 0);
1066 }
1067}