atrg_testing/
fake_jetstream.rs1use tokio::sync::mpsc;
4
5pub struct FakeJetstream {
10 tx: mpsc::Sender<serde_json::Value>,
11 rx: Option<mpsc::Receiver<serde_json::Value>>,
12}
13
14impl FakeJetstream {
15 pub fn new(capacity: usize) -> Self {
17 let (tx, rx) = mpsc::channel(capacity);
18 Self { tx, rx: Some(rx) }
19 }
20
21 pub fn take_receiver(&mut self) -> Option<mpsc::Receiver<serde_json::Value>> {
23 self.rx.take()
24 }
25
26 pub async fn emit(&self, event: serde_json::Value) -> anyhow::Result<()> {
28 self.tx.send(event).await?;
29 Ok(())
30 }
31
32 pub async fn emit_post(&self, did: &str, rkey: &str, text: &str) -> anyhow::Result<()> {
34 self.emit(serde_json::json!({
35 "did": did,
36 "time_us": chrono_now_us(),
37 "kind": "commit",
38 "commit": {
39 "collection": "app.bsky.feed.post",
40 "rkey": rkey,
41 "operation": "create",
42 "record": {
43 "$type": "app.bsky.feed.post",
44 "text": text,
45 "createdAt": now_rfc3339(),
46 },
47 },
48 }))
49 .await
50 }
51
52 pub async fn emit_follow(&self, did: &str, rkey: &str, target_did: &str) -> anyhow::Result<()> {
54 self.emit(serde_json::json!({
55 "did": did,
56 "time_us": chrono_now_us(),
57 "kind": "commit",
58 "commit": {
59 "collection": "app.bsky.graph.follow",
60 "rkey": rkey,
61 "operation": "create",
62 "record": {
63 "$type": "app.bsky.graph.follow",
64 "subject": target_did,
65 "createdAt": now_rfc3339(),
66 },
67 },
68 }))
69 .await
70 }
71
72 pub async fn emit_like(&self, did: &str, rkey: &str, subject_uri: &str) -> anyhow::Result<()> {
74 self.emit(serde_json::json!({
75 "did": did,
76 "time_us": chrono_now_us(),
77 "kind": "commit",
78 "commit": {
79 "collection": "app.bsky.feed.like",
80 "rkey": rkey,
81 "operation": "create",
82 "record": {
83 "$type": "app.bsky.feed.like",
84 "subject": {
85 "uri": subject_uri,
86 },
87 "createdAt": now_rfc3339(),
88 },
89 },
90 }))
91 .await
92 }
93}
94
95fn chrono_now_us() -> i64 {
96 std::time::SystemTime::now()
97 .duration_since(std::time::UNIX_EPOCH)
98 .unwrap_or_default()
99 .as_micros() as i64
100}
101
102fn now_rfc3339() -> String {
103 let d = std::time::SystemTime::now()
105 .duration_since(std::time::UNIX_EPOCH)
106 .unwrap_or_default();
107 let secs = d.as_secs();
108 format!("1970-01-01T00:00:00Z+{secs}s")
110}
111
112#[cfg(test)]
113mod tests {
114 use super::*;
115
116 #[tokio::test]
117 async fn emit_and_receive() {
118 let mut fake = FakeJetstream::new(10);
119 let mut rx = fake.take_receiver().unwrap();
120
121 fake.emit(serde_json::json!({"test": true})).await.unwrap();
122
123 let event = rx.recv().await.unwrap();
124 assert_eq!(event["test"], true);
125 }
126
127 #[tokio::test]
128 async fn emit_post_has_correct_shape() {
129 let mut fake = FakeJetstream::new(10);
130 let mut rx = fake.take_receiver().unwrap();
131
132 fake.emit_post("did:plc:test", "abc123", "hello world")
133 .await
134 .unwrap();
135
136 let event = rx.recv().await.unwrap();
137 assert_eq!(event["did"], "did:plc:test");
138 assert_eq!(event["commit"]["collection"], "app.bsky.feed.post");
139 assert_eq!(event["commit"]["rkey"], "abc123");
140 assert_eq!(event["commit"]["record"]["text"], "hello world");
141 }
142
143 #[tokio::test]
144 async fn emit_follow_has_correct_shape() {
145 let mut fake = FakeJetstream::new(10);
146 let mut rx = fake.take_receiver().unwrap();
147
148 fake.emit_follow("did:plc:a", "f1", "did:plc:b")
149 .await
150 .unwrap();
151
152 let event = rx.recv().await.unwrap();
153 assert_eq!(event["commit"]["collection"], "app.bsky.graph.follow");
154 assert_eq!(event["commit"]["record"]["subject"], "did:plc:b");
155 }
156
157 #[tokio::test]
158 async fn emit_like_has_correct_shape() {
159 let mut fake = FakeJetstream::new(10);
160 let mut rx = fake.take_receiver().unwrap();
161
162 fake.emit_like("did:plc:a", "l1", "at://did:plc:b/app.bsky.feed.post/abc")
163 .await
164 .unwrap();
165
166 let event = rx.recv().await.unwrap();
167 assert_eq!(event["commit"]["collection"], "app.bsky.feed.like");
168 assert_eq!(
169 event["commit"]["record"]["subject"]["uri"],
170 "at://did:plc:b/app.bsky.feed.post/abc"
171 );
172 }
173
174 #[tokio::test]
175 async fn take_receiver_returns_none_second_time() {
176 let mut fake = FakeJetstream::new(10);
177 assert!(fake.take_receiver().is_some());
178 assert!(fake.take_receiver().is_none());
179 }
180}