Skip to main content

atrg_firehose/
event.rs

1//! Firehose event types for `com.atproto.sync.subscribeRepos`.
2//!
3//! These types model the decoded CBOR frames received from the AT Protocol
4//! relay firehose. Each frame is decoded from binary WebSocket messages
5//! containing DAG-CBOR encoded headers and bodies.
6
7use serde::{Deserialize, Serialize};
8
9/// A decoded firehose event from the relay.
10#[derive(Debug, Clone)]
11pub enum FirehoseEvent {
12    /// A repository commit containing record operations.
13    Commit(FirehoseCommit),
14    /// A handle change event.
15    Handle {
16        /// Sequence number for cursor tracking.
17        seq: i64,
18        /// DID of the account whose handle changed.
19        did: String,
20        /// The new handle.
21        handle: String,
22    },
23    /// An identity update event.
24    Identity {
25        /// Sequence number for cursor tracking.
26        seq: i64,
27        /// DID of the account whose identity was updated.
28        did: String,
29    },
30    /// A repository tombstone (account deletion).
31    Tombstone {
32        /// Sequence number for cursor tracking.
33        seq: i64,
34        /// DID of the deleted account.
35        did: String,
36    },
37    /// Informational message from the relay.
38    Info {
39        /// The info event name.
40        name: String,
41        /// Optional human-readable message.
42        message: Option<String>,
43    },
44}
45
46impl FirehoseEvent {
47    /// Return the sequence number if present.
48    ///
49    /// `Info` events do not carry a sequence number and return `None`.
50    pub fn seq(&self) -> Option<i64> {
51        match self {
52            Self::Commit(c) => Some(c.seq),
53            Self::Handle { seq, .. } => Some(*seq),
54            Self::Identity { seq, .. } => Some(*seq),
55            Self::Tombstone { seq, .. } => Some(*seq),
56            Self::Info { .. } => None,
57        }
58    }
59}
60
61/// A repository commit with decoded operations.
62#[derive(Debug, Clone)]
63pub struct FirehoseCommit {
64    /// Sequence number for cursor tracking.
65    pub seq: i64,
66    /// DID of the repository owner.
67    pub repo: String,
68    /// The repository revision.
69    pub rev: String,
70    /// Operations in this commit.
71    pub ops: Vec<RepoOp>,
72    /// Wall-clock time of the commit (ISO 8601).
73    pub time: String,
74}
75
76/// A single operation within a commit.
77#[derive(Debug, Clone)]
78pub struct RepoOp {
79    /// The action performed.
80    pub action: OpAction,
81    /// The path (collection/rkey).
82    pub path: String,
83    /// The decoded record value (if action is Create or Update).
84    pub record: Option<serde_json::Value>,
85    /// The CID of the record as a hex string.
86    pub cid: Option<String>,
87}
88
89impl RepoOp {
90    /// Extract the collection NSID from the path.
91    ///
92    /// The path format is `collection/rkey`. Returns an empty string if
93    /// the path does not contain a slash.
94    pub fn collection(&self) -> &str {
95        self.path.split('/').next().unwrap_or("")
96    }
97
98    /// Extract the record key from the path.
99    ///
100    /// The path format is `collection/rkey`. Returns an empty string if
101    /// the path does not contain a second segment.
102    pub fn rkey(&self) -> &str {
103        self.path.split('/').nth(1).unwrap_or("")
104    }
105}
106
107/// The type of operation in a repository commit.
108#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
109pub enum OpAction {
110    /// A new record was created.
111    Create,
112    /// An existing record was updated.
113    Update,
114    /// A record was deleted.
115    Delete,
116}
117
118impl OpAction {
119    /// Parse an action string from the firehose CBOR payload.
120    ///
121    /// Returns `None` for unrecognised action strings.
122    pub fn parse(s: &str) -> Option<Self> {
123        match s {
124            "create" => Some(Self::Create),
125            "update" => Some(Self::Update),
126            "delete" => Some(Self::Delete),
127            _ => None,
128        }
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135
136    #[test]
137    fn seq_from_commit() {
138        let event = FirehoseEvent::Commit(FirehoseCommit {
139            seq: 42,
140            repo: "did:plc:abc".to_string(),
141            rev: "rev1".to_string(),
142            ops: vec![],
143            time: "2024-01-01T00:00:00Z".to_string(),
144        });
145        assert_eq!(event.seq(), Some(42));
146    }
147
148    #[test]
149    fn seq_from_handle() {
150        let event = FirehoseEvent::Handle {
151            seq: 99,
152            did: "did:plc:abc".to_string(),
153            handle: "alice.test".to_string(),
154        };
155        assert_eq!(event.seq(), Some(99));
156    }
157
158    #[test]
159    fn seq_from_identity() {
160        let event = FirehoseEvent::Identity {
161            seq: 7,
162            did: "did:plc:abc".to_string(),
163        };
164        assert_eq!(event.seq(), Some(7));
165    }
166
167    #[test]
168    fn seq_from_tombstone() {
169        let event = FirehoseEvent::Tombstone {
170            seq: 100,
171            did: "did:plc:abc".to_string(),
172        };
173        assert_eq!(event.seq(), Some(100));
174    }
175
176    #[test]
177    fn seq_from_info_is_none() {
178        let event = FirehoseEvent::Info {
179            name: "OutdatedCursor".to_string(),
180            message: Some("cursor too old".to_string()),
181        };
182        assert_eq!(event.seq(), None);
183    }
184
185    #[test]
186    fn repo_op_collection_and_rkey() {
187        let op = RepoOp {
188            action: OpAction::Create,
189            path: "app.bsky.feed.post/3k2la7fx2as2a".to_string(),
190            record: None,
191            cid: None,
192        };
193        assert_eq!(op.collection(), "app.bsky.feed.post");
194        assert_eq!(op.rkey(), "3k2la7fx2as2a");
195    }
196
197    #[test]
198    fn repo_op_empty_path() {
199        let op = RepoOp {
200            action: OpAction::Delete,
201            path: String::new(),
202            record: None,
203            cid: None,
204        };
205        assert_eq!(op.collection(), "");
206        assert_eq!(op.rkey(), "");
207    }
208
209    #[test]
210    fn repo_op_no_rkey() {
211        let op = RepoOp {
212            action: OpAction::Update,
213            path: "app.bsky.feed.post".to_string(),
214            record: None,
215            cid: None,
216        };
217        assert_eq!(op.collection(), "app.bsky.feed.post");
218        assert_eq!(op.rkey(), "");
219    }
220
221    #[test]
222    fn op_action_from_str() {
223        assert_eq!(OpAction::parse("create"), Some(OpAction::Create));
224        assert_eq!(OpAction::parse("update"), Some(OpAction::Update));
225        assert_eq!(OpAction::parse("delete"), Some(OpAction::Delete));
226        assert_eq!(OpAction::parse("unknown"), None);
227        assert_eq!(OpAction::parse(""), None);
228    }
229}