diff --git a/rsky-pds/src/apis/com/atproto/admin/update_subject_status.rs b/rsky-pds/src/apis/com/atproto/admin/update_subject_status.rs index 6de9c72d..d9c5aad5 100644 --- a/rsky-pds/src/apis/com/atproto/admin/update_subject_status.rs +++ b/rsky-pds/src/apis/com/atproto/admin/update_subject_status.rs @@ -12,6 +12,7 @@ use rocket::response::status; use rocket::serde::json::Json; use rocket::State; use rsky_lexicon::com::atproto::admin::{Subject, SubjectStatus, UpdateSubjectStatusOutput}; +use rsky_syntax::aturi::AtUri; use std::str::FromStr; async fn inner_update_subject_status( @@ -31,20 +32,16 @@ async fn inner_update_subject_status( AccountManager::takedown_account(&subject.did, takedown.clone()).await?; } Subject::StrongRef(subject) => { - let uri_without_prefix = subject.uri.replace("at://", ""); - let parts = uri_without_prefix.split("/").collect::>(); - if let (Some(uri_hostname), Some(_), Some(_)) = - (parts.get(0), parts.get(1), parts.get(2)) - { - let actor_store = ActorStore::new( - uri_hostname.to_string(), - S3BlobStore::new(uri_hostname.to_string(), s3_config), - ); - actor_store - .record - .update_record_takedown_status(&subject.uri, takedown.clone()) - .await?; - } + let subject_at_uri: AtUri = subject.uri.clone().try_into()?; + let actor_store = ActorStore::new( + subject_at_uri.get_hostname().to_string(), + S3BlobStore::new(subject_at_uri.get_hostname().to_string(), s3_config), + ); + actor_store + .record + .update_record_takedown_status(&subject_at_uri, takedown.clone()) + .await?; + } Subject::RepoBlobRef(subject) => { let actor_store = ActorStore::new( diff --git a/rsky-pds/src/apis/com/atproto/repo/apply_writes.rs b/rsky-pds/src/apis/com/atproto/repo/apply_writes.rs index 82cbce6d..07c2d760 100644 --- a/rsky-pds/src/apis/com/atproto/repo/apply_writes.rs +++ b/rsky-pds/src/apis/com/atproto/repo/apply_writes.rs @@ -86,7 +86,7 @@ async fn inner_apply_writes( collection: write.collection, rkey: write.rkey, swap_cid: None, - })) + })?) } }) }) diff --git a/rsky-pds/src/apis/com/atproto/repo/create_record.rs b/rsky-pds/src/apis/com/atproto/repo/create_record.rs index 59f8bfcd..8e7f353e 100644 --- a/rsky-pds/src/apis/com/atproto/repo/create_record.rs +++ b/rsky-pds/src/apis/com/atproto/repo/create_record.rs @@ -16,6 +16,7 @@ use rocket::response::status; use rocket::serde::json::Json; use rocket::State; use rsky_lexicon::com::atproto::repo::{CreateRecordInput, CreateRecordOutput}; +use rsky_syntax::aturi::AtUri; use std::str::FromStr; async fn inner_create_record( @@ -64,34 +65,26 @@ async fn inner_create_record( let mut actor_store = ActorStore::new(did.clone(), S3BlobStore::new(did.clone(), s3_config)); - let backlink_conflicts: Vec = match validate { + let backlink_conflicts: Vec = match validate { Some(true) => { + let write_at_uri: AtUri = write.uri.clone().try_into()?; actor_store .record - .get_backlink_conflicts(&write.uri, &write.record) + .get_backlink_conflicts(&write_at_uri, &write.record) .await? } _ => Vec::new(), }; - // @TODO: Use ATUri let backlink_deletions: Vec = backlink_conflicts - .into_iter() - .map(|uri| { - let uri_without_prefix = uri.replace("at://", ""); - let parts = uri_without_prefix.split("/").collect::>(); - if let (Some(uri_hostname), Some(uri_collection), Some(uri_rkey)) = - (parts.get(0), parts.get(1), parts.get(2)) - { - Ok(prepare_delete(PrepareDeleteOpts { - did: uri_hostname.to_string(), - collection: uri_collection.to_string(), - rkey: uri_rkey.to_string(), - swap_cid: None, - })) - } else { - bail!("Issue parsing backlink `{uri}`") - } + .iter() + .map(|at_uri| { + prepare_delete(PrepareDeleteOpts { + did: at_uri.get_hostname().to_string(), + collection: at_uri.get_collection(), + rkey: at_uri.get_rkey(), + swap_cid: None, + }) }) .collect::>>()?; let mut writes: Vec = vec![PreparedWrite::Create(write.clone())]; @@ -108,7 +101,7 @@ async fn inner_create_record( AccountManager::update_repo_root(did, commit.cid, commit.rev)?; Ok(CreateRecordOutput { - uri: write.uri, + uri: write.uri.clone(), cid: write.cid.to_string(), }) } else { diff --git a/rsky-pds/src/apis/com/atproto/repo/delete_record.rs b/rsky-pds/src/apis/com/atproto/repo/delete_record.rs index 0e5ec738..e3a0c2fe 100644 --- a/rsky-pds/src/apis/com/atproto/repo/delete_record.rs +++ b/rsky-pds/src/apis/com/atproto/repo/delete_record.rs @@ -14,6 +14,7 @@ use rocket::response::status; use rocket::serde::json::Json; use rocket::State; use rsky_lexicon::com::atproto::repo::DeleteRecordInput; +use rsky_syntax::aturi::AtUri; use std::str::FromStr; async fn inner_delete_record( @@ -60,13 +61,13 @@ async fn inner_delete_record( collection, rkey, swap_cid: swap_record_cid, - }); + })?; let mut actor_store = ActorStore::new(did.clone(), S3BlobStore::new(did.clone(), s3_config)); - + let write_at_uri: AtUri = write.uri.clone().try_into()?; let record = actor_store .record - .get_record(&write.uri, None, Some(true)) + .get_record(&write_at_uri, None, Some(true)) .await?; let commit = match record { None => return Ok(()), // No-op if record already doesn't exist diff --git a/rsky-pds/src/apis/com/atproto/repo/get_record.rs b/rsky-pds/src/apis/com/atproto/repo/get_record.rs index d0364091..e992b3b2 100644 --- a/rsky-pds/src/apis/com/atproto/repo/get_record.rs +++ b/rsky-pds/src/apis/com/atproto/repo/get_record.rs @@ -2,7 +2,7 @@ use crate::account_manager::AccountManager; use crate::models::{ErrorCode, ErrorMessageResponse}; use crate::pipethrough::{pipethrough, OverrideOpts, ProxyRequest}; use crate::repo::aws::s3::S3BlobStore; -use crate::repo::{make_aturi, ActorStore}; +use crate::repo::ActorStore; use anyhow::{bail, Result}; use aws_config::SdkConfig; use rocket::http::Status; @@ -10,6 +10,7 @@ use rocket::response::status; use rocket::serde::json::Json; use rocket::State; use rsky_lexicon::com::atproto::repo::GetRecordOutput; +use rsky_syntax::aturi::AtUri; async fn inner_get_record( repo: String, @@ -23,14 +24,14 @@ async fn inner_get_record( // fetch from pds if available, if not then fetch from appview if let Some(did) = did { - let uri = make_aturi(did.clone(), Some(collection), Some(rkey)); + let uri = AtUri::make(did.clone(), Some(collection), Some(rkey))?; let mut actor_store = ActorStore::new(did.clone(), S3BlobStore::new(did.clone(), s3_config)); match actor_store.record.get_record(&uri, cid, None).await { Ok(Some(record)) if record.takedown_ref.is_none() => Ok(GetRecordOutput { - uri, + uri: uri.to_string(), cid: Some(record.cid), value: serde_json::to_value(record.value)?, }), diff --git a/rsky-pds/src/apis/com/atproto/repo/list_records.rs b/rsky-pds/src/apis/com/atproto/repo/list_records.rs index 57b1a415..b8ca8497 100644 --- a/rsky-pds/src/apis/com/atproto/repo/list_records.rs +++ b/rsky-pds/src/apis/com/atproto/repo/list_records.rs @@ -9,6 +9,7 @@ use rocket::response::status; use rocket::serde::json::Json; use rocket::State; use rsky_lexicon::com::atproto::repo::{ListRecordsOutput, Record}; +use rsky_syntax::aturi::AtUri; #[allow(non_snake_case)] async fn inner_list_records( @@ -58,17 +59,10 @@ async fn inner_list_records( .collect::>>()?; let last_record = records.last(); - // @TODO: Use ATUri let cursor: Option; if let Some(last_record) = last_record { - let last_uri = last_record.clone().uri; - let last_uri_without_prefix = last_uri.replace("at://", ""); - let parts = last_uri_without_prefix.split("/").collect::>(); - if let (Some(_), Some(_), Some(uri_rkey)) = (parts.get(0), parts.get(1), parts.get(2)) { - cursor = Some(uri_rkey.to_string()); - } else { - cursor = None; - } + let last_at_uri: AtUri = last_record.uri.clone().try_into()?; + cursor = Some(last_at_uri.get_rkey()); } else { cursor = None; } diff --git a/rsky-pds/src/apis/com/atproto/repo/put_record.rs b/rsky-pds/src/apis/com/atproto/repo/put_record.rs index dfe3c2be..e8cc62fd 100644 --- a/rsky-pds/src/apis/com/atproto/repo/put_record.rs +++ b/rsky-pds/src/apis/com/atproto/repo/put_record.rs @@ -5,7 +5,7 @@ use crate::models::{ErrorCode, ErrorMessageResponse}; use crate::repo::aws::s3::S3BlobStore; use crate::repo::types::{CommitData, PreparedWrite}; use crate::repo::{ - make_aturi, prepare_create, prepare_update, ActorStore, PrepareCreateOpts, PrepareUpdateOpts, + prepare_create, prepare_update, ActorStore, PrepareCreateOpts, PrepareUpdateOpts, }; use crate::SharedSequencer; use anyhow::{bail, Result}; @@ -16,6 +16,7 @@ use rocket::response::status; use rocket::serde::json::Json; use rocket::State; use rsky_lexicon::com::atproto::repo::{PutRecordInput, PutRecordOutput}; +use rsky_syntax::aturi::AtUri; use std::str::FromStr; async fn inner_put_record( @@ -49,8 +50,7 @@ async fn inner_put_record( if did != auth.access.credentials.unwrap().did.unwrap() { bail!("AuthRequiredError") } - // @TODO: Use ATUri - let uri = make_aturi(did.clone(), Some(collection.clone()), Some(rkey.clone())); + let uri = AtUri::make(did.clone(), Some(collection.clone()), Some(rkey.clone()))?; let swap_commit_cid = match swap_commit { Some(swap_commit) => Some(Cid::from_str(&swap_commit)?), None => None, diff --git a/rsky-pds/src/repo/mod.rs b/rsky-pds/src/repo/mod.rs index 7257ac76..4e4f5abc 100644 --- a/rsky-pds/src/repo/mod.rs +++ b/rsky-pds/src/repo/mod.rs @@ -32,6 +32,7 @@ use lexicon_cid::Cid; use libipld::cbor::DagCborCodec; use libipld::Ipld as VendorIpld; use libipld::{Block, DefaultParams}; +use rsky_syntax::aturi::AtUri; use secp256k1::{Keypair, Secp256k1, SecretKey}; use serde::Serialize; use serde_cbor::Value as CborValue; @@ -110,7 +111,6 @@ impl ActorStore { // Transactors // ------------------- - // @TODO: Update to use AtUri pub async fn create_repo( &mut self, keypair: Keypair, @@ -120,19 +120,15 @@ impl ActorStore { .clone() .into_iter() .map(|prepare| { - let uri_without_prefix = prepare.uri.replace("at://", ""); - let parts = uri_without_prefix.split("/").collect::>(); - let collection = *parts.get(0).unwrap_or(&""); - let rkey = *parts.get(1).unwrap_or(&""); - - RecordCreateOrUpdateOp { + let at_uri: AtUri = prepare.uri.try_into()?; + Ok(RecordCreateOrUpdateOp { action: WriteOpAction::Create, - collection: collection.to_owned(), - rkey: rkey.to_owned(), + collection: at_uri.get_collection(), + rkey: at_uri.get_rkey(), record: prepare.record, - } + }) }) - .collect::>(); + .collect::>>()?; let commit = Repo::format_init_commit( self.storage.clone(), self.did.clone(), @@ -184,22 +180,27 @@ impl ActorStore { } self.storage.cache_rev(current_root.rev).await?; let mut new_record_cids: Vec = vec![]; - let mut delete_and_update_uris: Vec = vec![]; + let mut delete_and_update_uris= vec![]; for write in &writes { match write.clone() { PreparedWrite::Create(c) => new_record_cids.push(c.cid), PreparedWrite::Update(u) => { new_record_cids.push(u.cid); - delete_and_update_uris.push(u.uri); + let u_at_uri: AtUri = u.uri.try_into()?; + delete_and_update_uris.push(u_at_uri); } - PreparedWrite::Delete(d) => delete_and_update_uris.push(d.uri), + PreparedWrite::Delete(d) => { + let d_at_uri: AtUri = d.uri.try_into()?; + delete_and_update_uris.push(d_at_uri) + }, } if write.swap_cid().is_none() { continue; } + let write_at_uri: &AtUri = &write.uri().try_into()?; let record = self .record - .get_record(write.uri(), None, Some(true)) + .get_record(write_at_uri, None, Some(true)) .await?; let current_record = match record { Some(record) => Some(Cid::from_str(&record.cid)?), @@ -233,8 +234,8 @@ impl ActorStore { let mut repo = Repo::load(&mut self.storage, Some(current_root.cid)).await?; let write_ops: Vec = writes .into_iter() - .map(|write| write_to_op(write)) - .collect::>(); + .map(write_to_op) + .collect::>>()?; // @TODO: Use repo signing key global config let secp = Secp256k1::new(); let repo_private_key = env::var("PDS_REPO_SIGNING_KEY_K256_PRIVATE_KEY_HEX").unwrap(); @@ -274,9 +275,10 @@ impl ActorStore { .then(|write| async move { Ok::<(), anyhow::Error>(match write { PreparedWrite::Create(write) => { + let write_at_uri: AtUri = write.uri.try_into()?; self.record .index_record( - write.uri, + write_at_uri.clone(), write.cid, Some(write.record), Some(write.action), @@ -286,9 +288,10 @@ impl ActorStore { .await? } PreparedWrite::Update(write) => { + let write_at_uri: AtUri = write.uri.try_into()?; self.record .index_record( - write.uri, + write_at_uri.clone(), write.cid, Some(write.record), Some(write.action), @@ -297,7 +300,10 @@ impl ActorStore { ) .await? } - PreparedWrite::Delete(write) => self.record.delete_record(write.uri).await?, + PreparedWrite::Delete(write) => { + let write_at_uri: AtUri = write.uri.try_into()?; + self.record.delete_record(&write_at_uri).await? + }, }) }) .collect::>() @@ -330,11 +336,10 @@ impl ActorStore { Ok(()) } - // @TODO: Use AtUri pub async fn get_duplicate_record_cids( &self, cids: Vec, - touched_uris: Vec, + touched_uris: Vec, ) -> Result> { if touched_uris.len() == 0 || cids.len() == 0 { return Ok(vec![]); @@ -343,10 +348,11 @@ impl ActorStore { let conn = &mut establish_connection()?; let cid_strs: Vec = cids.into_iter().map(|c| c.to_string()).collect(); + let touched_uri_strs: Vec = touched_uris.iter().map(|t| t.to_string()).collect(); let res: Vec = RecordSchema::record .filter(RecordSchema::did.eq(&self.did)) .filter(RecordSchema::cid.eq_any(cid_strs)) - .filter(RecordSchema::uri.ne_all(touched_uris)) + .filter(RecordSchema::uri.ne_all(touched_uri_strs)) .select(RecordSchema::cid) .get_results(conn)?; Ok(res @@ -811,21 +817,6 @@ pub fn set_collection_name( Ok(record) } -pub fn make_aturi( - handle_or_did: String, - collection: Option, - rkey: Option, -) -> String { - let mut str = format!("at://{handle_or_did}"); - if let Some(collection) = collection { - str = format!("{str}/{collection}"); - } - if let Some(rkey) = rkey { - str = format!("{str}/{rkey}"); - } - str -} - pub async fn cid_for_safe_record(record: RepoRecord) -> Result { let block = data_to_cbor_block(&lex_to_ipld(Lex::Map(record)))?; // Confirm whether Block properly transforms between lex and cbor @@ -852,9 +843,10 @@ pub async fn prepare_create(opts: PrepareCreateOpts) -> Result Result Result PreparedDelete { +pub fn prepare_delete(opts: PrepareDeleteOpts) -> Result { let PrepareDeleteOpts { did, collection, rkey, swap_cid, } = opts; - PreparedDelete { + let uri = AtUri::make(did, Some(collection), Some(rkey))?; + Ok(PreparedDelete { action: WriteOpAction::Delete, - uri: make_aturi(did, Some(collection), Some(rkey)), + uri: uri.to_string(), swap_cid, - } + }) } lazy_static! { diff --git a/rsky-pds/src/repo/record/mod.rs b/rsky-pds/src/repo/record/mod.rs index 7c272a9c..8a822d18 100644 --- a/rsky-pds/src/repo/record/mod.rs +++ b/rsky-pds/src/repo/record/mod.rs @@ -9,6 +9,7 @@ use diesel::*; use futures::stream::{self, StreamExt}; use lexicon_cid::Cid; use rsky_lexicon::com::atproto::admin::StatusAttr; +use rsky_syntax::aturi::AtUri; use serde_json::Value as JsonValue; use std::env; use std::str::FromStr; @@ -33,7 +34,7 @@ pub struct RecordsForCollection { // @NOTE in the future this can be replaced with a more generic routine that pulls backlinks based on lex docs. // For now, we just want to ensure we're tracking links from follows, blocks, likes, and reposts. -pub fn get_backlinks(uri: &String, record: &RepoRecord) -> Result> { +pub fn get_backlinks(uri: &AtUri, record: &RepoRecord) -> Result> { if let Some(Lex::Ipld(Ipld::Json(JsonValue::String(record_type)))) = record.get("$type") { if record_type == Ids::AppBskyGraphFollow.as_str() || record_type == Ids::AppBskyGraphBlock.as_str() @@ -41,7 +42,7 @@ pub fn get_backlinks(uri: &String, record: &RepoRecord) -> Result Result, include_soft_deleted: Option, ) -> Result> { @@ -178,7 +179,7 @@ impl RecordReader { let mut builder = RecordSchema::record .inner_join(RepoBlockSchema::repo_block.on(RepoBlockSchema::cid.eq(RecordSchema::cid))) .select((models::Record::as_select(), models::RepoBlock::as_select())) - .filter(RecordSchema::uri.eq(uri)) + .filter(RecordSchema::uri.eq(uri.to_string())) .into_boxed(); if !include_soft_deleted { builder = builder.filter(RecordSchema::takedownRef.is_null()); @@ -290,24 +291,17 @@ impl RecordReader { Ok(res) } - // @TODO: Update to use AtUri pub async fn get_backlink_conflicts( &self, - uri: &String, + uri: &AtUri, record: &RepoRecord, - ) -> Result> { + ) -> Result> { let record_backlinks = get_backlinks(uri, record)?; - let collection = uri - .split("/") - .collect::>() - .into_iter() - .nth(0) - .unwrap_or(""); let conflicts: Vec> = stream::iter(record_backlinks) .then(|backlink| async move { Ok::, anyhow::Error>( self.get_record_backlinks( - collection.to_owned(), + uri.get_collection(), backlink.path, backlink.link_to, ) @@ -319,17 +313,17 @@ impl RecordReader { .into_iter() .collect::, _>>()?; Ok(conflicts - .into_iter() - .flatten() - .map(|record| { - format!( - "at://{0}/{1}/{2}", - env::var("PDS_HOSTNAME").unwrap_or("localhost".to_owned()), - collection, - record.rkey - ) - }) - .collect()) + .into_iter() + .flatten() + .filter_map(|record| { + AtUri::make( + env::var("PDS_HOSTNAME").unwrap_or("localhost".to_owned()), + Some(String::from(uri.get_collection())), + Some(record.rkey), + ) + .ok() + }) + .collect::>()) } // Transactors @@ -337,7 +331,7 @@ impl RecordReader { pub async fn index_record( &self, - uri: String, // @TODO: Use AtUri + uri: AtUri, cid: Cid, record: Option, action: Option, // Create or update with a default of create @@ -345,86 +339,83 @@ impl RecordReader { timestamp: Option, ) -> Result<()> { println!("@LOG DEBUG RecordReader::index_record, indexing record {uri}"); + let collection = uri.get_collection(); + let rkey = uri.get_rkey(); + let hostname = uri.get_hostname().to_string(); let action = action.unwrap_or(WriteOpAction::Create); - let uri_without_prefix = uri.replace("at://", ""); - let parts = uri_without_prefix.split("/").collect::>(); - match (parts.get(0), parts.get(1), parts.get(2)) { - (Some(hostname), Some(collection), Some(rkey)) => { - let indexed_at = timestamp.unwrap_or_else(|| common::now()); - let row = Record { - did: self.did.clone(), - uri: uri.clone(), - cid: cid.to_string(), - collection: collection.to_string(), - rkey: rkey.to_string(), - repo_rev: Some(repo_rev.clone()), - indexed_at: indexed_at.clone(), - takedown_ref: None, - }; - - if !hostname.starts_with("did:") { - bail!("Expected indexed URI to contain DID") - } else if collection.len() < 1 { - bail!("Expected indexed URI to contain a collection") - } else if rkey.len() < 1 { - bail!("Expected indexed URI to contain a record key") - } + let indexed_at = timestamp.unwrap_or_else(|| common::now()); + let row = Record { + did: self.did.clone(), + uri: uri.to_string(), + cid: cid.to_string(), + collection: collection.clone(), + rkey: rkey.to_string(), + repo_rev: Some(repo_rev.clone()), + indexed_at: indexed_at.clone(), + takedown_ref: None, + }; - use crate::schema::pds::record::dsl as RecordSchema; - let conn = &mut establish_connection()?; - - // Track current version of record - insert_into(RecordSchema::record) - .values(row) - .on_conflict(RecordSchema::uri) - .do_update() - .set(( - RecordSchema::cid.eq(cid.to_string()), - RecordSchema::repoRev.eq(&repo_rev), - RecordSchema::indexedAt.eq(&indexed_at), - )) - .execute(conn)?; - - if let Some(record) = record { - // Maintain backlinks - let backlinks = get_backlinks(&uri, &record)?; - if let WriteOpAction::Update = action { - // On update just recreate backlinks from scratch for the record, so we can clear out - // the old ones. E.g. for weird cases like updating a follow to be for a different did. - self.remove_backlinks_by_uri(&uri).await?; - } - self.add_backlinks(backlinks).await?; - } - println!("@LOG DEBUG RecordReader::index_record, indexed record {uri}"); - Ok(()) + if !hostname.starts_with("did:") { + bail!("Expected indexed URI to contain DID") + } else if collection.len() < 1 { + bail!("Expected indexed URI to contain a collection") + } else if rkey.len() < 1 { + bail!("Expected indexed URI to contain a record key") + } + + use crate::schema::pds::record::dsl as RecordSchema; + let conn = &mut establish_connection()?; + + // Track current version of record + insert_into(RecordSchema::record) + .values(row) + .on_conflict(RecordSchema::uri) + .do_update() + .set(( + RecordSchema::cid.eq(cid.to_string()), + RecordSchema::repoRev.eq(&repo_rev), + RecordSchema::indexedAt.eq(&indexed_at), + )) + .execute(conn)?; + + if let Some(record) = record { + // Maintain backlinks + let backlinks = get_backlinks(&uri, &record)?; + if let WriteOpAction::Update = action { + // On update just recreate backlinks from scratch for the record, so we can clear out + // the old ones. E.g. for weird cases like updating a follow to be for a different did. + self.remove_backlinks_by_uri(&uri).await?; } - _ => bail!("Issue parsing uri: {uri}"), + self.add_backlinks(backlinks).await?; } + println!("@LOG DEBUG RecordReader::index_record, indexed record {uri}"); + Ok(()) + } pub async fn delete_record( &self, - uri: String, // @TODO: Use AtUri + uri: &AtUri, ) -> Result<()> { println!("@LOG DEBUG RecordReader::delete_record, deleting indexed record {uri}"); use crate::schema::pds::backlink::dsl as BacklinkSchema; use crate::schema::pds::record::dsl as RecordSchema; let conn = &mut establish_connection()?; delete(RecordSchema::record) - .filter(RecordSchema::uri.eq(&uri)) + .filter(RecordSchema::uri.eq(uri.to_string())) .execute(conn)?; delete(BacklinkSchema::backlink) - .filter(BacklinkSchema::uri.eq(&uri)) + .filter(BacklinkSchema::uri.eq(uri.to_string())) .execute(conn)?; println!("@LOG DEBUG RecordReader::delete_record, deleted indexed record {uri}"); Ok(()) } - pub async fn remove_backlinks_by_uri(&self, uri: &String) -> Result<()> { + pub async fn remove_backlinks_by_uri(&self, uri: &AtUri) -> Result<()> { use crate::schema::pds::backlink::dsl as BacklinkSchema; let conn = &mut establish_connection()?; delete(BacklinkSchema::backlink) - .filter(BacklinkSchema::uri.eq(uri)) + .filter(BacklinkSchema::uri.eq(uri.to_string())) .execute(conn)?; Ok(()) } @@ -445,7 +436,7 @@ impl RecordReader { pub async fn update_record_takedown_status( &self, - uri: &String, // @TODO: Use AtUri + uri: &AtUri, takedown: StatusAttr, ) -> Result<()> { use crate::schema::pds::record::dsl as RecordSchema; @@ -458,9 +449,9 @@ impl RecordReader { }, false => None, }; - + let uri_string = uri.to_string(); update(RecordSchema::record) - .filter(RecordSchema::uri.eq(uri)) + .filter(RecordSchema::uri.eq(uri_string)) .set(RecordSchema::takedownRef.eq(takedown_ref)) .execute(conn)?; diff --git a/rsky-pds/src/repo/types.rs b/rsky-pds/src/repo/types.rs index 566642dd..dfc8f693 100644 --- a/rsky-pds/src/repo/types.rs +++ b/rsky-pds/src/repo/types.rs @@ -4,6 +4,7 @@ use crate::repo::cid_set::CidSet; use crate::storage::Ipld; use anyhow::{bail, Result}; use lexicon_cid::Cid; +use rsky_syntax::aturi::AtUri; use std::collections::BTreeMap; // Repo nodes @@ -178,48 +179,42 @@ pub enum RecordWriteOp { Delete(RecordDeleteOp), } -// @TODO: Use AtUri -pub fn create_write_to_op(write: PreparedCreateOrUpdate) -> RecordWriteOp { - let uri_without_prefix = write.uri.replace("at://", ""); - let parts = uri_without_prefix.split("/").collect::>(); - RecordWriteOp::Create { +pub fn create_write_to_op(write: PreparedCreateOrUpdate) -> Result { + let write_at_uri: AtUri = write.uri.try_into()?; + Ok(RecordWriteOp::Create { 0: RecordCreateOrUpdateOp { action: WriteOpAction::Create, - collection: parts[1].to_string(), - rkey: parts[2].to_string(), + collection: write_at_uri.get_collection(), + rkey: write_at_uri.get_rkey(), record: write.record, }, - } + }) } -// @TODO: Use AtUri -pub fn update_write_to_op(write: PreparedCreateOrUpdate) -> RecordWriteOp { - let uri_without_prefix = write.uri.replace("at://", ""); - let parts = uri_without_prefix.split("/").collect::>(); - RecordWriteOp::Update { +pub fn update_write_to_op(write: PreparedCreateOrUpdate) -> Result { + let write_at_uri: AtUri = write.uri.try_into()?; + Ok(RecordWriteOp::Update { 0: RecordCreateOrUpdateOp { action: WriteOpAction::Update, - collection: parts[1].to_string(), - rkey: parts[2].to_string(), + collection: write_at_uri.get_collection(), + rkey: write_at_uri.get_rkey(), record: write.record, }, - } + }) } -// @TODO: Use AtUri -pub fn delete_write_to_op(write: PreparedDelete) -> RecordWriteOp { - let uri_without_prefix = write.uri.replace("at://", ""); - let parts = uri_without_prefix.split("/").collect::>(); - RecordWriteOp::Delete { +pub fn delete_write_to_op(write: PreparedDelete) -> Result { + let write_at_uri: AtUri = write.uri.try_into()?; + Ok(RecordWriteOp::Delete { 0: RecordDeleteOp { action: WriteOpAction::Delete, - collection: parts[1].to_string(), - rkey: parts[2].to_string(), + collection: write_at_uri.get_collection(), + rkey: write_at_uri.get_rkey(), }, - } + }) } -pub fn write_to_op(write: PreparedWrite) -> RecordWriteOp { +pub fn write_to_op(write: PreparedWrite) -> Result { match write { PreparedWrite::Create(c) => create_write_to_op(c), PreparedWrite::Update(u) => update_write_to_op(u), diff --git a/rsky-syntax/src/aturi.rs b/rsky-syntax/src/aturi.rs index edb79321..b5218ab6 100644 --- a/rsky-syntax/src/aturi.rs +++ b/rsky-syntax/src/aturi.rs @@ -264,6 +264,30 @@ impl Display for AtUri { } } +impl TryFrom<&str> for AtUri { + type Error = anyhow::Error; + + fn try_from(value: &str) -> Result { + AtUri::new(value.to_string(), None) + } +} + +impl TryFrom for AtUri { + type Error = anyhow::Error; + + fn try_from(value: String) -> Result { + AtUri::new(value, None) + } +} + +impl TryFrom<&String> for AtUri { + type Error = anyhow::Error; + + fn try_from(value: &String) -> Result { + AtUri::new(value.to_string(), None) + } +} + #[cfg(test)] mod tests { use super::*; @@ -453,4 +477,222 @@ mod tests { } } } + + #[test] + fn test_valid_str_conversion() { + let valid_cases = vec![ + "did:plc:44ybard66vv44zksje25o7dz/app.bsky.feed.post/3jwdwj2ctlk26", + "at://foo.com/com.example.foo/123", + "bnewbold.bsky.team/app.bsky.feed.post/3jwdwj2ctlk26", + ]; + + for case in valid_cases { + let result: Result = case.try_into(); + assert!(result.is_ok(), "Failed to parse valid URI: {}", case); + + let uri = result.unwrap(); + assert_eq!(uri.to_string(), format!("at://{}", case.trim_start_matches("at://"))); + } + } + + #[test] + fn test_valid_string_conversion() { + let valid_cases = vec![ + String::from("did:plc:44ybard66vv44zksje25o7dz/app.bsky.feed.post/3jwdwj2ctlk26"), + String::from("at://foo.com/com.example.foo/123"), + String::from("bnewbold.bsky.team/app.bsky.feed.post/3jwdwj2ctlk26"), + ]; + + for case in valid_cases { + let result: Result = case.clone().try_into(); + assert!(result.is_ok(), "Failed to parse valid URI: {}", case); + + let uri = result.unwrap(); + assert_eq!(uri.to_string(), format!("at://{}", case.trim_start_matches("at://"))); + } + } + + #[test] + fn test_invalid_str_conversion() { + let invalid_cases = vec![ + "", // Empty string + // @TODO implement AtUri Validation + // "invalid/uri/format", // Missing host + // "http://not-at-protocol", // Wrong protocol + // "at://", // Missing everything after protocol + // "at://@invalid-chars@", // Invalid characters + // "at://host/collection/rkey/extra", // Too many path segments + ]; + + for case in invalid_cases { + let result: Result = case.try_into(); + assert!(result.is_err(), "Unexpectedly parsed invalid URI: {}", case); + } + } + + #[test] + fn test_invalid_string_conversion() { + let invalid_cases = vec![ + String::from(""), + // @TODO implement AtUri Validation + // String::from("invalid/uri/format"), + // String::from("http://not-at-protocol"), + // String::from("at://"), + // String::from("at://@invalid-chars@"), + // String::from("at://host/collection/rkey/extra"), + ]; + + for case in invalid_cases { + let result: Result = case.clone().try_into(); + assert!(result.is_err(), "Unexpectedly parsed invalid URI: {}", case); + } + } + + #[test] + fn test_conversion_with_query_params() { + let uri_str = "at://host.com/collection/123?key=value"; + let result: Result = uri_str.try_into(); + assert!(result.is_ok()); + let uri = result.unwrap(); + assert_eq!(uri.host, "host.com"); + assert_eq!(uri.get_collection(), "collection"); + assert_eq!(uri.get_rkey(), "123"); + assert_eq!(uri.search_params, vec![("key".to_string(), "value".to_string())]); + } + + #[test] + fn test_conversion_with_hash() { + let uri_str = "at://host.com/collection/123#fragment"; + let result: Result = uri_str.try_into(); + assert!(result.is_ok()); + let uri = result.unwrap(); + assert_eq!(uri.host, "host.com"); + assert_eq!(uri.get_collection(), "collection"); + assert_eq!(uri.get_rkey(), "123"); + assert_eq!(uri.hash, "#fragment"); + } + + #[test] + fn test_conversion_full_uri() { + let uri_str = "at://host.com/collection/123?key=value#fragment"; + let result: Result = uri_str.try_into(); + assert!(result.is_ok()); + let uri = result.unwrap(); + assert_eq!(uri.host, "host.com"); + assert_eq!(uri.get_collection(), "collection"); + assert_eq!(uri.get_rkey(), "123"); + assert_eq!(uri.search_params, vec![("key".to_string(), "value".to_string())]); + assert_eq!(uri.hash, "#fragment"); + } + + #[test] +fn test_uri_modifications() -> Result<()> { + // Start with basic URI + let mut uri = AtUri::new("at://foo.com".to_string(), None)?; + assert_eq!(uri.to_string(), "at://foo.com/"); + + // Test host modifications + uri.set_hostname("bar.com".to_string()); + assert_eq!(uri.to_string(), "at://bar.com/"); + uri.set_hostname("did:web:localhost%3A1234".to_string()); + assert_eq!(uri.to_string(), "at://did:web:localhost%3A1234/"); + uri.set_hostname("foo.com".to_string()); + assert_eq!(uri.to_string(), "at://foo.com/"); + + // Test pathname modifications + uri.pathname = "/".to_string(); + assert_eq!(uri.to_string(), "at://foo.com/"); + uri.pathname = "/foo".to_string(); + assert_eq!(uri.to_string(), "at://foo.com/foo"); + uri.pathname = "foo".to_string(); + assert_eq!(uri.to_string(), "at://foo.com/foo"); + + // Test collection and rkey modifications + uri.set_collection("com.example.foo".to_string()); + uri.set_rkey("123".to_string()); + assert_eq!(uri.to_string(), "at://foo.com/com.example.foo/123"); + uri.set_rkey("124".to_string()); + assert_eq!(uri.to_string(), "at://foo.com/com.example.foo/124"); + uri.set_collection("com.other.foo".to_string()); + assert_eq!(uri.to_string(), "at://foo.com/com.other.foo/124"); + uri.pathname = "".to_string(); + uri.set_rkey("123".to_string()); + assert_eq!(uri.to_string(), "at://foo.com/123"); + uri.pathname = "foo".to_string(); + + // Test search parameter modifications + uri.set_search("?foo=bar".to_string())?; + assert_eq!(uri.to_string(), "at://foo.com/foo?foo=bar"); + uri.search_params = vec![ + ("foo".to_string(), "bar".to_string()), + ("baz".to_string(), "buux".to_string()) + ]; + assert_eq!(uri.to_string(), "at://foo.com/foo?foo=bar&baz=buux"); + + // Test hash modifications + // @TODO should set # automatically if not set to conform with typescript + // see https://github.com/bluesky-social/atproto/blob/688ff0/packages/syntax/tests/aturi.test.ts#L314 + // uri.hash = "#hash".to_string(); + // assert_eq!(uri.to_string(), "at://foo.com/foo?foo=bar&baz=buux#hash"); + // uri.hash = "hash".to_string(); // Should automatically add # when missing + // assert_eq!(uri.to_string(), "at://foo.com/foo?foo=bar&baz=buux#hash"); + + Ok(()) +} + +#[test] +fn test_relative_uris() -> Result<()> { + // Define test cases as tuples of (input, expected_pathname, expected_search, expected_hash) + let test_cases = vec![ + ("", "", "", ""), + ("/", "/", "", ""), + ("/foo", "/foo", "", ""), + ("/foo/", "/foo/", "", ""), + ("/foo/bar", "/foo/bar", "", ""), + ("?foo=bar", "", "foo=bar", ""), + ("?foo=bar&baz=buux", "", "foo=bar&baz=buux", ""), + ("/?foo=bar", "/", "foo=bar", ""), + ("/foo?foo=bar", "/foo", "foo=bar", ""), + ("/foo/?foo=bar", "/foo/", "foo=bar", ""), + ("#hash", "", "", "#hash"), + ("/#hash", "/", "", "#hash"), + ("/foo#hash", "/foo", "", "#hash"), + ("/foo/#hash", "/foo/", "", "#hash"), + ("?foo=bar#hash", "", "foo=bar", "#hash"), + ]; + + // Define base URIs to test against + let base_uris = vec![ + "did:web:localhost%3A1234", + "at://did:web:localhost%3A1234", + "at://did:web:localhost%3A1234/foo/bar?foo=bar&baz=buux#hash", + ]; + + for base in base_uris { + let base_uri = AtUri::new(base.to_string(), None)?; + + for (relative, exp_path, exp_search, exp_hash) in test_cases.iter() { + let uri = AtUri::new(relative.to_string(), Some(base.to_string()))?; + + // Verify the components match expectations + assert_eq!(uri.get_protocol(), "at:".to_string()); + assert_eq!(uri.host, base_uri.host); + assert_eq!(uri.get_hostname(), base_uri.get_hostname()); + assert_eq!(uri.get_origin(), base_uri.get_origin()); + assert_eq!(uri.pathname, exp_path.to_string()); + + // Compare search params + if exp_search.is_empty() { + assert!(uri.get_search()?.is_none() || uri.get_search()?.unwrap().is_empty()); + } else { + assert_eq!(uri.get_search()?.unwrap(), exp_search.to_string()); + } + + // Compare hash + assert_eq!(uri.hash, exp_hash.to_string()); + } + } + + Ok(()) +} }