Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

import java.util.Locale;

import static org.elasticsearch.repositories.s3.S3Service.REPOSITORY_S3_CAS_ANTI_CONTENTION_DELAY_SETTING;
import static org.elasticsearch.repositories.s3.S3Service.REPOSITORY_S3_CAS_TTL_SETTING;

@ThreadLeakFilters(filters = { TestContainersThreadFilter.class })
@ThreadLeakScope(ThreadLeakScope.Scope.NONE) // https://github.com/elastic/elasticsearch/issues/102482
public class RepositoryS3MinioBasicCredentialsRestIT extends AbstractRepositoryS3RestTestCase {
Expand All @@ -39,6 +42,10 @@ public class RepositoryS3MinioBasicCredentialsRestIT extends AbstractRepositoryS
.keystore("s3.client." + CLIENT + ".access_key", ACCESS_KEY)
.keystore("s3.client." + CLIENT + ".secret_key", SECRET_KEY)
.setting("s3.client." + CLIENT + ".endpoint", minioFixture::getAddress)
// Skip listing of pre-existing uploads during a CAS because MinIO sometimes leaks them; also reduce the delay before proceeding
// TODO do not set these if running a MinIO version in which https://github.com/minio/minio/issues/21189 is fixed
.setting(REPOSITORY_S3_CAS_TTL_SETTING.getKey(), "-1")
.setting(REPOSITORY_S3_CAS_ANTI_CONTENTION_DELAY_SETTING.getKey(), "100ms")
.build();

@ClassRule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,11 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
* @return {@code true} if there are already ongoing uploads, so we should not proceed with the operation
*/
private boolean hasPreexistingUploads() {
final var timeToLiveMillis = blobStore.getCompareAndExchangeTimeToLive().millis();
if (timeToLiveMillis < 0) {
return false; // proceed always
}

final var uploads = listMultipartUploads();
logUploads("preexisting uploads", uploads);

Expand All @@ -709,11 +714,7 @@ private boolean hasPreexistingUploads() {
return false;
}

final var expiryDate = Date.from(
Instant.ofEpochMilli(
blobStore.getThreadPool().absoluteTimeInMillis() - blobStore.getCompareAndExchangeTimeToLive().millis()
)
);
final var expiryDate = Date.from(Instant.ofEpochMilli(blobStore.getThreadPool().absoluteTimeInMillis() - timeToLiveMillis));
if (uploads.stream().anyMatch(upload -> upload.getInitiated().after(expiryDate))) {
logger.trace("[{}] fresh preexisting uploads vs {}", blobKey, expiryDate);
return true;
Expand Down Expand Up @@ -788,7 +789,7 @@ private int getUploadIndex(String targetUploadId, List<MultipartUpload> multipar
final var currentTimeMillis = blobStore.getThreadPool().absoluteTimeInMillis();
final var ageMillis = currentTimeMillis - multipartUpload.getInitiated().toInstant().toEpochMilli();
final var expectedAgeRangeMillis = blobStore.getCompareAndExchangeTimeToLive().millis();
if (ageMillis < -expectedAgeRangeMillis || ageMillis > expectedAgeRangeMillis) {
if (0 <= expectedAgeRangeMillis && (ageMillis < -expectedAgeRangeMillis || ageMillis > expectedAgeRangeMillis)) {
logger.warn(
"""
compare-and-exchange of blob [{}:{}] was initiated at [{}={}] \
Expand Down