Skip to main content

atrg_firehose/
lib.rs

1#![deny(unsafe_code)]
2#![warn(missing_docs)]
3//! AT Protocol firehose consumer for at-rust-go.
4//!
5//! Subscribes to `com.atproto.sync.subscribeRepos` on a relay and delivers
6//! decoded events through a bounded channel with backpressure.
7//!
8//! This crate is deliberately independent of `atrg-core` to avoid cyclic
9//! dependencies. It defines its own [`FirehoseConfig`] that `atrg-core` maps
10//! its firehose configuration into before calling [`spawn_firehose`].
11
12pub mod backoff;
13pub mod car;
14pub mod consumer;
15pub mod event;
16pub mod metrics;
17
18pub use consumer::spawn_firehose;
19pub use event::{FirehoseCommit, FirehoseEvent, OpAction, RepoOp};
20pub use metrics::FirehoseMetrics;
21
22use std::sync::Arc;
23
24use futures::future::BoxFuture;
25
26/// Configuration for the firehose consumer.
27///
28/// This mirrors firehose-related fields that might live in `atrg-core`'s
29/// config but exists in this crate so that `atrg-firehose` has zero
30/// dependency on `atrg-core`.
31#[derive(Debug, Clone)]
32pub struct FirehoseConfig {
33    /// Relay WebSocket URL, e.g. `"wss://bsky.network"`.
34    pub relay: String,
35    /// Cursor (sequence number) to resume from. `None` means start from the
36    /// relay's current head.
37    pub cursor: Option<i64>,
38    /// Bounded back-pressure channel capacity (default: 1024).
39    pub channel_capacity: usize,
40}
41
42impl Default for FirehoseConfig {
43    fn default() -> Self {
44        Self {
45            relay: "wss://bsky.network".to_string(),
46            cursor: None,
47            channel_capacity: 1024,
48        }
49    }
50}
51
52/// Type alias for firehose event handler functions.
53///
54/// The handler receives a [`FirehoseEvent`] and a clone of whatever state
55/// object the caller supplied to [`spawn_firehose`]. The state type must be
56/// `Clone + Send + Sync + 'static`.
57pub type FirehoseHandler<S> =
58    Arc<dyn Fn(FirehoseEvent, S) -> BoxFuture<'static, anyhow::Result<()>> + Send + Sync>;
59
60#[cfg(test)]
61mod tests {
62    use super::*;
63
64    #[test]
65    fn default_config() {
66        let config = FirehoseConfig::default();
67        assert_eq!(config.relay, "wss://bsky.network");
68        assert!(config.cursor.is_none());
69        assert_eq!(config.channel_capacity, 1024);
70    }
71}