Skip to content

Commit 2ef8e05

Browse files
committed
Fix
1 parent eaee9cc commit 2ef8e05

File tree

6 files changed

+73
-34
lines changed

6 files changed

+73
-34
lines changed

build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ subprojects {
4141
db2DriverVersion = '12.1.3.0'
4242
mariadDbDriverVersion = '3.5.6'
4343
alloyDbJdbcConnectorVersion = '1.2.8'
44-
alloyDbJdbcConnectorVersion = '1.2.7'
4544
googleCloudStorageVersion = '2.60.0'
4645
picocliVersion = '4.7.7'
4746
commonsTextVersion = '1.14.0'

core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapper.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,10 @@ public interface ObjectStorageWrapper {
6666
void delete(String key, String version) throws ObjectStorageWrapperException;
6767

6868
/**
69-
* Delete objects with the specified prefix from the storage.
69+
* Delete objects with the specified prefix from the storage. <br>
70+
* <br>
71+
* <strong>Attention:</strong> This method does not guarantee atomicity and assume to be used
72+
* where concurrent operations do not occur.
7073
*
7174
* @param prefix the prefix of the objects to delete
7275
* @throws ObjectStorageWrapperException if an error occurs
Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
package com.scalar.db.storage.objectstorage.cloudstorage;
22

33
import com.scalar.db.storage.objectstorage.ObjectStorageProvider;
4-
import com.scalar.db.storage.objectstorage.s3.S3Config;
54

65
public class CloudStorageProvider implements ObjectStorageProvider {
76

87
@Override
98
public String getName() {
10-
return S3Config.STORAGE_NAME;
9+
return CloudStorageConfig.STORAGE_NAME;
1110
}
1211
}

core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageWrapper.java

Lines changed: 22 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.scalar.db.storage.objectstorage.cloudstorage;
22

3+
import com.google.api.gax.paging.Page;
34
import com.google.auth.oauth2.ServiceAccountCredentials;
4-
import com.google.cloud.BatchResult;
55
import com.google.cloud.WriteChannel;
66
import com.google.cloud.storage.Blob;
77
import com.google.cloud.storage.BlobId;
@@ -37,6 +37,10 @@ public class CloudStorageWrapper implements ObjectStorageWrapper {
3737

3838
public CloudStorageWrapper(CloudStorageConfig config) {
3939
ServiceAccountCredentials credentials;
40+
if (config.getPassword() == null) {
41+
throw new RuntimeException(
42+
"Service account credentials are not provided in the password field");
43+
}
4044
try (ByteArrayInputStream keyStream =
4145
new ByteArrayInputStream(config.getPassword().getBytes(StandardCharsets.UTF_8))) {
4246
credentials = ServiceAccountCredentials.fromStream(keyStream);
@@ -209,34 +213,24 @@ public void delete(String key, String version) throws ObjectStorageWrapperExcept
209213
@Override
210214
public void deleteByPrefix(String prefix) throws ObjectStorageWrapperException {
211215
try {
212-
Iterable<Blob> blobs =
213-
storage.list(bucket, Storage.BlobListOption.prefix(prefix)).iterateAll();
214-
List<BlobId> blobIds = new ArrayList<>();
215-
for (Blob blob : blobs) {
216-
blobIds.add(BlobId.of(bucket, blob.getName()));
217-
}
218-
// Delete objects in batches
219-
for (int i = 0; i < blobIds.size(); i += BATCH_DELETE_SIZE_LIMIT) {
220-
int endIndex = Math.min(i + BATCH_DELETE_SIZE_LIMIT, blobIds.size());
221-
List<BlobId> batch = blobIds.subList(i, endIndex);
222-
StorageBatch storageBatch = storage.batch();
223-
for (BlobId blobId : batch) {
224-
storageBatch
225-
.delete(blobId)
226-
.notify(
227-
new BatchResult.Callback<Boolean, StorageException>() {
228-
@Override
229-
public void success(Boolean result) {}
230-
231-
@Override
232-
public void error(StorageException e) {
233-
if (e.getCode() != CloudStorageErrorCode.NOT_FOUND.get()) {
234-
throw e;
235-
}
236-
}
237-
});
216+
Page<Blob> page = storage.list(bucket, Storage.BlobListOption.prefix(prefix));
217+
while (page != null) {
218+
// Collect BlobIds to delete
219+
List<BlobId> blobIds = new ArrayList<>();
220+
for (Blob blob : page.getValues()) {
221+
blobIds.add(BlobId.of(bucket, blob.getName()));
222+
}
223+
// Delete objects in batches
224+
for (int i = 0; i < blobIds.size(); i += BATCH_DELETE_SIZE_LIMIT) {
225+
int endIndex = Math.min(i + BATCH_DELETE_SIZE_LIMIT, blobIds.size());
226+
List<BlobId> batch = blobIds.subList(i, endIndex);
227+
StorageBatch storageBatch = storage.batch();
228+
for (BlobId blobId : batch) {
229+
storageBatch.delete(blobId);
230+
}
231+
storageBatch.submit();
238232
}
239-
storageBatch.submit();
233+
page = page.getNextPage();
240234
}
241235
} catch (Exception e) {
242236
throw new ObjectStorageWrapperException(

core/src/main/resources/META-INF/services/com.scalar.db.api.DistributedStorageProvider

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@ com.scalar.db.storage.cosmos.CosmosProvider
33
com.scalar.db.storage.dynamo.DynamoProvider
44
com.scalar.db.storage.jdbc.JdbcProvider
55
com.scalar.db.storage.objectstorage.blobstorage.BlobStorageProvider
6+
com.scalar.db.storage.objectstorage.cloudstorage.CloudStorageProvider
67
com.scalar.db.storage.objectstorage.s3.S3Provider
78
com.scalar.db.storage.multistorage.MultiStorageProvider

core/src/test/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageWrapperTest.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,8 @@ public void deleteByPrefix_PrefixGiven_ShouldDeleteAllObjectsWithThePrefix() thr
345345
@SuppressWarnings("unchecked")
346346
Page<Blob> page = mock(Page.class);
347347
when(storage.list(eq(BUCKET), any(Storage.BlobListOption.class))).thenReturn(page);
348-
when(page.iterateAll()).thenReturn(Arrays.asList(blob1, blob2, blob3));
348+
when(page.getValues()).thenReturn(Arrays.asList(blob1, blob2, blob3));
349+
when(page.getNextPage()).thenReturn(null);
349350

350351
StorageBatch batch = mock(StorageBatch.class);
351352
when(storage.batch()).thenReturn(batch);
@@ -361,13 +362,55 @@ public void deleteByPrefix_PrefixGiven_ShouldDeleteAllObjectsWithThePrefix() thr
361362
verify(batch).submit();
362363
}
363364

365+
@Test
366+
public void deleteByPrefix_MultiplePagesGiven_ShouldDeleteAllObjectsAcrossPages()
367+
throws Exception {
368+
// Arrange
369+
String objectKey1 = ANY_PREFIX + "test1/object1";
370+
String objectKey2 = ANY_PREFIX + "test1/object2";
371+
String objectKey3 = ANY_PREFIX + "test2/object3";
372+
373+
Blob blob1 = mock(Blob.class);
374+
Blob blob2 = mock(Blob.class);
375+
Blob blob3 = mock(Blob.class);
376+
when(blob1.getName()).thenReturn(objectKey1);
377+
when(blob2.getName()).thenReturn(objectKey2);
378+
when(blob3.getName()).thenReturn(objectKey3);
379+
380+
// Mock paginated responses with 2 pages
381+
@SuppressWarnings("unchecked")
382+
Page<Blob> page1 = mock(Page.class);
383+
@SuppressWarnings("unchecked")
384+
Page<Blob> page2 = mock(Page.class);
385+
386+
when(storage.list(eq(BUCKET), any(Storage.BlobListOption.class))).thenReturn(page1);
387+
when(page1.getValues()).thenReturn(Arrays.asList(blob1, blob2));
388+
when(page1.getNextPage()).thenReturn(page2);
389+
when(page2.getValues()).thenReturn(Collections.singletonList(blob3));
390+
when(page2.getNextPage()).thenReturn(null);
391+
392+
StorageBatch batch = mock(StorageBatch.class);
393+
when(storage.batch()).thenReturn(batch);
394+
@SuppressWarnings("unchecked")
395+
StorageBatchResult<Boolean> batchResult = mock(StorageBatchResult.class);
396+
doReturn(batchResult).when(batch).delete(any(BlobId.class));
397+
398+
// Act
399+
wrapper.deleteByPrefix(ANY_PREFIX);
400+
401+
// Assert
402+
verify(storage, org.mockito.Mockito.times(2)).batch();
403+
verify(batch, org.mockito.Mockito.times(2)).submit();
404+
}
405+
364406
@Test
365407
public void deleteByPrefix_NoObjectsWithPrefix_ShouldDoNothing() throws Exception {
366408
// Arrange
367409
@SuppressWarnings("unchecked")
368410
Page<Blob> page = mock(Page.class);
369411
when(storage.list(eq(BUCKET), any(Storage.BlobListOption.class))).thenReturn(page);
370-
when(page.iterateAll()).thenReturn(Collections.emptyList());
412+
when(page.getValues()).thenReturn(Collections.emptyList());
413+
when(page.getNextPage()).thenReturn(null);
371414

372415
// Act
373416
wrapper.deleteByPrefix(ANY_PREFIX);

0 commit comments

Comments
 (0)