1use serde::{Deserialize, Serialize};
8
9#[derive(Debug, Clone)]
11pub enum FirehoseEvent {
12 Commit(FirehoseCommit),
14 Handle {
16 seq: i64,
18 did: String,
20 handle: String,
22 },
23 Identity {
25 seq: i64,
27 did: String,
29 },
30 Tombstone {
32 seq: i64,
34 did: String,
36 },
37 Info {
39 name: String,
41 message: Option<String>,
43 },
44}
45
46impl FirehoseEvent {
47 pub fn seq(&self) -> Option<i64> {
51 match self {
52 Self::Commit(c) => Some(c.seq),
53 Self::Handle { seq, .. } => Some(*seq),
54 Self::Identity { seq, .. } => Some(*seq),
55 Self::Tombstone { seq, .. } => Some(*seq),
56 Self::Info { .. } => None,
57 }
58 }
59}
60
61#[derive(Debug, Clone)]
63pub struct FirehoseCommit {
64 pub seq: i64,
66 pub repo: String,
68 pub rev: String,
70 pub ops: Vec<RepoOp>,
72 pub time: String,
74}
75
76#[derive(Debug, Clone)]
78pub struct RepoOp {
79 pub action: OpAction,
81 pub path: String,
83 pub record: Option<serde_json::Value>,
85 pub cid: Option<String>,
87}
88
89impl RepoOp {
90 pub fn collection(&self) -> &str {
95 self.path.split('/').next().unwrap_or("")
96 }
97
98 pub fn rkey(&self) -> &str {
103 self.path.split('/').nth(1).unwrap_or("")
104 }
105}
106
107#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
109pub enum OpAction {
110 Create,
112 Update,
114 Delete,
116}
117
118impl OpAction {
119 pub fn parse(s: &str) -> Option<Self> {
123 match s {
124 "create" => Some(Self::Create),
125 "update" => Some(Self::Update),
126 "delete" => Some(Self::Delete),
127 _ => None,
128 }
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135
136 #[test]
137 fn seq_from_commit() {
138 let event = FirehoseEvent::Commit(FirehoseCommit {
139 seq: 42,
140 repo: "did:plc:abc".to_string(),
141 rev: "rev1".to_string(),
142 ops: vec![],
143 time: "2024-01-01T00:00:00Z".to_string(),
144 });
145 assert_eq!(event.seq(), Some(42));
146 }
147
148 #[test]
149 fn seq_from_handle() {
150 let event = FirehoseEvent::Handle {
151 seq: 99,
152 did: "did:plc:abc".to_string(),
153 handle: "alice.test".to_string(),
154 };
155 assert_eq!(event.seq(), Some(99));
156 }
157
158 #[test]
159 fn seq_from_identity() {
160 let event = FirehoseEvent::Identity {
161 seq: 7,
162 did: "did:plc:abc".to_string(),
163 };
164 assert_eq!(event.seq(), Some(7));
165 }
166
167 #[test]
168 fn seq_from_tombstone() {
169 let event = FirehoseEvent::Tombstone {
170 seq: 100,
171 did: "did:plc:abc".to_string(),
172 };
173 assert_eq!(event.seq(), Some(100));
174 }
175
176 #[test]
177 fn seq_from_info_is_none() {
178 let event = FirehoseEvent::Info {
179 name: "OutdatedCursor".to_string(),
180 message: Some("cursor too old".to_string()),
181 };
182 assert_eq!(event.seq(), None);
183 }
184
185 #[test]
186 fn repo_op_collection_and_rkey() {
187 let op = RepoOp {
188 action: OpAction::Create,
189 path: "app.bsky.feed.post/3k2la7fx2as2a".to_string(),
190 record: None,
191 cid: None,
192 };
193 assert_eq!(op.collection(), "app.bsky.feed.post");
194 assert_eq!(op.rkey(), "3k2la7fx2as2a");
195 }
196
197 #[test]
198 fn repo_op_empty_path() {
199 let op = RepoOp {
200 action: OpAction::Delete,
201 path: String::new(),
202 record: None,
203 cid: None,
204 };
205 assert_eq!(op.collection(), "");
206 assert_eq!(op.rkey(), "");
207 }
208
209 #[test]
210 fn repo_op_no_rkey() {
211 let op = RepoOp {
212 action: OpAction::Update,
213 path: "app.bsky.feed.post".to_string(),
214 record: None,
215 cid: None,
216 };
217 assert_eq!(op.collection(), "app.bsky.feed.post");
218 assert_eq!(op.rkey(), "");
219 }
220
221 #[test]
222 fn op_action_from_str() {
223 assert_eq!(OpAction::parse("create"), Some(OpAction::Create));
224 assert_eq!(OpAction::parse("update"), Some(OpAction::Update));
225 assert_eq!(OpAction::parse("delete"), Some(OpAction::Delete));
226 assert_eq!(OpAction::parse("unknown"), None);
227 assert_eq!(OpAction::parse(""), None);
228 }
229}