Skip to main content

atrg_stream/
event.rs

1//! Jetstream event types.
2//!
3//! These types model the JSON events emitted by the Jetstream firehose.
4//! They are defined here (rather than depending on `atproto-jetstream`)
5//! so that atrg controls the exact shape and serde behaviour.
6
7use serde::{Deserialize, Serialize};
8
9/// A single event from the Jetstream firehose.
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct JetstreamEvent {
12    /// The DID of the account this event is about.
13    pub did: String,
14
15    /// Unix microseconds timestamp.
16    #[serde(default)]
17    pub time_us: i64,
18
19    /// The event kind (`"commit"`, `"identity"`, `"account"`).
20    #[serde(default)]
21    pub kind: String,
22
23    /// Commit data (present for `"commit"` events).
24    pub commit: Option<CommitData>,
25
26    /// Identity data (present for `"identity"` events).
27    pub identity: Option<serde_json::Value>,
28
29    /// Account data (present for `"account"` events).
30    pub account: Option<serde_json::Value>,
31}
32
33/// Data from a commit event.
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct CommitData {
36    /// The NSID collection (e.g. `"app.bsky.feed.post"`).
37    pub collection: String,
38
39    /// The record key.
40    pub rkey: String,
41
42    /// The operation: `"create"`, `"update"`, or `"delete"`.
43    #[serde(default)]
44    pub operation: String,
45
46    /// The record payload (present for create/update, absent for delete).
47    pub record: Option<serde_json::Value>,
48
49    /// The CID of the record.
50    pub cid: Option<String>,
51
52    /// Revision string.
53    pub rev: Option<String>,
54}
55
56#[cfg(test)]
57mod tests {
58    use super::*;
59
60    #[test]
61    fn deserialize_commit_event() {
62        let json = r#"{
63            "did": "did:plc:abc123",
64            "time_us": 1700000000000000,
65            "kind": "commit",
66            "commit": {
67                "collection": "app.bsky.feed.post",
68                "rkey": "3k2la7fx2as2a",
69                "operation": "create",
70                "record": {"text": "hello world", "$type": "app.bsky.feed.post"},
71                "cid": "bafyreig2",
72                "rev": "123"
73            }
74        }"#;
75
76        let event: JetstreamEvent = serde_json::from_str(json).unwrap();
77        assert_eq!(event.did, "did:plc:abc123");
78        assert_eq!(event.kind, "commit");
79        assert_eq!(event.time_us, 1700000000000000);
80
81        let commit = event.commit.unwrap();
82        assert_eq!(commit.collection, "app.bsky.feed.post");
83        assert_eq!(commit.rkey, "3k2la7fx2as2a");
84        assert_eq!(commit.operation, "create");
85        assert_eq!(commit.record.unwrap()["text"], "hello world");
86        assert_eq!(commit.cid.as_deref(), Some("bafyreig2"));
87    }
88
89    #[test]
90    fn deserialize_identity_event() {
91        let json = r#"{
92            "did": "did:plc:abc123",
93            "time_us": 1700000000000000,
94            "kind": "identity",
95            "identity": {"handle": "alice.bsky.social"}
96        }"#;
97
98        let event: JetstreamEvent = serde_json::from_str(json).unwrap();
99        assert_eq!(event.kind, "identity");
100        assert!(event.commit.is_none());
101        assert!(event.identity.is_some());
102    }
103
104    #[test]
105    fn deserialize_minimal_event() {
106        let json = r#"{"did": "did:plc:abc123"}"#;
107        let event: JetstreamEvent = serde_json::from_str(json).unwrap();
108        assert_eq!(event.did, "did:plc:abc123");
109        assert_eq!(event.kind, "");
110        assert_eq!(event.time_us, 0);
111        assert!(event.commit.is_none());
112        assert!(event.identity.is_none());
113        assert!(event.account.is_none());
114    }
115
116    #[test]
117    fn deserialize_delete_commit() {
118        let json = r#"{
119            "did": "did:plc:abc123",
120            "kind": "commit",
121            "commit": {
122                "collection": "app.bsky.feed.post",
123                "rkey": "3k2la7fx2as2a",
124                "operation": "delete"
125            }
126        }"#;
127
128        let event: JetstreamEvent = serde_json::from_str(json).unwrap();
129        let commit = event.commit.unwrap();
130        assert_eq!(commit.operation, "delete");
131        assert!(commit.record.is_none());
132        assert!(commit.cid.is_none());
133    }
134
135    #[test]
136    fn roundtrip_serialization() {
137        let event = JetstreamEvent {
138            did: "did:plc:test".to_string(),
139            time_us: 123456,
140            kind: "commit".to_string(),
141            commit: Some(CommitData {
142                collection: "app.bsky.feed.post".to_string(),
143                rkey: "abc".to_string(),
144                operation: "create".to_string(),
145                record: Some(serde_json::json!({"text": "hi"})),
146                cid: Some("bafytest".to_string()),
147                rev: None,
148            }),
149            identity: None,
150            account: None,
151        };
152
153        let serialized = serde_json::to_string(&event).unwrap();
154        let deserialized: JetstreamEvent = serde_json::from_str(&serialized).unwrap();
155        assert_eq!(deserialized.did, event.did);
156        assert_eq!(deserialized.kind, event.kind);
157        assert_eq!(
158            deserialized.commit.as_ref().unwrap().collection,
159            "app.bsky.feed.post"
160        );
161    }
162}