Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,16 @@
import static com.google.cloud.storage.Storage.SignUrlOption.withExtHeaders;
import static com.google.cloud.storage.Storage.SignUrlOption.withV4Signature;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.Iterables.partition;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.trino.filesystem.gcs.GcsUtils.encodedKey;
import static io.trino.filesystem.gcs.GcsUtils.getBlob;
import static io.trino.filesystem.gcs.GcsUtils.handleGcsException;
import static io.trino.filesystem.gcs.GcsUtils.keySha256Checksum;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Gatherers.windowFixed;

public class GcsFileSystem
implements TrinoFileSystem
Expand Down Expand Up @@ -164,16 +165,19 @@ public void deleteFile(Location location)
public void deleteFiles(Collection<Location> locations)
throws IOException
{
List<ListenableFuture<?>> batchFutures = new ArrayList<>();
List<ListenableFuture<?>> batchFutures = locations.stream()
.gather(windowFixed(batchSize))
.map(locationBatch -> {
StorageBatch batch = storage.batch();
for (Location location : locationBatch) {
GcsLocation gcsLocation = new GcsLocation(location);
batch.delete(BlobId.of(gcsLocation.bucket(), gcsLocation.path()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did it flow outside of the try catch block?

}
return executorService.submit(batch::submit);
})
.collect(toImmutableList());

try {
for (List<Location> locationBatch : partition(locations, batchSize)) {
StorageBatch batch = storage.batch();
for (Location location : locationBatch) {
GcsLocation gcsLocation = new GcsLocation(location);
batch.delete(BlobId.of(gcsLocation.bucket(), gcsLocation.path()));
}
batchFutures.add(executorService.submit(batch::submit));
}
getFutureValue(Futures.allAsList(batchFutures));
}
catch (RuntimeException e) {
Expand All @@ -186,16 +190,19 @@ public void deleteDirectory(Location location)
throws IOException
{
GcsLocation gcsLocation = new GcsLocation(normalizeToDirectory(location));

List<ListenableFuture<?>> batchFutures = getPage(gcsLocation).streamAll()
.gather(windowFixed(batchSize))
.map(blobBatch -> {
StorageBatch batch = storage.batch();
for (Blob blob : blobBatch) {
batch.delete(blob.getBlobId());
}
return executorService.submit(batch::submit);
})
.collect(toImmutableList());

try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here -- code moved outside of try

List<ListenableFuture<?>> batchFutures = new ArrayList<>();

for (List<Blob> blobBatch : partition(getPage(gcsLocation).iterateAll(), batchSize)) {
StorageBatch batch = storage.batch();
for (Blob blob : blobBatch) {
batch.delete(blob.getBlobId());
}
batchFutures.add(executorService.submit(batch::submit));
}
getFutureValue(Futures.allAsList(batchFutures));
}
catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@

import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.Iterables.partition;
import static com.google.common.collect.Multimaps.toMultimap;
import static io.trino.filesystem.s3.S3FileSystemConfig.S3SseType.NONE;
import static io.trino.filesystem.s3.S3SseCUtils.encoded;
import static io.trino.filesystem.s3.S3SseCUtils.md5Checksum;
import static io.trino.filesystem.s3.S3SseRequestConfigurator.setEncryptionSettings;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Gatherers.windowFixed;

final class S3FileSystem
implements TrinoFileSystem
Expand Down Expand Up @@ -190,34 +190,33 @@ private void deleteObjects(Collection<Location> locations)
String bucket = entry.getKey();
Collection<String> allKeys = entry.getValue();

for (List<String> keys : partition(allKeys, DELETE_BATCH_SIZE)) {
List<ObjectIdentifier> objects = keys.stream()
try {
allKeys.stream()
.map(key -> ObjectIdentifier.builder().key(key).build())
.toList();

DeleteObjectsRequest request = DeleteObjectsRequest.builder()
.overrideConfiguration(context::applyCredentialProviderOverride)
.requestPayer(requestPayer)
.bucket(bucket)
.delete(builder -> builder.objects(objects).quiet(true))
.build();

try {
DeleteObjectsResponse response = client.deleteObjects(request);
for (S3Error error : response.errors()) {
String filePath = "s3://%s/%s".formatted(bucket, error.key());
if (error.message() == null) {
// If the error message is null, we just use the error code
failures.put(filePath, error.code());
}
else {
failures.put(filePath, "%s (%s)".formatted(error.message(), error.code()));
}
}
}
catch (SdkException e) {
throw new TrinoFileSystemException("Error while batch deleting files", e);
}
.gather(windowFixed(DELETE_BATCH_SIZE))
.forEach(keys -> {
DeleteObjectsRequest request = DeleteObjectsRequest.builder()
.overrideConfiguration(context::applyCredentialProviderOverride)
.requestPayer(requestPayer)
.bucket(bucket)
.delete(builder -> builder.objects(keys).quiet(true))
.build();

DeleteObjectsResponse response = client.deleteObjects(request);
for (S3Error error : response.errors()) {
String filePath = "s3://%s/%s".formatted(bucket, error.key());
if (error.message() == null) {
// If the error message is null, we just use the error code
failures.put(filePath, error.code());
}
else {
failures.put(filePath, "%s (%s)".formatted(error.message(), error.code()));
}
}
});
}
catch (SdkException e) {
throw new TrinoFileSystemException("Error while batch deleting files", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@
import com.google.common.base.Splitter;
import com.google.common.collect.AbstractSequentialIterator;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.io.Closer;
import com.google.common.net.MediaType;
Expand Down Expand Up @@ -185,6 +184,7 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Gatherers.windowFixed;
import static org.apache.hadoop.fs.FSExceptionMessages.CANNOT_SEEK_PAST_EOF;
import static org.apache.hadoop.fs.FSExceptionMessages.NEGATIVE_SEEK;
import static org.apache.hadoop.fs.FSExceptionMessages.STREAM_IS_CLOSED;
Expand Down Expand Up @@ -775,10 +775,9 @@ public void deleteFiles(Collection<Path> paths)
throws IOException
{
try {
Iterable<List<Path>> partitions = Iterables.partition(paths, DELETE_BATCH_SIZE);
for (List<Path> currentBatch : partitions) {
deletePaths(currentBatch);
}
paths.stream()
.gather(windowFixed(DELETE_BATCH_SIZE))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

windowFixed involves unnecessary data copying which {Iterables,List}.partition does not, so is inferior on performance aspect

it's also not superior on readability aspect. {Iterables,List}.partition very concisely describes what it does

we should use .gather(windowFixed when we have a stream, but i don't see why we would want to use it on lists.

.forEach(this::deletePaths);
}
catch (MultiObjectDeleteException e) {
String errors = e.getErrors().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
Expand Down Expand Up @@ -43,6 +42,7 @@
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Gatherers.windowFixed;

public class ForwardingFileIo
implements SupportsBulkOperations
Expand Down Expand Up @@ -118,7 +118,8 @@ public void deleteFile(OutputFile file)
public void deleteFiles(Iterable<String> pathsToDelete)
throws BulkDeletionFailureException
{
List<Callable<Void>> tasks = Streams.stream(Iterables.partition(pathsToDelete, DELETE_BATCH_SIZE))
List<Callable<Void>> tasks = Streams.stream(pathsToDelete)
.gather(windowFixed(DELETE_BATCH_SIZE))
.map(batch -> (Callable<Void>) () -> {
deleteBatch(batch);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.plugin.pinot;

import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import io.airlift.log.Logger;
import io.trino.plugin.pinot.client.PinotClient;
Expand Down Expand Up @@ -46,6 +45,7 @@
import static io.trino.spi.ErrorType.USER_ERROR;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Gatherers.windowFixed;

public class PinotSplitManager
implements ConnectorSplitManager
Expand Down Expand Up @@ -107,9 +107,10 @@ protected void generateSegmentSplits(
hostToSegmentsMap.forEach((host, segments) -> {
int numSegmentsInThisSplit = Math.min(segments.size(), segmentsPerSplitConfigured);
// segments is already shuffled
Iterables.partition(segments, numSegmentsInThisSplit).forEach(
segmentsForThisSplit -> splits.add(
createSegmentSplit(tableNameSuffix, segmentsForThisSplit, host, timePredicate)));
segments.stream()
.gather(windowFixed(numSegmentsInThisSplit))
.map(segment -> createSegmentSplit(tableNameSuffix, segment, host, timePredicate))
.forEach(splits::add);
});
}
}
Expand Down