Skip to content

Remove unnecessary tests that are flaky #6350

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 16, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ void differentNumberOfStrings_shouldCompleteSuccessfully(int numberOfStrings) th
assertThat(numRequestsInFlightSampling).contains(MAX_CONCURRENT_EXECUTIONS);
}
disposable.dispose();
numRequestsInFlightSampling.forEach(maxConcurrency -> assertThat(maxConcurrency).isLessThanOrEqualTo(MAX_CONCURRENT_EXECUTIONS));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,114 +526,6 @@ void downloadDirectory_withListObjectsRequestTransformer_transformerThrows_fails

assertThatThrownBy(downloadDirectory.completionFuture()::join).hasCause(exception);
}
@Test
void downloadDirectory_customMaxConcurrency_shouldLimitConcurrentOperations() throws Exception {
// Create many S3 objects to test concurrency
int totalFiles = 50;
String[] keys = new String[totalFiles];
for (int i = 0; i < totalFiles; i++) {
keys[i] = "file" + i + ".txt";
}
stubSuccessfulListObjects(listObjectsHelper, keys);

// Configuration with expected concurrency limit
int configuredMaxConcurrency = 5;

// Track concurrent operations
AtomicInteger currentlyActive = new AtomicInteger(0);
AtomicInteger peakConcurrency = new AtomicInteger(0);

// Use a Phaser to coordinate phases
Phaser phaser = new Phaser(1); // Start with test thread

// Mock the single download function to track concurrent operations
when(singleDownloadFunction.apply(any())).thenAnswer(invocation -> {
phaser.register(); // Each operation registers

CompletableFuture<CompletedFileDownload> future = new CompletableFuture<>();

CompletableFuture.runAsync(() -> {
try {
// Track entry
int current = currentlyActive.incrementAndGet();
peakConcurrency.updateAndGet(max -> Math.max(max, current));

// Wait at barrier - this forces all operations to be active simultaneously
phaser.arriveAndAwaitAdvance();

// Complete
future.complete(CompletedFileDownload.builder()
.response(GetObjectResponse.builder().eTag("test").build())
.build());

} catch (Exception e) {
future.completeExceptionally(e);
} finally {
currentlyActive.decrementAndGet();
phaser.arriveAndDeregister();
}
});

return new DefaultFileDownload(future,
new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder()
.transferredBytes(0L)
.build()),
() -> DownloadFileRequest.builder()
.getObjectRequest(GetObjectRequest.builder().build())
.destination(Paths.get("."))
.build(),
null);
});

// Configure with our expected limit
// To verify test works as intended, verify test failure when transferDirectoryMaxConcurrency is
// configuredMaxConcurrency + 1 or configuredMaxConcurrency - 1
TransferManagerConfiguration customConfig = TransferManagerConfiguration.builder()
.transferDirectoryMaxConcurrency(configuredMaxConcurrency)
.build();

DownloadDirectoryHelper customHelper = new DownloadDirectoryHelper(
customConfig, listObjectsHelper, singleDownloadFunction);

// Start download asynchronously
CompletableFuture<Void> downloadTask = CompletableFuture.runAsync(() -> {
DirectoryDownload download = customHelper.downloadDirectory(
DownloadDirectoryRequest.builder()
.destination(directory)
.bucket("bucket")
.build()
);
download.completionFuture().join();
});

// Wait for operations to register (but not complete the phase)
// We wait for more than the configured limit to ensure we'd catch violations
Duration maxWait = Duration.ofSeconds(5);
long deadline = System.nanoTime() + maxWait.toNanos();
int current = phaser.getRegisteredParties() - 1; // Subtract 1 for main thread
while (current < configuredMaxConcurrency) {
if (System.nanoTime() >= deadline) {
throw new AssertionError(
"Timed out waiting for registrations: current=" + current +
", configuredMaxConcurrency=" + configuredMaxConcurrency);
}
LockSupport.parkNanos(10_000_000L); // ~10 ms
current = phaser.getRegisteredParties() - 1;
}

// Check peak BEFORE releasing the phase
int observedPeak = peakConcurrency.get();
assertThat(observedPeak)
.as("Implementation allowed %d concurrent operations but was configured for %d",
observedPeak, configuredMaxConcurrency)
.isEqualTo(configuredMaxConcurrency);

// Release the phase to let operations complete
phaser.arriveAndDeregister();

// Complete the test
downloadTask.get(2, TimeUnit.SECONDS);
}

