Skip to content

Commit ab44cdd

Browse files
committed
Fixed multiple calls to listener.
Added simplest bounded delete. Added test for bounded delete. Added test to check if TileDeleted events are received when a layer is deleted. There is a race in this test, so it can pass even though tileDeleted is sent.
1 parent e0d205a commit ab44cdd

File tree

8 files changed

+301
-65
lines changed

8 files changed

+301
-65
lines changed

geowebcache/core/src/main/java/org/geowebcache/util/TMSKeyBuilder.java

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -127,18 +127,6 @@ public String forTile(TileObject obj) {
127127
return key;
128128
}
129129

130-
public static String buildParametersId(TileObject obj) {
131-
String parametersId;
132-
Map<String, String> parameters = obj.getParameters();
133-
parametersId = ParametersUtils.getId(parameters);
134-
if (parametersId == null) {
135-
parametersId = "default";
136-
} else {
137-
obj.setParametersId(parametersId);
138-
}
139-
return parametersId;
140-
}
141-
142130
public String forLocation(String prefix, long[] loc, MimeType mime) {
143131
Long x = loc[0];
144132
Long y = loc[1];
@@ -209,13 +197,6 @@ public String coordinatesPrefix(TileRange obj, boolean endWithSlash) {
209197
String gridset = obj.getGridSetId();
210198
MimeType mimeType = obj.getMimeType();
211199

212-
String parametersId = parametersFromTileRange(obj);
213-
String shortFormat = mimeType.getFileExtension(); // png, png8, png24, etc
214-
215-
return join(endWithSlash, prefix, layer, gridset, shortFormat, parametersId);
216-
}
217-
218-
private static String parametersFromTileRange(TileRange obj) {
219200
String parametersId = obj.getParametersId();
220201
if (parametersId == null) {
221202
Map<String, String> parameters = obj.getParameters();
@@ -226,7 +207,10 @@ private static String parametersFromTileRange(TileRange obj) {
226207
obj.setParametersId(parametersId);
227208
}
228209
}
229-
return parametersId;
210+
String shortFormat = mimeType.getFileExtension(); // png, png8, png24, etc
211+
212+
String key = join(endWithSlash, prefix, layer, gridset, shortFormat, parametersId);
213+
return key;
230214
}
231215

232216
public String pendingDeletes() {
@@ -248,6 +232,20 @@ private static String join(boolean closing, Object... elements) {
248232
return joiner.toString();
249233
}
250234

235+
private static String parametersFromTileRange(TileRange obj) {
236+
String parametersId = obj.getParametersId();
237+
if (parametersId == null) {
238+
Map<String, String> parameters = obj.getParameters();
239+
parametersId = ParametersUtils.getId(parameters);
240+
if (parametersId == null) {
241+
parametersId = "default";
242+
} else {
243+
obj.setParametersId(parametersId);
244+
}
245+
}
246+
return parametersId;
247+
}
248+
251249
public String forZoomLevel(TileRange tileRange, int level) {
252250
String layerId = layerId(tileRange.getLayerName());
253251
String gridsetId = tileRange.getGridSetId();

geowebcache/s3storage/Readme.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
Tidy up aws after working with tests
2+
===
3+
4+
```
5+
aws s3 ls s3://<bucket>/ | grep tmp_ | awk '{print $2}' | while read obj; do
6+
echo "Object: $obj"
7+
aws s3 rm s3://gwc-s3-test/$obj --recursive
8+
done
9+
</code>
10+
```
11+
12+
Replace the `<bucket>` with the value configured in your system.
13+
This will delete all the temporary object that have been created
14+
15+
16+
Config file
17+
====
18+
Add a `.gwc_s3_tests.properties` to your home directory to get the integration tests to run.
19+
20+
```
21+
cat .gwc_s3_tests.properties
22+
```
23+
_contents of file_
24+
25+
```
26+
bucket=gwc-s3-test
27+
secretKey=lxL*****************************
28+
accessKey=AK***************```
29+
30+
```

geowebcache/s3storage/src/main/java/org/geowebcache/s3/S3BlobStore.java

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import static com.google.common.base.Preconditions.checkArgument;
1717
import static com.google.common.base.Preconditions.checkNotNull;
18+
import static java.lang.String.format;
1819
import static java.util.Objects.isNull;
1920

2021
import com.amazonaws.AmazonServiceException;
@@ -44,6 +45,8 @@
4445
import java.util.Set;
4546
import java.util.logging.Level;
4647
import java.util.logging.Logger;
48+
import java.util.regex.Matcher;
49+
import java.util.regex.Pattern;
4750
import java.util.stream.Collectors;
4851
import java.util.stream.IntStream;
4952
import javax.annotation.Nullable;
@@ -56,6 +59,7 @@
5659
import org.geowebcache.locks.LockProvider;
5760
import org.geowebcache.mime.MimeException;
5861
import org.geowebcache.mime.MimeType;
62+
import org.geowebcache.s3.streams.TileDeletionListenerNotifier;
5963
import org.geowebcache.storage.BlobStore;
6064
import org.geowebcache.storage.BlobStoreListener;
6165
import org.geowebcache.storage.BlobStoreListenerList;
@@ -184,7 +188,6 @@ public boolean removeListener(BlobStoreListener listener) {
184188

185189
@Override
186190
public void put(TileObject obj) throws StorageException {
187-
TMSKeyBuilder.buildParametersId(obj);
188191
final Resource blob = obj.getBlob();
189192
checkNotNull(blob);
190193
checkNotNull(obj.getBlobFormat());
@@ -295,7 +298,9 @@ public boolean delete(final TileRange tileRange) throws StorageException {
295298
}
296299

297300
private String scheduleDeleteForZoomLevel(TileRange tileRange, int level) {
298-
String prefix = keyBuilder.forZoomLevel(tileRange, level);
301+
String zoomPath = keyBuilder.forZoomLevel(tileRange, level);
302+
Bounds bounds = new Bounds(tileRange.rangeBounds(level));
303+
String prefix = format("%s?%s", zoomPath, bounds);
299304
try {
300305
s3Ops.scheduleAsyncDelete(prefix);
301306
return prefix;
@@ -468,4 +473,55 @@ public Map<String, Optional<Map<String, String>>> getParametersMapping(String la
468473
.map(props -> (Map<String, String>) (Map<?, ?>) props)
469474
.collect(Collectors.toMap(ParametersUtils::getId, Optional::of));
470475
}
476+
477+
public static class Bounds {
478+
private static final Pattern boundsRegex =
479+
Pattern.compile("^(?<prefix>.*/)\\?bounds=(?<minx>\\d+),(?<miny>\\d+),(?<maxx>\\d+),(?<maxy>\\d+)$");
480+
private final long minX, minY, maxX, maxY;
481+
482+
public Bounds(long[] bound) {
483+
minX = Math.min(bound[0], bound[2]);
484+
minY = Math.min(bound[1], bound[3]);
485+
maxX = Math.max(bound[0], bound[2]);
486+
maxY = Math.max(bound[1], bound[3]);
487+
}
488+
489+
static Optional<Bounds> createBounds(String prefix) {
490+
Matcher matcher = boundsRegex.matcher(prefix);
491+
if (!matcher.matches()) {
492+
return Optional.empty();
493+
}
494+
495+
Bounds bounds = new Bounds(new long[] {
496+
Long.parseLong(matcher.group("minx")),
497+
Long.parseLong(matcher.group("miny")),
498+
Long.parseLong(matcher.group("maxx")),
499+
Long.parseLong(matcher.group("maxy"))
500+
});
501+
return Optional.of(bounds);
502+
}
503+
504+
static String prefixWithoutBounds(String prefix) {
505+
Matcher matcher = boundsRegex.matcher(prefix);
506+
if (matcher.matches()) {
507+
return matcher.group("prefix");
508+
}
509+
return prefix;
510+
}
511+
512+
@Override
513+
public String toString() {
514+
return format("bounds=%d,%d,%d,%d", minX, minY, maxX, maxY);
515+
}
516+
517+
public boolean predicate(S3ObjectSummary s3ObjectSummary) {
518+
var matcher = TileDeletionListenerNotifier.keyRegex.matcher(s3ObjectSummary.getKey());
519+
if (!matcher.matches()) {
520+
return false;
521+
}
522+
long x = Long.parseLong(matcher.group(TileDeletionListenerNotifier.X_GROUP_POS));
523+
long y = Long.parseLong(matcher.group(TileDeletionListenerNotifier.Y_GROUP_POS));
524+
return x >= minX && x <= maxX && y >= minY && y <= maxY;
525+
}
526+
}
471527
}

geowebcache/s3storage/src/main/java/org/geowebcache/s3/S3Ops.java

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
*/
1414
package org.geowebcache.s3;
1515

16+
import static org.geowebcache.s3.S3BlobStore.Bounds.prefixWithoutBounds;
17+
1618
import com.amazonaws.services.s3.AmazonS3;
1719
import com.amazonaws.services.s3.AmazonS3Client;
1820
import com.amazonaws.services.s3.iterable.S3Objects;
@@ -39,8 +41,8 @@
3941
import java.util.concurrent.ExecutorService;
4042
import java.util.concurrent.Executors;
4143
import java.util.concurrent.ThreadFactory;
44+
import java.util.function.Consumer;
4245
import java.util.function.Predicate;
43-
import java.util.function.Supplier;
4446
import java.util.logging.Logger;
4547
import java.util.stream.Stream;
4648
import java.util.stream.StreamSupport;
@@ -50,16 +52,18 @@
5052
import org.geowebcache.locks.LockProvider;
5153
import org.geowebcache.locks.LockProvider.Lock;
5254
import org.geowebcache.locks.NoOpLockProvider;
55+
import org.geowebcache.s3.S3BlobStore.Bounds;
5356
import org.geowebcache.s3.streams.BatchingIterator;
5457
import org.geowebcache.s3.streams.DeleteBatchesOfS3Objects;
5558
import org.geowebcache.s3.streams.S3ObjectForPrefixSupplier;
56-
import org.geowebcache.s3.streams.TileListenerNotifier;
59+
import org.geowebcache.s3.streams.TileDeletionListenerNotifier;
5760
import org.geowebcache.storage.BlobStoreListenerList;
5861
import org.geowebcache.storage.StorageException;
5962
import org.geowebcache.util.TMSKeyBuilder;
6063

6164
class S3Ops {
6265
private static final int BATCH_SIZE = 1000;
66+
public static final Consumer<List<S3ObjectSummary>> NO_OPERATION_POST_PROCESSOR = list -> {};
6367
private final AmazonS3Client conn;
6468

6569
private final String bucketName;
@@ -205,8 +209,10 @@ private synchronized boolean asyncDelete(final String prefix, final long timesta
205209
return false;
206210
}
207211

208-
TileListenerNotifier tileListenerNotifier = new TileListenerNotifier(listeners, keyBuilder, S3BlobStore.log);
209-
BulkDelete task = new BulkDelete(conn, bucketName, prefix, timestamp, S3BlobStore.log, tileListenerNotifier);
212+
TileDeletionListenerNotifier tileDeletionListenerNotifier =
213+
new TileDeletionListenerNotifier(listeners, keyBuilder, S3BlobStore.log);
214+
BulkDelete task =
215+
new BulkDelete(conn, bucketName, prefix, timestamp, S3BlobStore.log, tileDeletionListenerNotifier);
210216
deleteExecutorService.submit(task);
211217
pendingDeletesKeyTime.put(prefix, timestamp);
212218

@@ -302,10 +308,15 @@ public byte[] getBytes(String key) throws StorageException {
302308

303309
/** Simply checks if there are objects starting with {@code prefix} */
304310
public boolean prefixExists(String prefix) {
305-
boolean hasNext = S3Objects.withPrefix(conn, bucketName, prefix)
311+
String prefixWithoutBounds = prefixWithoutBounds(prefix);
312+
boolean hasNext = S3Objects.withPrefix(conn, bucketName, prefixWithoutBounds)
306313
.withBatchSize(1)
307314
.iterator()
308315
.hasNext();
316+
317+
if (!hasNext) {
318+
S3BlobStore.log.info("No prefix exists for " + prefixWithoutBounds);
319+
}
309320
return hasNext;
310321
}
311322

@@ -361,21 +372,21 @@ public class BulkDelete implements Callable<Long> {
361372

362373
private final String bucketName;
363374
private final Logger logger;
364-
private final TileListenerNotifier tileListenerNotifier;
375+
private final TileDeletionListenerNotifier tileDeletionListenerNotifier;
365376

366377
public BulkDelete(
367378
final AmazonS3 conn,
368379
final String bucketName,
369380
final String prefix,
370381
final long timestamp,
371382
final Logger logger,
372-
TileListenerNotifier tileListenerNotifier) {
383+
TileDeletionListenerNotifier tileDeletionListenerNotifier) {
373384
this.conn = conn;
374385
this.bucketName = bucketName;
375386
this.prefix = prefix;
376387
this.timestamp = timestamp;
377388
this.logger = logger;
378-
this.tileListenerNotifier = tileListenerNotifier;
389+
this.tileDeletionListenerNotifier = tileDeletionListenerNotifier;
379390
}
380391

381392
@Override
@@ -393,6 +404,9 @@ public Long call() throws Exception {
393404
checkInterrupted();
394405
clearPendingBulkDelete(prefix, timestamp);
395406
return tilesDeleted;
407+
} catch (RuntimeException e) {
408+
S3BlobStore.log.severe("Aborted bulk delete " + e.getMessage());
409+
throw e;
396410
} finally {
397411
try {
398412
lock.release();
@@ -404,25 +418,48 @@ public Long call() throws Exception {
404418
}
405419

406420
private long deleteBatchesOfTilesAndInformListeners() {
421+
var possibleBounds = Bounds.createBounds(prefix);
407422
DeleteBatchesOfS3Objects<S3ObjectSummary> deleteBatchesOfS3Objects =
408423
new DeleteBatchesOfS3Objects<>(bucketName, conn, S3ObjectSummary::getKey, logger);
409-
S3Objects s3Objects = S3Objects.withPrefix(conn, bucketName, prefix).withBatchSize(BATCH_SIZE);
410-
Supplier<S3ObjectSummary> s3SummaryObjectSupplier =
411-
new S3ObjectForPrefixSupplier(prefix, bucketName, s3Objects, logger);
412424
Predicate<S3ObjectSummary> timeStampFilter = new TimeStampFilter(timestamp);
425+
Consumer<List<S3ObjectSummary>> batchPostProcessor =
426+
possibleBounds.isPresent() ? tileDeletionListenerNotifier : NO_OPERATION_POST_PROCESSOR;
413427

414428
return BatchingIterator.batchedStreamOf(
415-
Stream.generate(s3SummaryObjectSupplier)
429+
createS3ObjectStream()
416430
.takeWhile(Objects::nonNull)
417431
.takeWhile(o -> !Thread.currentThread().isInterrupted())
418432
.filter(timeStampFilter),
419433
BATCH_SIZE)
420434
.map(deleteBatchesOfS3Objects)
421-
.peek(tileListenerNotifier)
435+
.peek(batchPostProcessor)
422436
.mapToLong(List::size)
423437
.sum();
424438
}
425439

440+
private Stream<S3ObjectSummary> createS3ObjectStream() {
441+
var possibleBounds = Bounds.createBounds(prefix);
442+
if (possibleBounds.isPresent()) {
443+
String prefixWithoutBounds = prefixWithoutBounds(prefix);
444+
return boundedStreamOfS3Objects(prefixWithoutBounds, possibleBounds.get());
445+
} else {
446+
return unboundedStreamOfS3Objects(prefix);
447+
}
448+
}
449+
450+
private Stream<S3ObjectSummary> unboundedStreamOfS3Objects(String prefix) {
451+
S3Objects s3Objects = S3Objects.withPrefix(conn, bucketName, prefix).withBatchSize(BATCH_SIZE);
452+
S3ObjectForPrefixSupplier supplier = new S3ObjectForPrefixSupplier(prefix, bucketName, s3Objects, logger);
453+
return Stream.generate(supplier).takeWhile(Objects::nonNull);
454+
}
455+
456+
private Stream<S3ObjectSummary> boundedStreamOfS3Objects(String prefixWithoutBounds, Bounds bounds) {
457+
S3Objects s3Objects =
458+
S3Objects.withPrefix(conn, bucketName, prefixWithoutBounds).withBatchSize(BATCH_SIZE);
459+
S3ObjectForPrefixSupplier supplier = new S3ObjectForPrefixSupplier(prefix, bucketName, s3Objects, logger);
460+
return Stream.generate(supplier).takeWhile(Objects::nonNull).filter(bounds::predicate);
461+
}
462+
426463
private void checkInterrupted() throws InterruptedException {
427464
if (Thread.interrupted()) {
428465
S3BlobStore.log.info(String.format(

geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/BatchingIterator.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.stream.Stream;
2121
import java.util.stream.StreamSupport;
2222

23-
/** An iterator which returns batches of items taken from another iterator */
2423
public class BatchingIterator<T> implements Iterator<List<T>> {
2524
/**
2625
* Given a stream, convert it to a stream of batches no greater than the batchSize.
@@ -39,29 +38,24 @@ private static <T> Stream<T> asStream(Iterator<T> iterator) {
3938
}
4039

4140
private final int batchSize;
42-
private List<T> currentBatch;
4341
private final Iterator<T> sourceIterator;
4442

45-
public BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
43+
private BatchingIterator(Iterator<T> sourceIterator, int batchSize) {
4644
this.batchSize = batchSize;
4745
this.sourceIterator = sourceIterator;
4846
}
4947

5048
@Override
5149
public boolean hasNext() {
52-
prepareNextBatch();
53-
return currentBatch != null && !currentBatch.isEmpty();
50+
return sourceIterator.hasNext();
5451
}
5552

5653
@Override
5754
public List<T> next() {
58-
return currentBatch;
59-
}
60-
61-
private void prepareNextBatch() {
62-
currentBatch = new ArrayList<>(batchSize);
55+
List<T> currentBatch = new ArrayList<>(batchSize);
6356
while (sourceIterator.hasNext() && currentBatch.size() < batchSize) {
6457
currentBatch.add(sourceIterator.next());
6558
}
59+
return currentBatch;
6660
}
6761
}

0 commit comments

Comments
 (0)