Skip to main content

atrg_firehose/
car.rs

1//! Minimal CAR v1 file decoder for extracting CBOR blocks.
2//!
3//! The firehose `#commit` frames include a `blocks` byte array that is
4//! a CAR v1 archive. We decode it to extract the record CBOR blocks,
5//! converting each from CBOR (via `ciborium`) into `serde_json::Value`.
6
7use std::collections::HashMap;
8
9/// Decoded blocks from a CAR v1 file, keyed by hex-encoded CID.
10#[derive(Debug)]
11pub struct CarDecoded {
12    /// Map from CID (hex-encoded) to decoded CBOR block value.
13    pub blocks: HashMap<String, serde_json::Value>,
14}
15
16impl CarDecoded {
17    /// Create an empty `CarDecoded` with no blocks.
18    pub fn empty() -> Self {
19        Self {
20            blocks: HashMap::new(),
21        }
22    }
23}
24
25/// Decode a CAR v1 byte slice into its constituent blocks.
26///
27/// Returns a map of CID (hex-encoded) → decoded JSON value for each block.
28/// Blocks that fail CBOR decoding are silently skipped.
29pub fn decode_car(data: &[u8]) -> anyhow::Result<CarDecoded> {
30    let mut cursor = data;
31
32    // 1. Read the CAR header: varint length, then CBOR header map.
33    let header_len = read_varint(&mut cursor)? as usize;
34    if cursor.len() < header_len {
35        anyhow::bail!("CAR header length {header_len} exceeds remaining data");
36    }
37
38    // Skip the header CBOR (we don't need roots for our purposes).
39    cursor = &cursor[header_len..];
40
41    // 2. Iterate blocks: each is varint(cid_len + data_len), then CID bytes, then block data.
42    let mut blocks = HashMap::new();
43
44    while !cursor.is_empty() {
45        let block_section_len = match read_varint(&mut cursor) {
46            Ok(len) => len as usize,
47            Err(_) => break,
48        };
49
50        if block_section_len == 0 || cursor.len() < block_section_len {
51            break;
52        }
53
54        let block_section = &cursor[..block_section_len];
55        cursor = &cursor[block_section_len..];
56
57        // Parse the CID from the block section to find where data starts.
58        let mut block_cursor = block_section;
59        let cid_hex = match read_cid(&mut block_cursor) {
60            Ok(cid) => cid,
61            Err(_) => continue,
62        };
63
64        // The remainder after reading the CID is the block data (CBOR).
65        if block_cursor.is_empty() {
66            continue;
67        }
68
69        // Attempt to decode the CBOR block into a serde_json::Value.
70        match ciborium::from_reader::<ciborium::Value, _>(block_cursor) {
71            Ok(cbor_val) => {
72                let json_val = cbor_to_json(&cbor_val);
73                blocks.insert(cid_hex, json_val);
74            }
75            Err(_) => {
76                // Silently skip blocks that fail CBOR decoding.
77            }
78        }
79    }
80
81    Ok(CarDecoded { blocks })
82}
83
84/// Read an unsigned varint (LEB128) from a byte slice, advancing the cursor.
85pub(crate) fn read_varint(reader: &mut &[u8]) -> anyhow::Result<u64> {
86    let mut value: u64 = 0;
87    let mut shift: u32 = 0;
88
89    loop {
90        if reader.is_empty() {
91            anyhow::bail!("unexpected end of data reading varint");
92        }
93
94        let byte = reader[0];
95        *reader = &reader[1..];
96
97        value |= u64::from(byte & 0x7F) << shift;
98
99        if byte & 0x80 == 0 {
100            return Ok(value);
101        }
102
103        shift += 7;
104        if shift >= 64 {
105            anyhow::bail!("varint overflow");
106        }
107    }
108}
109
110/// Read a CID from a byte slice, advancing the cursor past it.
111///
112/// Returns the CID bytes as a hex string. Supports CIDv1 (multicodec prefix)
113/// and CIDv0 (raw 34-byte SHA2-256 multihash starting with 0x12 0x20).
114fn read_cid(reader: &mut &[u8]) -> anyhow::Result<String> {
115    let start = *reader;
116
117    if reader.is_empty() {
118        anyhow::bail!("empty data reading CID");
119    }
120
121    // CIDv0: starts with 0x12 0x20 (SHA2-256 multihash, 32 bytes digest).
122    // Total 34 bytes.
123    if reader.len() >= 2 && reader[0] == 0x12 && reader[1] == 0x20 {
124        if reader.len() < 34 {
125            anyhow::bail!("truncated CIDv0");
126        }
127        let cid_bytes = &reader[..34];
128        *reader = &reader[34..];
129        return Ok(hex::encode(cid_bytes));
130    }
131
132    // CIDv1: varint(version) + varint(codec) + multihash.
133    let version = read_varint(reader)?;
134    if version != 1 {
135        anyhow::bail!("unsupported CID version: {version}");
136    }
137
138    let _codec = read_varint(reader)?;
139
140    // Multihash: varint(hash_fn) + varint(digest_size) + digest_bytes.
141    let _hash_fn = read_varint(reader)?;
142    let digest_size = read_varint(reader)? as usize;
143
144    if reader.len() < digest_size {
145        anyhow::bail!(
146            "truncated CID multihash digest: need {digest_size}, have {}",
147            reader.len()
148        );
149    }
150    *reader = &reader[digest_size..];
151
152    // The CID is everything from `start` to the current position.
153    let cid_len = start.len() - reader.len();
154    let cid_bytes = &start[..cid_len];
155    Ok(hex::encode(cid_bytes))
156}
157
158/// Convert a `ciborium::Value` to a `serde_json::Value`.
159///
160/// CBOR types that have no direct JSON equivalent (bytes, tags) are
161/// converted to string representations.
162fn cbor_to_json(val: &ciborium::Value) -> serde_json::Value {
163    match val {
164        ciborium::Value::Null => serde_json::Value::Null,
165        ciborium::Value::Bool(b) => serde_json::Value::Bool(*b),
166        ciborium::Value::Integer(i) => {
167            let n: i128 = (*i).into();
168            if let Ok(v) = i64::try_from(n) {
169                serde_json::Value::Number(v.into())
170            } else if let Ok(v) = u64::try_from(n) {
171                serde_json::Value::Number(v.into())
172            } else {
173                // Fallback for very large integers.
174                serde_json::Value::String(n.to_string())
175            }
176        }
177        ciborium::Value::Float(f) => serde_json::Number::from_f64(*f)
178            .map(serde_json::Value::Number)
179            .unwrap_or(serde_json::Value::Null),
180        ciborium::Value::Text(s) => serde_json::Value::String(s.clone()),
181        ciborium::Value::Bytes(b) => {
182            // AT Protocol uses CBOR bytes for CID links. Represent as an
183            // object with `$bytes` key (matching common DAG-CBOR conventions).
184            serde_json::json!({ "$bytes": hex::encode(b) })
185        }
186        ciborium::Value::Array(arr) => {
187            serde_json::Value::Array(arr.iter().map(cbor_to_json).collect())
188        }
189        ciborium::Value::Map(entries) => {
190            let mut map = serde_json::Map::new();
191            for (k, v) in entries {
192                let key = match k {
193                    ciborium::Value::Text(s) => s.clone(),
194                    other => format!("{other:?}"),
195                };
196                map.insert(key, cbor_to_json(v));
197            }
198            serde_json::Value::Object(map)
199        }
200        ciborium::Value::Tag(tag, inner) => {
201            // DAG-CBOR tag 42 is a CID link.
202            if *tag == 42 {
203                if let ciborium::Value::Bytes(b) = inner.as_ref() {
204                    // CID link bytes — skip the leading 0x00 identity multibase prefix if present.
205                    let cid_bytes = if b.first() == Some(&0x00) { &b[1..] } else { b };
206                    return serde_json::json!({ "$link": hex::encode(cid_bytes) });
207                }
208            }
209            // Generic tagged value fallback.
210            serde_json::json!({
211                "$tag": tag,
212                "$value": cbor_to_json(inner),
213            })
214        }
215        _ => serde_json::Value::Null,
216    }
217}
218
219#[cfg(test)]
220mod tests {
221    use super::*;
222
223    #[test]
224    fn read_varint_single_byte() {
225        let data = [0x05u8];
226        let mut cursor: &[u8] = &data;
227        let val = read_varint(&mut cursor).unwrap();
228        assert_eq!(val, 5);
229        assert!(cursor.is_empty());
230    }
231
232    #[test]
233    fn read_varint_two_bytes() {
234        // 300 = 0b100101100 → LEB128: 0xAC 0x02
235        let data = [0xACu8, 0x02];
236        let mut cursor: &[u8] = &data;
237        let val = read_varint(&mut cursor).unwrap();
238        assert_eq!(val, 300);
239        assert!(cursor.is_empty());
240    }
241
242    #[test]
243    fn read_varint_zero() {
244        let data = [0x00u8];
245        let mut cursor: &[u8] = &data;
246        let val = read_varint(&mut cursor).unwrap();
247        assert_eq!(val, 0);
248    }
249
250    #[test]
251    fn read_varint_empty_fails() {
252        let data: &[u8] = &[];
253        let mut cursor = data;
254        assert!(read_varint(&mut cursor).is_err());
255    }
256
257    #[test]
258    fn read_varint_max_single() {
259        let data = [0x7Fu8]; // 127
260        let mut cursor: &[u8] = &data;
261        let val = read_varint(&mut cursor).unwrap();
262        assert_eq!(val, 127);
263    }
264
265    #[test]
266    fn read_varint_128() {
267        // 128 → LEB128: 0x80 0x01
268        let data = [0x80u8, 0x01];
269        let mut cursor: &[u8] = &data;
270        let val = read_varint(&mut cursor).unwrap();
271        assert_eq!(val, 128);
272    }
273
274    #[test]
275    fn cbor_to_json_null() {
276        let val = ciborium::Value::Null;
277        assert_eq!(cbor_to_json(&val), serde_json::Value::Null);
278    }
279
280    #[test]
281    fn cbor_to_json_string() {
282        let val = ciborium::Value::Text("hello".to_string());
283        assert_eq!(cbor_to_json(&val), serde_json::json!("hello"));
284    }
285
286    #[test]
287    fn cbor_to_json_map() {
288        let val = ciborium::Value::Map(vec![(
289            ciborium::Value::Text("key".to_string()),
290            ciborium::Value::Integer(42.into()),
291        )]);
292        assert_eq!(cbor_to_json(&val), serde_json::json!({"key": 42}));
293    }
294
295    #[test]
296    fn cbor_to_json_tag_42_cid_link() {
297        // Tag 42 with bytes (with leading 0x00 multibase prefix).
298        let mut cid_bytes = vec![0x00];
299        cid_bytes.extend_from_slice(&[0x01, 0x71, 0x12, 0x20]);
300        cid_bytes.extend_from_slice(&[0xAA; 32]);
301        let val = ciborium::Value::Tag(42, Box::new(ciborium::Value::Bytes(cid_bytes)));
302        let json = cbor_to_json(&val);
303        assert!(json.get("$link").is_some());
304    }
305
306    #[test]
307    fn empty_returns_no_blocks() {
308        let decoded = CarDecoded::empty();
309        assert!(decoded.blocks.is_empty());
310    }
311
312    #[test]
313    fn decode_car_empty_blocks() {
314        // Construct a minimal CAR v1: header only, no blocks.
315        // Header CBOR: {"version": 1, "roots": []}
316        let mut header_cbor = Vec::new();
317        ciborium::into_writer(
318            &ciborium::Value::Map(vec![
319                (
320                    ciborium::Value::Text("version".to_string()),
321                    ciborium::Value::Integer(1.into()),
322                ),
323                (
324                    ciborium::Value::Text("roots".to_string()),
325                    ciborium::Value::Array(vec![]),
326                ),
327            ]),
328            &mut header_cbor,
329        )
330        .unwrap();
331
332        let mut data = Vec::new();
333        // Write header length as varint.
334        write_varint_to(&mut data, header_cbor.len() as u64);
335        data.extend_from_slice(&header_cbor);
336
337        let decoded = decode_car(&data).unwrap();
338        assert!(decoded.blocks.is_empty());
339    }
340
341    /// Helper to write a varint for test construction.
342    fn write_varint_to(buf: &mut Vec<u8>, mut val: u64) {
343        loop {
344            let mut byte = (val & 0x7F) as u8;
345            val >>= 7;
346            if val != 0 {
347                byte |= 0x80;
348            }
349            buf.push(byte);
350            if val == 0 {
351                break;
352            }
353        }
354    }
355
356    // ── read_cid tests ──────────────────────────────────────────────
357
358    #[test]
359    fn read_cid_v0() {
360        // CIDv0: starts with 0x12 0x20, followed by 32 bytes of digest. Total 34 bytes.
361        let mut data = vec![0x12u8, 0x20];
362        data.extend_from_slice(&[0xAA; 32]);
363        let mut cursor: &[u8] = &data;
364        let cid = read_cid(&mut cursor).unwrap();
365        // Should consume all 34 bytes
366        assert!(cursor.is_empty());
367        // Hex should start with "1220"
368        assert!(cid.starts_with("1220"));
369        assert_eq!(cid.len(), 68); // 34 bytes * 2 hex chars
370    }
371
372    #[test]
373    fn read_cid_v1() {
374        let mut data = Vec::new();
375        write_varint_to(&mut data, 1); // version
376        write_varint_to(&mut data, 0x71); // codec (dag-cbor)
377        write_varint_to(&mut data, 0x12); // hash fn (sha2-256)
378        write_varint_to(&mut data, 32); // digest size
379        data.extend_from_slice(&[0xBB; 32]); // digest
380        let expected_len = data.len();
381        let mut cursor: &[u8] = &data;
382        let cid = read_cid(&mut cursor).unwrap();
383        assert!(cursor.is_empty());
384        assert_eq!(cid.len(), expected_len * 2); // hex encoding doubles length
385    }
386
387    #[test]
388    fn read_cid_empty_fails() {
389        let data: &[u8] = &[];
390        let mut cursor = data;
391        assert!(read_cid(&mut cursor).is_err());
392    }
393
394    #[test]
395    fn read_cid_truncated_v0_fails() {
396        // Only 10 bytes when CIDv0 needs 34
397        let mut data = vec![0x12u8, 0x20];
398        data.extend_from_slice(&[0xAA; 8]); // only 10 total, need 34
399        let mut cursor: &[u8] = &data;
400        assert!(read_cid(&mut cursor).is_err());
401    }
402
403    // ── decode_car with actual blocks ───────────────────────────────
404
405    #[test]
406    fn decode_car_with_one_block() {
407        // Build header
408        let mut header_cbor = Vec::new();
409        ciborium::into_writer(
410            &ciborium::Value::Map(vec![
411                (
412                    ciborium::Value::Text("version".into()),
413                    ciborium::Value::Integer(1.into()),
414                ),
415                (
416                    ciborium::Value::Text("roots".into()),
417                    ciborium::Value::Array(vec![]),
418                ),
419            ]),
420            &mut header_cbor,
421        )
422        .unwrap();
423
424        // Build a CIDv1
425        let mut cid_bytes = Vec::new();
426        write_varint_to(&mut cid_bytes, 1); // version
427        write_varint_to(&mut cid_bytes, 0x71); // codec
428        write_varint_to(&mut cid_bytes, 0x12); // hash fn
429        write_varint_to(&mut cid_bytes, 32); // digest size
430        cid_bytes.extend_from_slice(&[0xCC; 32]); // digest
431
432        // Build CBOR block data: {"hello": "world"}
433        let mut block_data = Vec::new();
434        ciborium::into_writer(
435            &ciborium::Value::Map(vec![(
436                ciborium::Value::Text("hello".into()),
437                ciborium::Value::Text("world".into()),
438            )]),
439            &mut block_data,
440        )
441        .unwrap();
442
443        // Block section = CID bytes + block data
444        let mut block_section = Vec::new();
445        block_section.extend_from_slice(&cid_bytes);
446        block_section.extend_from_slice(&block_data);
447
448        // Assemble full CAR
449        let mut car_data = Vec::new();
450        write_varint_to(&mut car_data, header_cbor.len() as u64);
451        car_data.extend_from_slice(&header_cbor);
452        write_varint_to(&mut car_data, block_section.len() as u64);
453        car_data.extend_from_slice(&block_section);
454
455        let decoded = decode_car(&car_data).unwrap();
456        assert_eq!(decoded.blocks.len(), 1);
457        let (_cid, value) = decoded.blocks.iter().next().unwrap();
458        assert_eq!(value["hello"], "world");
459    }
460
461    // ── cbor_to_json missing branches ───────────────────────────────
462
463    #[test]
464    fn cbor_to_json_bool() {
465        assert_eq!(
466            cbor_to_json(&ciborium::Value::Bool(true)),
467            serde_json::json!(true)
468        );
469        assert_eq!(
470            cbor_to_json(&ciborium::Value::Bool(false)),
471            serde_json::json!(false)
472        );
473    }
474
475    #[test]
476    fn cbor_to_json_integer() {
477        assert_eq!(
478            cbor_to_json(&ciborium::Value::Integer(42.into())),
479            serde_json::json!(42)
480        );
481        assert_eq!(
482            cbor_to_json(&ciborium::Value::Integer((-1).into())),
483            serde_json::json!(-1)
484        );
485    }
486
487    #[test]
488    fn cbor_to_json_float() {
489        let val = cbor_to_json(&ciborium::Value::Float(1.234));
490        assert!(val.is_number());
491    }
492
493    #[test]
494    fn cbor_to_json_bytes() {
495        let val = cbor_to_json(&ciborium::Value::Bytes(vec![0xDE, 0xAD]));
496        assert_eq!(val["$bytes"], "dead");
497    }
498
499    #[test]
500    fn cbor_to_json_array() {
501        let val = cbor_to_json(&ciborium::Value::Array(vec![
502            ciborium::Value::Integer(1.into()),
503            ciborium::Value::Text("two".into()),
504        ]));
505        assert_eq!(val, serde_json::json!([1, "two"]));
506    }
507
508    #[test]
509    fn cbor_to_json_generic_tag() {
510        // Non-42 tag should use $tag/$value fallback
511        let val = cbor_to_json(&ciborium::Value::Tag(
512            99,
513            Box::new(ciborium::Value::Text("inner".into())),
514        ));
515        assert_eq!(val["$tag"], 99);
516        assert_eq!(val["$value"], "inner");
517    }
518
519    #[test]
520    fn cbor_to_json_float_nan() {
521        // NaN can't be represented in JSON, should become null
522        let val = cbor_to_json(&ciborium::Value::Float(f64::NAN));
523        assert!(val.is_null());
524    }
525
526    // ── decode_car edge cases ───────────────────────────────────────
527
528    #[test]
529    fn decode_car_truncated_header_fails() {
530        // varint says header is 100 bytes but only 5 available
531        let mut data = Vec::new();
532        write_varint_to(&mut data, 100);
533        data.extend_from_slice(&[0; 5]);
534        assert!(decode_car(&data).is_err());
535    }
536
537    #[test]
538    fn decode_car_zero_length_block_skipped() {
539        // Header + a block with 0 length should be skipped gracefully
540        let mut header_cbor = Vec::new();
541        ciborium::into_writer(
542            &ciborium::Value::Map(vec![
543                (
544                    ciborium::Value::Text("version".into()),
545                    ciborium::Value::Integer(1.into()),
546                ),
547                (
548                    ciborium::Value::Text("roots".into()),
549                    ciborium::Value::Array(vec![]),
550                ),
551            ]),
552            &mut header_cbor,
553        )
554        .unwrap();
555
556        let mut data = Vec::new();
557        write_varint_to(&mut data, header_cbor.len() as u64);
558        data.extend_from_slice(&header_cbor);
559        write_varint_to(&mut data, 0); // zero-length block
560
561        let decoded = decode_car(&data).unwrap();
562        assert!(decoded.blocks.is_empty());
563    }
564}