Skip to content

Commit a8c843a

Browse files
authored
[yaml] : Update kafkaToBigquery yaml template (#3178)
* add bigquery options * update some language * update pipeline and parameters based on option files * update table parameter language * update default for numStreams * update kafka option to fix spotless error and template * forgot to update IT :) * fix spotless and correct parameter
1 parent 450ba64 commit a8c843a

File tree

5 files changed

+243
-135
lines changed

5 files changed

+243
-135
lines changed

yaml/src/main/java/com/google/cloud/teleport/templates/yaml/KafkaToBigQueryYaml.java

Lines changed: 159 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -45,85 +45,196 @@ public interface KafkaToBigQueryYaml {
4545

4646
@TemplateParameter.Text(
4747
order = 1,
48-
name = "readBootstrapServers",
48+
name = "bootstrapServers",
4949
optional = false,
50-
description = "Kafka Bootstrap Server list",
51-
helpText = "Kafka Bootstrap Server list, separated by commas.",
52-
example = "localhost:9092,127.0.0.1:9093")
50+
description =
51+
"A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.",
52+
helpText =
53+
"A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. For example: host1:port1,host2:port2",
54+
example = "host1:port1,host2:port2,localhost:9092,127.0.0.1:9093")
5355
@Validation.Required
54-
String getReadBootstrapServers();
56+
String getBootstrapServers();
5557

5658
@TemplateParameter.Text(
5759
order = 2,
58-
name = "kafkaReadTopics",
60+
name = "topic",
5961
optional = false,
60-
description = "Kafka topic(s) to read the input from.",
61-
helpText = "Kafka topic(s) to read the input from.",
62-
example = "topic1,topic2")
62+
description = "Kafka topic to read from.",
63+
helpText = "Kafka topic to read from. For example: my_topic",
64+
example = "my_topic")
6365
@Validation.Required
64-
String getKafkaReadTopics();
66+
String getTopic();
6567

66-
@TemplateParameter.Text(
68+
@TemplateParameter.Boolean(
6769
order = 3,
68-
name = "outputTableSpec",
69-
optional = false,
70-
description = "BigQuery output table",
71-
helpText =
72-
"BigQuery table location to write the output to. The name should be in the format <project>:<dataset>.<table_name>`. The table's schema must match input objects.",
73-
example = "")
74-
@Validation.Required
75-
String getOutputTableSpec();
70+
name = "allowDuplicates",
71+
optional = true,
72+
description = "If the Kafka read allows duplicates.",
73+
helpText = "If the Kafka read allows duplicates. For example: true",
74+
example = "true")
75+
Boolean getAllowDuplicates();
7676

7777
@TemplateParameter.Text(
7878
order = 4,
79-
name = "outputDeadletterTable",
80-
optional = false,
81-
description = "The dead-letter table name to output failed messages to BigQuery",
82-
helpText =
83-
"BigQuery table for failed messages. Messages failed to reach the output table for different reasons (e.g., mismatched schema, malformed json) are written to this table. If it doesn't exist, it will be created during pipeline execution. If not specified, 'outputTableSpec_error_records' is used instead. The dead-letter table name to output failed messages to BigQuery.",
84-
example = "your-project-id:your-dataset.your-table-name")
85-
@Validation.Required
86-
String getOutputDeadletterTable();
79+
name = "confluentSchemaRegistrySubject",
80+
optional = true,
81+
description = "The subject name for the Confluent Schema Registry.",
82+
helpText = "The subject name for the Confluent Schema Registry. For example: my_subject",
83+
example = "my_subject")
84+
String getConfluentSchemaRegistrySubject();
8785

8886
@TemplateParameter.Text(
8987
order = 5,
90-
name = "messageFormat",
88+
name = "confluentSchemaRegistryUrl",
9189
optional = true,
92-
description = "The message format.",
93-
helpText = "The message format. One of: AVRO, JSON, PROTO, RAW, or STRING.",
94-
example = "")
95-
@Default.String("JSON")
96-
String getMessageFormat();
90+
description = "The URL for the Confluent Schema Registry.",
91+
helpText =
92+
"The URL for the Confluent Schema Registry. For example: http://schema-registry:8081",
93+
example = "http://schema-registry:8081")
94+
String getConfluentSchemaRegistryUrl();
9795

