Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import java.util.Locale;

import static org.elasticsearch.repositories.s3.S3Service.REPOSITORY_S3_CAS_ANTI_CONTENTION_DELAY_SETTING;
import static org.elasticsearch.repositories.s3.S3Service.REPOSITORY_S3_CAS_TTL_SETTING;

@ThreadLeakFilters(filters = { TestContainersThreadFilter.class })
@ThreadLeakScope(ThreadLeakScope.Scope.NONE) // https://github.com/elastic/elasticsearch/issues/102482
public class RepositoryS3MinioBasicCredentialsRestIT extends AbstractRepositoryS3RestTestCase {
Expand All @@ -39,6 +42,10 @@ public class RepositoryS3MinioBasicCredentialsRestIT extends AbstractRepositoryS
.keystore("s3.client." + CLIENT + ".access_key", ACCESS_KEY)
.keystore("s3.client." + CLIENT + ".secret_key", SECRET_KEY)
.setting("s3.client." + CLIENT + ".endpoint", minioFixture::getAddress)
// Skip listing of pre-existing uploads during a CAS because MinIO sometimes leaks them; also reduce the delay before proceeding
// TODO do not set these if running a MinIO version in which https://github.com/minio/minio/issues/21189 is fixed
.setting(REPOSITORY_S3_CAS_TTL_SETTING.getKey(), "-1")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting this to -1 (or 0) even without the production code changes would have had pretty much the desired effect anyway because it would make ES consider all the existing uploads as stale, letting it proceed with the CAS. The production code changes just make this behaviour a little more precise, avoiding questions of clock skew and suppressing warnings about the stale uploads etc.

.setting(REPOSITORY_S3_CAS_ANTI_CONTENTION_DELAY_SETTING.getKey(), "100ms")
.build();

@ClassRule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,11 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
* @return {@code true} if there are already ongoing uploads, so we should not proceed with the operation
*/
private boolean hasPreexistingUploads() {
final var timeToLiveMillis = blobStore.getCompareAndExchangeTimeToLive().millis();
if (timeToLiveMillis < 0) {
return false; // proceed always
}

final var uploads = listMultipartUploads();
logUploads("preexisting uploads", uploads);

Expand All @@ -850,11 +855,7 @@ private boolean hasPreexistingUploads() {
return false;
}

final var expiryDate = Date.from(
Instant.ofEpochMilli(
blobStore.getThreadPool().absoluteTimeInMillis() - blobStore.getCompareAndExchangeTimeToLive().millis()
)
);
final var expiryDate = Date.from(Instant.ofEpochMilli(blobStore.getThreadPool().absoluteTimeInMillis() - timeToLiveMillis));
if (uploads.stream().anyMatch(upload -> upload.getInitiated().after(expiryDate))) {
logger.trace("[{}] fresh preexisting uploads vs {}", blobKey, expiryDate);
return true;
Expand Down Expand Up @@ -929,7 +930,7 @@ private int getUploadIndex(String targetUploadId, List<MultipartUpload> multipar
final var currentTimeMillis = blobStore.getThreadPool().absoluteTimeInMillis();
final var ageMillis = currentTimeMillis - multipartUpload.getInitiated().toInstant().toEpochMilli();
final var expectedAgeRangeMillis = blobStore.getCompareAndExchangeTimeToLive().millis();
if (ageMillis < -expectedAgeRangeMillis || ageMillis > expectedAgeRangeMillis) {
if (0 <= expectedAgeRangeMillis && (ageMillis < -expectedAgeRangeMillis || ageMillis > expectedAgeRangeMillis)) {
logger.warn(
"""
compare-and-exchange of blob [{}:{}] was initiated at [{}={}] \
Expand Down