Skip to content

Commit 3f610d5

Browse files
Remove polling for object creation and deletion in S3 and GCS. (#5688)
After uploading (or deleting) an object to S3 or GCS[^azure], we check in a loop whether the uploaded object exists (or not), until we receive a positive response. This was done purportedly to work around the eventual consistency in the storage backend. However, there are some problems with this: * GCS, and many known S3 implementations (Amazon S3, [Cloudflare R2](https://developers.cloudflare.com/r2/reference/consistency/), GCS' S3 API, [MinIO](minio/minio#11389 (comment)), [Oracle Cloud](https://docs.oracle.com/en-us/iaas/Content/Object/Concepts/objectstorageoverview.htm#features), [Wasabi](https://docs.wasabi.com/docs/what-data-consistency-model-does-wasabi-employ), [Ceph](https://openmetal.io/resources/blog/ceph-replication-and-consistency-model-explained/#consistency), [Backblaze](https://www.backblaze.com/blog/design-thinking-b2-apis-the-hidden-costs-of-s3-compatibility/#:~:text=The%20B2%20architecture%20offers%20what%20one%20could%20consider%20%E2%80%9Cstrong%20consistency.%E2%80%9D)) are now strongly consistent, making this check add an unnecessary roundtrip. * [This paragraph](https://www.allthingsdistributed.com/2021/04/s3-strong-consistency.html#:~:text=For%20S3%2C%20we,copies%20in%202006.) makes me think that when Amazon S3 was eventually consistent, one client might have observed that the object exists, but another might not. Checking that the object is locally visible to the client that uploaded it, is not very useful, given object storage's distributed nature. * https://github.com/Netflix/s3mper was made to work around S3's eventual consistency, but it relied on DynamoDB. I suspect it's not possible in general to add strong consistency to an eventually consistent interface, using only that interface. [^azure]: Not to Azure since the new SDK rewrite. --- TYPE: IMPROVEMENT DESC: Reduced latency when uploading objects on S3 and GCS
1 parent 735ba67 commit 3f610d5

File tree

4 files changed

+2
-145
lines changed

4 files changed

+2
-145
lines changed

tiledb/sm/filesystem/gcs.cc

Lines changed: 2 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -394,8 +394,6 @@ void GCS::remove_file(const URI& uri) const {
394394
"Delete object failed on: " + uri.to_string() + " (" +
395395
status.message() + ")");
396396
}
397-
398-
throw_if_not_ok(wait_for_object_to_be_deleted(bucket_name, object_path));
399397
}
400398

401399
void GCS::remove_dir(const URI& uri) const {
@@ -744,43 +742,7 @@ Status GCS::copy_object(const URI& old_uri, const URI& new_uri) const {
744742
status.message() + ")");
745743
}
746744

747-
return wait_for_object_to_propagate(new_bucket_name, new_object_path);
748-
}
749-
750-
Status GCS::wait_for_object_to_propagate(
751-
const std::string& bucket_name, const std::string& object_path) const {
752-
RETURN_NOT_OK(init_client());
753-
754-
unsigned attempts = 0;
755-
while (attempts++ < constants::gcs_max_attempts) {
756-
if (this->is_object(bucket_name, object_path)) {
757-
return Status::Ok();
758-
}
759-
760-
std::this_thread::sleep_for(
761-
std::chrono::milliseconds(constants::gcs_attempt_sleep_ms));
762-
}
763-
764-
throw GCSException(
765-
"Timed out waiting on object to propogate: " + object_path);
766-
}
767-
768-
Status GCS::wait_for_object_to_be_deleted(
769-
const std::string& bucket_name, const std::string& object_path) const {
770-
RETURN_NOT_OK(init_client());
771-
772-
unsigned attempts = 0;
773-
while (attempts++ < constants::gcs_max_attempts) {
774-
if (!this->is_object(bucket_name, object_path)) {
775-
return Status::Ok();
776-
}
777-
778-
std::this_thread::sleep_for(
779-
std::chrono::milliseconds(constants::gcs_attempt_sleep_ms));
780-
}
781-
782-
throw GCSException(
783-
"Timed out waiting on object to be deleted: " + object_path);
745+
return Status::Ok();
784746
}
785747