9896
@TemplateParameter.Text(
9997
order = 6,
98+
name = "consumerConfigUpdates",
99+
optional = true,
100+
description =
101+
"A list of key-value pairs that act as configuration parameters for Kafka consumers.",
102+
helpText =
103+
"A list of key-value pairs that act as configuration parameters for Kafka consumers. For example: {'group.id': 'my_group'}",
104+
example = "{\"group.id\": \"my_group\"}")
105+
String getConsumerConfigUpdates();
106+
107+
@TemplateParameter.Text(
108+
order = 7,
109+
name = "fileDescriptorPath",
110+
optional = true,
111+
description = "The path to the Protocol Buffer File Descriptor Set file.",
112+
helpText =
113+
"The path to the Protocol Buffer File Descriptor Set file. For example: gs://bucket/path/to/descriptor.pb",
114+
example = "gs://bucket/path/to/descriptor.pb")
115+
String getFileDescriptorPath();
116+
117+
@TemplateParameter.Text(
118+
order = 8,
119+
name = "format",
120+
optional = true,
121+
description = "The encoding format for the data stored in Kafka.",
122+
helpText =
123+
"The encoding format for the data stored in Kafka. Valid options are: RAW,STRING,AVRO,JSON,PROTO. For example: JSON",
124+
example = "JSON")
125+
@Default.String("JSON")
126+
String getFormat();
127+
128+
@TemplateParameter.Text(
129+
order = 9,
130+
name = "messageName",
131+
optional = true,
132+
description =
133+
"The name of the Protocol Buffer message to be used for schema extraction and data conversion.",
134+
helpText =
135+
"The name of the Protocol Buffer message to be used for schema extraction and data conversion. For example: MyMessage",
136+
example = "MyMessage")
137+
String getMessageName();
138+
139+
@TemplateParameter.Boolean(
140+
order = 10,
141+
name = "offsetDeduplication",
142+
optional = true,
143+
description = "If the redistribute is using offset deduplication mode.",
144+
helpText = "If the redistribute is using offset deduplication mode. For example: true",
145+
example = "true")
146+
Boolean getOffsetDeduplication();
147+
148+
@TemplateParameter.Boolean(
149+
order = 11,
150+
name = "redistributeByRecordKey",
151+
optional = true,
152+
description = "If the redistribute keys by the Kafka record key.",
153+
helpText = "If the redistribute keys by the Kafka record key. For example: true",
154+
example = "true")
155+
Boolean getRedistributeByRecordKey();
156+
157+
@TemplateParameter.Integer(
158+
order = 12,
159+
name = "redistributeNumKeys",
160+
optional = true,
161+
description = "The number of keys for redistributing Kafka inputs.",
162+
helpText = "The number of keys for redistributing Kafka inputs. For example: 10",
163+
example = "10")
164+
Integer getRedistributeNumKeys();
165+
166+
@TemplateParameter.Boolean(
167+
order = 13,
168+
name = "redistributed",
169+
optional = true,
170+
description = "If the Kafka read should be redistributed.",
171+
helpText = "If the Kafka read should be redistributed. For example: true",
172+
example = "true")
173+
Boolean getRedistributed();
174+
175+
@TemplateParameter.Text(
176+
order = 14,
100177
name = "schema",
178+
optional = true,
179+
description = "The schema in which the data is encoded in the Kafka topic.",
180+
helpText =
181+
"The schema in which the data is encoded in the Kafka topic. For example: {'type': 'record', 'name': 'User', 'fields': [{'name': 'name', 'type': 'string'}]}. A schema is required if data format is JSON, AVRO or PROTO.",
182+
example =
183+
"{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"name\", \"type\": \"string\"}]}")
184+
String getSchema();
185+
186+
@TemplateParameter.Text(
187+
order = 15,
188+
name = "table",
101189
optional = false,
102-
description = "Kafka schema.",
103-
helpText = "Kafka schema. A schema is required if data format is JSON, AVRO or PROTO.",
190+
description = "BigQuery table",
191+
helpText =
192+
"BigQuery table location to write the output to or read from. The name should be in the format <project>:<dataset>.<table_name>`. For write, the table's schema must match input objects.",
104193
example = "")
105194
@Validation.Required
106-
String getSchema();
195+
String getTable();
196+
197+
@TemplateParameter.Text(
198+
order = 16,
199+
name = "createDisposition",
200+
optional = true,
201+
description = "How to create",
202+
helpText =
203+
"Specifies whether a table should be created if it does not exist. Valid inputs are 'Never' and 'IfNeeded'.",
204+
example = "")
205+
@Default.String("CREATE_IF_NEEDED")
206+
String getCreateDisposition();
207+
208+
@TemplateParameter.Text(
209+
order = 17,
210+
name = "writeDisposition",
211+
optional = true,
212+
description = "How to write",
213+
helpText =
214+
"How to specify if a write should append to an existing table, replace the table, or verify that the table is empty. Note that the my_dataset being written to must already exist. Unbounded collections can only be written using 'WRITE_EMPTY' or 'WRITE_APPEND'.",
215+
example = "")
216+
@Default.String("WRITE_APPEND")
217+
String getWriteDisposition();
107218

