Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 11 additions & 14 deletions rsky-pds/src/apis/com/atproto/admin/update_subject_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use rocket::response::status;
use rocket::serde::json::Json;
use rocket::State;
use rsky_lexicon::com::atproto::admin::{Subject, SubjectStatus, UpdateSubjectStatusOutput};
use rsky_syntax::aturi::AtUri;
use std::str::FromStr;

async fn inner_update_subject_status(
Expand All @@ -31,20 +32,16 @@ async fn inner_update_subject_status(
AccountManager::takedown_account(&subject.did, takedown.clone()).await?;
}
Subject::StrongRef(subject) => {
let uri_without_prefix = subject.uri.replace("at://", "");
let parts = uri_without_prefix.split("/").collect::<Vec<&str>>();
if let (Some(uri_hostname), Some(_), Some(_)) =
(parts.get(0), parts.get(1), parts.get(2))
{
let actor_store = ActorStore::new(
uri_hostname.to_string(),
S3BlobStore::new(uri_hostname.to_string(), s3_config),
);
actor_store
.record
.update_record_takedown_status(&subject.uri, takedown.clone())
.await?;
}
let subject_at_uri: AtUri = subject.uri.clone().try_into()?;
let actor_store = ActorStore::new(
subject_at_uri.get_hostname().to_string(),
S3BlobStore::new(subject_at_uri.get_hostname().to_string(), s3_config),
);
actor_store
.record
.update_record_takedown_status(&subject_at_uri, takedown.clone())
.await?;

}
Subject::RepoBlobRef(subject) => {
let actor_store = ActorStore::new(
Expand Down
2 changes: 1 addition & 1 deletion rsky-pds/src/apis/com/atproto/repo/apply_writes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ async fn inner_apply_writes(
collection: write.collection,
rkey: write.rkey,
swap_cid: None,
}))
})?)
}
})
})
Expand Down
33 changes: 13 additions & 20 deletions rsky-pds/src/apis/com/atproto/repo/create_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use rocket::response::status;
use rocket::serde::json::Json;
use rocket::State;
use rsky_lexicon::com::atproto::repo::{CreateRecordInput, CreateRecordOutput};
use rsky_syntax::aturi::AtUri;
use std::str::FromStr;

