diff --git a/.gitignore b/.gitignore
index d8778b2a..93716dbe 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,6 +3,7 @@
.project
.settings/
.recommenders/
+.factorypath
# Intellij
.idea/
diff --git a/README.md b/README.md
index b7668316..0e0c7e45 100644
--- a/README.md
+++ b/README.md
@@ -43,6 +43,7 @@ The Cloud Storage sink connector supports the following properties.
| `role` | String | False | null | The Cloud Storage role. |
| `roleSessionName` | String | False | null | The Cloud Storage role session name. |
| `endpoint` | String | True | null | The Cloud Storage endpoint. |
+| `s3StorageClass` | String | False | "STANDARD" | The S3 storage class to use when writing objects. Only applies when `provider` is `s3v2`. The value is passed directly to the S3 API and must be a valid [S3 storage class](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#AmazonS3-PutObject-request-header-StorageClass) string (e.g. `STANDARD`, `STANDARD_IA`, `ONEZONE_IA`, `INTELLIGENT_TIERING`, `GLACIER`, `GLACIER_IR`, `DEEP_ARCHIVE`). |
| `bucket` | String | True | null | The Cloud Storage bucket. |
| `formatType` | String | False | "json" | The data format type. Available options are JSON, Avro, Bytes, or Parquet. By default, it is set to JSON. |
| `partitionerType` | String | False | "partition" | The partitioning type. It can be configured by topic partitions or by time. By default, the partition type is configured by topic partitions. |
diff --git a/docs/aws-s3-sink.md b/docs/aws-s3-sink.md
index 0c192e48..611fe46c 100644
--- a/docs/aws-s3-sink.md
+++ b/docs/aws-s3-sink.md
@@ -60,7 +60,8 @@ pulsarctl sinks create \
"bucket": "Your bucket name",
"region": "Your AWS S3 region",
"formatType": "json",
- "partitionerType": "PARTITION"
+ "partitionerType": "PARTITION",
+ "s3StorageClass": "STANDARD"
}'
```
@@ -134,6 +135,7 @@ Before using the AWS S3 sink connector, you need to configure it. This table out
| `partitionerType` | String | False | false | null | The partitioning type. It can be configured by topic `PARTITION` or `TIME`. By default, the partition type is configured by topic partitions. |
| `region` | String | False | false | null | The AWS S3 region. Either the endpoint or region must be set. |
| `endpoint` | String | False | false | null | The AWS S3 endpoint. Either the endpoint or region must be set. |
+| `s3StorageClass` | String | False | false | "STANDARD" | The S3 storage class to use when writing objects. Only applies when `provider` is `s3v2`. The value is passed directly to the S3 API and must be a valid [S3 storage class](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#AmazonS3-PutObject-request-header-StorageClass) string (e.g. `STANDARD`, `STANDARD_IA`, `ONEZONE_IA`, `INTELLIGENT_TIERING`, `GLACIER`, `GLACIER_IR`, `DEEP_ARCHIVE`, `REDUCED_REDUNDANCY`). |
| `role` | String | False | false | null | The AWS role. |
| `roleSessionName` | String | False | false | null | The AWS role session name. |
| `timePartitionPattern` | String | False | false | "yyyy-MM-dd" | The format pattern of the time-based partitioning. For details, refer to the Java date and time format. |
diff --git a/pom.xml b/pom.xml
index 9f0a00a9..4c170669 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,7 +70,8 @@
4.2.0
3.7.7
2.0.2
- 1.15.2
+ 1.19.3
+ 5.10.1
3.0
@@ -248,6 +249,21 @@
pulsar
${testcontainers.version}
+
+ org.testcontainers
+ junit-jupiter
+ ${testcontainers.version}
+
+
+ org.testcontainers
+ localstack
+ ${testcontainers.version}
+
+
+ org.junit.jupiter
+ junit-jupiter
+ ${junit-jupiter.version}
+
software.amazon.awssdk
s3
@@ -605,6 +621,21 @@
${pulsar.version}
test
+
+ org.testcontainers
+ junit-jupiter
+ test
+
+
+ org.testcontainers
+ localstack
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter
+ test
+
@@ -631,6 +662,9 @@
org.apache.maven.plugins
maven-surefire-plugin
+
+ **/*IT.java
+
false
1800
${testRetryCount}
@@ -796,6 +830,47 @@
maven-surefire-plugin
+
+
+ org.apache.maven.plugins
+ maven-failsafe-plugin
+ 3.2.3
+
+
+ **/*IT.java
+
+
+ --add-opens java.base/java.io=ALL-UNNAMED
+ --add-opens java.base/java.lang=ALL-UNNAMED
+ --add-opens java.base/java.lang.reflect=ALL-UNNAMED
+ --add-opens java.base/java.lang.invoke=ALL-UNNAMED
+ --add-opens java.base/java.net=ALL-UNNAMED
+ --add-opens java.base/java.nio=ALL-UNNAMED
+ --add-opens java.base/java.nio.channels.spi=ALL-UNNAMED
+ --add-opens java.base/java.nio.file=ALL-UNNAMED
+ --add-opens java.base/java.util=ALL-UNNAMED
+ --add-opens java.base/java.util.concurrent=ALL-UNNAMED
+ --add-opens java.base/java.util.concurrent.atomic=ALL-UNNAMED
+ --add-opens java.base/java.util.stream=ALL-UNNAMED
+ --add-opens java.base/java.util.zip=ALL-UNNAMED
+ --add-opens java.base/java.time=ALL-UNNAMED
+ --add-opens java.base/jdk.internal.loader=ALL-UNNAMED
+ --add-opens java.base/sun.net.dns=ALL-UNNAMED
+ --add-opens java.base/sun.nio.ch=ALL-UNNAMED
+ --add-opens java.base/sun.security.jca=ALL-UNNAMED
+ --add-opens java.xml/jdk.xml.internal=ALL-UNNAMED
+
+
+
+
+
+ integration-test
+ verify
+
+
+
+
+
org.apache.nifi
diff --git a/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java b/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java
index 59031718..96dd4dfb 100644
--- a/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java
+++ b/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java
@@ -79,6 +79,7 @@ public class BlobStoreAbstractConfig implements Serializable {
// #### common configuration ####
private boolean usePathStyleUrl = true;
private String awsCannedAcl = "";
+ private String s3StorageClass = "STANDARD";
private boolean skipFailedMessages = false;
// #### partitioner configuration ####
@@ -126,6 +127,10 @@ public void validate() {
checkArgument(isNotBlank(region) || isNotBlank(endpoint),
"Either the aws-end-point or aws-region must be set.");
}
+ if (provider.equalsIgnoreCase(PROVIDER_AWSS3V2)) {
+ checkArgument(isNotBlank(s3StorageClass),
+ "s3StorageClass property must not be empty for s3v2 provider.");
+ }
if (isNotBlank(endpoint)) {
checkArgument(hasURIScheme(endpoint), "endpoint property needs to specify URI scheme.");
}
diff --git a/src/main/java/org/apache/pulsar/io/jcloud/writer/S3BlobWriter.java b/src/main/java/org/apache/pulsar/io/jcloud/writer/S3BlobWriter.java
index 5a1b1c30..bddf8145 100644
--- a/src/main/java/org/apache/pulsar/io/jcloud/writer/S3BlobWriter.java
+++ b/src/main/java/org/apache/pulsar/io/jcloud/writer/S3BlobWriter.java
@@ -51,6 +51,7 @@ public class S3BlobWriter implements BlobWriter {
private final S3Client s3;
private final String bucket;
private ObjectCannedACL acl;
+ private String storageClass;
public S3BlobWriter(CloudStorageSinkConfig sinkConfig) {
@@ -67,6 +68,9 @@ public S3BlobWriter(CloudStorageSinkConfig sinkConfig) {
if (StringUtils.isNotEmpty(sinkConfig.getAwsCannedAcl())) {
acl = ObjectCannedACL.fromValue(sinkConfig.getAwsCannedAcl());
}
+ if (StringUtils.isNotEmpty(sinkConfig.getS3StorageClass())) {
+ storageClass = sinkConfig.getS3StorageClass();
+ }
}
@Override
@@ -75,6 +79,9 @@ public void uploadBlob(String key, ByteBuffer payload) throws IOException {
if (acl != null) {
req.acl(acl);
}
+ if (storageClass != null) {
+ req.storageClass(storageClass);
+ }
s3.putObject(req.build(), RequestBody.fromByteBuffer(payload));
}
diff --git a/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageSinkS3IT.java b/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageSinkS3IT.java
new file mode 100755
index 00000000..a04649b3
--- /dev/null
+++ b/src/test/java/org/apache/pulsar/io/jcloud/sink/CloudStorageSinkS3IT.java
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.jcloud.sink;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageIdAdv;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.BucketAlreadyExistsException;
+import software.amazon.awssdk.services.s3.model.BucketAlreadyOwnedByYouException;
+import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.services.s3.model.StorageClass;
+
+/**
+ * Integration tests for the S3 storage class feature (s3v2 provider) using
+ * TestContainers with LocalStack. Requires Docker to be running.
+ */
+@Testcontainers
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class CloudStorageSinkS3IT {
+
+ private static final String TEST_BUCKET = "test-sink-bucket";
+ private static final String TEST_TOPIC = "persistent://public/default/test-topic-partition-0";
+ private static final DockerImageName LOCALSTACK_IMAGE =
+ DockerImageName.parse("localstack/localstack:4.13");
+
+ @Container
+ static LocalStackContainer localStack = new LocalStackContainer(LOCALSTACK_IMAGE)
+ .withServices(LocalStackContainer.Service.S3);
+
+ private S3Client s3Client;
+
+ @BeforeAll
+ void setupS3Client() {
+ s3Client = S3Client.builder()
+ .endpointOverride(localStack.getEndpointOverride(LocalStackContainer.Service.S3))
+ .credentialsProvider(StaticCredentialsProvider.create(
+ AwsBasicCredentials.create(
+ localStack.getAccessKey(),
+ localStack.getSecretKey())))
+ .region(Region.of(localStack.getRegion()))
+ .serviceConfiguration(
+ software.amazon.awssdk.services.s3.S3Configuration.builder()
+ .pathStyleAccessEnabled(true)
+ .build())
+ .build();
+ }
+
+ @BeforeEach
+ void createBucket() {
+ try {
+ s3Client.createBucket(CreateBucketRequest.builder()
+ .bucket(TEST_BUCKET)
+ .build());
+ } catch (BucketAlreadyExistsException | BucketAlreadyOwnedByYouException ignored) {
+ // no need to recreate existing buckets
+ }
+ }
+
+ @AfterEach
+ void cleanBucket() {
+ ListObjectsV2Response listing = s3Client.listObjectsV2(
+ ListObjectsV2Request.builder().bucket(TEST_BUCKET).build());
+
+ for (S3Object obj : listing.contents()) {
+ s3Client.deleteObject(DeleteObjectRequest.builder()
+ .bucket(TEST_BUCKET)
+ .key(obj.key())
+ .build());
+ }
+ }
+
+ private Map buildSinkConfig(String formatType) {
+ Map config = new HashMap<>();
+ config.put("provider", "s3v2");
+ config.put("accessKeyId", localStack.getAccessKey());
+ config.put("secretAccessKey", localStack.getSecretKey());
+ config.put("bucket", TEST_BUCKET);
+ config.put("region", localStack.getRegion());
+ config.put("endpoint",
+ localStack.getEndpointOverride(LocalStackContainer.Service.S3).toString());
+ config.put("formatType", formatType);
+ config.put("partitionerType", "partition");
+ config.put("batchSize", 2);
+ config.put("batchTimeMs", 5000);
+ config.put("maxBatchBytes", 10_000_000);
+ config.put("pendingQueueSize", 10);
+ config.put("withTopicPartitionNumber", true);
+ config.put("withMetadata", false);
+ return config;
+ }
+
+ private Record createMockBytesRecord(String payload, long sequenceId) {
+ GenericRecord genericRecord = mock(GenericRecord.class);
+ when(genericRecord.getSchemaType()).thenReturn(SchemaType.BYTES);
+ when(genericRecord.getNativeObject()).thenReturn(payload.getBytes());
+ when(genericRecord.getSchemaVersion()).thenReturn(null);
+ when(genericRecord.getFields()).thenReturn(java.util.Collections.emptyList());
+
+ Schema bytesSchema = mock(Schema.class);
+ when(bytesSchema.getSchemaInfo()).thenReturn(
+ org.apache.pulsar.common.schema.SchemaInfo.builder()
+ .name("")
+ .type(SchemaType.BYTES)
+ .schema(new byte[0])
+ .build());
+
+ MessageIdAdv messageId = mock(MessageIdAdv.class);
+ when(messageId.getLedgerId()).thenReturn(sequenceId);
+ when(messageId.getEntryId()).thenReturn(sequenceId);
+ when(messageId.getBatchIndex()).thenReturn(0);
+
+ Message message = mock(Message.class);
+ when(message.getValue()).thenReturn(genericRecord);
+ when(message.getData()).thenReturn(payload.getBytes());
+ when(message.hasKey()).thenReturn(false);
+ when(message.hasIndex()).thenReturn(false);
+ when(message.getTopicName()).thenReturn(TEST_TOPIC);
+ when(message.getMessageId()).thenReturn(messageId);
+ when(message.getProperties()).thenReturn(java.util.Collections.emptyMap());
+ when(message.getEventTime()).thenReturn(System.currentTimeMillis());
+
+ Record record = mock(Record.class);
+ when(record.getValue()).thenReturn(genericRecord);
+ when(record.getSchema()).thenReturn(bytesSchema);
+ when(record.getMessage()).thenReturn(Optional.of(message));
+ when(record.getTopicName()).thenReturn(Optional.of(TEST_TOPIC));
+ when(record.getRecordSequence()).thenReturn(Optional.of(sequenceId));
+ when(record.getPartitionId()).thenReturn(Optional.of("0"));
+ when(record.getPartitionIndex()).thenReturn(Optional.of(0));
+
+ return record;
+ }
+
+ private final java.util.concurrent.atomic.AtomicReference fatalError =
+ new java.util.concurrent.atomic.AtomicReference<>();
+
+ private SinkContext mockSinkContext() {
+ SinkContext ctx = mock(SinkContext.class);
+ when(ctx.getSinkName()).thenReturn("cloud-storage-sink-test");
+ org.mockito.Mockito.doAnswer(inv -> {
+ Throwable t = inv.getArgument(0);
+ fatalError.set(t);
+ t.printStackTrace();
+ return null;
+ }).when(ctx).fatal(org.mockito.ArgumentMatchers.any(Throwable.class));
+ return ctx;
+ }
+
+ private java.util.List listAllObjectKeys() {
+ ListObjectsV2Response response = s3Client.listObjectsV2(
+ ListObjectsV2Request.builder().bucket(TEST_BUCKET).build());
+ return response.contents().stream()
+ .map(S3Object::key)
+ .collect(Collectors.toList());
+ }
+
+ private void awaitObjectCount(int expectedMinCount) {
+ await().atMost(Duration.ofSeconds(30))
+ .pollInterval(Duration.ofMillis(500))
+ .untilAsserted(() -> {
+ java.util.List keys = listAllObjectKeys();
+ assertTrue(keys.size() >= expectedMinCount,
+ "Expected at least " + expectedMinCount + " objects, found: " + keys.size());
+ });
+ }
+
+ private StorageClass getObjectStorageClass(String key) {
+ HeadObjectResponse head = s3Client.headObject(
+ HeadObjectRequest.builder()
+ .bucket(TEST_BUCKET)
+ .key(key)
+ .build());
+
+ return head.storageClass();
+ }
+
+ @Test
+ void testDefaultStorageClassIsStandard() throws Exception {
+ CloudStorageGenericRecordSink sink = new CloudStorageGenericRecordSink();
+ Map config = buildSinkConfig("json");
+ SinkContext ctx = mockSinkContext();
+
+ try {
+ sink.open(config, ctx);
+ sink.write(createMockBytesRecord("{\"key\":\"DefaultSC\"}", 1L));
+ sink.write(createMockBytesRecord("{\"key\":\"DefaultSC2\"}", 2L));
+ awaitObjectCount(1);
+ } finally {
+ sink.close();
+ }
+
+ java.util.List keys = listAllObjectKeys();
+ assertFalse(keys.isEmpty(),
+ "Expected objects in S3 with default storage class");
+
+ for (String key : keys) {
+ StorageClass sc = getObjectStorageClass(key);
+ // S3 returns null storage class for STANDARD objects on HeadObject calls
+ assertTrue(sc == null, "Default storage class should be null for key: " + key);
+ }
+ }
+
+ @Test
+ void testExplicitStandardStorageClass() throws Exception {
+ CloudStorageGenericRecordSink sink = new CloudStorageGenericRecordSink();
+ Map config = buildSinkConfig("json");
+ config.put("s3StorageClass", "STANDARD");
+ SinkContext ctx = mockSinkContext();
+
+ try {
+ sink.open(config, ctx);
+ sink.write(createMockBytesRecord("{\"key\":\"Explicit\"}", 1L));
+ sink.write(createMockBytesRecord("{\"key\":\"Standard\"}", 2L));
+ awaitObjectCount(1);
+ } finally {
+ sink.close();
+ }
+
+ java.util.List keys = listAllObjectKeys();
+ assertFalse(keys.isEmpty(),
+ "Expected objects in S3 with explicit STANDARD storage class");
+
+ for (String key : keys) {
+ StorageClass sc = getObjectStorageClass(key);
+ // S3 returns null storage class for STANDARD objects on HeadObject calls
+ assertTrue(sc == null, "Storage class should be null for key: " + key);
+ }
+ }
+
+ @Test
+ void testStandardIAStorageClass() throws Exception {
+ CloudStorageGenericRecordSink sink = new CloudStorageGenericRecordSink();
+ Map config = buildSinkConfig("json");
+ config.put("s3StorageClass", "STANDARD_IA");
+ SinkContext ctx = mockSinkContext();
+
+ try {
+ sink.open(config, ctx);
+ sink.write(createMockBytesRecord("{\"key\":\"IA_Record1\"}", 1L));
+ sink.write(createMockBytesRecord("{\"key\":\"IA_Record2\"}", 2L));
+ awaitObjectCount(1);
+ } finally {
+ sink.close();
+ }
+
+ java.util.List keys = listAllObjectKeys();
+ assertFalse(keys.isEmpty(),
+ "Expected objects in S3 with STANDARD_IA storage class");
+
+ for (String key : keys) {
+ StorageClass sc = getObjectStorageClass(key);
+ assertNotNull(sc);
+ assertEquals("STANDARD_IA", sc.toString(),
+ "Storage class should be STANDARD_IA for key: " + key);
+ }
+ }
+
+ @Test
+ void testGlacierStorageClass() throws Exception {
+ CloudStorageGenericRecordSink sink = new CloudStorageGenericRecordSink();
+ Map config = buildSinkConfig("json");
+ config.put("s3StorageClass", "GLACIER");
+ SinkContext ctx = mockSinkContext();
+
+ try {
+ sink.open(config, ctx);
+ sink.write(createMockBytesRecord("{\"key\":\"GLACIER_Record1\"}", 1L));
+ sink.write(createMockBytesRecord("{\"key\":\"GLACIER_Record2\"}", 2L));
+ awaitObjectCount(1);
+ } finally {
+ sink.close();
+ }
+
+ java.util.List keys = listAllObjectKeys();
+ assertFalse(keys.isEmpty(),
+ "Expected objects in S3 with GLACIER storage class");
+
+ for (String key : keys) {
+ StorageClass sc = getObjectStorageClass(key);
+ assertNotNull(sc);
+ assertEquals("GLACIER", sc.toString(),
+ "Storage class should be GLACIER for key: " + key);
+ }
+ }
+
+ @Test
+ void testIntelligentTieringStorageClass() throws Exception {
+ CloudStorageGenericRecordSink sink = new CloudStorageGenericRecordSink();
+ Map config = buildSinkConfig("json");
+ config.put("s3StorageClass", "INTELLIGENT_TIERING");
+ SinkContext ctx = mockSinkContext();
+
+ try {
+ sink.open(config, ctx);
+ sink.write(createMockBytesRecord("{\"key\":\"IT_Record1\"}", 1L));
+ sink.write(createMockBytesRecord("{\"key\":\"IT_Record2\"}", 2L));
+ awaitObjectCount(1);
+ } finally {
+ sink.close();
+ }
+
+ java.util.List keys = listAllObjectKeys();
+ assertFalse(keys.isEmpty(),
+ "Expected objects in S3 with INTELLIGENT_TIERING storage class");
+
+ for (String key : keys) {
+ StorageClass sc = getObjectStorageClass(key);
+ assertNotNull(sc);
+ assertEquals("INTELLIGENT_TIERING", sc.toString(),
+ "Storage class should be INTELLIGENT_TIERING for key: " + key);
+ }
+ }
+
+ @Test
+ void testEmptyStorageClassFailsValidationForS3v2() {
+ CloudStorageGenericRecordSink sink = new CloudStorageGenericRecordSink();
+ Map config = buildSinkConfig("json");
+ config.put("s3StorageClass", "");
+ SinkContext ctx = mockSinkContext();
+
+ assertThrows(Exception.class, () -> sink.open(config, ctx),
+ "Empty s3StorageClass should fail validation for s3v2 provider");
+ }
+}
diff --git a/src/test/java/org/apache/pulsar/io/jcloud/writer/S3BlobWriterTest.java b/src/test/java/org/apache/pulsar/io/jcloud/writer/S3BlobWriterTest.java
index 3abd5f09..3af61cd2 100644
--- a/src/test/java/org/apache/pulsar/io/jcloud/writer/S3BlobWriterTest.java
+++ b/src/test/java/org/apache/pulsar/io/jcloud/writer/S3BlobWriterTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.io.jcloud.writer;
import static org.apache.pulsar.io.jcloud.BlobStoreAbstractConfig.PROVIDER_AWSS3V2;
+import static org.junit.Assert.assertThrows;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -50,4 +51,83 @@ public void initTest() throws IOException {
S3BlobWriter o = new S3BlobWriter(cloudStorageSinkConfig);
o.close();
}
+
+ @Test
+ public void testValidStorageClass() throws IOException {
+ Map config = new HashMap<>();
+ config.put("provider", PROVIDER_AWSS3V2);
+ config.put("accessKeyId", "aws-s3");
+ config.put("secretAccessKey", "aws-s3");
+ config.put("bucket", "testbucket");
+ config.put("region", "us-east-1");
+ config.put("endpoint", "https://us-standard");
+ config.put("pathPrefix", "pulsar/");
+ config.put("formatType", "avro");
+ config.put("partitionerType", "default");
+ config.put("batchSize", 10);
+ config.put("s3StorageClass", "STANDARD_IA");
+ CloudStorageSinkConfig cloudStorageSinkConfig = CloudStorageSinkConfig.load(config);
+ cloudStorageSinkConfig.validate();
+
+ S3BlobWriter o = new S3BlobWriter(cloudStorageSinkConfig);
+ o.close();
+ }
+
+ @Test
+ public void testEmptyStorageClassFailsValidation() throws IOException {
+ Map config = new HashMap<>();
+ config.put("provider", PROVIDER_AWSS3V2);
+ config.put("accessKeyId", "aws-s3");
+ config.put("secretAccessKey", "aws-s3");
+ config.put("bucket", "testbucket");
+ config.put("region", "us-east-1");
+ config.put("endpoint", "https://us-standard");
+ config.put("pathPrefix", "pulsar/");
+ config.put("formatType", "avro");
+ config.put("partitionerType", "default");
+ config.put("batchSize", 10);
+ config.put("s3StorageClass", "");
+ CloudStorageSinkConfig cloudStorageSinkConfig = CloudStorageSinkConfig.load(config);
+
+ assertThrows(IllegalArgumentException.class, cloudStorageSinkConfig::validate);
+ }
+
+ @Test
+ public void testKnownStorageClassPassesValidation() throws IOException {
+ Map config = new HashMap<>();
+ config.put("provider", PROVIDER_AWSS3V2);
+ config.put("accessKeyId", "aws-s3");
+ config.put("secretAccessKey", "aws-s3");
+ config.put("bucket", "testbucket");
+ config.put("region", "us-east-1");
+ config.put("endpoint", "https://us-standard");
+ config.put("pathPrefix", "pulsar/");
+ config.put("formatType", "avro");
+ config.put("partitionerType", "default");
+ config.put("batchSize", 10);
+ config.put("s3StorageClass", "GLACIER_IR");
+ CloudStorageSinkConfig cloudStorageSinkConfig = CloudStorageSinkConfig.load(config);
+ cloudStorageSinkConfig.validate();
+ }
+
+ @Test
+ public void testDefaultStorageClass() throws IOException {
+ Map config = new HashMap<>();
+ config.put("provider", PROVIDER_AWSS3V2);
+ config.put("accessKeyId", "aws-s3");
+ config.put("secretAccessKey", "aws-s3");
+ config.put("bucket", "testbucket");
+ config.put("region", "us-east-1");
+ config.put("endpoint", "https://us-standard");
+ config.put("pathPrefix", "pulsar/");
+ config.put("formatType", "avro");
+ config.put("partitionerType", "default");
+ config.put("batchSize", 10);
+ // s3StorageClass not set, should default to STANDARD
+ CloudStorageSinkConfig cloudStorageSinkConfig = CloudStorageSinkConfig.load(config);
+ cloudStorageSinkConfig.validate();
+
+ S3BlobWriter o = new S3BlobWriter(cloudStorageSinkConfig);
+ o.close();
+ }
}