Skip to content

Commit 458f382

Browse files
authored
Merge pull request #123 from utkarshgupta137/rsky-relay/fix-export
rsky-relay: handle plc export edge case; fix queue drain at start; optimize plc_directory
2 parents 17cc3c0 + b28a485 commit 458f382

File tree

7 files changed

+262
-176
lines changed

7 files changed

+262
-176
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ Rocket.toml
1414
**/data/
1515
db/
1616
*.db
17+
*.db-shm
18+
*.db-wal
1719
*.db-journal
1820
rsky-relay.log.*
1921

rsky-relay/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ p256 = "0.13"
3030
reqwest = { version = "0.12", default-features = false, features = ["blocking", "gzip", "hickory-dns", "http2", "json", "rustls-tls-webpki-roots-no-provider"] }
3131
rs-car-sync = "0.4"
3232
rtrb = "0.3"
33-
rusqlite = { version = "0.35", features = ["bundled", "chrono"] }
33+
rusqlite = { version = "0.36", features = ["bundled", "chrono"] }
3434
rustls = "0.23"
3535
rustls-pemfile = "2"
3636
serde = { version = "1", features = ["derive"] }

rsky-relay/crawler.py

Lines changed: 92 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,30 +14,66 @@
1414
def create_database():
1515
"""Create SQLite database and table if they don't exist."""
1616
conn = sqlite3.connect(DB_FILE)
17+
conn.execute("""PRAGMA auto_vacuum = INCREMENTAL""")
18+
conn.execute("""PRAGMA journal_mode = WAL""")
1719
cursor = conn.cursor()
1820

21+
# Set PRAGMAs
22+
conn.execute("""PRAGMA cache_size = -64000""")
23+
conn.execute("""PRAGMA journal_size_limit = 6144000""")
24+
conn.execute("""PRAGMA mmap_size = 268435456""")
25+
conn.execute("""PRAGMA secure_delete = OFF""")
26+
conn.execute("""PRAGMA synchronous = NORMAL""")
27+
conn.execute("""PRAGMA temp_store = MEMORY""")
28+
1929
# Create table for PLC operations
2030
cursor.execute("""
2131
CREATE TABLE IF NOT EXISTS plc_operations (
22-
did TEXT,
23-
created_at TEXT,
24-
nullified BOOLEAN,
25-
cid TEXT,
26-
operation TEXT
32+
cid TEXT NOT NULL PRIMARY KEY ON CONFLICT REPLACE,
33+
did TEXT NOT NULL,
34+
created_at TEXT NOT NULL,
35+
nullified BOOLEAN NOT NULL,
36+
operation BLOB NOT NULL,
37+
pds_endpoint TEXT GENERATED ALWAYS AS (
38+
json_extract(operation, '$.services.atproto_pds.endpoint')
39+
) STORED,
40+
atproto_key TEXT GENERATED ALWAYS AS (
41+
json_extract(operation, '$.verificationMethods.atproto')
42+
) STORED,
43+
labeler_endpoint TEXT GENERATED ALWAYS AS (
44+
json_extract(operation, '$.services.atproto_labeler.endpoint')
45+
) STORED,
46+
atproto_label_key TEXT GENERATED ALWAYS AS (
47+
json_extract(operation, '$.verificationMethods.atproto_label')
48+
) STORED
2749
)
2850
""")
51+
52+
# Drop all views
53+
cursor.execute("""DROP VIEW IF EXISTS plc_labelers""")
54+
cursor.execute("""DROP VIEW IF EXISTS plc_pdses""")
55+
cursor.execute("""DROP VIEW IF EXISTS plc_keys""")
56+
cursor.execute("""DROP VIEW IF EXISTS plc_latest""")
57+
58+
# Create indexes
2959
cursor.execute("""
30-
CREATE INDEX IF NOT EXISTS did_index ON plc_operations (
31-
did
32-
)
60+
CREATE INDEX IF NOT EXISTS idx_plc_operations_did_created_at
61+
ON plc_operations (did, created_at DESC)
3362
""")
3463
cursor.execute("""
35-
CREATE INDEX IF NOT EXISTS created_at_index ON plc_operations (
36-
created_at ASC
37-
)
64+
CREATE INDEX IF NOT EXISTS idx_plc_operations_pds_endpoint
65+
ON plc_operations (pds_endpoint, created_at)
66+
WHERE pds_endpoint IS NOT NULL
3867
""")
3968
cursor.execute("""
40-
CREATE VIEW IF NOT EXISTS plc_latest AS
69+
CREATE INDEX IF NOT EXISTS idx_plc_operations_labeler_endpoint
70+
ON plc_operations (labeler_endpoint, created_at)
71+
WHERE labeler_endpoint IS NOT NULL
72+
""")
73+
74+
# Create views
75+
cursor.execute("""
76+
CREATE VIEW plc_latest AS
4177
SELECT *
4278
FROM plc_operations
4379
WHERE created_at = (
@@ -47,16 +83,45 @@ def create_database():
4783
)
4884
""")
4985
cursor.execute("""
50-
CREATE VIEW IF NOT EXISTS plc_keys AS
86+
CREATE VIEW plc_keys AS
87+
SELECT
88+
did,
89+
created_at,
90+
pds_endpoint,
91+
atproto_key AS pds_key,
92+
labeler_endpoint,
93+
atproto_label_key AS labeler_key
94+
FROM plc_latest
95+
""")
96+
cursor.execute("""
97+
CREATE VIEW plc_pdses AS
98+
SELECT
99+
MIN(created_at) AS first,
100+
MAX(created_at) AS last,
101+
count() AS accounts,
102+
pds_endpoint
103+
FROM plc_latest
104+
WHERE pds_endpoint IS NOT NULL
105+
GROUP BY pds_endpoint
106+
ORDER BY last
107+
""")
108+
cursor.execute("""
109+
CREATE VIEW plc_labelers AS
51110
SELECT
52-
did,
53-
created_at,
54-
json_extract(operation, '$.services.atproto_pds.endpoint') AS endpoint,
55-
json_extract(operation, '$.verificationMethods.atproto') AS key
111+
did,
112+
created_at,
113+
labeler_endpoint
56114
FROM plc_latest
115+
WHERE labeler_endpoint IS NOT NULL
116+
ORDER BY created_at
57117
""")
58118

