Skip to content

Commit 4318570

Browse files
authored
Fix option issues in Object Storage adapter (#3237)
1 parent 7d4b2b9 commit 4318570

File tree

12 files changed

+167
-166
lines changed

12 files changed

+167
-166
lines changed

core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageEnv.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,17 @@ public static Properties getPropertiesWithPerformanceOptions(String testName) {
5353
Properties properties = getProperties(testName);
5454

5555
// For Blob Storage
56-
properties.setProperty(BlobStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, "5242880"); // 5MB
57-
properties.setProperty(BlobStorageConfig.PARALLEL_UPLOAD_MAX_PARALLELISM, "4");
56+
properties.setProperty(BlobStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_BYTES, "5242880"); // 5MB
57+
properties.setProperty(BlobStorageConfig.PARALLEL_UPLOAD_MAX_CONCURRENCY, "4");
5858
properties.setProperty(
59-
BlobStorageConfig.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, "10485760"); // 10MB
60-
properties.setProperty(BlobStorageConfig.REQUEST_TIMEOUT_IN_SECONDS, "30");
59+
BlobStorageConfig.PARALLEL_UPLOAD_THRESHOLD_SIZE_BYTES, "10485760"); // 10MB
60+
properties.setProperty(BlobStorageConfig.REQUEST_TIMEOUT_SECS, "30");
6161

6262
// For S3
63-
properties.setProperty(S3Config.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, "5242880"); // 5MB
64-
properties.setProperty(S3Config.PARALLEL_UPLOAD_MAX_PARALLELISM, "4");
65-
properties.setProperty(S3Config.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, "10485760"); // 10MB
66-
properties.setProperty(S3Config.REQUEST_TIMEOUT_IN_SECONDS, "30");
63+
properties.setProperty(S3Config.MULTIPART_UPLOAD_PART_SIZE_BYTES, "5242880"); // 5MB
64+
properties.setProperty(S3Config.MULTIPART_UPLOAD_MAX_CONCURRENCY, "4");
65+
properties.setProperty(S3Config.MULTIPART_UPLOAD_THRESHOLD_SIZE_BYTES, "10485760"); // 10MB
66+
properties.setProperty(S3Config.REQUEST_TIMEOUT_SECS, "30");
6767

6868
return properties;
6969
}

core/src/integration-test/java/com/scalar/db/storage/objectstorage/ObjectStorageWrapperLargeObjectWriteIntegrationTest.java

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,38 +36,34 @@ public class ObjectStorageWrapperLargeObjectWriteIntegrationTest {
3636
@BeforeAll
3737
public void beforeAll() throws ObjectStorageWrapperException {
3838
Properties properties = getProperties(TEST_NAME);
39-
long parallelUploadThresholdInBytes;
39+
long payloadSizeBytes;
4040

4141
if (ObjectStorageEnv.isBlobStorage()) {
4242
// Minimum block size must be greater than or equal to 256KB for Blob Storage
43-
Long parallelUploadUnit = 256 * 1024L; // 256KB
43+
Long uploadUnit = 256 * 1024L; // 256KB
4444
properties.setProperty(
45-
BlobStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES,
46-
String.valueOf(parallelUploadUnit));
45+
BlobStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_BYTES, String.valueOf(uploadUnit));
4746
properties.setProperty(
48-
BlobStorageConfig.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES,
49-
String.valueOf(parallelUploadUnit * 2));
50-
parallelUploadThresholdInBytes = parallelUploadUnit * 2;
47+
BlobStorageConfig.PARALLEL_UPLOAD_THRESHOLD_SIZE_BYTES, String.valueOf(uploadUnit * 2));
48+
payloadSizeBytes = uploadUnit * 2 + 1;
5149
} else if (ObjectStorageEnv.isCloudStorage()) {
5250
// Minimum block size must be greater than or equal to 256KB for Cloud Storage
53-
Long parallelUploadUnit = 256 * 1024L; // 256KB
51+
Long uploadUnit = 256 * 1024L; // 256KB
5452
properties.setProperty(
55-
CloudStorageConfig.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES,
56-
String.valueOf(parallelUploadUnit));
57-
parallelUploadThresholdInBytes = parallelUploadUnit * 2;
53+
CloudStorageConfig.UPLOAD_CHUNK_SIZE_BYTES, String.valueOf(uploadUnit));
54+
payloadSizeBytes = uploadUnit * 2 + 1;
5855
} else if (ObjectStorageEnv.isS3()) {
5956
// Minimum part size must be greater than or equal to 5MB for S3
60-
Long parallelUploadUnit = 5 * 1024 * 1024L; // 5MB
57+
Long uploadUnit = 5 * 1024 * 1024L; // 5MB
58+
properties.setProperty(S3Config.MULTIPART_UPLOAD_PART_SIZE_BYTES, String.valueOf(uploadUnit));
6159
properties.setProperty(
62-
S3Config.PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, String.valueOf(parallelUploadUnit));
63-
properties.setProperty(
64-
S3Config.PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, String.valueOf(parallelUploadUnit * 2));
65-
parallelUploadThresholdInBytes = parallelUploadUnit * 2;
60+
S3Config.MULTIPART_UPLOAD_THRESHOLD_SIZE_BYTES, String.valueOf(uploadUnit * 2));
61+
payloadSizeBytes = uploadUnit * 2 + 1;
6662
} else {
6763
throw new AssertionError();
6864
}
6965

