Skip to content

Commit 97b67f8

Browse files
committed
Fixed multiple calls to listener.
Added simplest bounded delete. Added test for bounded delete. Added test to check if TileDeleted events are received when a layer is deleted. There is a race in this test, so it can pass even though tileDeleted is sent.
1 parent ab44cdd commit 97b67f8

File tree

6 files changed

+107
-24
lines changed

6 files changed

+107
-24
lines changed

geowebcache/s3storage/src/main/java/org/geowebcache/s3/S3BlobStore.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,14 @@ public Bounds(long[] bound) {
486486
maxY = Math.max(bound[1], bound[3]);
487487
}
488488

489+
public long getMinX() {
490+
return minX;
491+
}
492+
493+
public long getMaxX() {
494+
return maxX;
495+
}
496+
489497
static Optional<Bounds> createBounds(String prefix) {
490498
Matcher matcher = boundsRegex.matcher(prefix);
491499
if (!matcher.matches()) {

geowebcache/s3storage/src/main/java/org/geowebcache/s3/S3Ops.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.Map;
3636
import java.util.Map.Entry;
3737
import java.util.Objects;
38+
import java.util.Optional;
3839
import java.util.Properties;
3940
import java.util.concurrent.Callable;
4041
import java.util.concurrent.ConcurrentHashMap;
@@ -54,9 +55,10 @@
5455
import org.geowebcache.locks.NoOpLockProvider;
5556
import org.geowebcache.s3.S3BlobStore.Bounds;
5657
import org.geowebcache.s3.streams.BatchingIterator;
58+
import org.geowebcache.s3.streams.BoundedS3KeySupplier;
5759
import org.geowebcache.s3.streams.DeleteBatchesOfS3Objects;
58-
import org.geowebcache.s3.streams.S3ObjectForPrefixSupplier;
5960
import org.geowebcache.s3.streams.TileDeletionListenerNotifier;
61+
import org.geowebcache.s3.streams.UnboundedS3KeySupplier;
6062
import org.geowebcache.storage.BlobStoreListenerList;
6163
import org.geowebcache.storage.StorageException;
6264
import org.geowebcache.util.TMSKeyBuilder;
@@ -426,7 +428,7 @@ private long deleteBatchesOfTilesAndInformListeners() {
426428
possibleBounds.isPresent() ? tileDeletionListenerNotifier : NO_OPERATION_POST_PROCESSOR;
427429

428430
return BatchingIterator.batchedStreamOf(
429-
createS3ObjectStream()
431+
createS3ObjectStream(possibleBounds)
430432
.takeWhile(Objects::nonNull)
431433
.takeWhile(o -> !Thread.currentThread().isInterrupted())
432434
.filter(timeStampFilter),
@@ -437,8 +439,7 @@ private long deleteBatchesOfTilesAndInformListeners() {
437439
.sum();
438440
}
439441

440-
private Stream<S3ObjectSummary> createS3ObjectStream() {
441-
var possibleBounds = Bounds.createBounds(prefix);
442+
private Stream<S3ObjectSummary> createS3ObjectStream(Optional<Bounds> possibleBounds) {
442443
if (possibleBounds.isPresent()) {
443444
String prefixWithoutBounds = prefixWithoutBounds(prefix);
444445
return boundedStreamOfS3Objects(prefixWithoutBounds, possibleBounds.get());
@@ -449,15 +450,16 @@ private Stream<S3ObjectSummary> createS3ObjectStream() {
449450

450451
private Stream<S3ObjectSummary> unboundedStreamOfS3Objects(String prefix) {
451452
S3Objects s3Objects = S3Objects.withPrefix(conn, bucketName, prefix).withBatchSize(BATCH_SIZE);
452-
S3ObjectForPrefixSupplier supplier = new S3ObjectForPrefixSupplier(prefix, bucketName, s3Objects, logger);
453+
UnboundedS3KeySupplier supplier = new UnboundedS3KeySupplier(prefix, bucketName, s3Objects, logger);
453454
return Stream.generate(supplier).takeWhile(Objects::nonNull);
454455
}
455456

456457
private Stream<S3ObjectSummary> boundedStreamOfS3Objects(String prefixWithoutBounds, Bounds bounds) {
457-
S3Objects s3Objects =
458-
S3Objects.withPrefix(conn, bucketName, prefixWithoutBounds).withBatchSize(BATCH_SIZE);
459-
S3ObjectForPrefixSupplier supplier = new S3ObjectForPrefixSupplier(prefix, bucketName, s3Objects, logger);
460-
return Stream.generate(supplier).takeWhile(Objects::nonNull).filter(bounds::predicate);
458+
BoundedS3KeySupplier supplier =
459+
new BoundedS3KeySupplier(prefixWithoutBounds, logger, conn, bounds, bucketName, BATCH_SIZE);
460+
return Stream.generate(supplier)
461+
.takeWhile(Objects::nonNull)
462+
.filter(bounds::predicate); // Filter Y bounds as X is taken care of by the supplier
461463
}
462464

463465
private void checkInterrupted() throws InterruptedException {
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package org.geowebcache.s3.streams;
2+
3+
import static java.lang.String.format;
4+
5+
import com.amazonaws.services.s3.AmazonS3;
6+
import com.amazonaws.services.s3.iterable.S3Objects;
7+
import com.amazonaws.services.s3.model.S3ObjectSummary;
8+
9+
import java.util.Iterator;
10+
import java.util.function.Supplier;
11+
import java.util.logging.Logger;
12+
13+
import org.geowebcache.s3.S3BlobStore.Bounds;
14+
15+
/**
16+
* Similar to the UnboundedS3KeySupplier it retrieves keys from S3. It is slightly more optimised as it respects the x
17+
* bounds only fetching objects that are from the range of x bounds S3ObjectPathsForPrefixSupplier This class will
18+
* interact with the AmazonS3 connection to retrieve all the objects with prefix and bucket provided <br>
19+
* It will return these lazily one by one as the get methods is called
20+
*/
21+
public class BoundedS3KeySupplier implements Supplier<S3ObjectSummary> {
22+
private final String prefixWithoutBounds;
23+
private final Logger logger;
24+
private final AmazonS3 conn;
25+
private final Bounds bounds;
26+
private final String bucket;
27+
private final int batch;
28+
29+
public BoundedS3KeySupplier(
30+
String prefixWithoutBounds, Logger logger, AmazonS3 conn, Bounds bounds, String bucket, int batch) {
31+
this.prefixWithoutBounds = prefixWithoutBounds;
32+
this.logger = logger;
33+
this.conn = conn;
34+
this.bounds = bounds;
35+
this.nextX = bounds.getMinX();
36+
this.bucket = bucket;
37+
this.batch = batch;
38+
}
39+
40+
private Iterator<S3ObjectSummary> iterator;
41+
private long nextX;
42+
private long count = 0;
43+
44+
@Override
45+
public S3ObjectSummary get() {
46+
return next();
47+
}
48+
49+
private synchronized S3ObjectSummary next() {
50+
boolean hasNext = false;
51+
do {
52+
hasNext = iterator != null && iterator.hasNext();
53+
if (!hasNext) {
54+
iterator = null;
55+
}
56+
57+
if (iterator == null && nextX <= bounds.getMaxX()) {
58+
String prefixWithX = format("%s%d/", prefixWithoutBounds, nextX);
59+
S3Objects s3Objects =
60+
S3Objects.withPrefix(conn, bucket, prefixWithX).withBatchSize(batch);
61+
iterator = s3Objects.iterator();
62+
hasNext = iterator.hasNext();
63+
nextX++;
64+
}
65+
} while (!hasNext && nextX <= bounds.getMaxX()); // It is exhausted if
66+
67+
if (hasNext) {
68+
count++;
69+
S3ObjectSummary summary = iterator.next();
70+
logger.fine(format("%s: %s", summary.getKey(), bounds));
71+
return summary;
72+
} else {
73+
logger.info(String.format(
74+
"Exhausted objects with prefix: %s supplied %d", prefixWithoutBounds + bounds, count));
75+
return null;
76+
}
77+
}
78+
}

geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/DeleteBatchesOfS3Objects.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.logging.Logger;
2626
import java.util.stream.Collectors;
2727

28+
/** @param <T> The type of the data object used to track abstract the s3 class */
2829
public class DeleteBatchesOfS3Objects<T> implements Function<List<T>, List<T>> {
2930
private final String bucket;
3031
private final AmazonS3 conn;

geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/S3ObjectForPrefixSupplier.java renamed to geowebcache/s3storage/src/main/java/org/geowebcache/s3/streams/UnboundedS3KeySupplier.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,19 @@
2020
import java.util.logging.Logger;
2121

2222
/**
23-
* S3ObjectPathsForPrefixSupplier This class will interact with the AmazonS3 connection to retrieve all the objects with
24-
* prefix and bucket provided <br>
23+
* UnboundedS3KeySupplier This class will interact with the AmazonS3 connection to retrieve all the objects with prefix
24+
* and bucket provided <br>
2525
* It will return these lazily one by one as the get methods is called
2626
*/
27-
public class S3ObjectForPrefixSupplier implements Supplier<S3ObjectSummary> {
27+
public class UnboundedS3KeySupplier implements Supplier<S3ObjectSummary> {
2828
private final String prefix;
2929
private long count = 0;
3030
private final Logger logger;
3131
private final S3Objects s3Objects;
3232

3333
private Iterator<S3ObjectSummary> iterator;
3434

35-
public S3ObjectForPrefixSupplier(String prefix, String bucket, S3Objects s3Objects, Logger logger) {
35+
public UnboundedS3KeySupplier(String prefix, String bucket, S3Objects s3Objects, Logger logger) {
3636
checkNotNull(prefix, "prefix must not be null");
3737
checkNotNull(bucket, "bucket must not be null");
3838
checkNotNull(s3Objects, "s3Objects must not be null");

geowebcache/s3storage/src/test/java/org/geowebcache/s3/AbstractS3BlobStoreIntegrationTest.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,11 @@
1717
import static org.junit.Assert.assertFalse;
1818
import static org.junit.Assert.assertNotNull;
1919
import static org.junit.Assert.assertTrue;
20-
import static org.mockito.ArgumentMatchers.any;
2120
import static org.mockito.ArgumentMatchers.anyInt;
2221
import static org.mockito.ArgumentMatchers.anyLong;
2322
import static org.mockito.ArgumentMatchers.anyString;
2423
import static org.mockito.ArgumentMatchers.eq;
2524
import static org.mockito.ArgumentMatchers.isNull;
26-
import static org.mockito.Mockito.doNothing;
2725
import static org.mockito.Mockito.mock;
2826
import static org.mockito.Mockito.times;
2927
import static org.mockito.Mockito.verify;
@@ -432,12 +430,8 @@ public void testBoundedLayerDeletion() throws StorageException, MimeException {
432430

433431
int level = 3;
434432
seed(level, level);
435-
BlobStoreListener listener = mock(BlobStoreListener.class);
436-
blobStore.addListener(listener);
437-
doNothing()
438-
.when(listener)
439-
.tileDeleted(
440-
anyString(), anyString(), anyString(), isNull(), anyLong(), anyLong(), anyInt(), anyLong());
433+
FakeListener fakeListener = new FakeListener();
434+
blobStore.addListener(fakeListener);
441435

442436
long[][] rangeBounds = {{2, 2, 3, 3, level}};
443437

@@ -455,9 +449,9 @@ public void testBoundedLayerDeletion() throws StorageException, MimeException {
455449
}
456450

457451
int wantedNumberOfInvocations =
458-
(int) ((rangeBounds[0][2] - rangeBounds[0][0] + 1) * (rangeBounds[0][level] - rangeBounds[0][1] + 1));
459-
Awaitility.await().untilAsserted(() -> verify(listener, times(wantedNumberOfInvocations))
460-
.tileDeleted(anyString(), anyString(), anyString(), any(), anyLong(), anyLong(), anyInt(), anyLong()));
452+
(int) ((rangeBounds[0][2] - rangeBounds[0][0] + 1) * (rangeBounds[0][3] - rangeBounds[0][1] + 1));
453+
Awaitility.await().untilAsserted(() -> assertEquals(wantedNumberOfInvocations, fakeListener.tileDeleted));
454+
assertEquals(wantedNumberOfInvocations, fakeListener.total());
461455
}
462456

463457
private TileRange tileRange(

0 commit comments

Comments
 (0)