async fn inner_create_record(
Expand Down Expand Up @@ -64,34 +65,26 @@ async fn inner_create_record(

let mut actor_store =
ActorStore::new(did.clone(), S3BlobStore::new(did.clone(), s3_config));
let backlink_conflicts: Vec<String> = match validate {
let backlink_conflicts: Vec<AtUri> = match validate {
Some(true) => {
let write_at_uri: AtUri = write.uri.clone().try_into()?;
actor_store
.record
.get_backlink_conflicts(&write.uri, &write.record)
.get_backlink_conflicts(&write_at_uri, &write.record)
.await?
}
_ => Vec::new(),
};

// @TODO: Use ATUri
let backlink_deletions: Vec<PreparedDelete> = backlink_conflicts
.into_iter()
.map(|uri| {
let uri_without_prefix = uri.replace("at://", "");
let parts = uri_without_prefix.split("/").collect::<Vec<&str>>();
if let (Some(uri_hostname), Some(uri_collection), Some(uri_rkey)) =
(parts.get(0), parts.get(1), parts.get(2))
{
Ok(prepare_delete(PrepareDeleteOpts {
did: uri_hostname.to_string(),
collection: uri_collection.to_string(),
rkey: uri_rkey.to_string(),
swap_cid: None,
}))
} else {
bail!("Issue parsing backlink `{uri}`")
}
.iter()
.map(|at_uri| {
prepare_delete(PrepareDeleteOpts {
did: at_uri.get_hostname().to_string(),
collection: at_uri.get_collection(),
rkey: at_uri.get_rkey(),
swap_cid: None,
})
})
.collect::<Result<Vec<PreparedDelete>>>()?;
let mut writes: Vec<PreparedWrite> = vec![PreparedWrite::Create(write.clone())];
Expand All @@ -108,7 +101,7 @@ async fn inner_create_record(
AccountManager::update_repo_root(did, commit.cid, commit.rev)?;

Ok(CreateRecordOutput {
uri: write.uri,
uri: write.uri.clone(),
cid: write.cid.to_string(),
})
} else {
Expand Down
7 changes: 4 additions & 3 deletions rsky-pds/src/apis/com/atproto/repo/delete_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use rocket::response::status;
use rocket::serde::json::Json;
use rocket::State;
use rsky_lexicon::com::atproto::repo::DeleteRecordInput;
use rsky_syntax::aturi::AtUri;
use std::str::FromStr;

async fn inner_delete_record(
Expand Down Expand Up @@ -60,13 +61,13 @@ async fn inner_delete_record(
collection,
rkey,
swap_cid: swap_record_cid,
});
})?;
let mut actor_store =
ActorStore::new(did.clone(), S3BlobStore::new(did.clone(), s3_config));

let write_at_uri: AtUri = write.uri.clone().try_into()?;
let record = actor_store
.record
.get_record(&write.uri, None, Some(true))
.get_record(&write_at_uri, None, Some(true))
.await?;
let commit = match record {
None => return Ok(()), // No-op if record already doesn't exist
Expand Down
7 changes: 4 additions & 3 deletions rsky-pds/src/apis/com/atproto/repo/get_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use crate::account_manager::AccountManager;
use crate::models::{ErrorCode, ErrorMessageResponse};
use crate::pipethrough::{pipethrough, OverrideOpts, ProxyRequest};
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::{make_aturi, ActorStore};
use crate::repo::ActorStore;
use anyhow::{bail, Result};
use aws_config::SdkConfig;
use rocket::http::Status;
use rocket::response::status;
use rocket::serde::json::Json;
use rocket::State;
use rsky_lexicon::com::atproto::repo::GetRecordOutput;
use rsky_syntax::aturi::AtUri;

async fn inner_get_record(
repo: String,
Expand All @@ -23,14 +24,14 @@ async fn inner_get_record(

// fetch from pds if available, if not then fetch from appview
if let Some(did) = did {
let uri = make_aturi(did.clone(), Some(collection), Some(rkey));
let uri = AtUri::make(did.clone(), Some(collection), Some(rkey))?;

let mut actor_store =
ActorStore::new(did.clone(), S3BlobStore::new(did.clone(), s3_config));

match actor_store.record.get_record(&uri, cid, None).await {
Ok(Some(record)) if record.takedown_ref.is_none() => Ok(GetRecordOutput {
uri,
uri: uri.to_string(),
cid: Some(record.cid),
value: serde_json::to_value(record.value)?,
}),
Expand Down
12 changes: 3 additions & 9 deletions rsky-pds/src/apis/com/atproto/repo/list_records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use rocket::response::status;
use rocket::serde::json::Json;
use rocket::State;
use rsky_lexicon::com::atproto::repo::{ListRecordsOutput, Record};
use rsky_syntax::aturi::AtUri;

#[allow(non_snake_case)]
async fn inner_list_records(
Expand Down Expand Up @@ -58,17 +59,10 @@ async fn inner_list_records(
.collect::<Result<Vec<Record>>>()?;

let last_record = records.last();
// @TODO: Use ATUri
let cursor: Option<String>;
if let Some(last_record) = last_record {
let last_uri = last_record.clone().uri;
let last_uri_without_prefix = last_uri.replace("at://", "");
let parts = last_uri_without_prefix.split("/").collect::<Vec<&str>>();
if let (Some(_), Some(_), Some(uri_rkey)) = (parts.get(0), parts.get(1), parts.get(2)) {
cursor = Some(uri_rkey.to_string());
} else {
cursor = None;
}
let last_at_uri: AtUri = last_record.uri.clone().try_into()?;
cursor = Some(last_at_uri.get_rkey());
} else {
cursor = None;
}
Expand Down
6 changes: 3 additions & 3 deletions rsky-pds/src/apis/com/atproto/repo/put_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::models::{ErrorCode, ErrorMessageResponse};
use crate::repo::aws::s3::S3BlobStore;
use crate::repo::types::{CommitData, PreparedWrite};
use crate::repo::{
make_aturi, prepare_create, prepare_update, ActorStore, PrepareCreateOpts, PrepareUpdateOpts,
prepare_create, prepare_update, ActorStore, PrepareCreateOpts, PrepareUpdateOpts,
};
use crate::SharedSequencer;
use anyhow::{bail, Result};
Expand All @@ -16,6 +16,7 @@ use rocket::response::status;
use rocket::serde::json::Json;
use rocket::State;
use rsky_lexicon::com::atproto::repo::{PutRecordInput, PutRecordOutput};
use rsky_syntax::aturi::AtUri;
use std::str::FromStr;

async fn inner_put_record(
Expand Down Expand Up @@ -49,8 +50,7 @@ async fn inner_put_record(
if did != auth.access.credentials.unwrap().did.unwrap() {
bail!("AuthRequiredError")
}
// @TODO: Use ATUri
let uri = make_aturi(did.clone(), Some(collection.clone()), Some(rkey.clone()));
let uri = AtUri::make(did.clone(), Some(collection.clone()), Some(rkey.clone()))?;
let swap_commit_cid = match swap_commit {
Some(swap_commit) => Some(Cid::from_str(&swap_commit)?),
None => None,
Expand Down
Loading
Loading