70-
char[] charArray = new char[(int) parallelUploadThresholdInBytes + 1];
66+
char[] charArray = new char[(int) payloadSizeBytes];
7167
Arrays.fill(charArray, 'a');
7268
testObject1 = new String(charArray);
7369
Arrays.fill(charArray, 'b');

core/src/main/java/com/scalar/db/storage/objectstorage/blobstorage/BlobStorageConfig.java

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,16 @@
1212

1313
public class BlobStorageConfig implements ObjectStorageConfig {
1414
public static final String STORAGE_NAME = "blob-storage";
15-
public static final String PREFIX = DatabaseConfig.PREFIX + STORAGE_NAME + ".";
15+
public static final String STORAGE_NAME_IN_PREFIX = "blob_storage";
16+
public static final String PREFIX = DatabaseConfig.PREFIX + STORAGE_NAME_IN_PREFIX + ".";
1617

17-
public static final String PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES =
18-
PREFIX + "parallel_upload_block_size_in_bytes";
19-
public static final String PARALLEL_UPLOAD_MAX_PARALLELISM =
20-
PREFIX + "parallel_upload_max_parallelism";
21-
public static final String PARALLEL_UPLOAD_THRESHOLD_IN_BYTES =
22-
PREFIX + "parallel_upload_threshold_in_bytes";
23-
public static final String REQUEST_TIMEOUT_IN_SECONDS = PREFIX + "request_timeout_in_seconds";
18+
public static final String PARALLEL_UPLOAD_BLOCK_SIZE_BYTES =
19+
PREFIX + "parallel_upload_block_size_bytes";
20+
public static final String PARALLEL_UPLOAD_MAX_CONCURRENCY =
21+
PREFIX + "parallel_upload_max_concurrency";
22+
public static final String PARALLEL_UPLOAD_THRESHOLD_SIZE_BYTES =
23+
PREFIX + "parallel_upload_threshold_size_bytes";
24+
public static final String REQUEST_TIMEOUT_SECS = PREFIX + "request_timeout_secs";
2425

2526
private static final Logger logger = LoggerFactory.getLogger(BlobStorageConfig.class);
2627
private final String endpoint;
@@ -29,10 +30,10 @@ public class BlobStorageConfig implements ObjectStorageConfig {
2930
private final String bucket;
3031
private final String metadataNamespace;
3132

32-
private final Long parallelUploadBlockSizeInBytes;
33-
private final Integer parallelUploadMaxParallelism;
34-
private final Long parallelUploadThresholdInBytes;
35-
private final Integer requestTimeoutInSeconds;
33+
private final Long parallelUploadBlockSizeBytes;
34+
private final Integer parallelUploadMaxConcurrency;
35+
private final Long parallelUploadThresholdSizeBytes;
36+
private final Integer requestTimeoutSecs;
3637

3738
public BlobStorageConfig(DatabaseConfig databaseConfig) {
3839
String storage = databaseConfig.getStorage();
@@ -63,14 +64,13 @@ public BlobStorageConfig(DatabaseConfig databaseConfig) {
6364
+ "\" is not applicable to Blob Storage and will be ignored.");
6465
}
6566

66-
parallelUploadBlockSizeInBytes =
67-
getLong(databaseConfig.getProperties(), PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, null);
68-
parallelUploadMaxParallelism =
69-
getInt(databaseConfig.getProperties(), PARALLEL_UPLOAD_MAX_PARALLELISM, null);
70-
parallelUploadThresholdInBytes =
71-
getLong(databaseConfig.getProperties(), PARALLEL_UPLOAD_THRESHOLD_IN_BYTES, null);
72-
requestTimeoutInSeconds =
73-
getInt(databaseConfig.getProperties(), REQUEST_TIMEOUT_IN_SECONDS, null);
67+
parallelUploadBlockSizeBytes =
68+
getLong(databaseConfig.getProperties(), PARALLEL_UPLOAD_BLOCK_SIZE_BYTES, null);
69+
parallelUploadMaxConcurrency =
70+
getInt(databaseConfig.getProperties(), PARALLEL_UPLOAD_MAX_CONCURRENCY, null);
71+
parallelUploadThresholdSizeBytes =
72+
getLong(databaseConfig.getProperties(), PARALLEL_UPLOAD_THRESHOLD_SIZE_BYTES, null);
73+
requestTimeoutSecs = getInt(databaseConfig.getProperties(), REQUEST_TIMEOUT_SECS, null);
7474
}
7575

7676
@Override
@@ -101,19 +101,19 @@ public String getUsername() {
101101
return username;
102102
}
103103

104-
public Optional<Long> getParallelUploadBlockSizeInBytes() {
105-
return Optional.ofNullable(parallelUploadBlockSizeInBytes);
104+
public Optional<Long> getParallelUploadBlockSizeBytes() {
105+
return Optional.ofNullable(parallelUploadBlockSizeBytes);
106106
}
107107

108-
public Optional<Integer> getParallelUploadMaxParallelism() {
109-
return Optional.ofNullable(parallelUploadMaxParallelism);
108+
public Optional<Integer> getParallelUploadMaxConcurrency() {
109+
return Optional.ofNullable(parallelUploadMaxConcurrency);
110110
}
111111

112-
public Optional<Long> getParallelUploadThresholdInBytes() {
113-
return Optional.ofNullable(parallelUploadThresholdInBytes);
112+
public Optional<Long> getParallelUploadThresholdSizeBytes() {
113+
return Optional.ofNullable(parallelUploadThresholdSizeBytes);
114114
}
115115

116-
public Optional<Integer> getRequestTimeoutInSeconds() {
117-
return Optional.ofNullable(requestTimeoutInSeconds);
116+
public Optional<Integer> getRequestTimeoutSecs() {
117+
return Optional.ofNullable(requestTimeoutSecs);
118118
}
119119
}

core/src/main/java/com/scalar/db/storage/objectstorage/blobstorage/BlobStorageWrapper.java

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
@ThreadSafe
2828
public class BlobStorageWrapper implements ObjectStorageWrapper {
2929
private final BlobContainerClient client;
30-
private final Duration requestTimeoutInSeconds;
30+
private final Duration requestTimeoutSecs;
3131
private final ParallelTransferOptions parallelTransferOptions;
3232

3333
public BlobStorageWrapper(BlobStorageConfig config) {
@@ -37,18 +37,17 @@ public BlobStorageWrapper(BlobStorageConfig config) {
3737
.credential(new StorageSharedKeyCredential(config.getUsername(), config.getPassword()))
3838
.buildClient()
3939
.getBlobContainerClient(config.getBucket());
40-
this.requestTimeoutInSeconds =
41-
config.getRequestTimeoutInSeconds().map(Duration::ofSeconds).orElse(null);
40+
this.requestTimeoutSecs = config.getRequestTimeoutSecs().map(Duration::ofSeconds).orElse(null);
4241
this.parallelTransferOptions = new ParallelTransferOptions();
43-
if (config.getParallelUploadBlockSizeInBytes().isPresent()) {
44-
parallelTransferOptions.setBlockSizeLong(config.getParallelUploadBlockSizeInBytes().get());
42+
if (config.getParallelUploadBlockSizeBytes().isPresent()) {
43+
parallelTransferOptions.setBlockSizeLong(config.getParallelUploadBlockSizeBytes().get());
4544
}
46-
if (config.getParallelUploadMaxParallelism().isPresent()) {
47-
parallelTransferOptions.setMaxConcurrency(config.getParallelUploadMaxParallelism().get());
45+
if (config.getParallelUploadMaxConcurrency().isPresent()) {
46+
parallelTransferOptions.setMaxConcurrency(config.getParallelUploadMaxConcurrency().get());
4847
}
49-
if (config.getParallelUploadThresholdInBytes().isPresent()) {
48+
if (config.getParallelUploadThresholdSizeBytes().isPresent()) {
5049
parallelTransferOptions.setMaxSingleUploadSizeLong(
51-
config.getParallelUploadThresholdInBytes().get());
50+
config.getParallelUploadThresholdSizeBytes().get());
5251
}
5352
}
5453

@@ -58,7 +57,7 @@ public Optional<ObjectStorageWrapperResponse> get(String key)
5857
try {
5958
BlobClient blobClient = client.getBlobClient(key);
6059
BlobDownloadContentResponse response =
61-
blobClient.downloadContentWithResponse(null, null, requestTimeoutInSeconds, null);
60+
blobClient.downloadContentWithResponse(null, null, requestTimeoutSecs, null);
6261
String data = response.getValue().toString();
6362
String eTag = response.getHeaders().getValue(HttpHeaderName.ETAG);
6463
return Optional.of(new ObjectStorageWrapperResponse(data, eTag));
@@ -77,8 +76,7 @@ public Optional<ObjectStorageWrapperResponse> get(String key)
7776
@Override
7877
public Set<String> getKeys(String prefix) throws ObjectStorageWrapperException {
7978
try {
80-
return client.listBlobs(new ListBlobsOptions().setPrefix(prefix), requestTimeoutInSeconds)
81-
.stream()
79+
return client.listBlobs(new ListBlobsOptions().setPrefix(prefix), requestTimeoutSecs).stream()
8280
.map(BlobItem::getName)
8381
.collect(Collectors.toSet());
8482
} catch (Exception e) {
@@ -95,7 +93,7 @@ public void insert(String key, String object) throws ObjectStorageWrapperExcepti
9593
new BlobParallelUploadOptions(BinaryData.fromString(object))
9694
.setRequestConditions(new BlobRequestConditions().setIfNoneMatch("*"))
9795
.setParallelTransferOptions(parallelTransferOptions);
98-
blobClient.uploadWithResponse(options, requestTimeoutInSeconds, null);
96+
blobClient.uploadWithResponse(options, requestTimeoutSecs, null);
9997
} catch (BlobStorageException e) {
10098
if (e.getErrorCode().equals(BlobErrorCode.BLOB_ALREADY_EXISTS)) {
10199
throw new PreconditionFailedException(
@@ -120,7 +118,7 @@ public void update(String key, String object, String version)
120118
new BlobParallelUploadOptions(BinaryData.fromString(object))
121119
.setRequestConditions(new BlobRequestConditions().setIfMatch(version))
122120
.setParallelTransferOptions(parallelTransferOptions);
123-
blobClient.uploadWithResponse(options, requestTimeoutInSeconds, null);
121+
blobClient.uploadWithResponse(options, requestTimeoutSecs, null);
124122
} catch (BlobStorageException e) {
125123
if (e.getErrorCode().equals(BlobErrorCode.CONDITION_NOT_MET)
126124
|| e.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
@@ -162,7 +160,7 @@ public void delete(String key, String version) throws ObjectStorageWrapperExcept
162160
try {
163161
BlobClient blobClient = client.getBlobClient(key);
164162
blobClient.deleteWithResponse(
165-
null, new BlobRequestConditions().setIfMatch(version), requestTimeoutInSeconds, null);
163+
null, new BlobRequestConditions().setIfMatch(version), requestTimeoutSecs, null);
166164
} catch (BlobStorageException e) {
167165
if (e.getErrorCode().equals(BlobErrorCode.CONDITION_NOT_MET)
168166
|| e.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
@@ -183,7 +181,7 @@ public void delete(String key, String version) throws ObjectStorageWrapperExcept
183181
public void deleteByPrefix(String prefix) throws ObjectStorageWrapperException {
184182
try {
185183
client
186-
.listBlobs(new ListBlobsOptions().setPrefix(prefix), requestTimeoutInSeconds)
184+
.listBlobs(new ListBlobsOptions().setPrefix(prefix), requestTimeoutSecs)
187185
.forEach(
188186
blobItem -> {
189187
try {

core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageConfig.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,17 @@
1616

1717
public class CloudStorageConfig implements ObjectStorageConfig {
1818
public static final String STORAGE_NAME = "cloud-storage";
19-
public static final String PREFIX = DatabaseConfig.PREFIX + STORAGE_NAME + ".";
19+
public static final String STORAGE_NAME_IN_PREFIX = "cloud_storage";
20+
public static final String PREFIX = DatabaseConfig.PREFIX + STORAGE_NAME_IN_PREFIX + ".";
2021

21-
public static final String PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES =
22-
PREFIX + "parallel_upload_block_size_in_bytes";
22+
public static final String UPLOAD_CHUNK_SIZE_BYTES = PREFIX + "upload_chunk_size_bytes";
2323

2424
private static final Logger logger = LoggerFactory.getLogger(CloudStorageConfig.class);
2525
private final String password;
2626
private final String bucket;
2727
private final String metadataNamespace;
2828
private final String projectId;
29-
private final Integer parallelUploadBlockSizeInBytes;
29+
private final Integer uploadChunkSizeBytes;
3030

3131
public CloudStorageConfig(DatabaseConfig databaseConfig) {
3232
String storage = databaseConfig.getStorage();
@@ -49,8 +49,7 @@ public CloudStorageConfig(DatabaseConfig databaseConfig) {
4949
+ "\" is not applicable to Cloud Storage and will be ignored.");
5050
}
5151

52-
parallelUploadBlockSizeInBytes =
53-
getInt(databaseConfig.getProperties(), PARALLEL_UPLOAD_BLOCK_SIZE_IN_BYTES, null);
52+
uploadChunkSizeBytes = getInt(databaseConfig.getProperties(), UPLOAD_CHUNK_SIZE_BYTES, null);
5453
}
5554

5655
@Override
@@ -92,7 +91,7 @@ public Credentials getCredentials() {
9291
}
9392
}
9493

95-
public Optional<Integer> getParallelUploadBlockSizeInBytes() {
96-
return Optional.ofNullable(parallelUploadBlockSizeInBytes);
94+
public Optional<Integer> getUploadChunkSizeBytes() {
95+
return Optional.ofNullable(uploadChunkSizeBytes);
9796
}
9897
}

core/src/main/java/com/scalar/db/storage/objectstorage/cloudstorage/CloudStorageWrapper.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class CloudStorageWrapper implements ObjectStorageWrapper {
3131

3232
private final Storage storage;
3333
private final String bucket;
34-
private final Integer parallelUploadBlockSizeInBytes;
34+
private final Integer uploadChunkSizeBytes;
3535

3636
public CloudStorageWrapper(CloudStorageConfig config) {
3737
storage =
@@ -41,15 +41,15 @@ public CloudStorageWrapper(CloudStorageConfig config) {
4141
.build()
4242
.getService();
4343
bucket = config.getBucket();
44-
parallelUploadBlockSizeInBytes = config.getParallelUploadBlockSizeInBytes().orElse(null);
44+
uploadChunkSizeBytes = config.getUploadChunkSizeBytes().orElse(null);
4545
}
4646

4747
@VisibleForTesting
4848
@SuppressFBWarnings("EI_EXPOSE_REP2")
4949
public CloudStorageWrapper(CloudStorageConfig config, Storage storage) {
5050
this.storage = storage;
5151
this.bucket = config.getBucket();
52-
parallelUploadBlockSizeInBytes = config.getParallelUploadBlockSizeInBytes().orElse(null);
52+
uploadChunkSizeBytes = config.getUploadChunkSizeBytes().orElse(null);
5353
}
5454

5555
@Override
@@ -209,8 +209,8 @@ private void writeData(String key, String object, Storage.BlobWriteOption precon
209209
BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(bucket, key)).build();
210210

211211
try (WriteChannel writer = storage.writer(blobInfo, precondition)) {
212-
if (parallelUploadBlockSizeInBytes != null) {
213-
writer.setChunkSize(parallelUploadBlockSizeInBytes);
212+
if (uploadChunkSizeBytes != null) {
213+
writer.setChunkSize(uploadChunkSizeBytes);
214214
}
215215
ByteBuffer buffer = ByteBuffer.wrap(data);
216216
while (buffer.hasRemaining()) {

0 commit comments

Comments
 (0)