From 50ec61fdd5bc328b28d0220c9f6b8f51d7288d22 Mon Sep 17 00:00:00 2001 From: Kodai Doki <52027276+KodaiD@users.noreply.github.com> Date: Wed, 19 Nov 2025 19:22:37 +0900 Subject: [PATCH 1/2] Add support for Cloud Storage (#3179) --- .../object-storage-adapter-check.yaml | 59 ++- build.gradle | 1 + core/build.gradle | 15 +- .../objectstorage/ObjectStorageEnv.java | 6 + .../ObjectStorageWrapperIntegrationTest.java | 23 +- ...rapperLargeObjectWriteIntegrationTest.java | 14 +- .../java/com/scalar/db/common/CoreError.java | 12 + .../storage/objectstorage/ObjectStorage.java | 6 +- .../objectstorage/ObjectStorageAdmin.java | 9 +- .../objectstorage/ObjectStorageConfig.java | 7 - .../objectstorage/ObjectStorageUtils.java | 3 + .../objectstorage/ObjectStorageWrapper.java | 7 +- .../ObjectStorageWrapperException.java | 4 + .../ObjectStorageWrapperFactory.java | 6 + .../PreconditionFailedException.java | 4 + .../blobstorage/BlobStorageConfig.java | 9 +- .../cloudstorage/CloudStorageConfig.java | 98 ++++ .../cloudstorage/CloudStorageErrorCode.java | 16 + .../cloudstorage/CloudStorageProvider.java | 11 + .../cloudstorage/CloudStorageWrapper.java | 221 +++++++++ .../db/storage/objectstorage/s3/S3Config.java | 11 +- .../storage/objectstorage/s3/S3Wrapper.java | 10 +- ...m.scalar.db.api.DistributedStorageProvider | 1 + .../cloudstorage/CloudStorageConfigTest.java | 88 ++++ .../cloudstorage/CloudStorageWrapperTest.java | 435 ++++++++++++++++++ .../objectstorage/s3/S3WrapperTest.java | 10 +- 26 files changed, 1041 insertions(+), 45 deletions(-) create mode 100644 core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfig.java create mode 100644 core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageErrorCode.java create mode 100644 core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageProvider.java create mode 100644 core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageWrapper.java create mode 100644 core/src/test/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfigTest.java create mode 100644 core/src/test/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageWrapperTest.java diff --git a/.github/workflows/object-storage-adapter-check.yaml b/.github/workflows/object-storage-adapter-check.yaml index 7ef72d8ca9..a757e7c39c 100644 --- a/.github/workflows/object-storage-adapter-check.yaml +++ b/.github/workflows/object-storage-adapter-check.yaml @@ -44,6 +44,9 @@ env: AWS_SECRET_ACCESS_KEY: ${{ secrets.S3_SECRET_ACCESS_KEY }} S3_REGION: ap-northeast-1 S3_BUCKET_NAME: scalardb-test-bucket + CLOUD_STORAGE_PROJECT_ID: ${{ secrets.CLOUD_STORAGE_PROJECT_ID }} + CLOUD_STORAGE_SERVICE_ACCOUNT_KEY: ${{ secrets.CLOUD_STORAGE_SERVICE_ACCOUNT_KEY }} + CLOUD_STORAGE_BUCKET_NAME: scalardb-test-bucket jobs: integration-test-s3: @@ -98,5 +101,59 @@ jobs: if: always() uses: actions/upload-artifact@v5 with: - name: cassandra_3.0_integration_test_reports_${{ matrix.mode.label }} + name: s3_integration_test_reports_${{ matrix.mode.label }} + path: core/build/reports/tests/integrationTestObjectStorage + integration-test-cloud-storage: + name: Cloud Storage integration test (${{ matrix.mode.label }}) + runs-on: ubuntu-latest + + strategy: + fail-fast: false + matrix: + mode: + - label: default + group_commit_enabled: false + - label: with_group_commit + group_commit_enabled: true + + steps: + - uses: actions/checkout@v5 + + - name: Set up JDK ${{ env.JAVA_VERSION }} (${{ env.JAVA_VENDOR }}) + uses: actions/setup-java@v5 + with: + java-version: ${{ env.JAVA_VERSION }} + distribution: ${{ env.JAVA_VENDOR }} + + - name: Set up JDK ${{ env.INT_TEST_JAVA_RUNTIME_VERSION }} (${{ env.INT_TEST_JAVA_RUNTIME_VENDOR }}) to run integration test + uses: actions/setup-java@v5 + if: ${{ env.SET_UP_INT_TEST_RUNTIME_NON_ORACLE_JDK == 'true'}} + with: + java-version: ${{ env.INT_TEST_JAVA_RUNTIME_VERSION }} + distribution: ${{ env.INT_TEST_JAVA_RUNTIME_VENDOR }} + + - name: Login to Oracle container registry + uses: docker/login-action@v3 + if: ${{ env.INT_TEST_JAVA_RUNTIME_VENDOR == 'oracle' }} + with: + registry: container-registry.oracle.com + username: ${{ secrets.OCR_USERNAME }} + password: ${{ secrets.OCR_TOKEN }} + + - name: Set up JDK ${{ env.INT_TEST_JAVA_RUNTIME_VERSION }} (oracle) to run the integration test + if: ${{ env.INT_TEST_JAVA_RUNTIME_VENDOR == 'oracle' }} + run: | + container_id=$(docker create "container-registry.oracle.com/java/jdk:${{ env.INT_TEST_JAVA_RUNTIME_VERSION }}") + docker cp -L "$container_id:/usr/java/default" /usr/lib/jvm/oracle-jdk && docker rm "$container_id" + - name: Setup Gradle + uses: gradle/actions/setup-gradle@v5 + + - name: Execute Gradle 'integrationTestObjectStorage' task + run: ./gradlew integrationTestObjectStorage -Dscalardb.object_storage.storage=cloud-storage -Dscalardb.object_storage.endpoint=scalardb-test-bucket -Dscalardb.object_storage.username=${{ env.CLOUD_STORAGE_PROJECT_ID }} -Dscalardb.object_storage.password=${{ env.CLOUD_STORAGE_SERVICE_ACCOUNT_KEY }} ${{ matrix.mode.group_commit_enabled && env.INT_TEST_GRADLE_OPTIONS_FOR_GROUP_COMMIT || '' }} + + - name: Upload Gradle test reports + if: always() + uses: actions/upload-artifact@v5 + with: + name: cloud_storage_integration_test_reports_${{ matrix.mode.label }} path: core/build/reports/tests/integrationTestObjectStorage diff --git a/build.gradle b/build.gradle index 387696118c..db4ff7fb31 100644 --- a/build.gradle +++ b/build.gradle @@ -41,6 +41,7 @@ subprojects { db2DriverVersion = '12.1.3.0' mariadDbDriverVersion = '3.5.6' alloyDbJdbcConnectorVersion = '1.2.8' + googleCloudStorageVersion = '2.60.0' picocliVersion = '4.7.7' commonsTextVersion = '1.14.0' junitVersion = '5.14.1' diff --git a/core/build.gradle b/core/build.gradle index 576ca041d5..0bd71ae5f5 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -193,6 +193,9 @@ dependencies { implementation("com.google.cloud:alloydb-jdbc-connector:${alloyDbJdbcConnectorVersion}") { exclude group: 'org.slf4j', module: 'slf4j-api' } + implementation("com.google.cloud:google-cloud-storage:${googleCloudStorageVersion}") { + exclude group: 'org.slf4j', module: 'slf4j-api' + } implementation "org.apache.commons:commons-text:${commonsTextVersion}" testImplementation platform("org.junit:junit-bom:${junitVersion}") testImplementation 'org.junit.jupiter:junit-jupiter' @@ -226,7 +229,7 @@ task integrationTestCassandra(type: Test) { classpath = sourceSets.integrationTestCassandra.runtimeClasspath outputs.upToDateWhen { false } // ensures integration tests are run every time when called options { - systemProperties(System.getProperties().findAll{it.key.toString().startsWith("scalardb")}) + systemProperties(System.getProperties().findAll { it.key.toString().startsWith("scalardb") }) } } @@ -237,7 +240,7 @@ task integrationTestCosmos(type: Test) { classpath = sourceSets.integrationTestCosmos.runtimeClasspath outputs.upToDateWhen { false } // ensures integration tests are run every time when called options { - systemProperties(System.getProperties().findAll{it.key.toString().startsWith("scalardb")}) + systemProperties(System.getProperties().findAll { it.key.toString().startsWith("scalardb") }) } jvmArgs '-XX:MaxDirectMemorySize=4g', '-Xmx6g', // INFO com.azure.cosmos.implementation.RxDocumentClientImpl - Initializing DocumentClient [3] with serviceEndpoint [https://localhost:8081/], ... @@ -255,7 +258,7 @@ task integrationTestDynamo(type: Test) { classpath = sourceSets.integrationTestDynamo.runtimeClasspath outputs.upToDateWhen { false } // ensures integration tests are run every time when called options { - systemProperties(System.getProperties().findAll{it.key.toString().startsWith("scalardb")}) + systemProperties(System.getProperties().findAll { it.key.toString().startsWith("scalardb") }) } maxParallelForks = 10 } @@ -267,7 +270,7 @@ task integrationTestJdbc(type: Test) { classpath = sourceSets.integrationTestJdbc.runtimeClasspath outputs.upToDateWhen { false } // ensures integration tests are run every time when called options { - systemProperties(System.getProperties().findAll{it.key.toString().startsWith("scalardb")}) + systemProperties(System.getProperties().findAll { it.key.toString().startsWith("scalardb") }) } maxHeapSize = "4g" } @@ -279,7 +282,7 @@ task integrationTestObjectStorage(type: Test) { classpath = sourceSets.integrationTestObjectStorage.runtimeClasspath outputs.upToDateWhen { false } // ensures integration tests are run every time when called options { - systemProperties(System.getProperties().findAll{it.key.toString().startsWith("scalardb")}) + systemProperties(System.getProperties().findAll { it.key.toString().startsWith("scalardb") }) } } @@ -290,7 +293,7 @@ task integrationTestMultiStorage(type: Test) { classpath = sourceSets.integrationTestMultiStorage.runtimeClasspath outputs.upToDateWhen { false } // ensures integration tests are run every time when called options { - systemProperties(System.getProperties().findAll{it.key.toString().startsWith("scalardb")}) + systemProperties(System.getProperties().findAll { it.key.toString().startsWith("scalardb") }) } } diff --git a/core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageEnv.java b/core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageEnv.java index fb54ad01ca..85e3c8be21 100644 --- a/core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageEnv.java +++ b/core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageEnv.java @@ -2,6 +2,7 @@ import com.scalar.db.config.DatabaseConfig; import com.scalar.db.storage.objectstorage.blobstorage.BlobStorageConfig; +import com.scalar.db.storage.objectstorage.cloudstorage.CloudStorageConfig; import com.scalar.db.storage.objectstorage.s3.S3Config; import java.util.Collections; import java.util.Map; @@ -76,6 +77,11 @@ public static boolean isBlobStorage() { .equals(BlobStorageConfig.STORAGE_NAME); } + public static boolean isCloudStorage() { + return System.getProperty(PROP_OBJECT_STORAGE_STORAGE, DEFAULT_OBJECT_STORAGE_STORAGE) + .equals(CloudStorageConfig.STORAGE_NAME); + } + public static boolean isS3() { return System.getProperty(PROP_OBJECT_STORAGE_STORAGE, DEFAULT_OBJECT_STORAGE_STORAGE) .equals(S3Config.STORAGE_NAME); diff --git a/core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperIntegrationTest.java b/core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperIntegrationTest.java index 57ffecd24b..567f4555f7 100644 --- a/core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperIntegrationTest.java +++ b/core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperIntegrationTest.java @@ -28,8 +28,11 @@ public class ObjectStorageWrapperIntegrationTest { private static final String TEST_OBJECT2 = "test-object2"; private static final String TEST_OBJECT3 = "test-object3"; private static final int BLOB_STORAGE_LIST_MAX_KEYS = 5000; + private static final int CLOUD_STORAGE_LIST_MAX_KEYS = 1000; + private static final int S3_LIST_MAX_KEYS = 1000; private ObjectStorageWrapper wrapper; + private int listMaxKeys; @BeforeAll public void beforeAll() throws ObjectStorageWrapperException { @@ -38,6 +41,16 @@ public void beforeAll() throws ObjectStorageWrapperException { ObjectStorageUtils.getObjectStorageConfig(new DatabaseConfig(properties)); wrapper = ObjectStorageWrapperFactory.create(objectStorageConfig); createObjects(); + + if (ObjectStorageEnv.isBlobStorage()) { + listMaxKeys = BLOB_STORAGE_LIST_MAX_KEYS; + } else if (ObjectStorageEnv.isCloudStorage()) { + listMaxKeys = CLOUD_STORAGE_LIST_MAX_KEYS; + } else if (ObjectStorageEnv.isS3()) { + listMaxKeys = S3_LIST_MAX_KEYS; + } else { + throw new AssertionError(); + } } @AfterAll @@ -152,14 +165,14 @@ public void update_NonExistingObjectKeyGiven_ShouldThrowPreconditionFailedExcept String objectKey = "non-existing-key"; // Act Assert - assertThatCode(() -> wrapper.update(objectKey, "some-object", "some-version")) + assertThatCode(() -> wrapper.update(objectKey, "some-object", "123456789")) .isInstanceOf(PreconditionFailedException.class); } @Test public void update_WrongVersionGiven_ShouldThrowPreconditionFailedException() { // Arrange - String wrongVersion = "wrong-version"; + String wrongVersion = "123456789"; // Act Assert assertThatCode(() -> wrapper.update(TEST_KEY2, "another-object", wrongVersion)) @@ -219,7 +232,7 @@ public void delete_ExistingObjectKeyWithWrongVersionGiven_ShouldThrowPreconditio // Arrange Optional response1 = wrapper.get(TEST_KEY1); assertThat(response1.isPresent()).isTrue(); - String wrongVersion = "wrong-version"; + String wrongVersion = "123456789"; // Act Assert assertThatCode(() -> wrapper.delete(TEST_KEY1, wrongVersion)) @@ -253,7 +266,7 @@ public void getKeys_WithNonExistingPrefix_ShouldReturnEmptySet() throws Exceptio public void getKeys_WithPrefixForTheNumberOfObjectsExceedingTheListLimit_ShouldReturnAllKeys() throws Exception { String prefix = "prefix-"; - int numberOfObjects = BLOB_STORAGE_LIST_MAX_KEYS + 1; + int numberOfObjects = listMaxKeys + 1; try { // Arrange for (int i = 0; i < numberOfObjects; i++) { @@ -313,7 +326,7 @@ public void deleteByPrefix_WithNonExistingPrefix_ShouldDoNothing() throws Except deleteByPrefix_WithPrefixForTheNumberOfObjectsExceedingTheListLimit_ShouldDeleteAllObjects() throws Exception { String prefix = "prefix-"; - int numberOfObjects = BLOB_STORAGE_LIST_MAX_KEYS + 1; + int numberOfObjects = listMaxKeys + 1; try { // Arrange for (int i = 0; i < numberOfObjects; i++) { diff --git a/core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperLargeObjectWriteIntegrationTest.java b/core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperLargeObjectWriteIntegrationTest.java index a2900e0991..2a80fc5047 100644 --- a/core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperLargeObjectWriteIntegrationTest.java +++ b/core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperLargeObjectWriteIntegrationTest.java @@ -5,6 +5,7 @@ import com.scalar.db.config.DatabaseConfig; import com.scalar.db.storage.objectstorage.blobstorage.BlobStorageConfig; +import com.scalar.db.storage.objectstorage.cloudstorage.CloudStorageConfig; import com.scalar.db.storage.objectstorage.s3.S3Config; import java.util.Arrays; import java.util.Optional; @@ -47,6 +48,13 @@ public void beforeAll() throws ObjectStorageWrapperException { BlobStorageConfig.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, String.valueOf(parallelUploadUnit * 2)); parallelUploadThresholdInBytes = parallelUploadUnit * 2; + } else if (ObjectStorageEnv.isCloudStorage()) { + // Minimum block size must be greater than or equal to 256KB for Cloud Storage + Long parallelUploadUnit = 256 * 1024L; // 256KB + properties.setProperty( + CloudStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, + String.valueOf(parallelUploadUnit)); + parallelUploadThresholdInBytes = parallelUploadUnit * 2; } else if (ObjectStorageEnv.isS3()) { // Minimum part size must be greater than or equal to 5MB for S3 Long parallelUploadUnit = 5 * 1024 * 1024L; // 5MB @@ -59,7 +67,7 @@ public void beforeAll() throws ObjectStorageWrapperException { throw new AssertionError(); } - char[] charArray = new char[(int) parallelUploadThresholdInBytes]; + char[] charArray = new char[(int) parallelUploadThresholdInBytes + 1]; Arrays.fill(charArray, 'a'); testObject1 = new String(charArray); Arrays.fill(charArray, 'b'); @@ -163,14 +171,14 @@ public void update_NonExistingObjectKeyGiven_ShouldThrowPreconditionFailedExcept String objectKey = "non-existing-key"; // Act Assert - assertThatCode(() -> wrapper.update(objectKey, "some-object", "some-version")) + assertThatCode(() -> wrapper.update(objectKey, "some-object", "123456789")) .isInstanceOf(PreconditionFailedException.class); } @Test public void update_WrongVersionGiven_ShouldThrowPreconditionFailedException() { // Arrange - String wrongVersion = "wrong-version"; + String wrongVersion = "123456789"; // Act Assert assertThatCode(() -> wrapper.update(TEST_KEY2, "another-object", wrongVersion)) diff --git a/core/src/main/java/com/scalar/db/common/CoreError.java b/core/src/main/java/com/scalar/db/common/CoreError.java index 87779cd038..8473597b65 100644 --- a/core/src/main/java/com/scalar/db/common/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/CoreError.java @@ -931,6 +931,18 @@ public enum CoreError implements ScalarDbError { "Conditions on indexed columns in cross-partition scan operations are not allowed in the SERIALIZABLE isolation level", "", ""), + OBJECT_STORAGE_CLOUD_STORAGE_SERVICE_ACCOUNT_KEY_NOT_FOUND( + Category.USER_ERROR, + "0263", + "The service account key for Cloud Storage was not found.", + "", + ""), + OBJECT_STORAGE_CLOUD_STORAGE_SERVICE_ACCOUNT_KEY_LOAD_FAILED( + Category.USER_ERROR, + "0264", + "Failed to load the service account key for Cloud Storage.", + "", + ""), // // Errors for the concurrency error category diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorage.java b/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorage.java index 88170ac942..627d7718c9 100644 --- a/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorage.java +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorage.java @@ -154,6 +154,10 @@ public void mutate(List mutations) throws ExecutionException @Override public void close() { - wrapper.close(); + try { + wrapper.close(); + } catch (ObjectStorageWrapperException e) { + logger.warn("Failed to close the ObjectStorageWrapper", e); + } } } diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageAdmin.java b/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageAdmin.java index 74eae7fddc..594cbcefe7 100644 --- a/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageAdmin.java +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageAdmin.java @@ -22,12 +22,15 @@ import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @ThreadSafe public class ObjectStorageAdmin implements DistributedStorageAdmin { public static final String NAMESPACE_METADATA_TABLE = "namespaces"; public static final String TABLE_METADATA_TABLE = "metadata"; + private static final Logger logger = LoggerFactory.getLogger(ObjectStorageAdmin.class); private static final StorageInfo STORAGE_INFO = new StorageInfoImpl( "object_storage", @@ -81,7 +84,11 @@ public StorageInfo getStorageInfo(String namespace) throws ExecutionException { @Override public void close() { - wrapper.close(); + try { + wrapper.close(); + } catch (ObjectStorageWrapperException e) { + logger.warn("Failed to close the ObjectStorageWrapper", e); + } } @Override diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageConfig.java b/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageConfig.java index b795506107..3866e66f43 100644 --- a/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageConfig.java +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageConfig.java @@ -9,13 +9,6 @@ public interface ObjectStorageConfig { */ String getStorageName(); - /** - * Returns the username for authentication. - * - * @return the username - */ - String getUsername(); - /** * Returns the password for authentication. * diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageUtils.java b/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageUtils.java index 8e56a82f52..f95d6cf8c0 100644 --- a/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageUtils.java +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageUtils.java @@ -2,6 +2,7 @@ import com.scalar.db.config.DatabaseConfig; import com.scalar.db.storage.objectstorage.blobstorage.BlobStorageConfig; +import com.scalar.db.storage.objectstorage.cloudstorage.CloudStorageConfig; import com.scalar.db.storage.objectstorage.s3.S3Config; import java.util.Objects; @@ -26,6 +27,8 @@ public static ObjectStorageConfig getObjectStorageConfig(DatabaseConfig database return new BlobStorageConfig(databaseConfig); } else if (Objects.equals(databaseConfig.getStorage(), S3Config.STORAGE_NAME)) { return new S3Config(databaseConfig); + } else if (Objects.equals(databaseConfig.getStorage(), CloudStorageConfig.STORAGE_NAME)) { + return new CloudStorageConfig(databaseConfig); } else { throw new IllegalArgumentException( "Unsupported Object Storage: " + databaseConfig.getStorage()); diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapper.java b/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapper.java index 04958383cf..96e4117c41 100644 --- a/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapper.java +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapper.java @@ -66,7 +66,10 @@ public interface ObjectStorageWrapper { void delete(String key, String version) throws ObjectStorageWrapperException; /** - * Delete objects with the specified prefix from the storage. + * Delete objects with the specified prefix from the storage.
+ *
+ * Attention: This method does not guarantee atomicity and is assumed to be used + * where concurrent operations do not occur. * * @param prefix the prefix of the objects to delete * @throws ObjectStorageWrapperException if an error occurs @@ -74,5 +77,5 @@ public interface ObjectStorageWrapper { void deleteByPrefix(String prefix) throws ObjectStorageWrapperException; /** Close the storage wrapper. */ - void close(); + void close() throws ObjectStorageWrapperException; } diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperException.java b/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperException.java index 7fb8df1240..9590101b7d 100644 --- a/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperException.java +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperException.java @@ -5,4 +5,8 @@ public class ObjectStorageWrapperException extends Exception { public ObjectStorageWrapperException(String message, Throwable cause) { super(message, cause); } + + public ObjectStorageWrapperException(String message) { + super(message); + } } diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperFactory.java b/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperFactory.java index b090efba71..a067e6ce88 100644 --- a/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperFactory.java +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperFactory.java @@ -2,6 +2,8 @@ import com.scalar.db.storage.objectstorage.blobstorage.BlobStorageConfig; import com.scalar.db.storage.objectstorage.blobstorage.BlobStorageWrapper; +import com.scalar.db.storage.objectstorage.cloudstorage.CloudStorageConfig; +import com.scalar.db.storage.objectstorage.cloudstorage.CloudStorageWrapper; import com.scalar.db.storage.objectstorage.s3.S3Config; import com.scalar.db.storage.objectstorage.s3.S3Wrapper; import java.util.Objects; @@ -15,6 +17,10 @@ public static ObjectStorageWrapper create(ObjectStorageConfig objectStorageConfi } else if (Objects.equals(objectStorageConfig.getStorageName(), S3Config.STORAGE_NAME)) { assert objectStorageConfig instanceof S3Config; return new S3Wrapper((S3Config) objectStorageConfig); + } else if (Objects.equals( + objectStorageConfig.getStorageName(), CloudStorageConfig.STORAGE_NAME)) { + assert objectStorageConfig instanceof CloudStorageConfig; + return new CloudStorageWrapper((CloudStorageConfig) objectStorageConfig); } else { throw new IllegalArgumentException( "Unsupported Object Storage: " + objectStorageConfig.getStorageName()); diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/PreconditionFailedException.java b/core/src/main/java/com/scalar/db/storage/objectstorage/PreconditionFailedException.java index 5eec7c0105..a57f50e574 100644 --- a/core/src/main/java/com/scalar/db/storage/objectstorage/PreconditionFailedException.java +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/PreconditionFailedException.java @@ -5,4 +5,8 @@ public class PreconditionFailedException extends ObjectStorageWrapperException { public PreconditionFailedException(String message, Throwable cause) { super(message, cause); } + + public PreconditionFailedException(String message) { + super(message); + } } diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/blobstorage/BlobStorageConfig.java b/core/src/main/java/com/scalar/db/storage/objectstorage/blobstorage/BlobStorageConfig.java index 713e809a74..a2839d9ec5 100644 --- a/core/src/main/java/com/scalar/db/storage/objectstorage/blobstorage/BlobStorageConfig.java +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/blobstorage/BlobStorageConfig.java @@ -84,11 +84,6 @@ public String getStorageName() { return STORAGE_NAME; } - @Override - public String getUsername() { - return username; - } - @Override public String getPassword() { return password; @@ -108,6 +103,10 @@ public String getEndpoint() { return endpoint; } + public String getUsername() { + return username; + } + public Optional getParallelUploadBlockSizeInBytes() { return Optional.ofNullable(parallelUploadBlockSizeInBytes); } diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfig.java b/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfig.java new file mode 100644 index 0000000000..41f2929ef0 --- /dev/null +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfig.java @@ -0,0 +1,98 @@ +package com.scalar.db.storage.objectstorage.cloudstorage; + +import static com.scalar.db.config.ConfigUtils.getInt; + +import com.google.auth.Credentials; +import com.google.auth.oauth2.ServiceAccountCredentials; +import com.scalar.db.common.CoreError; +import com.scalar.db.config.DatabaseConfig; +import com.scalar.db.storage.objectstorage.ObjectStorageConfig; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CloudStorageConfig implements ObjectStorageConfig { + public static final String STORAGE_NAME = "cloud-storage"; + public static final String PREFIX = DatabaseConfig.PREFIX + STORAGE_NAME + "."; + + public static final String PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES = + PREFIX + "parallel_upload_block_size_in_bytes"; + + private static final Logger logger = LoggerFactory.getLogger(CloudStorageConfig.class); + private final String password; + private final String bucket; + private final String metadataNamespace; + private final String projectId; + private final Integer parallelUploadBlockSizeInBytes; + + public CloudStorageConfig(DatabaseConfig databaseConfig) { + String storage = databaseConfig.getStorage(); + if (!storage.equals(STORAGE_NAME)) { + throw new IllegalArgumentException( + DatabaseConfig.STORAGE + " should be '" + STORAGE_NAME + "'"); + } + if (databaseConfig.getContactPoints().isEmpty()) { + throw new IllegalArgumentException("Contact points are not specified."); + } + bucket = databaseConfig.getContactPoints().get(0); + projectId = databaseConfig.getUsername().orElse(null); + password = databaseConfig.getPassword().orElse(null); + metadataNamespace = databaseConfig.getSystemNamespaceName(); + + if (databaseConfig.getScanFetchSize() != DatabaseConfig.DEFAULT_SCAN_FETCH_SIZE) { + logger.warn( + "The configuration property \"" + + DatabaseConfig.SCAN_FETCH_SIZE + + "\" is not applicable to Cloud Storage and will be ignored."); + } + + parallelUploadBlockSizeInBytes = + getInt(databaseConfig.getProperties(), PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, null); + } + + @Override + public String getStorageName() { + return STORAGE_NAME; + } + + @Override + public String getPassword() { + return password; + } + + @Override + public String getBucket() { + return bucket; + } + + @Override + public String getMetadataNamespace() { + return metadataNamespace; + } + + public String getProjectId() { + return projectId; + } + + public Credentials getCredentials() { + String serviceAccountJson = getPassword(); + if (serviceAccountJson == null) { + throw new IllegalArgumentException( + CoreError.OBJECT_STORAGE_CLOUD_STORAGE_SERVICE_ACCOUNT_KEY_NOT_FOUND.buildMessage()); + } + try (ByteArrayInputStream keyStream = + new ByteArrayInputStream(serviceAccountJson.getBytes(StandardCharsets.UTF_8))) { + return ServiceAccountCredentials.fromStream(keyStream); + } catch (IOException e) { + throw new IllegalArgumentException( + CoreError.OBJECT_STORAGE_CLOUD_STORAGE_SERVICE_ACCOUNT_KEY_LOAD_FAILED.buildMessage()); + } + } + + public Optional getParallelUploadBlockSizeInBytes() { + return Optional.ofNullable(parallelUploadBlockSizeInBytes); + } +} diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageErrorCode.java b/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageErrorCode.java new file mode 100644 index 0000000000..807556fa37 --- /dev/null +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageErrorCode.java @@ -0,0 +1,16 @@ +package com.scalar.db.storage.objectstorage.cloudstorage; + +enum CloudStorageErrorCode { + NOT_FOUND(404), + PRECONDITION_FAILED(412); + + private final int code; + + CloudStorageErrorCode(int code) { + this.code = code; + } + + public int get() { + return this.code; + } +} diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageProvider.java b/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageProvider.java new file mode 100644 index 0000000000..632d2a410a --- /dev/null +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageProvider.java @@ -0,0 +1,11 @@ +package com.scalar.db.storage.objectstorage.cloudstorage; + +import com.scalar.db.storage.objectstorage.ObjectStorageProvider; + +public class CloudStorageProvider implements ObjectStorageProvider { + + @Override + public String getName() { + return CloudStorageConfig.STORAGE_NAME; + } +} diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageWrapper.java b/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageWrapper.java new file mode 100644 index 0000000000..ab89310ed7 --- /dev/null +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageWrapper.java @@ -0,0 +1,221 @@ +package com.scalar.db.storage.objectstorage.cloudstorage; + +import com.google.cloud.WriteChannel; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageBatch; +import com.google.cloud.storage.StorageException; +import com.google.cloud.storage.StorageOptions; +import com.google.common.annotations.VisibleForTesting; +import com.scalar.db.storage.objectstorage.ObjectStorageWrapper; +import com.scalar.db.storage.objectstorage.ObjectStorageWrapperException; +import com.scalar.db.storage.objectstorage.ObjectStorageWrapperResponse; +import com.scalar.db.storage.objectstorage.PreconditionFailedException; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import javax.annotation.concurrent.ThreadSafe; + +@ThreadSafe +public class CloudStorageWrapper implements ObjectStorageWrapper { + // Batch API has a limit of 100 operations per request + public static final int BATCH_DELETE_SIZE_LIMIT = 100; + + private final Storage storage; + private final String bucket; + private final Integer parallelUploadBlockSizeInBytes; + + public CloudStorageWrapper(CloudStorageConfig config) { + storage = + StorageOptions.newBuilder() + .setProjectId(config.getProjectId()) + .setCredentials(config.getCredentials()) + .build() + .getService(); + bucket = config.getBucket(); + parallelUploadBlockSizeInBytes = config.getParallelUploadBlockSizeInBytes().orElse(null); + } + + @VisibleForTesting + @SuppressFBWarnings("EI_EXPOSE_REP2") + public CloudStorageWrapper(CloudStorageConfig config, Storage storage) { + this.storage = storage; + this.bucket = config.getBucket(); + parallelUploadBlockSizeInBytes = config.getParallelUploadBlockSizeInBytes().orElse(null); + } + + @Override + public Optional get(String key) + throws ObjectStorageWrapperException { + try { + Blob blob = storage.get(BlobId.of(bucket, key)); + if (blob == null) { + return Optional.empty(); + } + String payload = new String(blob.getContent(), StandardCharsets.UTF_8); + String generation = String.valueOf(blob.getGeneration()); + return Optional.of(new ObjectStorageWrapperResponse(payload, generation)); + } catch (Exception e) { + throw new ObjectStorageWrapperException( + String.format("Failed to get the object with key '%s'", key), e); + } + } + + @Override + public Set getKeys(String prefix) throws ObjectStorageWrapperException { + try { + Iterable blobs = + storage.list(bucket, Storage.BlobListOption.prefix(prefix)).iterateAll(); + return StreamSupport.stream(blobs.spliterator(), false) + .map(Blob::getName) + .collect(Collectors.toSet()); + } catch (Exception e) { + throw new ObjectStorageWrapperException( + String.format("Failed to get the object keys with prefix '%s'", prefix), e); + } + } + + @Override + public void insert(String key, String object) throws ObjectStorageWrapperException { + try { + Storage.BlobWriteOption precondition = Storage.BlobWriteOption.doesNotExist(); + writeData(key, object, precondition); + } catch (StorageException e) { + if (e.getCode() == CloudStorageErrorCode.PRECONDITION_FAILED.get()) { + throw new PreconditionFailedException( + String.format( + "Failed to insert the object with key '%s' due to precondition failure", key), + e); + } + throw new ObjectStorageWrapperException( + String.format("Failed to insert the object with key '%s'", key), e); + } catch (Exception e) { + throw new ObjectStorageWrapperException( + String.format("Failed to insert the object with key '%s'", key), e); + } + } + + @Override + public void update(String key, String object, String version) + throws ObjectStorageWrapperException { + try { + Storage.BlobWriteOption precondition = + Storage.BlobWriteOption.generationMatch(Long.parseLong(version)); + writeData(key, object, precondition); + } catch (StorageException e) { + if (e.getCode() == CloudStorageErrorCode.PRECONDITION_FAILED.get()) { + throw new PreconditionFailedException( + String.format( + "Failed to update the object with key '%s' due to precondition failure", key), + e); + } + throw new ObjectStorageWrapperException( + String.format("Failed to update the object with key '%s'", key), e); + } catch (Exception e) { + throw new ObjectStorageWrapperException( + String.format("Failed to update the object with key '%s'", key), e); + } + } + + @Override + public void delete(String key) throws ObjectStorageWrapperException { + try { + if (!storage.delete(BlobId.of(bucket, key))) { + throw new PreconditionFailedException( + String.format( + "Failed to delete the object with key '%s' due to precondition failure", key)); + } + } catch (PreconditionFailedException e) { + throw e; + } catch (Exception e) { + throw new ObjectStorageWrapperException( + String.format("Failed to delete the object with key '%s'", key), e); + } + } + + @Override + public void delete(String key, String version) throws ObjectStorageWrapperException { + try { + if (!storage.delete( + BlobId.of(bucket, key), + Storage.BlobSourceOption.generationMatch(Long.parseLong(version)))) { + throw new PreconditionFailedException( + String.format( + "Failed to delete the object with key '%s' due to precondition failure", key)); + } + } catch (PreconditionFailedException e) { + throw e; + } catch (StorageException e) { + if (e.getCode() == CloudStorageErrorCode.PRECONDITION_FAILED.get()) { + throw new PreconditionFailedException( + String.format( + "Failed to delete the object with key '%s' due to precondition failure", key), + e); + } + throw new ObjectStorageWrapperException( + String.format("Failed to delete the object with key '%s'", key), e); + } catch (Exception e) { + throw new ObjectStorageWrapperException( + String.format("Failed to delete the object with key '%s'", key), e); + } + } + + @Override + public void deleteByPrefix(String prefix) throws ObjectStorageWrapperException { + try { + // Collect all blob IDs with the specified prefix + Iterable blobs = + storage.list(bucket, Storage.BlobListOption.prefix(prefix)).iterateAll(); + List blobIds = + StreamSupport.stream(blobs.spliterator(), false) + .map(blob -> BlobId.of(bucket, blob.getName())) + .collect(Collectors.toList()); + // Delete blobs in batches + for (int i = 0; i < blobIds.size(); i += BATCH_DELETE_SIZE_LIMIT) { + int endIndex = Math.min(i + BATCH_DELETE_SIZE_LIMIT, blobIds.size()); + List batch = blobIds.subList(i, endIndex); + StorageBatch storageBatch = storage.batch(); + for (BlobId blobId : batch) { + storageBatch.delete(blobId); + } + storageBatch.submit(); + } + } catch (Exception e) { + throw new ObjectStorageWrapperException( + String.format("Failed to delete the objects with prefix '%s'", prefix), e); + } + } + + @Override + public void close() throws ObjectStorageWrapperException { + try { + storage.close(); + } catch (Exception e) { + throw new ObjectStorageWrapperException("Failed to close the storage wrapper", e); + } + } + + private void writeData(String key, String object, Storage.BlobWriteOption precondition) + throws IOException { + byte[] data = object.getBytes(StandardCharsets.UTF_8); + BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(bucket, key)).build(); + + try (WriteChannel writer = storage.writer(blobInfo, precondition)) { + if (parallelUploadBlockSizeInBytes != null) { + writer.setChunkSize(parallelUploadBlockSizeInBytes); + } + ByteBuffer buffer = ByteBuffer.wrap(data); + while (buffer.hasRemaining()) { + writer.write(buffer); + } + } + } +} diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/s3/S3Config.java b/core/src/main/java/com/scalar/db/storage/objectstorage/s3/S3Config.java index 5cd1a98e8c..3312f92a22 100644 --- a/core/src/main/java/com/scalar/db/storage/objectstorage/s3/S3Config.java +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/s3/S3Config.java @@ -26,8 +26,6 @@ public class S3Config implements ObjectStorageConfig { PREFIX + "parallel_upload_threshold_in_bytes"; public static final String REQUEST_TIMEOUT_IN_SECONDS = PREFIX + "request_timeout_in_seconds"; - public static final int DEFAULT_REQUEST_TIMEOUT_IN_SECONDS = 15; - private static final Logger logger = LoggerFactory.getLogger(S3Config.class); private final String username; private final String password; @@ -90,11 +88,6 @@ public String getStorageName() { return STORAGE_NAME; } - @Override - public String getUsername() { - return username; - } - @Override public String getPassword() { return password; @@ -114,6 +107,10 @@ public String getRegion() { return region; } + public String getUsername() { + return username; + } + public Optional getParallelUploadBlockSizeInBytes() { return Optional.ofNullable(parallelUploadBlockSizeInBytes); } diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/s3/S3Wrapper.java b/core/src/main/java/com/scalar/db/storage/objectstorage/s3/S3Wrapper.java index 71e0052e2c..2f7342d2b4 100644 --- a/core/src/main/java/com/scalar/db/storage/objectstorage/s3/S3Wrapper.java +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/s3/S3Wrapper.java @@ -35,7 +35,9 @@ @ThreadSafe public class S3Wrapper implements ObjectStorageWrapper { + // DeleteObjects API has a limit of 1000 objects per request public static final int BATCH_DELETE_SIZE_LIMIT = 1000; + private final S3AsyncClient client; private final String bucket; @@ -281,8 +283,12 @@ public void deleteByPrefix(String prefix) throws ObjectStorageWrapperException { } @Override - public void close() { - client.close(); + public void close() throws ObjectStorageWrapperException { + try { + client.close(); + } catch (Exception e) { + throw new ObjectStorageWrapperException("Failed to close the storage wrapper", e); + } } private Optional findS3Exception(Throwable throwable) { diff --git a/core/src/main/resources/META-INF/services/com.scalar.db.api.DistributedStorageProvider b/core/src/main/resources/META-INF/services/com.scalar.db.api.DistributedStorageProvider index 90486ebde4..d26885ba06 100644 --- a/core/src/main/resources/META-INF/services/com.scalar.db.api.DistributedStorageProvider +++ b/core/src/main/resources/META-INF/services/com.scalar.db.api.DistributedStorageProvider @@ -3,5 +3,6 @@ com.scalar.db.storage.cosmos.CosmosProvider com.scalar.db.storage.dynamo.DynamoProvider com.scalar.db.storage.jdbc.JdbcProvider com.scalar.db.storage.objectstorage.blobstorage.BlobStorageProvider +com.scalar.db.storage.objectstorage.cloudstorage.CloudStorageProvider com.scalar.db.storage.objectstorage.s3.S3Provider com.scalar.db.storage.multistorage.MultiStorageProvider diff --git a/core/src/test/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfigTest.java b/core/src/test/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfigTest.java new file mode 100644 index 0000000000..254e695427 --- /dev/null +++ b/core/src/test/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfigTest.java @@ -0,0 +1,88 @@ +package com.scalar.db.storage.objectstorage.cloudstorage; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.scalar.db.config.DatabaseConfig; +import java.util.Properties; +import org.junit.jupiter.api.Test; + +public class CloudStorageConfigTest { + private static final String ANY_PROJECT_ID = "any_project_id"; + private static final String ANY_PASSWORD = "any_password"; + private static final String ANY_BUCKET = "bucket"; + private static final String ANY_CONTACT_POINT = ANY_BUCKET; + private static final String CloudStorage_STORAGE = "cloud-storage"; + private static final String ANY_TABLE_METADATA_NAMESPACE = "any_namespace"; + private static final String ANY_PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES = "5242880"; // 5MB + + @Test + public void constructor_AllPropertiesGiven_ShouldLoadProperly() { + // Arrange + Properties props = new Properties(); + props.setProperty(DatabaseConfig.CONTACT_POINTS, ANY_CONTACT_POINT); + props.setProperty(DatabaseConfig.USERNAME, ANY_PROJECT_ID); + props.setProperty(DatabaseConfig.PASSWORD, ANY_PASSWORD); + props.setProperty(DatabaseConfig.STORAGE, CloudStorage_STORAGE); + props.setProperty(DatabaseConfig.SYSTEM_NAMESPACE_NAME, ANY_TABLE_METADATA_NAMESPACE); + props.setProperty( + CloudStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, + ANY_PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES); + + // Act + CloudStorageConfig config = new CloudStorageConfig(new DatabaseConfig(props)); + + // Assert + assertThat(config.getProjectId()).isEqualTo(ANY_PROJECT_ID); + assertThat(config.getBucket()).isEqualTo(ANY_BUCKET); + assertThat(config.getPassword()).isEqualTo(ANY_PASSWORD); + assertThat(config.getMetadataNamespace()).isEqualTo(ANY_TABLE_METADATA_NAMESPACE); + assertThat(config.getParallelUploadBlockSizeInBytes()).isNotEmpty(); + assertThat(config.getParallelUploadBlockSizeInBytes().get()).isEqualTo(5242880); + } + + @Test + public void constructor_PropertiesWithoutNonMandatoryOptionsGiven_ShouldLoadProperly() { + // Arrange + Properties props = new Properties(); + props.setProperty(DatabaseConfig.CONTACT_POINTS, ANY_CONTACT_POINT); + props.setProperty(DatabaseConfig.USERNAME, ANY_PROJECT_ID); + props.setProperty(DatabaseConfig.PASSWORD, ANY_PASSWORD); + props.setProperty(DatabaseConfig.STORAGE, CloudStorage_STORAGE); + + // Act + CloudStorageConfig config = new CloudStorageConfig(new DatabaseConfig(props)); + + // Assert + assertThat(config.getProjectId()).isEqualTo(ANY_PROJECT_ID); + assertThat(config.getBucket()).isEqualTo(ANY_BUCKET); + assertThat(config.getPassword()).isEqualTo(ANY_PASSWORD); + assertThat(config.getMetadataNamespace()) + .isEqualTo(DatabaseConfig.DEFAULT_SYSTEM_NAMESPACE_NAME); + assertThat(config.getParallelUploadBlockSizeInBytes()).isEmpty(); + } + + @Test + public void constructor_WithoutStorage_ShouldThrowIllegalArgumentException() { + // Arrange + Properties props = new Properties(); + props.setProperty(DatabaseConfig.CONTACT_POINTS, ANY_CONTACT_POINT); + + // Act Assert + assertThatThrownBy(() -> new CloudStorageConfig(new DatabaseConfig(props))) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void + constructor_PropertiesWithEmptyContactPointsGiven_ShouldThrowIllegalArgumentException() { + // Arrange + Properties props = new Properties(); + props.setProperty(DatabaseConfig.CONTACT_POINTS, ""); + props.setProperty(DatabaseConfig.STORAGE, CloudStorage_STORAGE); + + // Act Assert + assertThatThrownBy(() -> new CloudStorageConfig(new DatabaseConfig(props))) + .isInstanceOf(IllegalArgumentException.class); + } +} diff --git a/core/src/test/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageWrapperTest.java b/core/src/test/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageWrapperTest.java new file mode 100644 index 0000000000..62e0f5de98 --- /dev/null +++ b/core/src/test/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageWrapperTest.java @@ -0,0 +1,435 @@ +package com.scalar.db.storage.objectstorage.cloudstorage; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.gax.paging.Page; +import com.google.cloud.WriteChannel; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageBatch; +import com.google.cloud.storage.StorageBatchResult; +import com.google.cloud.storage.StorageException; +import com.scalar.db.storage.objectstorage.ObjectStorageWrapperException; +import com.scalar.db.storage.objectstorage.ObjectStorageWrapperResponse; +import com.scalar.db.storage.objectstorage.PreconditionFailedException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import java.util.Set; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class CloudStorageWrapperTest { + private static final String METADATA_NAMESPACE = "scalardb"; + private static final String PROJECT_ID = "project_id"; + private static final String BUCKET = "bucket"; + private static final String ANY_OBJECT_KEY = "any_object_key"; + private static final String ANY_PREFIX = "any_prefix/"; + private static final String ANY_DATA = "any_data"; + private static final long ANY_GENERATION = 12345L; + + @Mock private CloudStorageConfig config; + @Mock private Storage storage; + private CloudStorageWrapper wrapper; + + @BeforeEach + public void setUp() throws Exception { + MockitoAnnotations.openMocks(this).close(); + + when(config.getMetadataNamespace()).thenReturn(METADATA_NAMESPACE); + when(config.getProjectId()).thenReturn(PROJECT_ID); + when(config.getBucket()).thenReturn(BUCKET); + when(config.getParallelUploadBlockSizeInBytes()).thenReturn(Optional.empty()); + wrapper = new CloudStorageWrapper(config, storage); + } + + @Test + public void get_ExistingObjectKeyGiven_ShouldReturnObjectData() throws Exception { + // Arrange + BlobId blobId = BlobId.of(BUCKET, ANY_OBJECT_KEY); + Blob blob = mock(Blob.class); + when(storage.get(blobId)).thenReturn(blob); + when(blob.getContent()).thenReturn(ANY_DATA.getBytes(StandardCharsets.UTF_8)); + when(blob.getGeneration()).thenReturn(ANY_GENERATION); + + // Act + Optional result = wrapper.get(ANY_OBJECT_KEY); + + // Assert + verify(storage).get(blobId); + assertThat(result).isPresent(); + assertThat(result.get().getPayload()).isEqualTo(ANY_DATA); + assertThat(result.get().getVersion()).isEqualTo(String.valueOf(ANY_GENERATION)); + } + + @Test + public void get_NonExistingObjectKeyGiven_ShouldReturnEmptyOptional() throws Exception { + // Arrange + BlobId blobId = BlobId.of(BUCKET, ANY_OBJECT_KEY); + when(storage.get(blobId)).thenReturn(null); + + // Act + Optional result = wrapper.get(ANY_OBJECT_KEY); + + // Assert + assertThat(result).isNotPresent(); + } + + @Test + public void get_StorageExceptionThrown_ShouldThrowObjectStorageWrapperException() { + // Arrange + BlobId blobId = BlobId.of(BUCKET, ANY_OBJECT_KEY); + when(storage.get(blobId)).thenThrow(new StorageException(500, "Any Error")); + + // Act Assert + assertThatCode(() -> wrapper.get(ANY_OBJECT_KEY)) + .isInstanceOf(ObjectStorageWrapperException.class); + } + + @Test + public void getKeys_PrefixGiven_ShouldReturnObjectKeys() throws Exception { + // Arrange + String objectKey1 = ANY_PREFIX + "test1/object1"; + String objectKey2 = ANY_PREFIX + "test1/object2"; + String objectKey3 = ANY_PREFIX + "test2/object3"; + + Blob blob1 = mock(Blob.class); + Blob blob2 = mock(Blob.class); + Blob blob3 = mock(Blob.class); + when(blob1.getName()).thenReturn(objectKey1); + when(blob2.getName()).thenReturn(objectKey2); + when(blob3.getName()).thenReturn(objectKey3); + + @SuppressWarnings("unchecked") + Page page = mock(Page.class); + when(storage.list(eq(BUCKET), any(Storage.BlobListOption.class))).thenReturn(page); + when(page.iterateAll()).thenReturn(Arrays.asList(blob1, blob2, blob3)); + + // Act + Set result = wrapper.getKeys(ANY_PREFIX); + + // Assert + assertThat(result).containsExactlyInAnyOrder(objectKey1, objectKey2, objectKey3); + } + + @Test + public void getKeys_NoObjectsWithPrefix_ShouldReturnEmptySet() throws Exception { + // Arrange + @SuppressWarnings("unchecked") + Page page = mock(Page.class); + when(storage.list(eq(BUCKET), any(Storage.BlobListOption.class))).thenReturn(page); + when(page.iterateAll()).thenReturn(Collections.emptyList()); + + // Act + Set result = wrapper.getKeys(ANY_PREFIX); + + // Assert + assertThat(result).isEmpty(); + } + + @Test + public void getKeys_StorageExceptionThrown_ShouldThrowObjectStorageWrapperException() { + // Arrange + when(storage.list(eq(BUCKET), any(Storage.BlobListOption.class))) + .thenThrow(new StorageException(500, "Any Error")); + + // Act Assert + assertThatCode(() -> wrapper.getKeys(ANY_PREFIX)) + .isInstanceOf(ObjectStorageWrapperException.class); + } + + @Test + public void insert_NonExistingObjectKeyGiven_ShouldInsertObject() throws Exception { + // Arrange + WriteChannel writeChannel = mock(WriteChannel.class); + doAnswer( + invocation -> { + ByteBuffer buffer = invocation.getArgument(0); + int remaining = buffer.remaining(); + buffer.position(buffer.limit()); + return remaining; + }) + .when(writeChannel) + .write(any(ByteBuffer.class)); + when(storage.writer(any(BlobInfo.class), any(Storage.BlobWriteOption.class))) + .thenReturn(writeChannel); + + // Act + wrapper.insert(ANY_OBJECT_KEY, ANY_DATA); + + // Assert + verify(storage).writer(any(BlobInfo.class), any(Storage.BlobWriteOption.class)); + } + + @Test + public void insert_PreconditionFailed_ShouldThrowPreconditionFailedException() { + // Arrange + when(storage.writer(any(BlobInfo.class), any(Storage.BlobWriteOption.class))) + .thenThrow(new StorageException(412, "Any Error")); + + // Act Assert + assertThatCode(() -> wrapper.insert(ANY_OBJECT_KEY, ANY_DATA)) + .isInstanceOf(PreconditionFailedException.class); + } + + @Test + public void insert_OtherStorageExceptionThrown_ShouldThrowObjectStorageWrapperException() { + // Arrange + when(storage.writer(any(BlobInfo.class), any(Storage.BlobWriteOption.class))) + .thenThrow(new StorageException(500, "Any Error")); + + // Act Assert + assertThatCode(() -> wrapper.insert(ANY_OBJECT_KEY, ANY_DATA)) + .isInstanceOf(ObjectStorageWrapperException.class); + } + + @Test + public void update_ExistingObjectKeyGiven_ShouldUpdateObject() throws Exception { + // Arrange + WriteChannel writeChannel = mock(WriteChannel.class); + doAnswer( + invocation -> { + ByteBuffer buffer = invocation.getArgument(0); + int remaining = buffer.remaining(); + buffer.position(buffer.limit()); + return remaining; + }) + .when(writeChannel) + .write(any(ByteBuffer.class)); + when(storage.writer(any(BlobInfo.class), any(Storage.BlobWriteOption.class))) + .thenReturn(writeChannel); + + // Act + wrapper.update(ANY_OBJECT_KEY, ANY_DATA, String.valueOf(ANY_GENERATION)); + + // Assert + verify(storage).writer(any(BlobInfo.class), any(Storage.BlobWriteOption.class)); + } + + @Test + public void update_PreconditionFailed_ShouldThrowPreconditionFailedException() { + // Arrange + when(storage.writer(any(BlobInfo.class), any(Storage.BlobWriteOption.class))) + .thenThrow(new StorageException(412, "Any Error")); + + // Act Assert + assertThatCode(() -> wrapper.update(ANY_OBJECT_KEY, ANY_DATA, String.valueOf(ANY_GENERATION))) + .isInstanceOf(PreconditionFailedException.class); + } + + @Test + public void update_OtherStorageExceptionThrown_ShouldThrowObjectStorageWrapperException() { + // Arrange + when(storage.writer(any(BlobInfo.class), any(Storage.BlobWriteOption.class))) + .thenThrow(new StorageException(500, "Any Error")); + + // Act Assert + assertThatCode(() -> wrapper.update(ANY_OBJECT_KEY, ANY_DATA, String.valueOf(ANY_GENERATION))) + .isInstanceOf(ObjectStorageWrapperException.class); + } + + @Test + public void delete_ExistingObjectKeyGiven_ShouldDeleteObject() throws Exception { + // Arrange + BlobId blobId = BlobId.of(BUCKET, ANY_OBJECT_KEY); + when(storage.delete(blobId)).thenReturn(true); + + // Act + wrapper.delete(ANY_OBJECT_KEY); + + // Assert + verify(storage).delete(blobId); + } + + @Test + public void delete_NonExistingObjectKeyGiven_ShouldThrowPreconditionFailedException() { + // Arrange + BlobId blobId = BlobId.of(BUCKET, ANY_OBJECT_KEY); + when(storage.delete(blobId)).thenReturn(false); + + // Act Assert + assertThatCode(() -> wrapper.delete(ANY_OBJECT_KEY)) + .isInstanceOf(PreconditionFailedException.class); + } + + @Test + public void delete_OtherStorageExceptionThrown_ShouldThrowObjectStorageWrapperException() { + // Arrange + BlobId blobId = BlobId.of(BUCKET, ANY_OBJECT_KEY); + when(storage.delete(blobId)).thenThrow(new StorageException(500, "Any Error")); + + // Act Assert + assertThatCode(() -> wrapper.delete(ANY_OBJECT_KEY)) + .isInstanceOf(ObjectStorageWrapperException.class); + } + + @Test + public void delete_WithVersion_ExistingObjectKeyGiven_ShouldDeleteObject() throws Exception { + // Arrange + BlobId blobId = BlobId.of(BUCKET, ANY_OBJECT_KEY); + when(storage.delete(eq(blobId), any(Storage.BlobSourceOption.class))).thenReturn(true); + + // Act + wrapper.delete(ANY_OBJECT_KEY, String.valueOf(ANY_GENERATION)); + + // Assert + verify(storage).delete(eq(blobId), any(Storage.BlobSourceOption.class)); + } + + @Test + public void + delete_WithVersion_NonExistingObjectKeyGiven_ShouldThrowPreconditionFailedException() { + // Arrange + BlobId blobId = BlobId.of(BUCKET, ANY_OBJECT_KEY); + when(storage.delete(eq(blobId), any(Storage.BlobSourceOption.class))).thenReturn(false); + + // Act Assert + assertThatCode(() -> wrapper.delete(ANY_OBJECT_KEY, String.valueOf(ANY_GENERATION))) + .isInstanceOf(PreconditionFailedException.class); + } + + @Test + public void delete_WithVersion_PreconditionFailed_ShouldThrowPreconditionFailedException() { + // Arrange + BlobId blobId = BlobId.of(BUCKET, ANY_OBJECT_KEY); + when(storage.delete(eq(blobId), any(Storage.BlobSourceOption.class))) + .thenThrow(new StorageException(412, "Any Error")); + + // Act Assert + assertThatCode(() -> wrapper.delete(ANY_OBJECT_KEY, String.valueOf(ANY_GENERATION))) + .isInstanceOf(PreconditionFailedException.class); + } + + @Test + public void + delete_WithVersion_OtherStorageExceptionThrown_ShouldThrowObjectStorageWrapperException() { + // Arrange + BlobId blobId = BlobId.of(BUCKET, ANY_OBJECT_KEY); + when(storage.delete(eq(blobId), any(Storage.BlobSourceOption.class))) + .thenThrow(new StorageException(500, "Any Error")); + + // Act Assert + assertThatCode(() -> wrapper.delete(ANY_OBJECT_KEY, String.valueOf(ANY_GENERATION))) + .isInstanceOf(ObjectStorageWrapperException.class); + } + + @Test + public void deleteByPrefix_PrefixGiven_ShouldDeleteAllObjectsWithThePrefix() throws Exception { + // Arrange + String objectKey1 = ANY_PREFIX + "test1/object1"; + String objectKey2 = ANY_PREFIX + "test1/object2"; + String objectKey3 = ANY_PREFIX + "test2/object3"; + + Blob blob1 = mock(Blob.class); + Blob blob2 = mock(Blob.class); + Blob blob3 = mock(Blob.class); + when(blob1.getName()).thenReturn(objectKey1); + when(blob2.getName()).thenReturn(objectKey2); + when(blob3.getName()).thenReturn(objectKey3); + + @SuppressWarnings("unchecked") + Page page = mock(Page.class); + when(storage.list(eq(BUCKET), any(Storage.BlobListOption.class))).thenReturn(page); + when(page.iterateAll()).thenReturn(Arrays.asList(blob1, blob2, blob3)); + + StorageBatch batch = mock(StorageBatch.class); + when(storage.batch()).thenReturn(batch); + @SuppressWarnings("unchecked") + StorageBatchResult batchResult = mock(StorageBatchResult.class); + doReturn(batchResult).when(batch).delete(any(BlobId.class)); + + // Act + wrapper.deleteByPrefix(ANY_PREFIX); + + // Assert + verify(storage).batch(); + verify(batch).submit(); + } + + @Test + public void deleteByPrefix_MultiplePagesGiven_ShouldDeleteAllObjectsAcrossPages() + throws Exception { + // Arrange + String objectKey1 = ANY_PREFIX + "test1/object1"; + String objectKey2 = ANY_PREFIX + "test1/object2"; + String objectKey3 = ANY_PREFIX + "test2/object3"; + + Blob blob1 = mock(Blob.class); + Blob blob2 = mock(Blob.class); + Blob blob3 = mock(Blob.class); + when(blob1.getName()).thenReturn(objectKey1); + when(blob2.getName()).thenReturn(objectKey2); + when(blob3.getName()).thenReturn(objectKey3); + + // Mock with iterateAll() that returns all blobs across pages + @SuppressWarnings("unchecked") + Page page = mock(Page.class); + when(storage.list(eq(BUCKET), any(Storage.BlobListOption.class))).thenReturn(page); + when(page.iterateAll()).thenReturn(Arrays.asList(blob1, blob2, blob3)); + + StorageBatch batch = mock(StorageBatch.class); + when(storage.batch()).thenReturn(batch); + @SuppressWarnings("unchecked") + StorageBatchResult batchResult = mock(StorageBatchResult.class); + doReturn(batchResult).when(batch).delete(any(BlobId.class)); + + // Act + wrapper.deleteByPrefix(ANY_PREFIX); + + // Assert + verify(storage).batch(); + verify(batch).submit(); + } + + @Test + public void deleteByPrefix_NoObjectsWithPrefix_ShouldDoNothing() throws Exception { + // Arrange + @SuppressWarnings("unchecked") + Page page = mock(Page.class); + when(storage.list(eq(BUCKET), any(Storage.BlobListOption.class))).thenReturn(page); + when(page.iterateAll()).thenReturn(Collections.emptyList()); + + // Act + wrapper.deleteByPrefix(ANY_PREFIX); + + // Assert + verify(storage, never()).batch(); + } + + @Test + public void deleteByPrefix_StorageExceptionThrown_ShouldThrowObjectStorageWrapperException() { + // Arrange + when(storage.list(eq(BUCKET), any(Storage.BlobListOption.class))) + .thenThrow(new StorageException(500, "Any Error")); + + // Act Assert + assertThatCode(() -> wrapper.deleteByPrefix(ANY_PREFIX)) + .isInstanceOf(ObjectStorageWrapperException.class); + } + + @Test + public void close_ShouldCloseTheStorage() throws Exception { + // Arrange + + // Act + wrapper.close(); + + // Assert + verify(storage).close(); + } +} diff --git a/core/src/test/java/com/scalar/db/storage/objectstorage/s3/S3WrapperTest.java b/core/src/test/java/com/scalar/db/storage/objectstorage/s3/S3WrapperTest.java index a4996ffd11..d0238465ac 100644 --- a/core/src/test/java/com/scalar/db/storage/objectstorage/s3/S3WrapperTest.java +++ b/core/src/test/java/com/scalar/db/storage/objectstorage/s3/S3WrapperTest.java @@ -47,10 +47,10 @@ public class S3WrapperTest { private static final String METADATA_NAMESPACE = "scalardb"; private static final String REGION = "us-west-2"; private static final String BUCKET = "bucket"; - private static final String ANY_OBJECT_KEY = "any-object-key"; - private static final String ANY_PREFIX = "any-prefix/"; - private static final String ANY_DATA = "any-data"; - private static final String ANY_ETAG = "any-etag"; + private static final String ANY_OBJECT_KEY = "any_object_key"; + private static final String ANY_PREFIX = "any_prefix/"; + private static final String ANY_DATA = "any_data"; + private static final String ANY_ETAG = "any_etag"; @Mock private S3Config config; @Mock private S3AsyncClient client; @@ -552,7 +552,7 @@ public void deleteByPrefix_S3ExceptionThrown_ShouldThrowObjectStorageWrapperExce } @Test - public void close_ShouldCloseTheClient() { + public void close_ShouldCloseTheClient() throws Exception { // Arrange // Act From c044507d15338945498e11706abd388ed2f293f5 Mon Sep 17 00:00:00 2001 From: Kodai Doki Date: Wed, 19 Nov 2025 20:23:07 +0900 Subject: [PATCH 2/2] Resolve conflicts --- .../objectstorage/cloudstorage/CloudStorageConfig.java | 8 +++++++- .../cloudstorage/CloudStorageConfigTest.java | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfig.java b/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfig.java index 41f2929ef0..ab80128269 100644 --- a/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfig.java +++ b/core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfig.java @@ -1,6 +1,7 @@ package com.scalar.db.storage.objectstorage.cloudstorage; import static com.scalar.db.config.ConfigUtils.getInt; +import static com.scalar.db.config.ConfigUtils.getString; import com.google.auth.Credentials; import com.google.auth.oauth2.ServiceAccountCredentials; @@ -17,6 +18,7 @@ public class CloudStorageConfig implements ObjectStorageConfig { public static final String STORAGE_NAME = "cloud-storage"; public static final String PREFIX = DatabaseConfig.PREFIX + STORAGE_NAME + "."; + public static final String TABLE_METADATA_NAMESPACE = PREFIX + "table_metadata.namespace"; public static final String PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES = PREFIX + "parallel_upload_block_size_in_bytes"; @@ -40,7 +42,11 @@ public CloudStorageConfig(DatabaseConfig databaseConfig) { bucket = databaseConfig.getContactPoints().get(0); projectId = databaseConfig.getUsername().orElse(null); password = databaseConfig.getPassword().orElse(null); - metadataNamespace = databaseConfig.getSystemNamespaceName(); + metadataNamespace = + getString( + databaseConfig.getProperties(), + TABLE_METADATA_NAMESPACE, + DatabaseConfig.DEFAULT_SYSTEM_NAMESPACE_NAME); if (databaseConfig.getScanFetchSize() != DatabaseConfig.DEFAULT_SCAN_FETCH_SIZE) { logger.warn( diff --git a/core/src/test/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfigTest.java b/core/src/test/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfigTest.java index 254e695427..fd83e46dda 100644 --- a/core/src/test/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfigTest.java +++ b/core/src/test/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfigTest.java @@ -24,7 +24,7 @@ public void constructor_AllPropertiesGiven_ShouldLoadProperly() { props.setProperty(DatabaseConfig.USERNAME, ANY_PROJECT_ID); props.setProperty(DatabaseConfig.PASSWORD, ANY_PASSWORD); props.setProperty(DatabaseConfig.STORAGE, CloudStorage_STORAGE); - props.setProperty(DatabaseConfig.SYSTEM_NAMESPACE_NAME, ANY_TABLE_METADATA_NAMESPACE); + props.setProperty(CloudStorageConfig.TABLE_METADATA_NAMESPACE, ANY_TABLE_METADATA_NAMESPACE); props.setProperty( CloudStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, ANY_PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES);