1#![deny(unsafe_code)]
2#![warn(missing_docs)]
3use std::time::Duration;
10
11#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
13pub struct ResolvedIdentity {
14 pub did: String,
16 pub handle: String,
18 pub pds_endpoint: Option<String>,
20 pub did_document: Option<serde_json::Value>,
22}
23
24#[derive(Debug, Clone)]
26pub struct IdentityMetrics {
27 pub hits: u64,
29 pub misses: u64,
31 pub entry_count: u64,
33}
34
35#[derive(Debug, Clone)]
37pub struct IdentityConfig {
38 pub cache_capacity: u64,
40 pub cache_ttl_secs: u64,
42 pub plc_directory: String,
44}
45
46impl Default for IdentityConfig {
47 fn default() -> Self {
48 Self {
49 cache_capacity: 10_000,
50 cache_ttl_secs: 3600,
51 plc_directory: "https://plc.directory".to_string(),
52 }
53 }
54}
55
56pub struct IdentityResolver {
61 cache: moka::future::Cache<String, ResolvedIdentity>,
62 http: reqwest::Client,
63 plc_directory: String,
64 hits: std::sync::atomic::AtomicU64,
65 misses: std::sync::atomic::AtomicU64,
66}
67
68impl IdentityResolver {
69 pub fn new(config: &IdentityConfig, http: reqwest::Client) -> Self {
71 let cache = moka::future::Cache::builder()
72 .max_capacity(config.cache_capacity)
73 .time_to_live(Duration::from_secs(config.cache_ttl_secs))
74 .build();
75
76 Self {
77 cache,
78 http,
79 plc_directory: config.plc_directory.clone(),
80 hits: std::sync::atomic::AtomicU64::new(0),
81 misses: std::sync::atomic::AtomicU64::new(0),
82 }
83 }
84
85 pub fn with_defaults(http: reqwest::Client) -> Self {
87 Self::new(&IdentityConfig::default(), http)
88 }
89
90 pub async fn resolve(&self, subject: &str) -> anyhow::Result<ResolvedIdentity> {
95 if let Some(cached) = self.cache.get(subject).await {
96 self.hits.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
97 return Ok(cached);
98 }
99
100 self.misses
101 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
102 let resolved = self.resolve_uncached(subject).await?;
103
104 self.cache
106 .insert(resolved.did.clone(), resolved.clone())
107 .await;
108 self.cache
109 .insert(resolved.handle.clone(), resolved.clone())
110 .await;
111
112 Ok(resolved)
113 }
114
115 pub async fn invalidate(&self, subject: &str) {
117 self.cache.invalidate(subject).await;
118 }
119
120 pub fn metrics(&self) -> IdentityMetrics {
122 IdentityMetrics {
123 hits: self.hits.load(std::sync::atomic::Ordering::Relaxed),
124 misses: self.misses.load(std::sync::atomic::Ordering::Relaxed),
125 entry_count: self.cache.entry_count(),
126 }
127 }
128
129 async fn resolve_uncached(&self, subject: &str) -> anyhow::Result<ResolvedIdentity> {
131 if subject.starts_with("did:") {
132 self.resolve_did(subject).await
133 } else {
134 self.resolve_handle(subject).await
135 }
136 }
137
138 async fn resolve_did(&self, did: &str) -> anyhow::Result<ResolvedIdentity> {
140 let doc = if did.starts_with("did:plc:") {
141 let url = format!("{}/{}", self.plc_directory.trim_end_matches('/'), did);
142 tracing::debug!(did = %did, url = %url, "resolving DID via PLC directory");
143 let resp = self.http.get(&url).send().await?;
144 if !resp.status().is_success() {
145 anyhow::bail!("PLC directory returned {} for DID {}", resp.status(), did);
146 }
147 resp.json::<serde_json::Value>().await?
148 } else if did.starts_with("did:web:") {
149 let domain = did.strip_prefix("did:web:").unwrap_or(did);
150 let url = format!("https://{}/.well-known/did.json", domain);
151 tracing::debug!(did = %did, url = %url, "resolving did:web");
152 let resp = self.http.get(&url).send().await?;
153 if !resp.status().is_success() {
154 anyhow::bail!("did:web resolution returned {} for {}", resp.status(), did);
155 }
156 resp.json::<serde_json::Value>().await?
157 } else {
158 anyhow::bail!("unsupported DID method: {}", did);
159 };
160
161 let handle = doc["alsoKnownAs"]
163 .as_array()
164 .and_then(|arr| {
165 arr.iter().find_map(|v| {
166 v.as_str()
167 .and_then(|s| s.strip_prefix("at://"))
168 .map(|s| s.to_string())
169 })
170 })
171 .unwrap_or_default();
172
173 let pds_endpoint = doc["service"].as_array().and_then(|arr| {
175 arr.iter().find_map(|svc| {
176 if svc["id"].as_str() == Some("#atproto_pds") {
177 svc["serviceEndpoint"].as_str().map(|s| s.to_string())
178 } else {
179 None
180 }
181 })
182 });
183
184 Ok(ResolvedIdentity {
185 did: did.to_string(),
186 handle,
187 pds_endpoint,
188 did_document: Some(doc),
189 })
190 }
191
192 async fn resolve_handle(&self, handle: &str) -> anyhow::Result<ResolvedIdentity> {
194 let url = format!(
196 "https://bsky.social/xrpc/com.atproto.identity.resolveHandle?handle={}",
197 handle
198 );
199 tracing::debug!(handle = %handle, "resolving handle");
200
201 let resp = self.http.get(&url).send().await?;
202 if !resp.status().is_success() {
203 anyhow::bail!(
204 "handle resolution returned {} for {}",
205 resp.status(),
206 handle
207 );
208 }
209
210 let body: serde_json::Value = resp.json().await?;
211 let did = body["did"]
212 .as_str()
213 .ok_or_else(|| anyhow::anyhow!("handle resolution response missing 'did' field"))?;
214
215 let mut identity = self.resolve_did(did).await?;
217 if identity.handle.is_empty() {
219 identity.handle = handle.to_string();
220 }
221 Ok(identity)
222 }
223}
224
225#[cfg(test)]
226mod tests {
227 use super::*;
228
229 #[test]
230 fn default_config() {
231 let config = IdentityConfig::default();
232 assert_eq!(config.cache_capacity, 10_000);
233 assert_eq!(config.cache_ttl_secs, 3600);
234 assert_eq!(config.plc_directory, "https://plc.directory");
235 }
236
237 #[tokio::test]
238 async fn resolver_creation() {
239 let resolver = IdentityResolver::with_defaults(reqwest::Client::new());
240 let metrics = resolver.metrics();
241 assert_eq!(metrics.hits, 0);
242 assert_eq!(metrics.misses, 0);
243 assert_eq!(metrics.entry_count, 0);
244 }
245
246 #[tokio::test]
247 async fn invalidate_nonexistent_key() {
248 let resolver = IdentityResolver::with_defaults(reqwest::Client::new());
249 resolver.invalidate("did:plc:nonexistent").await;
251 }
252}