786748
Status GCS::wait_for_bucket_to_propagate(const std::string& bucket_name) const {
@@ -1195,23 +1157,8 @@ void GCS::flush(const URI& uri, bool) {
11951157
std::string object_path;
11961158
throw_if_not_ok(parse_gcs_uri(uri, &bucket_name, &object_path));
11971159

1198-
// Wait for the last written part to propogate to ensure all parts
1199-
// are available for composition into a single object.
1200-
std::string last_part_path = part_paths.back();
1201-
const Status st = wait_for_object_to_propagate(bucket_name, last_part_path);
1202-
state->update_st(st);
12031160
state_lck.unlock();
12041161

1205-
if (!st.ok()) {
1206-
// Delete all outstanding part objects.
1207-
delete_parts(bucket_name, part_paths);
1208-
1209-
// Release all instance state associated with this part list
1210-
// transactions.
1211-
finish_multi_part_upload(uri);
1212-
return;
1213-
}
1214-
12151162
// Build a list of objects to compose.
12161163
std::vector<google::cloud::storage::ComposeSourceObject> source_objects;
12171164
source_objects.reserve(part_paths.size());
@@ -1252,8 +1199,6 @@ void GCS::flush(const URI& uri, bool) {
12521199
"Compse object failed on: " + uri.to_string() + " (" +
12531200
status.message() + ")");
12541201
}
1255-
1256-
throw_if_not_ok(wait_for_object_to_propagate(bucket_name, object_path));
12571202
}
12581203

12591204
void GCS::delete_parts(
@@ -1334,7 +1279,7 @@ Status GCS::flush_object_direct(const URI& uri) {
13341279
")");
13351280
}
13361281

1337-
return wait_for_object_to_propagate(bucket_name, object_path);
1282+
return Status::Ok();
13381283
}
13391284

