Skip to content

Commit a0c4b8a

Browse files
committed
Merge branch 'stage' of https://github.com/datastax/streaming-learning-docs into stage
2 parents f22047c + 15537cf commit a0c4b8a

File tree

12 files changed

+290
-0
lines changed

12 files changed

+290
-0
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
Name,Required,Default,Description
2+
accessKeyId,true,null,The Cloud Storage access key ID. It requires permission to write objects.
3+
bucket,true,null,The Cloud Storage bucket.
4+
endpoint,true,null,The Cloud Storage endpoint.
5+
provider,true,null,"The Cloud Storage type, such as aws-s3,s3v2(s3v2 uses the AWS client but not the JCloud client)."
6+
secretAccessKey,true,null,The Cloud Storage secret access key.
7+
avroCodec,false,snappy,"Compression codec used when formatType=avro. Available compression types are: null (no compression), deflate, bzip2, xz, zstandard, snappy."
8+
avroCodec,false,snappy,"Compression codec used when formatType=avro. Available compression types are: null (no compression), deflate, bzip2, xz, zstandard, snappy."
9+
batchSize,false,10,The number of records submitted in batch.
10+
batchTimeMs,false,1000,The interval for batch submission.
11+
bytesFormatTypeSeparator,false,0x10,"It is inserted between records for the formatType of bytes. By default, it is set to '0x10'. An input record that contains the line separator looks like multiple records in the output object."
12+
formatType,false,json,"The data format type. Available options are JSON, Avro, Bytes, or Parquet. By default, it is set to JSON."
13+
jsonAllowNaN,false,false,"Recognize 'NaN', 'INF', '-INF' as legal floating number values when formatType=json. Since JSON specification does not allow such values this is a non-standard feature and disabled by default."
14+
jsonAllowNaN,false,false,"Recognize 'NaN', 'INF', '-INF' as legal floating number values when formatType=json. Since JSON specification does not allow such values this is a non-standard feature and disabled by default."
15+
maxBatchBytes,false,10000000,The maximum number of bytes in a batch.
16+
parquetCodec,false,gzip,"Compression codec used when formatType=parquet. Available compression types are: null (no compression), snappy, gzip, lzo, brotli, lz4, zstd."
17+
parquetCodec,false,gzip,"Compression codec used when formatType=parquet. Available compression types are: null (no compression), snappy, gzip, lzo, brotli, lz4, zstd."
18+
partitionerType,false,partition,"The partitioning type. It can be configured by topic partitions or by time. By default, the partition type is configured by topic partitions."
19+
partitionerUseIndexAsOffset,false,false,"Whether to use the Pulsar's message index as offset or the record sequence. It's recommended if the incoming messages may be batched. The brokers may or not expose the index metadata and, if it's not present on the record, the sequence will be used. See PIP-70 for more details."
20+
pathPrefix,false,false,"If it is set, the output files are stored in a folder under the given bucket path. The pathPrefix must be in the format of xx/xxx/."
21+
pendingQueueSize,false,10,"The number of records buffered in queue. By default, it is equal tobatchSize. You can set it manually."
22+
role,false,null,The Cloud Storage role.
23+
roleSessionName,false,null,The Cloud Storage role session name.
24+
skipFailedMessages,false,false,"Configure whether to skip a message which it fails to be processed. If it is set to true, the connector will skip the failed messages by ack it. Otherwise, the connector will fail the message."
25+
sliceTopicPartitionPath,false,false,"When it is set to true, split the partitioned topic name into separate folders in the bucket path."
26+
timePartitionDuration,false,86400000,"The time interval for time-based partitioning. Support formatted interval string, such as 30d, 24h, 30m, 10s, and also support number in milliseconds precision, such as 86400000 refers to 24h or 1d."
27+
timePartitionPattern,false,yyyy-MM-dd,"The format pattern of the time-based partitioning. For details, refer to the Java date and time format."
28+
useHumanReadableMessageId,false,false,"Use a human-readable format string for messageId in message metadata. The messageId is in a format like ledgerId:entryId:partitionIndex:batchIndex. Otherwise, the messageId is a Hex-encoded string."
29+
useHumanReadableSchemaVersion,false,false,"Use a human-readable format string for the schema version in the message metadata. If it is set to true, the schema version is in plain string format. Otherwise, the schema version is in hex-encoded string format."
30+
withMetadata,false,false,Save message attributes to metadata.
31+
withTopicPartitionNumber,false,true,"When it is set to true, include the topic partition number to the object path."
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
Name,Required,Default,Description
2+
azureStorageAccountConnectionString,true,,The Azure Blob Storage connection string. Required when authenticating via connection string.
3+
azureStorageAccountKey,true,,The Azure Blob Storage account key. Required when authenticating via account name and account key.
4+
azureStorageAccountName,true,,The Azure Blob Storage account name. Required when authenticating via account name and account key.
5+
azureStorageAccountSASToken,true,,The Azure Blob Storage account SAS token. Required when authenticating via SAS token.
6+
bucket,true,null,The Cloud Storage bucket.
7+
endpoint,true,null,The Azure Blob Storage endpoint.
8+
provider,true,null,The Cloud Storage type. Azure Blob Storage only supports the azure-blob-storage provider.
9+
avroCodec,false,snappy,"Compression codec used when formatType=avro. Available compression types are: null (no compression), deflate, bzip2, xz, zstandard, snappy."
10+
batchSize,false,10,The number of records submitted in batch.
11+
batchTimeMs,false,1000,The interval for batch submission.
12+
bytesFormatTypeSeparator,false,0x10,"It is inserted between records for the formatType of bytes. By default, it is set to '0x10'. An input record that contains the line separator looks like multiple records in the output object."
13+
formatType,false,json,"The data format type. Available options are JSON, Avro, Bytes, or Parquet. By default, it is set to JSON."
14+
jsonAllowNaN,false,false,"Recognize 'NaN', 'INF', '-INF' as legal floating number values when formatType=json. Since JSON specification does not allow such values this is a non-standard feature and disabled by default."
15+
maxBatchBytes,false,10000000,The maximum number of bytes in a batch.
16+
parquetCodec,false,gzip,"Compression codec used when formatType=parquet. Available compression types are: null (no compression), snappy, gzip, lzo, brotli, lz4, zstd."
17+
partitionerType,false,partition,"The partitioning type. It can be configured by topic partitions or by time. By default, the partition type is configured by topic partitions."
18+
partitionerUseIndexAsOffset,false,false,"Whether to use the Pulsar's message index as offset or the record sequence. It's recommended if the incoming messages may be batched. The brokers may or not expose the index metadata and, if it's not present on the record, the sequence will be used. See PIP-70 for more details."
19+
pathPrefix,false,false,"If it is set, the output files are stored in a folder under the given bucket path. The pathPrefix must be in the format of xx/xxx/."
20+
pendingQueueSize,false,10,"The number of records buffered in queue. By default, it is equal to batchSize. You can set it manually."
21+
skipFailedMessages,false,false,"Configure whether to skip a message which it fails to be processed. If it is set to true, the connector will skip the failed messages by ack it. Otherwise, the connector will fail the message."
22+
sliceTopicPartitionPath,false,false,"When it is set to true, split the partitioned topic name into separate folders in the bucket path."
23+
timePartitionDuration,false,86400000,"The time interval for time-based partitioning. Support formatted interval string, such as 30d, 24h, 30m, 10s, and also support number in milliseconds precision, such as 86400000 refers to 24h or 1d."
24+
timePartitionPattern,false,yyyy-MM-dd,"The format pattern of the time-based partitioning. For details, refer to the Java date and time format."
25+
useHumanReadableMessageId,false,false,"Use a human-readable format string for messageId in message metadata. The messageId is in a format like ledgerId:entryId:partitionIndex:batchIndex. Otherwise, the messageId is a Hex-encoded string."
26+
useHumanReadableSchemaVersion,false,false,"Use a human-readable format string for the schema version in the message metadata. If it is set to true, the schema version is in plain string format. Otherwise, the schema version is in hex-encoded string format."
27+
withMetadata,false,false,Save message attributes to metadata.
28+
withTopicPartitionNumber,false,true,"When it is set to true, include the topic partition number to the object path."
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
curl -sS --fail --request POST ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'?opt=poweruser' \
2+
--header "Authorization: Bearer $PULSAR_TOKEN" \
3+
--form 'sinkConfig="{
4+
\"archive\":\"builtin:\/\/cloud-storage\",
5+
\"tenant\":\"'$TENANT'\",
6+
\"namespace\":\"'$NAMESPACE'\",
7+
\"name\":\"'$SINK_NAME'\",
8+
\"parallelism\": 1,
9+
\"inputs\":[\"'$TENANT'\/'$NAMESPACE'\/'$INPUT_TOPIC'\"],
10+
\"configs\":{ <see below reference for storage specifics> }
11+
}"'
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
curl -sS --fail --request PUT ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'?opt=poweruser' \
2+
--header "Authorization: Bearer $ASTRA_STREAMING_TOKEN" \
3+
--form 'sinkConfig="{
4+
\"archive\":\"builtin:\/\/cloud-storage\",
5+
\"tenant\":\"'$TENANT'\",
6+
\"namespace\":\"'$NAMESPACE'\",
7+
\"name\":\"'$SINK_NAME'\",
8+
\"parallelism\": 2,
9+
\"inputs\":[\"'$TENANT'\/'$NAMESPACE'\/'$INPUT_TOPIC'\"]
10+
}"'
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
Pulsar Schema,Writer: Avro,Writer: JSON,Writer: Parquet,Writer: Bytes
2+
Primitive,❌,✅ *,❌,✅
3+
Avro,✅,✅,✅,✅
4+
Json,✅,✅,✅,✅
5+
Protobuf **,✅,✅,✅,✅
6+
ProtobufNative,✅ * * *,❌,✅,✅
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
Name,Required,Default,Description
2+
bucket,true,null,The Cloud Storage bucket.
3+
provider,true,null,The Cloud Storage type. Google cloud storage only supports the google-cloud-storage provider.
4+
avroCodec,false,snappy,"Compression codec used when formatType=avro. Available compression types are: null (no compression), deflate, bzip2, xz, zstandard, snappy."
5+
batchSize,false,10,The number of records submitted in batch.
6+
batchTimeMs,false,1000,The interval for batch submission.
7+
bytesFormatTypeSeparator,false,0x10,"It is inserted between records for the formatType of bytes. By default, it is set to '0x10'. An input record that contains the line separator looks like multiple records in the output object."
8+
formatType,false,json,"The data format type. Available options are JSON, Avro, Bytes, or Parquet. By default, it is set to JSON."
9+
gcsServiceAccountKeyFileContent,false,,"The contents of the JSON service key file. If empty, credentials are read from gcsServiceAccountKeyFilePath file."
10+
gcsServiceAccountKeyFilePath,false,,"Path to the GCS credentials file. If empty, the credentials file are read from the GOOGLE_APPLICATION_CREDENTIALS environment variable."
11+
jsonAllowNaN,false,false,"Recognize 'NaN', 'INF', '-INF' as legal floating number values when formatType=json. Since JSON specification does not allow such values this is a non-standard feature and disabled by default."
12+
maxBatchBytes,false,10000000,The maximum number of bytes in a batch.
13+
parquetCodec,false,gzip,"Compression codec used when formatType=parquet. Available compression types are: null (no compression), snappy, gzip, lzo, brotli, lz4, zstd."
14+
partitionerType,false,partition,"The partitioning type. It can be configured by topic partitions or by time. By default, the partition type is configured by topic partitions."
15+
partitionerUseIndexAsOffset,false,false,"Whether to use the Pulsar's message index as offset or the record sequence. It's recommended if the incoming messages may be batched. The brokers may or not expose the index metadata and, if it's not present on the record, the sequence will be used. See PIP-70 for more details."
16+
pathPrefix,false,false,"If it is set, the output files are stored in a folder under the given bucket path. The pathPrefix must be in the format of xx/xxx/."
17+
pendingQueueSize,false,10,"The number of records buffered in queue. By default, it is equal to batchSize. You can set it manually."
18+
skipFailedMessages,false,false,"Configure whether to skip a message which it fails to be processed. If it is set to true, the connector will skip the failed messages by ack it. Otherwise, the connector will fail the message."
19+
sliceTopicPartitionPath,false,false,"When it is set to true, split the partitioned topic name into separate folders in the bucket path."
20+
timePartitionDuration,false,86400000,"The time interval for time-based partitioning. Support formatted interval string, such as 30d, 24h, 30m, 10s, and also support number in milliseconds precision, such as 86400000 refers to 24h or 1d."
21+
timePartitionPattern,false,yyyy-MM-dd,"The format pattern of the time-based partitioning. For details, refer to the Java date and time format."
22+
useHumanReadableMessageId,false,false,"Use a human-readable format string for messageId in message metadata. The messageId is in a format like ledgerId:entryId:partitionIndex:batchIndex. Otherwise, the messageId is a Hex-encoded string."
23+
useHumanReadableSchemaVersion,false,false,"Use a human-readable format string for the schema version in the message metadata. If it is set to true, the schema version is in plain string format. Otherwise, the schema version is in hex-encoded string format."
24+
withMetadata,false,false,Save message attributes to metadata.
25+
withTopicPartitionNumber,false,true,"When it is set to true, include the topic partition number to the object path."
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
./bin/pulsar-admin sinks create \
2+
--sink-type cloud-storage \
3+
--name "$SINK_NAME" \
4+
--inputs "$TENANT/$NAMESPACE/$INPUT_TOPIC" \
5+
--tenant "$TENANT" \
6+
--processing-guarantees EFFECTIVELY_ONCE \
7+
--sink-config '{ <see below reference for storage specifics> }'
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
./bin/pulsar-admin sinks update \
2+
--sink-type cloud-storage \
3+
--name "$SINK_NAME" \
4+
--inputs "$TENANT/$NAMESPACE/$INPUT_TOPIC" \
5+
--tenant "$TENANT" \
6+
--parallelism 2
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
xxxx
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Writer Format,withMetadata
2+
Avro,✅
3+
JSON,✅
4+
Parquet,✅ *
5+
Bytes,❌

0 commit comments

Comments
 (0)