Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,85 +45,196 @@ public interface KafkaToBigQueryYaml {

@TemplateParameter.Text(
order = 1,
name = "readBootstrapServers",
name = "bootstrapServers",
optional = false,
description = "Kafka Bootstrap Server list",
helpText = "Kafka Bootstrap Server list, separated by commas.",
example = "localhost:9092,127.0.0.1:9093")
description =
"A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.",
helpText =
"A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. For example: host1:port1,host2:port2",
example = "host1:port1,host2:port2,localhost:9092,127.0.0.1:9093")
@Validation.Required
String getReadBootstrapServers();
String getBootstrapServers();

@TemplateParameter.Text(
order = 2,
name = "kafkaReadTopics",
name = "topic",
optional = false,
description = "Kafka topic(s) to read the input from.",
helpText = "Kafka topic(s) to read the input from.",
example = "topic1,topic2")
description = "Kafka topic to read from.",
helpText = "Kafka topic to read from. For example: my_topic",
example = "my_topic")
@Validation.Required
String getKafkaReadTopics();
String getTopic();

@TemplateParameter.Text(
@TemplateParameter.Boolean(
order = 3,
name = "outputTableSpec",
optional = false,
description = "BigQuery output table",
helpText =
"BigQuery table location to write the output to. The name should be in the format <project>:<dataset>.<table_name>`. The table's schema must match input objects.",
example = "")
@Validation.Required
String getOutputTableSpec();
name = "allowDuplicates",
optional = true,
description = "If the Kafka read allows duplicates.",
helpText = "If the Kafka read allows duplicates. For example: true",
example = "true")
Boolean getAllowDuplicates();

@TemplateParameter.Text(
order = 4,
name = "outputDeadletterTable",
optional = false,
description = "The dead-letter table name to output failed messages to BigQuery",
helpText =
"BigQuery table for failed messages. Messages failed to reach the output table for different reasons (e.g., mismatched schema, malformed json) are written to this table. If it doesn't exist, it will be created during pipeline execution. If not specified, 'outputTableSpec_error_records' is used instead. The dead-letter table name to output failed messages to BigQuery.",
example = "your-project-id:your-dataset.your-table-name")
@Validation.Required
String getOutputDeadletterTable();
name = "confluentSchemaRegistrySubject",
optional = true,
description = "The subject name for the Confluent Schema Registry.",
helpText = "The subject name for the Confluent Schema Registry. For example: my_subject",
example = "my_subject")
String getConfluentSchemaRegistrySubject();

@TemplateParameter.Text(
order = 5,
name = "messageFormat",
name = "confluentSchemaRegistryUrl",
optional = true,
description = "The message format.",
helpText = "The message format. One of: AVRO, JSON, PROTO, RAW, or STRING.",
example = "")
@Default.String("JSON")
String getMessageFormat();
description = "The URL for the Confluent Schema Registry.",
helpText =
"The URL for the Confluent Schema Registry. For example: http://schema-registry:8081",
example = "http://schema-registry:8081")
String getConfluentSchemaRegistryUrl();

@TemplateParameter.Text(
order = 6,
name = "consumerConfigUpdates",
optional = true,
description =
"A list of key-value pairs that act as configuration parameters for Kafka consumers.",
helpText =
"A list of key-value pairs that act as configuration parameters for Kafka consumers. For example: {'group.id': 'my_group'}",
example = "{\"group.id\": \"my_group\"}")
String getConsumerConfigUpdates();

@TemplateParameter.Text(
order = 7,
name = "fileDescriptorPath",
optional = true,
description = "The path to the Protocol Buffer File Descriptor Set file.",
helpText =
"The path to the Protocol Buffer File Descriptor Set file. For example: gs://bucket/path/to/descriptor.pb",
example = "gs://bucket/path/to/descriptor.pb")
String getFileDescriptorPath();

@TemplateParameter.Text(
order = 8,
name = "format",
optional = true,
description = "The encoding format for the data stored in Kafka.",
helpText =
"The encoding format for the data stored in Kafka. Valid options are: RAW,STRING,AVRO,JSON,PROTO. For example: JSON",
example = "JSON")
@Default.String("JSON")
String getFormat();

@TemplateParameter.Text(
order = 9,
name = "messageName",
optional = true,
description =
"The name of the Protocol Buffer message to be used for schema extraction and data conversion.",
helpText =
"The name of the Protocol Buffer message to be used for schema extraction and data conversion. For example: MyMessage",
example = "MyMessage")
String getMessageName();

@TemplateParameter.Boolean(
order = 10,
name = "offsetDeduplication",
optional = true,
description = "If the redistribute is using offset deduplication mode.",
helpText = "If the redistribute is using offset deduplication mode. For example: true",
example = "true")
Boolean getOffsetDeduplication();

@TemplateParameter.Boolean(
order = 11,
name = "redistributeByRecordKey",
optional = true,
description = "If the redistribute keys by the Kafka record key.",
helpText = "If the redistribute keys by the Kafka record key. For example: true",
example = "true")
Boolean getRedistributeByRecordKey();

@TemplateParameter.Integer(
order = 12,
name = "redistributeNumKeys",
optional = true,
description = "The number of keys for redistributing Kafka inputs.",
helpText = "The number of keys for redistributing Kafka inputs. For example: 10",
example = "10")
Integer getRedistributeNumKeys();

@TemplateParameter.Boolean(
order = 13,
name = "redistributed",
optional = true,
description = "If the Kafka read should be redistributed.",
helpText = "If the Kafka read should be redistributed. For example: true",
example = "true")
Boolean getRedistributed();