108219
@TemplateParameter.Integer(
109-
order = 7,
110-
name = "numStorageWriteApiStreams",
220+
order = 18,
221+
name = "numStreams",
111222
optional = true,
112223
description = "Number of streams for BigQuery Storage Write API",
113224
helpText =
114225
"Number of streams defines the parallelism of the BigQueryIO’s Write transform and roughly corresponds to the number of Storage Write API’s streams which will be used by the pipeline. See https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api for the recommended values. The default value is 1.",
115226
example = "")
116227
@Default.Integer(1)
117-
Integer getNumStorageWriteApiStreams();
228+
Integer getNumStreams();
118229

119-
@TemplateParameter.Integer(
120-
order = 8,
121-
name = "storageWriteApiTriggeringFrequencySec",
122-
optional = true,
123-
description = "Triggering frequency in seconds for BigQuery Storage Write API",
230+
@TemplateParameter.Text(
231+
order = 19,
232+
name = "outputDeadletterTable",
233+
optional = false,
234+
description = "The dead-letter table name to output failed messages to BigQuery",
124235
helpText =
125-
"Triggering frequency will determine how soon the data will be visible for querying in BigQuery. See https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api for the recommended values. The default value is 5.",
126-
example = "")
127-
@Default.Integer(5)
128-
Integer getStorageWriteApiTriggeringFrequencySec();
236+
"BigQuery table for failed messages. Messages failed to reach the output table for different reasons (e.g., mismatched schema, malformed json) are written to this table. If it doesn't exist, it will be created during pipeline execution. If not specified, 'outputTableSpec_error_records' is used instead. The dead-letter table name to output failed messages to BigQuery.",
237+
example = "your-project-id:your-dataset.your-table-name")
238+
@Validation.Required
239+
String getOutputDeadletterTable();
129240
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
options:
2+
- name: "bigquery_common_options"
3+
parameters:
4+
- order: 1
5+
name: "table"
6+
description: "BigQuery table"
7+
help: >
8+
BigQuery table location to write the output to or read from. The name
9+
should be in the format <project>:<dataset>.<table_name>`. For write,
10+
the table's schema must match input objects.
11+
required: true
12+
type: text
13+
14+
- name: "bigquery_read_options"
15+
parameters:
16+
17+
- name: "bigquery_write_options"
18+
parameters:
19+
- order: 1
20+
name: "createDisposition"
21+
description: "How to create"
22+
help: >
23+
Specifies whether a table should be created if it does not exist.
24+
Valid inputs are 'Never' and 'IfNeeded'.
25+
default: CREATE_IF_NEEDED
26+
required: false
27+
type: text
28+
- order: 2
29+
name: "writeDisposition"
30+
description: "How to write"
31+
help: >
32+
How to specify if a write should append to an existing table, replace
33+
the table, or verify that the table is empty. Note that the my_dataset
34+
being written to must already exist. Unbounded collections can only
35+
be written using 'WRITE_EMPTY' or 'WRITE_APPEND'.
36+
default: WRITE_APPEND
37+
required: false
38+
type: text
39+
- order: 3
40+
name: "numStreams"
41+
description: "Number of streams for BigQuery Storage Write API"
42+
help: >
43+
Number of streams defines the parallelism of the BigQueryIO’s Write
44+
transform and roughly corresponds to the number of Storage Write API’s
45+
streams which will be used by the pipeline. See
46+
https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api
47+
for the recommended values. The default value is 1.
48+
required: false
49+
default: 1
50+
type: integer

yaml/src/main/python/options/kafka_options.yaml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ options:
55
name: "bootstrapServers"
66
description: "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster."
77
help: "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. For example: host1:port1,host2:port2"
8-
example: "host1:port1,host2:port2"
8+
example: "host1:port1,host2:port2,localhost:9092,127.0.0.1:9093"
99
required: true
1010
type: text
1111
- order: 2
@@ -96,7 +96,10 @@ options:
9696
- order: 14
9797
name: "schema"
9898
description: "The schema in which the data is encoded in the Kafka topic."
99-
help: "The schema in which the data is encoded in the Kafka topic. For example: {'type': 'record', 'name': 'User', 'fields': [{'name': 'name', 'type': 'string'}]}"
99+
help: >
100+
The schema in which the data is encoded in the Kafka topic.
101+
For example: {'type': 'record', 'name': 'User', 'fields': [{'name': 'name', 'type': 'string'}]}.
102+
A schema is required if data format is JSON, AVRO or PROTO.
100103
example: '{"type": "record", "name": "User", "fields": [{"name": "name", "type": "string"}]}'
101104
required: false
102105
type: text

0 commit comments

Comments
 (0)