Skip to content

Commit 0e5f382

Browse files
committed
cleanup-cloud-storage
1 parent 6d0c41b commit 0e5f382

File tree

3 files changed

+145
-10
lines changed

3 files changed

+145
-10
lines changed
Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,68 @@
1-
xxxx
1+
[source,json]
2+
----
3+
{
4+
"archive": "builtin://bigquery",
5+
"autoAck": true,
6+
"className": "org.apache.pulsar.io.kafka.connect.KafkaConnectSink",
7+
"cleanupSubscription": false,
8+
"configs": {
9+
"batchSize": "1000",
10+
"kafkaConnectorConfigProperties": {
11+
"autoCreateBucket": true,
12+
"autoCreateTables": false,
13+
"keySource": "JSON",
14+
"queueSize": "-1",
15+
"sanitizeTopics": false,
16+
"topics": "homelab/default/clue-sensors"
17+
},
18+
"kafkaConnectorSinkClass": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
19+
"lingerTimeMs": "1000",
20+
"offsetStorageTopic": "homelab/default/clue-sensors",
21+
"sanitizeTopicName": true,
22+
"topic": "homelab/default/clue-sensors"
23+
},
24+
"customRuntimeOptions": "internal_data",
25+
"deadLetterTopic": null,
26+
"inputSpecs": {
27+
"homelab/default/clue-sensors": {
28+
"consumerProperties": {},
29+
"cryptoConfig": null,
30+
"poolMessages": false,
31+
"receiverQueueSize": null,
32+
"regexPattern": false,
33+
"schemaProperties": {},
34+
"schemaType": null,
35+
"serdeClassName": null
36+
}
37+
},
38+
"inputs": [
39+
"homelab/default/clue-sensors"
40+
],
41+
"maxMessageRetries": null,
42+
"name": "bq-sink",
43+
"namespace": "default",
44+
"negativeAckRedeliveryDelayMs": null,
45+
"parallelism": 1,
46+
"processingGuarantees": "EFFECTIVELY_ONCE",
47+
"resources": {
48+
"cpu": 0.25,
49+
"disk": 1000000000,
50+
"ram": 1000000000
51+
},
52+
"retainKeyOrdering": false,
53+
"retainOrdering": true,
54+
"runtimeFlags": null,
55+
"secrets": null,
56+
"sourceSubscriptionName": null,
57+
"sourceSubscriptionPosition": "Latest",
58+
"tenant": "homelab",
59+
"timeoutMs": 2000,
60+
"topicToSchemaProperties": null,
61+
"topicToSchemaType": null,
62+
"topicToSerdeClassName": null,
63+
"topicsPattern": null,
64+
"transformFunction": null,
65+
"transformFunctionClassName": null,
66+
"transformFunctionConfig": null
67+
}
68+
----
Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,69 @@
1-
xxxx
1+
[source,json]
2+
----
3+
{
4+
"archive": "builtin://cloud-storage",
5+
"autoAck": true,
6+
"className": "org.apache.pulsar.io.jcloud.sink.CloudStorageGenericRecordSink",
7+
"cleanupSubscription": false,
8+
"configs": {
9+
"batchSize": "10",
10+
"batchTimeMs": "1000",
11+
"bucket": "S3",
12+
"formatType": "json",
13+
"maxBatchBytes": "10000000",
14+
"partitionerType": "partition",
15+
"partitionerUseIndexAsOffset": false,
16+
"pendingQueueSize": "10",
17+
"provider": "AWS",
18+
"skipFailedMessages": false,
19+
"sliceTopicPartitionPath": false,
20+
"useHumanReadableMessageId": false,
21+
"useHumanReadableSchemaVersion": false,
22+
"withMetadata": false,
23+
"withTopicPartitionNumber": true
24+
},
25+
"customRuntimeOptions": "internal_data",
26+
"deadLetterTopic": null,
27+
"inputSpecs": {
28+
"persistent://homelab/default/clue-sensors": {
29+
"consumerProperties": {},
30+
"cryptoConfig": null,
31+
"poolMessages": false,
32+
"receiverQueueSize": null,
33+
"regexPattern": false,
34+
"schemaProperties": {},
35+
"schemaType": null,
36+
"serdeClassName": null
37+
}
38+
},
39+
"inputs": [
40+
"persistent://homelab/default/clue-sensors"
41+
],
42+
"maxMessageRetries": null,
43+
"name": "cloud-storage-sink",
44+
"namespace": "default",
45+
"negativeAckRedeliveryDelayMs": null,
46+
"parallelism": 1,
47+
"processingGuarantees": "ATLEAST_ONCE",
48+
"resources": {
49+
"cpu": 0.25,
50+
"disk": 1000000000,
51+
"ram": 1000000000
52+
},
53+
"retainKeyOrdering": false,
54+
"retainOrdering": false,
55+
"runtimeFlags": null,
56+
"secrets": null,
57+
"sourceSubscriptionName": null,
58+
"sourceSubscriptionPosition": "Latest",
59+
"tenant": "homelab",
60+
"timeoutMs": 5000,
61+
"topicToSchemaProperties": null,
62+
"topicToSchemaType": null,
63+
"topicToSerdeClassName": null,
64+
"topicsPattern": null,
65+
"transformFunction": null,
66+
"transformFunctionClassName": null,
67+
"transformFunctionConfig": null
68+
}
69+
----