59119
conn.commit()
120+
121+
# Vacuum & optimize
122+
cursor.execute("""PRAGMA incremental_vacuum""")
123+
cursor.execute("""PRAGMA optimize = 0x10002""")
124+
60125
return conn
61126

62127

@@ -83,15 +148,15 @@ def insert_operations(conn, operations):
83148
for op in operations:
84149
cursor.execute(
85150
"""
86-
INSERT INTO plc_operations (did, cid, nullified, created_at, operation)
151+
INSERT INTO plc_operations (cid, did, created_at, nullified, operation)
87152
VALUES (?, ?, ?, ?, ?)
88153
""",
89154
(
90-
op.get("did"),
91155
op.get("cid"),
92-
op.get("nullified"),
156+
op.get("did"),
93157
op.get("createdAt"),
94-
json.dumps(op.get("operation")),
158+
op.get("nullified"),
159+
json.dumps(op.get("operation"), separators=(",", ":")).encode("utf-8"),
95160
),
96161
)
97162

@@ -131,7 +196,7 @@ def main():
131196
after = latest_timestamp
132197

133198
total_processed = get_count(conn) or 0
134-
request_count = 0
199+
request_count = total_processed // 999
135200

136201
try:
137202
print("Starting PLC Directory API crawl...")
@@ -145,7 +210,8 @@ def main():
145210
break
146211

147212
insert_operations(conn, operations)
148-
total_processed += len(operations)
213+
prev_processed = total_processed
214+
total_processed = get_count(conn) or 0
149215

150216
# Get the last timestamp for the next request
151217
last_op = operations[-1]
@@ -156,6 +222,9 @@ def main():
156222
f"Request #{request_count}: Fetched {len(operations)}, "
157223
f"Total {total_processed}, Last timestamp: {after}"
158224
)
225+
ignored = len(operations) - (total_processed - prev_processed)
226+
if ignored != 1:
227+
print(f"IGNORED: {ignored}")
159228

160229
# Check if we got fewer records than requested (end of data)
161230
if len(operations) < COUNT_PER_REQUEST:

