Skip to main content

atrg_testing/
fake_jetstream.rs

1//! Fake Jetstream for testing event handlers.
2
3use tokio::sync::mpsc;
4
5/// A fake Jetstream that can emit synthetic events for testing.
6///
7/// Events are sent through a channel and can be consumed by the
8/// application's event handler.
9pub struct FakeJetstream {
10    tx: mpsc::Sender<serde_json::Value>,
11    rx: Option<mpsc::Receiver<serde_json::Value>>,
12}
13
14impl FakeJetstream {
15    /// Create a new fake Jetstream with a bounded channel.
16    pub fn new(capacity: usize) -> Self {
17        let (tx, rx) = mpsc::channel(capacity);
18        Self { tx, rx: Some(rx) }
19    }
20
21    /// Take the receiver (for wiring into the consumer).
22    pub fn take_receiver(&mut self) -> Option<mpsc::Receiver<serde_json::Value>> {
23        self.rx.take()
24    }
25
26    /// Emit a raw event.
27    pub async fn emit(&self, event: serde_json::Value) -> anyhow::Result<()> {
28        self.tx.send(event).await?;
29        Ok(())
30    }
31
32    /// Emit a synthetic post commit event.
33    pub async fn emit_post(&self, did: &str, rkey: &str, text: &str) -> anyhow::Result<()> {
34        self.emit(serde_json::json!({
35            "did": did,
36            "time_us": chrono_now_us(),
37            "kind": "commit",
38            "commit": {
39                "collection": "app.bsky.feed.post",
40                "rkey": rkey,
41                "operation": "create",
42                "record": {
43                    "$type": "app.bsky.feed.post",
44                    "text": text,
45                    "createdAt": now_rfc3339(),
46                },
47            },
48        }))
49        .await
50    }
51
52    /// Emit a synthetic follow event.
53    pub async fn emit_follow(&self, did: &str, rkey: &str, target_did: &str) -> anyhow::Result<()> {
54        self.emit(serde_json::json!({
55            "did": did,
56            "time_us": chrono_now_us(),
57            "kind": "commit",
58            "commit": {
59                "collection": "app.bsky.graph.follow",
60                "rkey": rkey,
61                "operation": "create",
62                "record": {
63                    "$type": "app.bsky.graph.follow",
64                    "subject": target_did,
65                    "createdAt": now_rfc3339(),
66                },
67            },
68        }))
69        .await
70    }
71
72    /// Emit a synthetic like event.
73    pub async fn emit_like(&self, did: &str, rkey: &str, subject_uri: &str) -> anyhow::Result<()> {
74        self.emit(serde_json::json!({
75            "did": did,
76            "time_us": chrono_now_us(),
77            "kind": "commit",
78            "commit": {
79                "collection": "app.bsky.feed.like",
80                "rkey": rkey,
81                "operation": "create",
82                "record": {
83                    "$type": "app.bsky.feed.like",
84                    "subject": {
85                        "uri": subject_uri,
86                    },
87                    "createdAt": now_rfc3339(),
88                },
89            },
90        }))
91        .await
92    }
93}
94
95fn chrono_now_us() -> i64 {
96    std::time::SystemTime::now()
97        .duration_since(std::time::UNIX_EPOCH)
98        .unwrap_or_default()
99        .as_micros() as i64
100}
101
102fn now_rfc3339() -> String {
103    // Simple ISO 8601 timestamp
104    let d = std::time::SystemTime::now()
105        .duration_since(std::time::UNIX_EPOCH)
106        .unwrap_or_default();
107    let secs = d.as_secs();
108    // Good enough for testing — real code would use chrono
109    format!("1970-01-01T00:00:00Z+{secs}s")
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115
116    #[tokio::test]
117    async fn emit_and_receive() {
118        let mut fake = FakeJetstream::new(10);
119        let mut rx = fake.take_receiver().unwrap();
120
121        fake.emit(serde_json::json!({"test": true})).await.unwrap();
122
123        let event = rx.recv().await.unwrap();
124        assert_eq!(event["test"], true);
125    }
126
127    #[tokio::test]
128    async fn emit_post_has_correct_shape() {
129        let mut fake = FakeJetstream::new(10);
130        let mut rx = fake.take_receiver().unwrap();
131
132        fake.emit_post("did:plc:test", "abc123", "hello world")
133            .await
134            .unwrap();
135
136        let event = rx.recv().await.unwrap();
137        assert_eq!(event["did"], "did:plc:test");
138        assert_eq!(event["commit"]["collection"], "app.bsky.feed.post");
139        assert_eq!(event["commit"]["rkey"], "abc123");
140        assert_eq!(event["commit"]["record"]["text"], "hello world");
141    }
142
143    #[tokio::test]
144    async fn emit_follow_has_correct_shape() {
145        let mut fake = FakeJetstream::new(10);
146        let mut rx = fake.take_receiver().unwrap();
147
148        fake.emit_follow("did:plc:a", "f1", "did:plc:b")
149            .await
150            .unwrap();
151
152        let event = rx.recv().await.unwrap();
153        assert_eq!(event["commit"]["collection"], "app.bsky.graph.follow");
154        assert_eq!(event["commit"]["record"]["subject"], "did:plc:b");
155    }
156
157    #[tokio::test]
158    async fn emit_like_has_correct_shape() {
159        let mut fake = FakeJetstream::new(10);
160        let mut rx = fake.take_receiver().unwrap();
161
162        fake.emit_like("did:plc:a", "l1", "at://did:plc:b/app.bsky.feed.post/abc")
163            .await
164            .unwrap();
165
166        let event = rx.recv().await.unwrap();
167        assert_eq!(event["commit"]["collection"], "app.bsky.feed.like");
168        assert_eq!(
169            event["commit"]["record"]["subject"]["uri"],
170            "at://did:plc:b/app.bsky.feed.post/abc"
171        );
172    }
173
174    #[tokio::test]
175    async fn take_receiver_returns_none_second_time() {
176        let mut fake = FakeJetstream::new(10);
177        assert!(fake.take_receiver().is_some());
178        assert!(fake.take_receiver().is_none());
179    }
180}