@TemplateParameter.Text(
order = 14,
name = "schema",
optional = true,
description = "The schema in which the data is encoded in the Kafka topic.",
helpText =
"The schema in which the data is encoded in the Kafka topic. For example: {'type': 'record', 'name': 'User', 'fields': [{'name': 'name', 'type': 'string'}]}. A schema is required if data format is JSON, AVRO or PROTO.",
example =
"{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}")
String getSchema();

@TemplateParameter.Text(
order = 15,
name = "table",
optional = false,
description = "Kafka schema.",
helpText = "Kafka schema. A schema is required if data format is JSON, AVRO or PROTO.",
description = "BigQuery table",
helpText =
"BigQuery table location to write the output to or read from. The name should be in the format <project>:<dataset>.<table_name>`. For write, the table's schema must match input objects.",
example = "")
@Validation.Required
String getSchema();
String getTable();

@TemplateParameter.Text(
order = 16,
name = "createDisposition",
optional = true,
description = "How to create",
helpText =
"Specifies whether a table should be created if it does not exist. Valid inputs are 'Never' and 'IfNeeded'.",
example = "")
@Default.String("CREATE_IF_NEEDED")
String getCreateDisposition();

@TemplateParameter.Text(
order = 17,
name = "writeDisposition",
optional = true,
description = "How to write",
helpText =
"How to specify if a write should append to an existing table, replace the table, or verify that the table is empty. Note that the my_dataset being written to must already exist. Unbounded collections can only be written using 'WRITE_EMPTY' or 'WRITE_APPEND'.",
example = "")
@Default.String("WRITE_APPEND")
String getWriteDisposition();

@TemplateParameter.Integer(
order = 7,
name = "numStorageWriteApiStreams",
order = 18,
name = "numStreams",
optional = true,
description = "Number of streams for BigQuery Storage Write API",
helpText =
"Number of streams defines the parallelism of the BigQueryIO’s Write transform and roughly corresponds to the number of Storage Write API’s streams which will be used by the pipeline. See https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api for the recommended values. The default value is 1.",
example = "")
@Default.Integer(1)
Integer getNumStorageWriteApiStreams();
Integer getNumStreams();

@TemplateParameter.Integer(
order = 8,
name = "storageWriteApiTriggeringFrequencySec",
optional = true,
description = "Triggering frequency in seconds for BigQuery Storage Write API",
@TemplateParameter.Text(
order = 19,
name = "outputDeadletterTable",
optional = false,
description = "The dead-letter table name to output failed messages to BigQuery",
helpText =
"Triggering frequency will determine how soon the data will be visible for querying in BigQuery. See https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api for the recommended values. The default value is 5.",
example = "")
@Default.Integer(5)
Integer getStorageWriteApiTriggeringFrequencySec();
"BigQuery table for failed messages. Messages failed to reach the output table for different reasons (e.g., mismatched schema, malformed json) are written to this table. If it doesn't exist, it will be created during pipeline execution. If not specified, 'outputTableSpec_error_records' is used instead. The dead-letter table name to output failed messages to BigQuery.",
example = "your-project-id:your-dataset.your-table-name")
@Validation.Required
String getOutputDeadletterTable();
}
50 changes: 50 additions & 0 deletions yaml/src/main/python/options/bigquery_options.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
options:
- name: "bigquery_common_options"
parameters:
- order: 1
name: "table"
description: "BigQuery table"
help: >
BigQuery table location to write the output to or read from. The name
should be in the format <project>:<dataset>.<table_name>`. For write,
the table's schema must match input objects.
required: true
type: text

- name: "bigquery_read_options"
parameters:

- name: "bigquery_write_options"
parameters:
- order: 1
name: "createDisposition"
description: "How to create"
help: >
Specifies whether a table should be created if it does not exist.
Valid inputs are 'Never' and 'IfNeeded'.
default: CREATE_IF_NEEDED
required: false
type: text
- order: 2
name: "writeDisposition"
description: "How to write"
help: >
How to specify if a write should append to an existing table, replace
the table, or verify that the table is empty. Note that the my_dataset
being written to must already exist. Unbounded collections can only
be written using 'WRITE_EMPTY' or 'WRITE_APPEND'.
default: WRITE_APPEND
required: false
type: text
- order: 3
name: "numStreams"
description: "Number of streams for BigQuery Storage Write API"
help: >
Number of streams defines the parallelism of the BigQueryIO’s Write
transform and roughly corresponds to the number of Storage Write API’s
streams which will be used by the pipeline. See
https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api
for the recommended values. The default value is 1.
required: false
default: 1
type: integer
7 changes: 5 additions & 2 deletions yaml/src/main/python/options/kafka_options.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ options:
name: "bootstrapServers"
description: "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster."
help: "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. For example: host1:port1,host2:port2"
example: "host1:port1,host2:port2"
example: "host1:port1,host2:port2,localhost:9092,127.0.0.1:9093"
required: true
type: text
- order: 2
Expand Down Expand Up @@ -96,7 +96,10 @@ options:
- order: 14
name: "schema"
description: "The schema in which the data is encoded in the Kafka topic."
help: "The schema in which the data is encoded in the Kafka topic. For example: {'type': 'record', 'name': 'User', 'fields': [{'name': 'name', 'type': 'string'}]}"
help: >
The schema in which the data is encoded in the Kafka topic.
For example: {'type': 'record', 'name': 'User', 'fields': [{'name': 'name', 'type': 'string'}]}.
A schema is required if data format is JSON, AVRO or PROTO.
example: '{"type": "record", "name": "User", "fields": [{"name": "name", "type": "string"}]}'
required: false
type: text
Loading