Skip to content

Commit 3a1be35

Browse files
authored
Merge pull request #32 from datastax/bigquery-sink
Created docs for BigQuery sink
2 parents 539c693 + 4e84300 commit 3a1be35

File tree

10 files changed

+205
-1
lines changed

10 files changed

+205
-1
lines changed

modules/pulsar-io/examples/connectors/sinks/astra.csv

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"autoAck","true","false","Boolean denotes whether or not the framework will automatically acknowledge messages"
44
"className","true","","The connector type's class reference, like 'org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource'"
55
"cleanupSubscription","false","false","Boolean denotes whether the subscriptions the functions created/used should be deleted when the functions is deleted"
6-
"configs","false","{}","JSON key/value config of sink type specific settings. Example: {""property1"":""1234"",""property2"":{""subProperty"":""asdf""}}"
6+
"configs","false","{}","A key/value map of config properties specific to the type of connector. See the reference table below for values."
77
"customRuntimeOptions","false","","A string that encodes options to customize the runtime, see Apache Pulsar docs for configured runtime for details"
88
"deadLetterTopic","false","","Name of the dead topic where the failing messages will be sent"
99
"inputSpecs","false","","The map of input topics to its consumer configuration, each configuration has schema of {""schemaType"": ""type-x"", ""serdeClassName"": ""name-x"", ""isRegexPattern""": true, ""receiverQueueSize"": 5}"
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
Name,Required,Default,Description
2+
kafkaConnectorSinkClass,true,,"A kafka-connector sink class to use. Unless you've developed your own, use the value ""com.wepay.kafka.connect.bigquery.BigQuerySinkConnector""."
3+
offsetStorageTopic,true,,Pulsar topic to store offsets at. This is an additional topic to your topic with the actual data going to BigQuery.
4+
sanitizeTopicName,true,,"Some connectors cannot handle pulsar topic names like persistent://a/b/topic and do not sanitize the topic name themselves. If enabled, all non alpha-digital characters in topic name will be replaced with underscores. In some cases it may result in topic name collisions (topic_a and topic.a will become the same)
5+
6+
This value MUST be set to `true`. Any other value will result in an error."
7+
topic,true,,The Kafka topic name that passed to kafka sink.
8+
batchSize,false,16384,Size of messages in bytes the sink will attempt to batch messages together before flush.
9+
collapsePartitionedTopics,false,false,Supply kafka record with topic name without -partition- suffix for partitioned topics.
10+
kafkaConnectorConfigProperties,false,{},A key/value map of config properties to pass to the kafka connector. See the reference table below.
11+
lingerTimeMs ,false,2147483647L,Time interval in milliseconds the sink will attempt to batch messages together before flush.
12+
maxBatchBitsForOffset,false,12,Number of bits (0 to 20) to use for index of message in the batch for translation into an offset. 0 to disable this behavior (Messages from the same batch will have the same offset which can affect some connectors.)
13+
unwrapKeyValueIfAvailable ,false,true,In case of Record<KeyValue<>> data use key from KeyValue<> instead of one from Record.
14+
useIndexAsOffset,false,true,"Allows use of message index instead of message sequenceId as offset, if available. Requires AppendIndexMetadataInterceptor and exposingBrokerEntryMetadataToClientEnabled=true on brokers."
15+
useOptionalPrimitives,false,false,"Pulsar schema does not contain information whether the Schema is optional, Kafka's does. This provides a way to force all primitive schemas to be optional for Kafka."
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
curl -sS --fail --request POST ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'?opt=poweruser' \
2+
--header "Authorization: Bearer $PULSAR_TOKEN" \
3+
--form 'sinkConfig="{
4+
\"archive\":\"builtin:\/\/bigquery\",
5+
\"tenant\":\"'$TENANT'\",
6+
\"namespace\":\"'$NAMESPACE'\",
7+
\"name\":\"'$SINK_NAME'\",
8+
\"parallelism\": 1,
9+
\"inputs\":[\"'$TENANT'\/'$NAMESPACE'\/'$INPUT_TOPIC'\"],
10+
\"configs\":{
11+
\"topic\": \"bq-test01\",
12+
\"kafkaConnectorSinkClass\": \"com.wepay.kafka.connect.bigquery.BigQuerySinkConnector\",
13+
\"offsetStorageTopic\": \"bq-sink-offsets01\",
14+
\"sanitizeTopicName\": \"true\",
15+
\"kafkaConnectorConfigProperties\":{
16+
\"name\": \"bq-sink1\",
17+
\"topics\": \"bq-test01\",
18+
\"project\": \"my-bigquery-project\",
19+
\"defaultDataset\": \"BQ_CONNECTOR_TEST\",
20+
\"keyfile\": \"{\"type\":\"service_account\",\"project_id\":\"XXXXXX\",\"private_key_id\":\"XXXXXXXXX\",\"private_key\":\"-----BEGIN PRIVATE KEY-----\\nMIIEvQIBADANBgkqhkiG9w … U=\\n-----END PRIVATE KEY-----\\n\",\"client_email\":\"XXXXXXXXX\",\"client_id\":\"XXXXXX\",\"auth_uri\":\"https://accounts.google.com/o/oauth2/auth\",\"token_uri\":\"https://oauth2.googleapis.com/token\",\"auth_provider_x509_cert_url\":\"https://www.googleapis.com/oauth2/v1/certs\",\"client_x509_cert_url\":\"https://www.googleapis.com/robot/v1/metadata/x509/XXXXXX\"}\",
21+
\"keySource\": \"JSON\",
22+
\"autoCreateTables\": \"true\",
23+
\"sanitizeTopics\": \"false\"
24+
}
25+
}
26+
}"'
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
curl -sS --fail --request PUT ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'?opt=poweruser' \
2+
--header "Authorization: Bearer $ASTRA_STREAMING_TOKEN" \
3+
--form 'sinkConfig="{
4+
\"archive\":\"builtin:\/\/bigquery\",
5+
\"tenant\":\"'$TENANT'\",
6+
\"namespace\":\"'$NAMESPACE'\",
7+
\"name\":\"'$SINK_NAME'\",
8+
\"parallelism\": 2,
9+
\"inputs\":[\"'$TENANT'\/'$NAMESPACE'\/'$INPUT_TOPIC'\"]
10+
}"'
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
Name,Required,Default,Description
2+
defaultDataset,true,,The default dataset to be used
3+
keyfile,true,,"Can be either a string representation of the Google credentials file or the path to the Google credentials file itself.
4+
5+
When using the Astra Streaming UI, the string representation must be used. But if using pulsar-admin with Astra Streaming, either the representation or file can be used."
6+
keySource,true,FILE,"Determines whether the keyfile configuration is the path to the credentials JSON file or to the JSON itself. Available values are `FILE` and `JSON`.
7+
8+
When using the Astra Streaming UI, JSON will be the only option. But if using pulsar-admin with Astra Streaming, either the representation or file can be used."
9+
name,true,,The name of the connector. Use the same value as Pulsar sink name.
10+
project,true,,The BigQuery project to write to
11+
sanitizeTopics,true,false,"Designates whether to automatically sanitize topic names before using them as table names. If not enabled, topic names are used as table names.
12+
13+
The only accepted value is `false`. Providing any other value will result in an error."
14+
topics,true,,"A list of Kafka topics to read from. Use the same name as the Pulsar topic (not the whole address, just the topic name)."
15+
allBQFieldsNullable,false,false,"If `true`, no fields in any produced BigQuery schema are REQUIRED. All non-nullable Avro fields are translated as NULLABLE (or REPEATED, if arrays)."
16+
allowBigQueryRequiredFieldRelaxation,false,false,"If true, fields in BigQuery Schema can be changed from REQUIRED to NULLABLE."
17+
allowNewBigQueryFields,false,false,"If true, new fields can be added to BigQuery tables during subsequent schema updates."
18+
allowSchemaUnionization,false,false,"If true, the existing table schema (if one is present) will be unionized with new record schemas during schema updates. If false, the record of the last schema in a batch will be used for any necessary table creation and schema update attempts.
19+
20+
Setting allowSchemaUnionization to false and allowNewBigQueryFields and allowBigQueryRequiredFieldRelaxation to true is equivalent to setting autoUpdateSchemas to true in older (pre-2.0.0) versions of this connector.
21+
22+
In this case, if BigQuery raises a schema validation exception or a table doesn’t exist when a writing a batch, the connector will try to remediate by required field relaxation and/or adding new fields.
23+
24+
If allowSchemaUnionization, allowNewBigQueryFields, and allowBigQueryRequiredFieldRelaxation are true, the connector will create or update tables with a schema whose fields are a union of the existing table schema’s fields and the ones present in all of the records of the current batch.
25+
26+
The key difference is that with unionization disabled, new record schemas have to be a superset of the table schema in BigQuery.
27+
28+
In general when enabled, allowSchemaUnionization is useful to make things work. For instance, if you’d like to remove fields from data upstream, the updated schemas still work in the connector. Similarly it is useful when different tasks see records whose schemas contain different fields that are not in the table. However note with caution that if allowSchemaUnionization is set and some bad records are in the topic, the BigQuery schema may be permanently changed. This presents two issues: first, since BigQuery doesn’t allow columns to be dropped from tables, they’ll add unnecessary noise to the schema. Second, since BigQuery doesn’t allow column types to be modified, they could completely break pipelines down the road where well-behaved records have schemas whose field names overlap with the accidentally-added columns in the table, but use a different type."
29+
autoCreateBucket,false,true,"Whether to automatically create the given bucket, if it does not exist."
30+
autoCreateTables,false,false,Automatically create BigQuery tables if they don’t already exist
31+
avroDataCacheSize,false,100,The size of the cache to use when converting schemas from Avro to Kafka Connect.
32+
batchLoadIntervalSec,false,120,"The interval, in seconds, in which to attempt to run GCS to BigQuery load jobs. Only relevant if `enableBatchLoad` is configured."
33+
bigQueryMessageTimePartitioning,false,false,Whether or not to use the message time when inserting records. Default uses the connector processing time.
34+
bigQueryPartitionDecorator,false,true,Whether or not to append partition decorator to BigQuery table name when inserting records. Default is true. Setting this to true appends partition decorator to table name (e.g. table$yyyyMMdd depending on the configuration set for bigQueryPartitionDecorator). Setting this to false bypasses the logic to append the partition decorator and uses raw table name for inserts.
35+
bigQueryRetry,false,0,The number of retry attempts made for a BigQuery request that fails with a backend error or a quota exceeded error.
36+
bigQueryRetryWait,false,1000,"The minimum amount of time, in milliseconds, to wait between retry attempts for a BigQuery backend or quota exceeded error."
37+
clusteringPartitionFieldNames,false,,Comma-separated list of fields where data is clustered in BigQuery.
38+
convertDoubleSpecialValues,false,false,Designates whether +Infinity is converted to Double.MAX_VALUE and whether -Infinity and NaN are converted to Double.MIN_VALUE to ensure successfull delivery to BigQuery.
39+
deleteEnabled,false,false,"Enable delete functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes. A delete will be performed when a record with a null value (that is–a tombstone record) is read. This feature will not work with SMTs that change the name of the topic."
40+
enableBatchLoad,false,“”,Beta Feature Use with caution. The sublist of topics to be batch loaded through GCS.
41+
gcsBucketName,false,"""”",The name of the bucket where Google Cloud Storage (GCS) blobs are located. These blobs are used to batch-load to BigQuery. This is applicable only if `enableBatchLoad` is configured.
42+
includeKafkaData,false,false,"Whether to include an extra block containing the Kafka source topic, offset, and partition information in the resulting BigQuery rows."
43+
intermediateTableSuffix,false,“.tmp”,"A suffix that will be appended to the names of destination tables to create the names for the corresponding intermediate tables. Multiple intermediate tables may be created for a single destination table, but their names will always start with the name of the destination table, followed by this suffix, and possibly followed by an additional suffix."
44+
kafkaDataFieldName,false,,"The Kafka data field name. The default value is null, which means the Kafka Data field will not be included."
45+
kafkaKeyFieldName,false,,"The Kafka key field name. The default value is null, which means the Kafka Key field will not be included."
46+
mergeIntervalMs,false,60_000L,"How often (in milliseconds) to perform a merge flush, if upsert/delete is enabled. Can be set to -1 to disable periodic flushing."
47+
mergeRecordsThreshold,false,-1,"How many records to write to an intermediate table before performing a merge flush, if upsert/delete is enabled. Can be set to -1 to disable record count-based flushing."
48+
queueSize,false,-1,The maximum size (or -1 for no maximum size) of the worker queue for BigQuery write requests before all topics are paused. This is a soft limit; the size of the queue can go over this before topics are paused. All topics resume once a flush is triggered or the size of the queue drops under half of the maximum size.
49+
schemaRetriever,false,com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever,A class that can be used for automatically creating tables and/or updating schemas.
50+
threadPoolSize,false,10,The size of the BigQuery write thread pool. This establishes the maximum number of concurrent writes to BigQuery.
51+
timePartitioningType,false,DAY,"The time partitioning type to use when creating tables. Existing tables will not be altered to use this partitioning type. Valid Values: (case insensitive) [MONTH, YEAR, HOUR, DAY]"
52+
timestampPartitionFieldName,false,,"The name of the field in the value that contains the timestamp to partition by in BigQuery and enable timestamp partitioning for each table. Leave this configuration blank, to enable ingestion time partitioning for each table."
53+
topic2TableMap,false,,"Map of topics to tables (optional).
54+
55+
Format: comma-separated tuples, e.g. <topic-1>:<table-1>,<topic-2>:<table-2>,..
56+
57+
Note, because `sanitizeTopicName` must be `true`, that in-turn means any alphanumeric character in the topic name will be replaced as underscore “_”. So when creating a mapping you need to take the underscores into account.
58+
59+
In example, if the topic name is provided as “persistent://a/b/c-d” then the mapping topic name would be “persistent___a_b_c_d”.
60+
"
61+
upsertEnabled,false,false,"Enable upsert functionality on the connector through the use of record keys, intermediate tables, and periodic merge flushes. Row-matching will be performed based on the contents of record keys. This feature won’t work with SMTs that change the name of the topic."
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
./bin/pulsar-admin sinks create \
2+
--sink-type bigquery \
3+
--name "$SINK_NAME" \
4+
--inputs "$TENANT/$NAMESPACE/$INPUT_TOPIC" \
5+
--tenant "$TENANT" \
6+
--processing-guarantees EFFECTIVELY_ONCE \
7+
--sink-config '{
8+
"topic": "bq-test01",
9+
"kafkaConnectorSinkClass": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
10+
"offsetStorageTopic": "bq-sink-offsets01",
11+
"sanitizeTopicName": "true",
12+
"kafkaConnectorConfigProperties":
13+
"name": "bq-sink1",
14+
"topics": "bq-test01",
15+
"project": "my-bigquery-project",
16+
"defaultDataset": "BQ_CONNECTOR_TEST",
17+
"keyfile": "/Users/me/my-bigquery-key.json",
18+
"keySource": "FILE",
19+
"autoCreateTables": "true",
20+
"sanitizeTopics": "false"
21+
}'
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
./bin/pulsar-admin sinks update \
2+
--sink-type bigquery \
3+
--name "$SINK_NAME" \
4+
--inputs "$TENANT/$NAMESPACE/$INPUT_TOPIC" \
5+
--tenant "$TENANT" \
6+
--parallelism 2
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
xxxx

modules/pulsar-io/nav.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
** Sinks
33
*** xref:connectors/sinks/astra-db.adoc[]
44
*** xref:connectors/sinks/elastic-search.adoc[]
5+
*** xref:connectors/sinks/google-bigquery.adoc[]
56
*** xref:connectors/sinks/jdbc-postgres.adoc[]
67
*** xref:connectors/sinks/jdbc-mariadb.adoc[]
78
*** xref:connectors/sinks/jdbc-clickhouse.adoc[]
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
:connectorName: bigquery-sink
2+
:connectorType: bigquery
3+
:attribute-missing: skip
4+
:slug: bigquery-sink-connector
5+
:page-tag: bigquery,sink-connector
6+
7+
= Google BigQuery
8+
9+
BigQuery is a fully managed enterprise data warehouse that helps you manage and analyze your data with built-in features like machine learning, geospatial analysis, and business intelligence. BigQuery's serverless architecture lets you use SQL queries to answer your organization's biggest questions with zero infrastructure management. BigQuery's scalable, distributed analysis engine lets you query terabytes in seconds and petabytes in minutes.
10+
11+
BigQuery Pulsar Sink is not integrated with BigQuery directly. It uses Pulsar’s built-in https://pulsar.apache.org/docs/adaptors-kafka/[Kafka Connect adapter^]{external-link-icon} library to transform message data into a Kafka compatible format. Then the https://docs.confluent.io/kafka-connectors/bigquery/current/kafka_connect_bigquery_config.html[Kafka Connect BigQuery Sink^]{external-link-icon} is used as the actual BigQuery integration. The adaptor provides a flexible and extensible framework for data transformation and processing. It supports various data formats, including JSON, Avro, and Protobuf, and enables users to apply transformations on the data as it is being streamed from Pulsar.
12+
13+
You will notice references to Kafka throughout the configuration. *You don’t need a running instance of Kafka to use this connector.* The Kafka references are used as "translation points” by this connector.
14+
15+
[NOTE]
16+
====
17+
If you would like some background on the Kafka Connect adapter and its use cases, read "https://www.datastax.com/blog/simplify-migrating-kafka-to-pulsar-kafka-connect-support[Simplify migrating from Kafka to Pulsar with Kafka Connect Support^]{external-link-icon}" and also "https://medium.com/building-the-open-data-stack/datastax-presents-snowflake-sink-connector-for-apache-pulsar-53629b196064[DataStax’s Snowflake Sink Connector for Apache Pulsar^]{external-link-icon}".
18+
====
19+
20+
== Get Started
21+
22+
include::partial$connectors/sinks/get-started.adoc[]
23+
24+
== Managing the Connector
25+
26+
include::partial$connectors/sinks/manage.adoc[]
27+
28+
== Monitoring the Connector
29+
30+
include::partial$connectors/sinks/monitoring.adoc[]
31+
32+
== Connector Reference
33+
34+
With the BigQuery Sink there a multiple sets of parameters. First the Astra Streaming parameters, then the Kafka Connect Adapter parameters, and finally the Google BigQuery parameters. Each provide a way to coordinate how data will be streamed from Pulsar to BigQuery.
35+
36+
=== Astra Streaming
37+
38+
[%header,format=csv,cols="2,1,1,3"]
39+
|===
40+
include::example$connectors/sinks/astra.csv[]
41+
|===
42+
43+
=== Kafka Connect Adapter Configuration (configs)
44+
45+
These values are provided in the “configs” area. View the code for these configurations https://github.com/apache/pulsar/blob/master/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java[here^]{external-link-icon}.
46+
47+
[%header,format=csv,cols="2,1,1,3"]
48+
|===
49+
include::example$connectors/sinks/bigquery/config.csv[]
50+
|===
51+
52+
=== Google BigQuery Configuration (kafkaConnectorConfigProperties)
53+
54+
These values are provided in the "kafkaConnectorConfigProperties" area. View the code for these configurations https://github.com/confluentinc/kafka-connect-bigquery/blob/master/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java[here^]{external-link-icon}.
55+
56+
[%header,format=csv,cols="2,1,1,3"]
57+
|===
58+
include::example$connectors/sinks/bigquery/kafkaConnectorConfigProperties.csv[]
59+
|===
60+
61+
== What's next?
62+
63+
Learn more about Google’s BigQuery features and capabilities on https://cloud.google.com/bigquery[their site^]{external-link-icon}.

0 commit comments

Comments
 (0)