private static DefaultFileDownload completedDownload() {
return new DefaultFileDownload(CompletableFuture.completedFuture(CompletedFileDownload.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,115 +462,6 @@ public void uploadDirectory_requestTransformFunctionThrows_failsUpload() {
assertThatThrownBy(uploadFuture::join).getCause().hasCause(exception);
}

@Test
void uploadDirectory_customMaxConcurrency_shouldLimitConcurrentOperations() throws Exception {
// Create many files to test concurrency
Path testDir = jimfs.getPath("concurrencyTest");
Files.createDirectory(testDir);
int totalFiles = 50;
for (int i = 0; i < totalFiles; i++) {
Files.createFile(jimfs.getPath("concurrencyTest/file" + i + ".txt"));
}

// Configuration with expected concurrency limit
int configuredMaxConcurrency = 5;

// Track concurrent operations
AtomicInteger currentlyActive = new AtomicInteger(0);
AtomicInteger peakConcurrency = new AtomicInteger(0);

// Use a Phaser to coordinate phases
Phaser phaser = new Phaser(1); // Start with test thread

// Mock the single upload function to track concurrent operations
when(singleUploadFunction.apply(any())).thenAnswer(invocation -> {
phaser.register(); // Each operation registers

CompletableFuture<CompletedFileUpload> future = new CompletableFuture<>();

CompletableFuture.runAsync(() -> {
try {
// Track entry
int current = currentlyActive.incrementAndGet();
peakConcurrency.updateAndGet(max -> Math.max(max, current));

// Wait at barrier - this forces all operations to be active simultaneously
phaser.arriveAndAwaitAdvance();

// Complete
future.complete(CompletedFileUpload.builder()
.response(PutObjectResponse.builder().eTag("test").build())
.build());

} catch (Exception e) {
future.completeExceptionally(e);
} finally {
currentlyActive.decrementAndGet();
phaser.arriveAndDeregister();
}
});

return new DefaultFileUpload(future,
new DefaultTransferProgress(DefaultTransferProgressSnapshot.builder()
.transferredBytes(0L)
.build()),
new PauseObservable(),
UploadFileRequest.builder()
.putObjectRequest(p -> p.key("key").bucket("bucket"))
.source(Paths.get("test.txt"))
.build());
});

// Configure with our expected limit
// To verify test works as intended, verify test failure when transferDirectoryMaxConcurrency is
// configuredMaxConcurrency + 1 or configuredMaxConcurrency - 1
TransferManagerConfiguration customConfig = TransferManagerConfiguration.builder()
.transferDirectoryMaxConcurrency(configuredMaxConcurrency)
.build();

UploadDirectoryHelper customHelper = new UploadDirectoryHelper(customConfig, singleUploadFunction);

// Start upload asynchronously
CompletableFuture<Void> uploadTask = CompletableFuture.runAsync(() -> {
DirectoryUpload upload = customHelper.uploadDirectory(
UploadDirectoryRequest.builder()
.source(testDir)
.bucket("bucket")
.build()
);
upload.completionFuture().join();
});

// Wait for operations to register (but not complete the phase)
// We wait for more than the configured limit to ensure we'd catch violations
Duration maxWait = Duration.ofSeconds(5);
long deadline = System.nanoTime() + maxWait.toNanos();
int current = phaser.getRegisteredParties() - 1; // Subtract 1 for main thread
while (current < configuredMaxConcurrency) {
if (System.nanoTime() >= deadline) {
throw new AssertionError(
"Timed out waiting for registrations: current=" + current +
", configuredMaxConcurrency=" + configuredMaxConcurrency);
}
LockSupport.parkNanos(10_000_000L); // ~10 ms
current = phaser.getRegisteredParties() - 1;
}

// Check peak BEFORE releasing the phase
int observedPeak = peakConcurrency.get();
assertThat(observedPeak)
.as("Implementation allowed %d concurrent operations but was configured for %d",
observedPeak, configuredMaxConcurrency)
.isLessThanOrEqualTo(configuredMaxConcurrency);

// Release the phase to let operations complete
phaser.arriveAndDeregister();

// Complete the test
uploadTask.get(2, TimeUnit.SECONDS);
}


private DefaultFileUpload completedUpload() {
return new DefaultFileUpload(CompletableFuture.completedFuture(CompletedFileUpload.builder()
.response(PutObjectResponse.builder().build())
Expand Down
Loading