rsky-relay/src/crawler/manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ impl Manager {
178178
fn get_cursor(&self, host: &str) -> Result<Option<Cursor>, ManagerError> {
179179
let mut stmt = self.conn.prepare_cached("SELECT * FROM hosts WHERE host = ?1")?;
180180
Ok(stmt
181-
.query_row((&host,), |row| Ok(row.get_unwrap::<_, u64>("cursor")))
181+
.query_one((&host,), |row| Ok(row.get_unwrap::<_, u64>("cursor")))
182182
.optional()?
183183
.map(Into::into))
184184
}

rsky-relay/src/validator/event.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ use rsky_common::tid::TID;
1313

1414
use crate::types::Cursor;
1515

16+
pub type DidEndpoint = Option<Box<str>>;
17+
pub type DidKey = [u8; 35];
18+
1619
#[derive(Debug, Error)]
1720
pub enum ParseError {
1821
#[error("header error: {0}")]

rsky-relay/src/validator/manager.rs

Lines changed: 79 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -92,18 +92,26 @@ impl Manager {
9292
self.repos.insert(did, state);
9393
repos += 1;
9494
}
95-
let mut queue = 0;
95+
96+
let mut cursor = self.firehose.last_key_value()?.map(|(k, _)| k.into()).unwrap_or_default();
97+
let mut queue_drained = 0;
98+
let mut queue_pending = 0;
9699
for res in self.queue.keys() {
97100
let key = res?;
98101
#[expect(clippy::unwrap_used)]
99102
let key = std::str::from_utf8(&key).unwrap();
100103
#[expect(clippy::unwrap_used)]
101-
self.resolver.resolve(key.split('>').next().unwrap())?;
102-
queue += 1;
104+
let did = key.split('>').next().unwrap();
105+
if self.resolver.resolve(did)?.is_some() {
106+
self.scan_did(&mut cursor, did)?;
107+
queue_drained += 1;
108+
} else {
109+
queue_pending += 1;
110+
}
103111
}
104-
let mut seq = self.firehose.last_key_value()?.map(|(k, _)| k.into()).unwrap_or_default();
105-
tracing::info!(%hosts, %repos, %queue, %seq, "loaded state");
106-
while self.update(&mut seq).await? {}
112+
113+
tracing::info!(%hosts, %repos, %queue_drained, %queue_pending, %cursor, "loaded state");
114+
while self.update(&mut cursor).await? {}
107115
tracing::info!("shutting down validator");
108116
SHUTDOWN.store(true, Ordering::Relaxed);
109117
Ok(())
@@ -266,83 +274,87 @@ impl Manager {
266274
self.hosts.insert(host.clone(), (seq, time));
267275
}
268276

269-
let mut batch: Option<Batch> = None;
270277
for did in self.resolver.poll().await? {
271-
let Some((pds, key)) = self.resolver.resolve(&did)? else {
272-
continue;
273-
};
278+
self.scan_did(cursor, &did)?;
279+
}
274280

275-
for res in self.queue.prefix(&did) {
276-
let (k, input) = res?;
277-
batch.get_or_insert_with(|| DB.batch()).remove(&self.queue, k.clone());
278-
279-
#[expect(clippy::unwrap_used)]
280-
let host = std::str::from_utf8(&k).unwrap().split('>').nth(1).unwrap();
281-
let span = tracing::debug_span!("msg_read", %host, len = %input.len());
282-
let _enter = span.enter();
283-
284-
#[expect(clippy::unwrap_used)]
285-
let event = SubscribeReposEvent::parse(&input)?.unwrap(); // already parsed
286-
let type_ = event.type_();
287-
let seq = event.seq();
288-
let time = event.time();
289-
let did = event.did();
290-
let span = tracing::debug_span!("msg_data", type = %type_, %seq, %time, %did);
291-
let _enter = span.enter();
292-
293-
#[expect(clippy::unwrap_used)]
294-
let (commit, head) = event.commit()?.unwrap(); // already parsed
295-
let span =
296-
tracing::debug_span!("validate", rev = %commit.rev, data = %commit.data, %head);
297-
let _enter = span.enter();
298-
299-
if let Some(pds) = pds {
300-
if host != pds {
301-
tracing::debug!(%pds, "hostname pds mismatch");
302-
continue;
303-
}
281+
Ok(true)
282+
}
283+
284+
fn scan_did(&mut self, cursor: &mut Cursor, did: &str) -> Result<(), ManagerError> {
285+
let Some((pds, key)) = self.resolver.resolve(did)? else { unreachable!() };
286+
287+
let mut batch: Option<Batch> = None;
288+
for res in self.queue.prefix(&did) {
289+
let (k, input) = res?;
290+
batch.get_or_insert_with(|| DB.batch()).remove(&self.queue, k.clone());
291+
292+
#[expect(clippy::unwrap_used)]
293+
let host = std::str::from_utf8(&k).unwrap().split('>').nth(1).unwrap();
294+
let span = tracing::debug_span!("msg_read", %host, len = %input.len());
295+
let _enter = span.enter();
296+
297+
#[expect(clippy::unwrap_used)]
298+
let event = SubscribeReposEvent::parse(&input)?.unwrap(); // already parsed
299+
let type_ = event.type_();
300+
let seq = event.seq();
301+
let time = event.time();
302+
let did = event.did();
303+
let span = tracing::debug_span!("msg_data", type = %type_, %seq, %time, %did);
304+
let _enter = span.enter();
305+
306+
#[expect(clippy::unwrap_used)]
307+
let (commit, head) = event.commit()?.unwrap(); // already parsed
308+
let span =
309+
tracing::debug_span!("validate", rev = %commit.rev, data = %commit.data, %head);
310+
let _enter = span.enter();
311+
312+
if let Some(pds) = pds {
313+
if host != pds {
314+
tracing::debug!(%pds, "hostname pds mismatch");
315+
continue;
304316
}
317+
}
305318

306-
// verify signature
307-
match utils::verify_commit_sig(&commit, key) {
308-
Ok(valid) => {
309-
if !valid {
310-
tracing::debug!(?key, "signature mismatch");
311-
continue;
312-
}
313-
}
314-
Err(err) => {
315-
tracing::debug!(%err, ?key, "signature check error");
319+
// verify signature
320+
match utils::verify_commit_sig(&commit, key) {
321+
Ok(valid) => {
322+
if !valid {
323+
tracing::debug!(?key, "signature mismatch");
316324
continue;
317325
}
318326
}
327+
Err(err) => {
328+
tracing::debug!(%err, ?key, "signature check error");
329+
continue;
330+
}
331+
}
319332

320-
// verify commit message
321-
let rev = commit.rev;
322-
let data = commit.data;
323-
let entry = self.repos.entry(commit.did);
324-
if let SubscribeReposEvent::Commit(commit) = &event {
325-
// TODO: should still validate records existing in blocks, etc
326-
if let Entry::Occupied(prev) = &entry {
327-
let prev = prev.get();
328-
let span = tracing::debug_span!("previous", rev = %prev.rev, data = %prev.data, head = %prev.head);
329-
let _enter = span.enter();
330-
if !utils::verify_commit_event(commit, data, prev) {
331-
continue;
332-
}
333+
// verify commit message
334+
let rev = commit.rev;
335+
let data = commit.data;
336+
let entry = self.repos.entry(commit.did);
337+
if let SubscribeReposEvent::Commit(commit) = &event {
338+
// TODO: should still validate records existing in blocks, etc
339+
if let Entry::Occupied(prev) = &entry {
340+
let prev = prev.get();
341+
let span = tracing::debug_span!("previous", rev = %prev.rev, data = %prev.data, head = %prev.head);
342+
let _enter = span.enter();
343+
if !utils::verify_commit_event(commit, data, prev) {
344+
continue;
333345
}
334346
}
335-
336-
let msg = event.serialize(input.len(), cursor.next())?;
337-
self.firehose.insert(*cursor, msg)?;
338-
entry.insert(RepoState { rev, data, head });
339347
}
348+
349+
let msg = event.serialize(input.len(), cursor.next())?;
350+
self.firehose.insert(*cursor, msg)?;
351+
entry.insert(RepoState { rev, data, head });
340352
}
341353
if let Some(batch) = batch {
342354
batch.commit()?;
343355
}
344356

345-
Ok(true)
357+
Ok(())
346358
}
347359
}
348360

0 commit comments

Comments
 (0)