|
15 | 15 |
|
16 | 16 | import static com.google.common.base.Preconditions.checkNotNull; |
17 | 17 | import static java.util.Objects.isNull; |
| 18 | +import static org.geowebcache.azure.DeleteManager.PAGE_SIZE; |
18 | 19 |
|
19 | 20 | import com.azure.core.util.BinaryData; |
20 | 21 | import com.azure.storage.blob.models.BlobDownloadContentResponse; |
21 | 22 | import com.azure.storage.blob.models.BlobItem; |
22 | 23 | import com.azure.storage.blob.models.BlobProperties; |
23 | 24 | import com.azure.storage.blob.models.BlobStorageException; |
24 | 25 | import com.azure.storage.blob.specialized.BlockBlobClient; |
25 | | -import com.google.common.collect.AbstractIterator; |
26 | | -import com.google.common.collect.Iterators; |
27 | 26 | import java.io.IOException; |
28 | 27 | import java.io.InputStream; |
29 | 28 | import java.io.UncheckedIOException; |
30 | 29 | import java.time.OffsetDateTime; |
| 30 | +import java.util.ArrayList; |
31 | 31 | import java.util.Iterator; |
32 | 32 | import java.util.List; |
33 | 33 | import java.util.Map; |
34 | 34 | import java.util.Optional; |
35 | 35 | import java.util.Properties; |
36 | 36 | import java.util.concurrent.Callable; |
37 | 37 | import java.util.logging.Logger; |
| 38 | +import java.util.regex.Matcher; |
| 39 | +import java.util.regex.Pattern; |
38 | 40 | import java.util.stream.Collectors; |
| 41 | +import java.util.stream.IntStream; |
39 | 42 | import java.util.stream.Stream; |
40 | 43 | import javax.annotation.Nullable; |
41 | 44 | import org.geotools.util.logging.Logging; |
|
53 | 56 | import org.geowebcache.storage.StorageException; |
54 | 57 | import org.geowebcache.storage.TileObject; |
55 | 58 | import org.geowebcache.storage.TileRange; |
56 | | -import org.geowebcache.storage.TileRangeIterator; |
57 | 59 | import org.geowebcache.util.TMSKeyBuilder; |
58 | 60 | import org.springframework.http.HttpStatus; |
59 | 61 |
|
60 | 62 | public class AzureBlobStore implements BlobStore { |
61 | 63 |
|
62 | | - static Logger log = Logging.getLogger(AzureBlobStore.class.getName()); |
| 64 | + private static final Logger LOG = Logging.getLogger(AzureBlobStore.class.getName()); |
| 65 | + |
| 66 | + private static final Pattern TILE_BLOB_NAME_REGEXP = Pattern.compile("(?<z>\\d+)/(?<x>\\d+)/(?<y>\\d+)\\.\\w+$"); |
63 | 67 |
|
64 | 68 | private final TMSKeyBuilder keyBuilder; |
65 | 69 | private final BlobStoreListenerList listeners = new BlobStoreListenerList(); |
@@ -190,58 +194,98 @@ public boolean delete(TileRange tileRange) throws StorageException { |
190 | 194 | return false; |
191 | 195 | } |
192 | 196 |
|
193 | | - // open an iterator oer tile locations, to avoid memory accumulation |
194 | | - final Iterator<long[]> tileLocations = new AbstractIterator<>() { |
195 | | - |
196 | | - // TileRange iterator with 1x1 meta tiling factor |
197 | | - private TileRangeIterator trIter = new TileRangeIterator(tileRange, new int[] {1, 1}); |
198 | | - |
199 | | - @Override |
200 | | - protected long[] computeNext() { |
201 | | - long[] gridLoc = trIter.nextMetaGridLocation(new long[3]); |
202 | | - return gridLoc == null ? endOfData() : gridLoc; |
203 | | - } |
204 | | - }; |
| 197 | + Stream<BlobItem> blobsToDelete = findTileBlobsToDelete(tileRange, coordsPrefix); |
205 | 198 |
|
206 | | - // if no listeners, we don't need to gather extra tile info, use a dedicated fast path |
207 | 199 | if (listeners.isEmpty()) { |
208 | 200 | // if there are no listeners, don't bother requesting every tile |
209 | 201 | // metadata to notify the listeners |
210 | | - Iterator<String> keysIterator = Iterators.transform( |
211 | | - tileLocations, tl -> keyBuilder.forLocation(coordsPrefix, tl, tileRange.getMimeType())); |
212 | | - // split the iteration in parts to avoid memory accumulation |
213 | | - Iterator<List<String>> partition = Iterators.partition(keysIterator, DeleteManager.PAGE_SIZE); |
214 | | - |
215 | | - while (partition.hasNext() && !shutDown) { |
216 | | - List<String> locations = partition.next(); |
217 | | - deleteManager.deleteParallel(locations); |
| 202 | + if (!shutDown) { |
| 203 | + deleteManager.deleteStreamed(blobsToDelete); |
218 | 204 | } |
219 | | - |
220 | 205 | } else { |
221 | 206 | // if we need to gather info, we'll end up just calling "delete" on each tile |
222 | 207 | // this is run here instead of inside the delete manager as we need high level info |
223 | 208 | // about tiles, e.g., TileObject, to inform the listeners |
224 | | - String layerName = tileRange.getLayerName(); |
225 | | - String gridSetId = tileRange.getGridSetId(); |
226 | | - String format = tileRange.getMimeType().getFormat(); |
227 | | - Map<String, String> parameters = tileRange.getParameters(); |
228 | | - |
229 | | - Iterator<Callable<?>> tilesIterator = Iterators.transform(tileLocations, xyz -> { |
230 | | - TileObject tile = TileObject.createQueryTileObject(layerName, xyz, gridSetId, format, parameters); |
| 209 | + Stream<Callable<?>> tilesDeletions = blobsToDelete.map(blobItem -> { |
| 210 | + TileObject tile = createTileObject(blobItem, tileRange); |
231 | 211 | tile.setParametersId(tileRange.getParametersId()); |
232 | | - return (Callable<Object>) () -> delete(tile); |
| 212 | + return () -> delete(tile); |
233 | 213 | }); |
234 | | - Iterator<List<Callable<?>>> partition = Iterators.partition(tilesIterator, DeleteManager.PAGE_SIZE); |
235 | 214 |
|
236 | | - // once a page of callables is ready, run them in parallel on the delete manager |
237 | | - while (partition.hasNext() && !shutDown) { |
238 | | - deleteManager.executeParallel(partition.next()); |
239 | | - } |
| 215 | + executeParallelDeletions(tilesDeletions); |
240 | 216 | } |
241 | 217 |
|
242 | 218 | return true; |
243 | 219 | } |
244 | 220 |
|
| 221 | + private Stream<BlobItem> findTileBlobsToDelete(TileRange tileRange, String coordsPrefix) { |
| 222 | + return IntStream.rangeClosed(tileRange.getZoomStart(), tileRange.getZoomStop()) |
| 223 | + .boxed() |
| 224 | + .flatMap(zoom -> { |
| 225 | + String zoomPrefix = coordsPrefix + "/" + zoom; |
| 226 | + |
| 227 | + if (!client.prefixExists(zoomPrefix)) { |
| 228 | + // empty level, skipping |
| 229 | + return Stream.empty(); |
| 230 | + } |
| 231 | + |
| 232 | + long[] rangeBoundsAtZoom = tileRange.rangeBounds(zoom); |
| 233 | + |
| 234 | + return client.listBlobs(zoomPrefix) |
| 235 | + .filter(tb -> |
| 236 | + TILE_BLOB_NAME_REGEXP.matcher(tb.getName()).find()) |
| 237 | + .filter(tb -> isTileBlobInBounds(tb, rangeBoundsAtZoom)); |
| 238 | + }); |
| 239 | + } |
| 240 | + |
| 241 | + private boolean isTileBlobInBounds(BlobItem tileBlob, long[] bounds) { |
| 242 | + long minX = bounds[0]; |
| 243 | + long minY = bounds[1]; |
| 244 | + long maxX = bounds[2]; |
| 245 | + long maxY = bounds[3]; |
| 246 | + |
| 247 | + long[] index = extractTileIndex(tileBlob); |
| 248 | + long tileX = index[0]; |
| 249 | + long tileY = index[1]; |
| 250 | + |
| 251 | + return tileX >= minX && tileX <= maxX && tileY >= minY && tileY <= maxY; |
| 252 | + } |
| 253 | + |
| 254 | + private TileObject createTileObject(BlobItem blobItem, TileRange tileRange) { |
| 255 | + String layerName = tileRange.getLayerName(); |
| 256 | + String gridSetId = tileRange.getGridSetId(); |
| 257 | + String format = tileRange.getMimeType().getFormat(); |
| 258 | + Map<String, String> parameters = tileRange.getParameters(); |
| 259 | + return TileObject.createQueryTileObject(layerName, extractTileIndex(blobItem), gridSetId, format, parameters); |
| 260 | + } |
| 261 | + |
| 262 | + private long[] extractTileIndex(BlobItem blobItem) { |
| 263 | + Matcher matcher = TILE_BLOB_NAME_REGEXP.matcher(blobItem.getName()); |
| 264 | + |
| 265 | + if (!matcher.find()) { |
| 266 | + throw new IllegalArgumentException("Invalid tile blob name"); |
| 267 | + } |
| 268 | + |
| 269 | + return new long[] { |
| 270 | + Long.parseLong(matcher.group("x")), Long.parseLong(matcher.group("y")), Long.parseLong(matcher.group("z")) |
| 271 | + }; |
| 272 | + } |
| 273 | + |
| 274 | + private void executeParallelDeletions(Stream<Callable<?>> tilesDeletions) throws StorageException { |
| 275 | + Iterator<Callable<?>> tilesDeletionsIterator = tilesDeletions.iterator(); |
| 276 | + |
| 277 | + while (tilesDeletionsIterator.hasNext() && !shutDown) { |
| 278 | + |
| 279 | + // once a page of callables is ready, run them in parallel on the delete manager |
| 280 | + List<Callable<?>> callables = new ArrayList<>(PAGE_SIZE); |
| 281 | + for (int i = 0; i < PAGE_SIZE && tilesDeletionsIterator.hasNext(); i++) { |
| 282 | + callables.add(tilesDeletionsIterator.next()); |
| 283 | + } |
| 284 | + |
| 285 | + deleteManager.executeParallel(callables); |
| 286 | + } |
| 287 | + } |
| 288 | + |
245 | 289 | @Override |
246 | 290 | public boolean get(TileObject obj) throws StorageException { |
247 | 291 | final String key = keyBuilder.forTile(obj); |
@@ -373,7 +417,7 @@ public boolean rename(String oldLayerName, String newLayerName) throws StorageEx |
373 | 417 | // revisit: this seems to hold true only for GeoServerTileLayer, "standalone" TileLayers |
374 | 418 | // return getName() from getId(), as in AbstractTileLayer. Unfortunately the only option |
375 | 419 | // for non-GeoServerTileLayers would be copy and delete. Expensive. |
376 | | - log.fine("No need to rename layers, AzureBlobStore uses layer id as key root"); |
| 420 | + LOG.fine("No need to rename layers, AzureBlobStore uses layer id as key root"); |
377 | 421 | if (client.prefixExists(oldLayerName)) { |
378 | 422 | listeners.sendLayerRenamed(oldLayerName, newLayerName); |
379 | 423 | } |
|
0 commit comments