13401285
uint64_t GCS::read(

tiledb/sm/filesystem/gcs.h

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -651,28 +651,6 @@ class GCS : public FilesystemBase {
651651
*/
652652
Status copy_object(const URI& old_uri, const URI& new_uri) const;
653653

654-
/**
655-
* Waits for a object with `bucket_name` and `object_path`
656-
* to exist on GCS.
657-
*
658-
* @param bucket_name The object's bucket name.
659-
* @param object_path The object's path
660-
* @return Status
661-
*/
662-
Status wait_for_object_to_propagate(
663-
const std::string& bucket_name, const std::string& object_path) const;
664-
665-
/**
666-
* Waits for a object with `bucket_name` and `object_path`
667-
* to not exist on GCS.
668-
*
669-
* @param bucket_name The object's bucket name.
670-
* @param object_path The object's path
671-
* @return Status
672-
*/
673-
Status wait_for_object_to_be_deleted(
674-
const std::string& bucket_name, const std::string& object_path) const;
675-
676654
/**
677655
* Waits for a bucket with `bucket_name`
678656
* to exist on GCS.

tiledb/sm/filesystem/s3.cc

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -612,9 +612,6 @@ void S3::touch(const URI& uri) const {
612612
std::string("Cannot touch object '") + uri.c_str() +
613613
outcome_error_message(put_object_outcome));
614614
}
615-
616-
throw_if_not_ok(wait_for_object_to_propagate(
617-
put_object_request.GetBucket(), put_object_request.GetKey()));
618615
}
619616

620617
void S3::write(
@@ -699,9 +696,6 @@ void S3::remove_file(const URI& uri) const {
699696
std::string("Failed to delete S3 object '") + uri.c_str() +
700697
outcome_error_message(delete_object_outcome));
701698
}
702-
703-
throw_if_not_ok(wait_for_object_to_be_deleted(
704-
delete_object_request.GetBucket(), delete_object_request.GetKey()));
705699
}
706700

707701
std::vector<directory_entry> S3::ls_with_sizes(const URI& parent) const {
@@ -832,8 +826,6 @@ void S3::flush(const URI& uri, bool finalize) {
832826
// It is safe to unlock the state here
833827
state_lck.unlock();
834828

835-
throw_if_not_ok(wait_for_object_to_propagate(move(bucket), move(key)));
836-
837829
throw_if_not_ok(finish_flush_object(std::move(outcome), uri, buff, false));
838830
} else {
839831
Aws::S3::Model::AbortMultipartUploadRequest abort_request =
@@ -917,8 +909,6 @@ void S3::finalize_and_flush_object(const URI& uri) {
917909
std::string("Failed to flush S3 object ") + uri.c_str() +
918910
outcome_error_message(outcome)};
919911
}
920-
921-
throw_if_not_ok(wait_for_object_to_propagate(state.bucket, state.key));
922912
} else {
923913
Aws::S3::Model::AbortMultipartUploadRequest abort_request =
924914
make_multipart_abort_request(state);
@@ -1567,9 +1557,6 @@ Status S3::copy_object(const URI& old_uri, const URI& new_uri) const {
15671557
new_uri.c_str() + outcome_error_message(copy_object_outcome)));
15681558
}
15691559

1570-
throw_if_not_ok(wait_for_object_to_propagate(
1571-
copy_object_request.GetBucket(), copy_object_request.GetKey()));
1572-
15731560
return Status::Ok();
15741561
}
15751562

@@ -1660,48 +1647,6 @@ Status S3::initiate_multipart_request(
16601647
return Status::Ok();
16611648
}
16621649

1663-
Status S3::wait_for_object_to_propagate(
1664-
const Aws::String& bucket_name, const Aws::String& object_key) const {
1665-
throw_if_not_ok(init_client());
1666-
1667-
unsigned attempts_cnt = 0;
1668-
while (attempts_cnt++ < constants::s3_max_attempts) {
1669-
bool exists;
1670-
RETURN_NOT_OK(is_object(bucket_name, object_key, &exists));
1671-
if (exists) {
1672-
return Status::Ok();
1673-
}
1674-
1675-
std::this_thread::sleep_for(
1676-
std::chrono::milliseconds(constants::s3_attempt_sleep_ms));
1677-
}
1678-
1679-
return LOG_STATUS(Status_S3Error(
1680-
"Failed waiting for object " +
1681-
std::string(object_key.c_str(), object_key.size()) + " to be created."));
1682-
}
1683-
1684-
Status S3::wait_for_object_to_be_deleted(
1685-
const Aws::String& bucket_name, const Aws::String& object_key) const {
1686-
throw_if_not_ok(init_client());
1687-
1688-
unsigned attempts_cnt = 0;
1689-
while (attempts_cnt++ < constants::s3_max_attempts) {
1690-
bool exists;
1691-
RETURN_NOT_OK(is_object(bucket_name, object_key, &exists));
1692-
if (!exists) {
1693-
return Status::Ok();
1694-
}
1695-
1696-
std::this_thread::sleep_for(
1697-
std::chrono::milliseconds(constants::s3_attempt_sleep_ms));
1698-
}
1699-
1700-
return LOG_STATUS(Status_S3Error(
1701-
"Failed waiting for object " +
1702-
std::string(object_key.c_str(), object_key.size()) + " to be deleted."));
1703-
}
1704-
17051650
Status S3::wait_for_bucket_to_be_created(const URI& bucket_uri) const {
17061651
throw_if_not_ok(init_client());
17071652

@@ -1821,9 +1766,6 @@ void S3::write_direct(const URI& uri, const void* buffer, uint64_t length) {
18211766
std::string("Cannot write object '") + uri.c_str() +
18221767
outcome_error_message(put_object_outcome));
18231768
}
1824-
1825-
throw_if_not_ok(wait_for_object_to_propagate(
1826-
put_object_request.GetBucket(), put_object_request.GetKey()));
18271769
}
18281770

18291771
Status S3::write_multipart(

tiledb/sm/filesystem/s3.h

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1436,14 +1436,6 @@ class S3 : public FilesystemBase {
14361436
Status initiate_multipart_request(
14371437
Aws::Http::URI aws_uri, MultiPartUploadState* state);
14381438

1439-
/** Waits for the input object to be propagated. */
1440-
Status wait_for_object_to_propagate(
1441-
const Aws::String& bucketName, const Aws::String& objectKey) const;
1442-
1443-
/** Waits for the input object to be deleted. */
1444-
Status wait_for_object_to_be_deleted(
1445-
const Aws::String& bucketName, const Aws::String& objectKey) const;
1446-
14471439
/** Waits for the bucket to be created. */
14481440
Status wait_for_bucket_to_be_created(const URI& bucket_uri) const;
14491441

0 commit comments

Comments
 (0)