Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
.project
.settings/
.recommenders/
.factorypath

# Intellij
.idea/
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ The Cloud Storage sink connector supports the following properties.
| `role` | String | False | null | The Cloud Storage role. |
| `roleSessionName` | String | False | null | The Cloud Storage role session name. |
| `endpoint` | String | True | null | The Cloud Storage endpoint. |
| `s3StorageClass` | String | False | "STANDARD" | The S3 storage class to use when writing objects. Only applies when `provider` is `s3v2`. The value is passed directly to the S3 API and must be a valid [S3 storage class](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#AmazonS3-PutObject-request-header-StorageClass) string (e.g. `STANDARD`, `STANDARD_IA`, `ONEZONE_IA`, `INTELLIGENT_TIERING`, `GLACIER`, `GLACIER_IR`, `DEEP_ARCHIVE`). |
| `bucket` | String | True | null | The Cloud Storage bucket. |
| `formatType` | String | False | "json" | The data format type. Available options are JSON, Avro, Bytes, or Parquet. By default, it is set to JSON. |
| `partitionerType` | String | False | "partition" | The partitioning type. It can be configured by topic partitions or by time. By default, the partition type is configured by topic partitions. |
Expand Down
4 changes: 3 additions & 1 deletion docs/aws-s3-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ pulsarctl sinks create \
"bucket": "Your bucket name",
"region": "Your AWS S3 region",
"formatType": "json",
"partitionerType": "PARTITION"
"partitionerType": "PARTITION",
"s3StorageClass": "STANDARD"
}'
```

Expand Down Expand Up @@ -134,6 +135,7 @@ Before using the AWS S3 sink connector, you need to configure it. This table out
| `partitionerType` | String | False | false | null | The partitioning type. It can be configured by topic `PARTITION` or `TIME`. By default, the partition type is configured by topic partitions. |
| `region` | String | False | false | null | The AWS S3 region. Either the endpoint or region must be set. |
| `endpoint` | String | False | false | null | The AWS S3 endpoint. Either the endpoint or region must be set. |
| `s3StorageClass` | String | False | false | "STANDARD" | The S3 storage class to use when writing objects. Only applies when `provider` is `s3v2`. The value is passed directly to the S3 API and must be a valid [S3 storage class](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html#AmazonS3-PutObject-request-header-StorageClass) string (e.g. `STANDARD`, `STANDARD_IA`, `ONEZONE_IA`, `INTELLIGENT_TIERING`, `GLACIER`, `GLACIER_IR`, `DEEP_ARCHIVE`, `REDUCED_REDUNDANCY`). |
| `role` | String | False | false | null | The AWS role. |
| `roleSessionName` | String | False | false | null | The AWS role session name. |
| `timePartitionPattern` | String | False | false | "yyyy-MM-dd" | The format pattern of the time-based partitioning. For details, refer to the Java date and time format. |
Expand Down
77 changes: 76 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@
<awaitility.version>4.2.0</awaitility.version>
<mockito.version>3.7.7</mockito.version>
<powermock.version>2.0.2</powermock.version>
<testcontainers.version>1.15.2</testcontainers.version>
<testcontainers.version>1.19.3</testcontainers.version>
<junit-jupiter.version>5.10.1</junit-jupiter.version>

<!-- build plugin dependencies -->
<license.plugin.version>3.0</license.plugin.version>
Expand Down Expand Up @@ -248,6 +249,21 @@
<artifactId>pulsar</artifactId>
<version>${testcontainers.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<version>${testcontainers.version}</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit-jupiter.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
Expand Down Expand Up @@ -605,6 +621,21 @@
<version>${pulsar.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

Expand All @@ -631,6 +662,9 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/*IT.java</exclude>
</excludes>
<reuseForks>false</reuseForks>
<forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds>
<rerunFailingTestsCount>${testRetryCount}</rerunFailingTestsCount>
Expand Down Expand Up @@ -796,6 +830,47 @@
<artifactId>maven-surefire-plugin</artifactId>
</plugin>

<!-- integration test -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.2.3</version>
<configuration>
<includes>
<include>**/*IT.java</include>
</includes>
<argLine>
--add-opens java.base/java.io=ALL-UNNAMED
--add-opens java.base/java.lang=ALL-UNNAMED
--add-opens java.base/java.lang.reflect=ALL-UNNAMED
--add-opens java.base/java.lang.invoke=ALL-UNNAMED
--add-opens java.base/java.net=ALL-UNNAMED
--add-opens java.base/java.nio=ALL-UNNAMED
--add-opens java.base/java.nio.channels.spi=ALL-UNNAMED
--add-opens java.base/java.nio.file=ALL-UNNAMED
--add-opens java.base/java.util=ALL-UNNAMED
--add-opens java.base/java.util.concurrent=ALL-UNNAMED
--add-opens java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens java.base/java.util.stream=ALL-UNNAMED
--add-opens java.base/java.util.zip=ALL-UNNAMED
--add-opens java.base/java.time=ALL-UNNAMED
--add-opens java.base/jdk.internal.loader=ALL-UNNAMED
--add-opens java.base/sun.net.dns=ALL-UNNAMED
--add-opens java.base/sun.nio.ch=ALL-UNNAMED
--add-opens java.base/sun.security.jca=ALL-UNNAMED
--add-opens java.xml/jdk.xml.internal=ALL-UNNAMED
</argLine>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- package -->
<plugin>
<groupId>org.apache.nifi</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class BlobStoreAbstractConfig implements Serializable {
// #### common configuration ####
private boolean usePathStyleUrl = true;
private String awsCannedAcl = "";
private String s3StorageClass = "STANDARD";
private boolean skipFailedMessages = false;

// #### partitioner configuration ####
Expand Down Expand Up @@ -126,6 +127,10 @@ public void validate() {
checkArgument(isNotBlank(region) || isNotBlank(endpoint),
"Either the aws-end-point or aws-region must be set.");
}
if (provider.equalsIgnoreCase(PROVIDER_AWSS3V2)) {
checkArgument(isNotBlank(s3StorageClass),
"s3StorageClass property must not be empty for s3v2 provider.");
}
if (isNotBlank(endpoint)) {
checkArgument(hasURIScheme(endpoint), "endpoint property needs to specify URI scheme.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class S3BlobWriter implements BlobWriter {
private final S3Client s3;
private final String bucket;
private ObjectCannedACL acl;
private String storageClass;

public S3BlobWriter(CloudStorageSinkConfig sinkConfig) {

Expand All @@ -67,6 +68,9 @@ public S3BlobWriter(CloudStorageSinkConfig sinkConfig) {
if (StringUtils.isNotEmpty(sinkConfig.getAwsCannedAcl())) {
acl = ObjectCannedACL.fromValue(sinkConfig.getAwsCannedAcl());
}
if (StringUtils.isNotEmpty(sinkConfig.getS3StorageClass())) {
storageClass = sinkConfig.getS3StorageClass();
}
}

@Override
Expand All @@ -75,6 +79,9 @@ public void uploadBlob(String key, ByteBuffer payload) throws IOException {
if (acl != null) {
req.acl(acl);
}
if (storageClass != null) {
req.storageClass(storageClass);
}

s3.putObject(req.build(), RequestBody.fromByteBuffer(payload));
}
Expand Down
Loading