Skip to content

Commit 83e446d

Browse files
axl8713github-actions[bot]
authored andcommitted
Refactored tile range deletion to be fully streamed.
1 parent 5a1dbc1 commit 83e446d

File tree

2 files changed

+47
-36
lines changed

2 files changed

+47
-36
lines changed

geowebcache/azureblob/src/main/java/org/geowebcache/azure/AzureBlobStore.java

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,19 @@
1515

1616
import static com.google.common.base.Preconditions.checkNotNull;
1717
import static java.util.Objects.isNull;
18+
import static org.geowebcache.azure.DeleteManager.PAGE_SIZE;
1819

1920
import com.azure.core.util.BinaryData;
2021
import com.azure.storage.blob.models.BlobDownloadContentResponse;
2122
import com.azure.storage.blob.models.BlobItem;
2223
import com.azure.storage.blob.models.BlobProperties;
2324
import com.azure.storage.blob.models.BlobStorageException;
2425
import com.azure.storage.blob.specialized.BlockBlobClient;
25-
import com.google.common.collect.Iterators;
2626
import java.io.IOException;
2727
import java.io.InputStream;
2828
import java.io.UncheckedIOException;
2929
import java.time.OffsetDateTime;
30+
import java.util.ArrayList;
3031
import java.util.Iterator;
3132
import java.util.List;
3233
import java.util.Map;
@@ -198,26 +199,18 @@ public boolean delete(TileRange tileRange) throws StorageException {
198199
if (listeners.isEmpty()) {
199200
// if there are no listeners, don't bother requesting every tile
200201
// metadata to notify the listeners
201-
List<String> keysToDelete = blobsToDelete.map(BlobItem::getName).collect(Collectors.toList());
202-
203-
// split the iteration in parts to avoid memory accumulation
204-
Iterator<List<String>> partition = Iterators.partition(keysToDelete.iterator(), DeleteManager.PAGE_SIZE);
205-
206-
while (partition.hasNext() && !shutDown) {
207-
deleteManager.deleteParallel(partition.next());
202+
if (!shutDown) {
203+
deleteManager.deleteStreamed(blobsToDelete);
208204
}
209-
210205
} else {
211206
// if we need to gather info, we'll end up just calling "delete" on each tile
212207
// this is run here instead of inside the delete manager as we need high level info
213208
// about tiles, e.g., TileObject, to inform the listeners
214-
List<Callable<?>> tilesDeletions = blobsToDelete
215-
.map(blobItem -> {
216-
TileObject tile = createTileObject(blobItem, tileRange);
217-
tile.setParametersId(tileRange.getParametersId());
218-
return (Callable<Object>) () -> delete(tile);
219-
})
220-
.collect(Collectors.toList());
209+
Stream<Callable<?>> tilesDeletions = blobsToDelete.map(blobItem -> {
210+
TileObject tile = createTileObject(blobItem, tileRange);
211+
tile.setParametersId(tileRange.getParametersId());
212+
return () -> delete(tile);
213+
});
221214

222215
executeParallelDeletions(tilesDeletions);
223216
}
@@ -278,13 +271,18 @@ private long[] extractTileIndex(BlobItem blobItem) {
278271
};
279272
}
280273

281-
private void executeParallelDeletions(List<Callable<?>> tilesDeletions) throws StorageException {
282-
Iterator<List<Callable<?>>> tilesDeletionsPartitions =
283-
Iterators.partition(tilesDeletions.iterator(), DeleteManager.PAGE_SIZE);
274+
private void executeParallelDeletions(Stream<Callable<?>> tilesDeletions) throws StorageException {
275+
Iterator<Callable<?>> tilesDeletionsIterator = tilesDeletions.iterator();
276+
277+
while (tilesDeletionsIterator.hasNext() && !shutDown) {
278+
279+
// once a page of callables is ready, run them in parallel on the delete manager
280+
List<Callable<?>> callables = new ArrayList<>(PAGE_SIZE);
281+
for (int i = 0; i < PAGE_SIZE && tilesDeletionsIterator.hasNext(); i++) {
282+
callables.add(tilesDeletionsIterator.next());
283+
}
284284

285-
// once a page of callables is ready, run them in parallel on the delete manager
286-
while (tilesDeletionsPartitions.hasNext() && !shutDown) {
287-
deleteManager.executeParallel(tilesDeletionsPartitions.next());
285+
deleteManager.executeParallel(callables);
288286
}
289287
}
290288

geowebcache/azureblob/src/main/java/org/geowebcache/azure/DeleteManager.java

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.time.OffsetDateTime;
2727
import java.util.ArrayList;
2828
import java.util.HashSet;
29+
import java.util.Iterator;
2930
import java.util.List;
3031
import java.util.Map;
3132
import java.util.Properties;
@@ -39,6 +40,7 @@
3940
import java.util.logging.Level;
4041
import java.util.logging.Logger;
4142
import java.util.stream.Collectors;
43+
import java.util.stream.Stream;
4244
import org.geotools.util.logging.Logging;
4345
import org.geowebcache.GeoWebCacheException;
4446
import org.geowebcache.locks.LockProvider;
@@ -111,12 +113,12 @@ public void executeParallel(List<Callable<?>> callables) throws StorageException
111113
}
112114
}
113115

114-
/** Executes the removal of the specified keys in a parallel fashion, returning the number of removed keys */
115-
public Long deleteParallel(List<String> keys) throws StorageException {
116+
/** Executes the removal of the specified blobs in a streamed fashion, returning the number of removed keys */
117+
public Long deleteStreamed(Stream<BlobItem> blobs) throws StorageException {
116118
try {
117-
return new KeysBulkDelete(keys).call();
119+
return new KeysBulkDelete(blobs.map(BlobItem::getName)).call();
118120
} catch (Exception e) {
119-
throw new StorageException("Failed to submit parallel keys execution", e);
121+
throw new StorageException("Failed to submit keys deletions", e);
120122
}
121123
}
122124

@@ -305,29 +307,40 @@ private void clearPendingBulkDelete(final String prefix, final long timestamp) t
305307

306308
public class KeysBulkDelete implements Callable<Long> {
307309

308-
private final List<String> keys;
310+
private final Stream<String> keyStream;
309311

310-
public KeysBulkDelete(List<String> keys) {
311-
this.keys = keys;
312+
public KeysBulkDelete(Stream<String> keyStream) {
313+
this.keyStream = keyStream;
312314
}
313315

314316
@Override
315317
public Long call() throws Exception {
316318
long count = 0L;
317319
try {
318320
checkInterrupted();
319-
if (LOG.isLoggable(Level.FINER)) {
320-
LOG.finer("Running delete delete on list of items on '%s':%s ... (only the first 100 items listed)"
321-
.formatted(client.getContainerName(), keys.subList(0, Math.min(keys.size(), 100))));
322-
}
323321

324322
BlobContainerClient container = client.getContainer();
325323
BlobBatchClient batch = client.getBatch();
326324

327-
for (int i = 0; i < keys.size(); i += PAGE_SIZE) {
328-
count += deleteItems(container, batch, keys.subList(i, Math.min(i + PAGE_SIZE, keys.size())));
329-
}
325+
Iterator<String> keysIterator = keyStream.iterator();
326+
327+
while (keysIterator.hasNext()) {
328+
329+
List<String> keys = new ArrayList<>(PAGE_SIZE);
330+
for (int i = 0; i < PAGE_SIZE && keysIterator.hasNext(); i++) {
331+
keys.add(keysIterator.next());
332+
}
333+
334+
if (LOG.isLoggable(Level.FINER)) {
335+
LOG.finer(
336+
"Running delete delete on list of items on '%s':%s ... (only the first 100 items listed)"
337+
.formatted(
338+
client.getContainerName(),
339+
keys.subList(0, Math.min(keys.size(), 100))));
340+
}
330341

342+
count += deleteItems(container, batch, keys);
343+
}
331344
} catch (InterruptedException | IllegalStateException e) {
332345
LOG.log(Level.INFO, "Azure bulk delete aborted", e);
333346
throw e;

0 commit comments

Comments
 (0)