modules/pulsar-io/pages/connectors/sinks/cloud-storage.adoc

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,32 +22,32 @@ include::partial$connectors/sinks/get-started.adoc[]
2222

2323
== Data format types
2424

25-
Cloud Storage Sink Connector provides multiple output format options, including JSON, Avro, Bytes, or Parquet. The default format is JSON. With current implementation, there are some limitations for different formats:
25+
The Cloud Storage sink connector provides multiple output format options, including JSON, Avro, Bytes, or Parquet. The default format is JSON. With current implementation, there are some limitations for different formats:
2626

27-
The Pulsar Schema types supported by the writers.
27+
Pulsar Schema types supported by the writers:
2828

2929
[%header,format=csv,cols="1,^1,^1,^1,^1"]
3030
|===
3131
include::example$connectors/sinks/cloud-storage/data-format.csv[]
3232
|===
3333

3434
____
35-
*The JSON writer will try to convert the data with a String or Bytes schema to JSON-format data if convertable.
35+
*The JSON writer will try to convert data with a String or Bytes schema to JSON-format data if convertable.
3636
3737
**The Protobuf schema is based on the Avro schema. It uses Avro as an intermediate format, so it may not provide the best effort conversion.
3838
3939
\*** The ProtobufNative record holds the Protobuf descriptor and the message. When writing to Avro format, the connector uses avro-protobuf to do the conversion.
4040
____
4141

42-
The support of `withMetadata` configurations for different writer formats:
42+
Supported `withMetadata` configurations for different writer formats:
4343

4444
[%header,format=csv,cols="1,^1",width="50%"]
4545
|===
4646
include::example$connectors/sinks/cloud-storage/with-meta-data.csv[]
4747
|===
4848

4949
____
50-
*When using Parquet with PROTOBUF_NATIVE format, the connector will write the messages with DynamicMessage format. When withMetadata is set to true, the connector will add __message_metadata__ to the messages with PulsarIOCSCProtobufMessageMetadata format.
50+
*When using Parquet with PROTOBUF_NATIVE format, the connector will write the messages with the DynamicMessage format. When withMetadata is set to true, the connector will add __message_metadata__ to the messages with PulsarIOCSCProtobufMessageMetadata format.
5151
5252
For example, if a message User has the following schema:
5353
@@ -85,7 +85,7 @@ By default, when the connector receives a message with a non-supported schema ty
8585

8686
== Dead-letter topics
8787

88-
To use a dead-letter topic, set `skipFailedMessages` to `false` in the cloud provider config. Then using either pulsar-admin or curl, set `--max-redeliver-count` and `--dead-letter-topic` . For more info about dead-letter topics, see the https://pulsar.apache.org/docs/en/concepts-messaging/#dead-letter-topic[Pulsar documentation^]{external-link-icon}. If a message fails to be sent to the Cloud Storage and there is a dead-letter topic, the connector will send the message to the assigned topic.
88+
To use a dead-letter topic, set `skipFailedMessages` to `false` in the cloud provider config. Then using either pulsar-admin or curl, set `--max-redeliver-count` and `--dead-letter-topic`. For more info about dead-letter topics, see the https://pulsar.apache.org/docs/en/concepts-messaging/#dead-letter-topic[Pulsar documentation^]{external-link-icon}. If a message fails to be sent to the Cloud Storage sink and there is a dead-letter topic, the connector will send the message to the assigned topic.
8989

9090
== Managing the Connector
9191

@@ -97,7 +97,7 @@ include::partial$connectors/sinks/monitoring.adoc[]
9797

9898
== Connector Reference
9999

100-
With the Cloud Storage Sink there a two sets of parameters. First the Astra Streaming parameters, then the params specific to your chosen cloud store.
100+
With the Cloud Storage Sink there are two sets of parameters. First, the Astra Streaming parameters, then the parameters specific to your chosen cloud store.
101101

102102
=== Astra Streaming
103103

@@ -132,7 +132,7 @@ The suggested permission policies for AWS S3 are:
132132
- s3:PutObject*
133133
- s3:List*
134134
135-
If you do not want to provide region in the configuration, you should enable s3:GetBucketLocation permission policy as well.
135+
If you do not want to provide a region in the configuration, you should enable s3:GetBucketLocation permission policy as well.
136136
137137
[%header,format=csv,cols="2,1,1,3"]
138138
|===

0 commit comments

Comments
 (0)