Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions rsky-pds/src/actor_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ impl ActorStore {
}
}

pub async fn get_repo_root(&self) -> Option<Cid> {
let storage_guard = self.storage.read().await;
storage_guard.get_root().await
}

// Transactors
// -------------------

Expand Down Expand Up @@ -85,6 +90,26 @@ impl ActorStore {
Ok(commit)
}

pub async fn process_import_repo(
&mut self,
commit: CommitData,
writes: Vec<PreparedWrite>,
) -> Result<()> {
{
let immutable_borrow = &self;
// & send to indexing
immutable_borrow
.index_writes(writes.clone(), &commit.rev)
.await?;
}
// persist the commit to repo storage
let storage_guard = self.storage.read().await;
storage_guard.apply_commit(commit.clone(), None).await?;
// process blobs
self.blob.process_write_blobs(writes).await?;
Ok(())
}

pub async fn process_writes(
&mut self,
writes: Vec<PreparedWrite>,
Expand Down
159 changes: 156 additions & 3 deletions rsky-pds/src/apis/com/atproto/repo/import_repo.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,157 @@
#[rocket::post("/xrpc/com.atproto.repo.importRepo")]
pub async fn import_repo() {
unimplemented!()
use crate::actor_store::aws::s3::S3BlobStore;
use crate::actor_store::ActorStore;
use crate::apis::ApiError;
use crate::auth_verifier::AccessFullImport;
use crate::db::DbConn;
use crate::repo::prepare::{
prepare_create, prepare_delete, prepare_update, PrepareCreateOpts, PrepareDeleteOpts,
PrepareUpdateOpts,
};
use aws_config::SdkConfig;
use futures::{stream, StreamExt};
use lexicon_cid::Cid;
use rocket::data::ToByteUnit;
use rocket::{Data, State};
use rsky_repo::block_map::BlockMap;
use rsky_repo::car::read_car_with_root;
use rsky_repo::parse::get_and_parse_record;
use rsky_repo::repo::Repo;
use rsky_repo::sync::consumer::{verify_diff, VerifyRepoInput};
use rsky_repo::types::{PreparedWrite, RecordWriteDescript, VerifiedDiff};

#[tracing::instrument(skip_all)]
#[rocket::post("/xrpc/com.atproto.repo.importRepo", data = "<blob>")]
pub async fn import_repo(
auth: AccessFullImport,
blob: Data<'_>,
s3_config: &State<SdkConfig>,
db: DbConn,
) -> Result<(), ApiError> {
let requester = auth.access.credentials.unwrap().did.unwrap();
let mut actor_store = ActorStore::new(
requester.clone(),
S3BlobStore::new(requester.clone(), s3_config),
db,
);

// Get current repo if it exists
let curr_root: Option<Cid> = actor_store.get_repo_root().await;
let curr_repo: Option<Repo> = match curr_root {
None => None,
Some(_root) => Some(Repo::load(actor_store.storage.clone(), curr_root).await?),
};

// Process imported car
let import_datastream = blob.open(100.megabytes());
let import_bytes = import_datastream.into_bytes().await.unwrap().value;

let car_with_root = match read_car_with_root(import_bytes).await {
Ok(res) => res,
Err(error) => {
return Err(ApiError::InvalidRequest(error.to_string()));
}
};

// Get verified difference from current repo and imported repo
let mut imported_blocks: BlockMap = car_with_root.blocks;
let imported_root: Cid = car_with_root.root;
let opts = VerifyRepoInput {
ensure_leaves: Some(false),
};

let diff: VerifiedDiff = match verify_diff(
curr_repo,
&mut imported_blocks,
imported_root,
None,
None,
Some(opts),
)
.await
{
Ok(res) => res,
Err(error) => {
tracing::error!("{:?}", error);
return Err(ApiError::RuntimeError);
}
};

let commit_data = diff.commit;
let prepared_writes: Vec<PreparedWrite> =
prepare_import_repo_writes(requester, diff.writes, &imported_blocks).await?;
match actor_store
.process_import_repo(commit_data, prepared_writes)
.await
{
Ok(_res) => {}
Err(error) => {
tracing::error!("Error importing repo\n{error}");
return Err(ApiError::RuntimeError);
}
}

Ok(())
}

/// Converts list of RecordWriteDescripts into a list of PreparedWrites
async fn prepare_import_repo_writes(
_did: String,
writes: Vec<RecordWriteDescript>,
blocks: &BlockMap,
) -> Result<Vec<PreparedWrite>, ApiError> {
match stream::iter(writes)
.then(|write| {
let did = _did.clone();
async move {
Ok::<PreparedWrite, anyhow::Error>(match write {
RecordWriteDescript::Create(write) => {
let parsed_record = get_and_parse_record(blocks, write.cid)?;
PreparedWrite::Create(
prepare_create(PrepareCreateOpts {
did: did.clone(),
collection: write.collection,
rkey: Some(write.rkey),
swap_cid: None,
record: parsed_record.record,
validate: Some(true),
})
.await?,
)
}
RecordWriteDescript::Update(write) => {
let parsed_record = get_and_parse_record(blocks, write.cid)?;
PreparedWrite::Update(
prepare_update(PrepareUpdateOpts {
did: did.clone(),
collection: write.collection,
rkey: write.rkey,
swap_cid: None,
record: parsed_record.record,
validate: Some(true),
})
.await?,
)
}
RecordWriteDescript::Delete(write) => {
PreparedWrite::Delete(prepare_delete(PrepareDeleteOpts {
did: did.clone(),
collection: write.collection,
rkey: write.rkey,
swap_cid: None,
})?)
}
})
}
})
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<anyhow::Result<Vec<PreparedWrite>, _>>()
{
Ok(res) => Ok(res),
Err(error) => {
tracing::error!("Error preparing import repo writes\n{error}");
Err(ApiError::RuntimeError)
}
}
}
23 changes: 22 additions & 1 deletion rsky-pds/src/auth_verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,27 @@ pub async fn access_check<'r>(
}
}

pub struct AccessFullImport {
pub access: AccessOutput,
}

#[rocket::async_trait]
impl<'r> FromRequest<'r> for AccessFullImport {
type Error = AuthError;

async fn from_request(req: &'r Request<'_>) -> Outcome<Self, Self::Error> {
let opts = ValidateAccessTokenOpts {
check_takedown: Some(true),
check_deactivated: Some(false),
};
match access_check(req, vec![AuthScope::Access], Some(opts)).await {
Outcome::Success(access) => Outcome::Success(AccessFullImport { access }),
Outcome::Error(error) => Outcome::Error(error),
Outcome::Forward(_) => panic!("Outcome::Forward returned"),
}
}
}

pub struct AccessFull {
pub access: AccessOutput,
}
Expand Down Expand Up @@ -746,7 +767,7 @@ pub async fn validate_access_token<'r>(
let found: ActorAccount = match AccountManager::get_account(
&did,
Some(AvailabilityFlags {
include_deactivated: None,
include_deactivated: Some(true),
include_taken_down: Some(true),
}),
)
Expand Down