Skip to content

Commit 83fb7ee

Browse files
committed
support defining multiple source kafka topics and possibility to persist kafka topic with the message
1 parent ff60a5f commit 83fb7ee

File tree

9 files changed

+111
-19
lines changed

9 files changed

+111
-19
lines changed

v2/kafka-common/src/main/java/com/google/cloud/teleport/v2/kafka/utils/KafkaTopicUtils.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import java.util.List;
1919
import java.util.regex.Matcher;
2020
import java.util.regex.Pattern;
21+
import java.util.stream.Collectors;
22+
import java.util.stream.Stream;
2123

2224
public class KafkaTopicUtils {
2325

@@ -27,23 +29,22 @@ public class KafkaTopicUtils {
2729
public static List<String> getBootstrapServerAndTopic(
2830
String bootstrapServerAndTopicString, String project) {
2931
Matcher matcher = GMK_PATTERN.matcher(bootstrapServerAndTopicString);
30-
String bootstrapServer = null;
31-
String topicName = null;
3232
if (matcher.matches()) {
33-
bootstrapServer =
33+
String bootstrapServer =
3434
"bootstrap."
3535
+ matcher.group(3)
3636
+ "."
3737
+ matcher.group(2)
3838
+ ".managedkafka."
3939
+ project
4040
+ ".cloud.goog:9092";
41-
topicName = matcher.group(4);
42-
} else {
43-
String[] list = bootstrapServerAndTopicString.split(";");
44-
bootstrapServer = list[0];
45-
topicName = list[1];
41+
String topicName = matcher.group(4);
42+
return List.of(bootstrapServer, topicName);
4643
}
47-
return List.of(bootstrapServer, topicName);
44+
String[] list = bootstrapServerAndTopicString.split(";");
45+
String bootstrapServer = list[0];
46+
String[] topicNames = list[1].split(",");
47+
return Stream.concat(Stream.of(bootstrapServer), Stream.of(topicNames))
48+
.collect(Collectors.toList());
4849
}
4950
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright (C) 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package com.google.cloud.teleport.v2.kafka.utils;
17+
18+
import java.util.List;
19+
import org.junit.Assert;
20+
import org.junit.Test;
21+
22+
public class KafkaTopicUtilsTest {
23+
24+
private static final String PROJECT = "test-project";
25+
26+
@Test
27+
public void testParsingBootstrapServerAndTopicFromGMK() {
28+
String input = "projects/project1/locations/us-central1/clusters/cluster1/topics/topic1";
29+
List<String> result = KafkaTopicUtils.getBootstrapServerAndTopic(input, PROJECT);
30+
Assert.assertEquals(
31+
result,
32+
List.of(
33+
"bootstrap.cluster1.us-central1.managedkafka.test-project.cloud.goog:9092", "topic1"));
34+
}
35+
36+
@Test
37+
public void testParsingBootstrapServerAndTopic() {
38+
String input = "1.1.1.1:9094;topic1";
39+
List<String> result = KafkaTopicUtils.getBootstrapServerAndTopic(input, PROJECT);
40+
Assert.assertEquals(result, List.of("1.1.1.1:9094", "topic1"));
41+
}
42+
43+
@Test
44+
public void testParsingBootstrapServerAndMultipleTopics() {
45+
String input = "1.1.1.1:9094;topic1,topic2,topic3";
46+
List<String> result = KafkaTopicUtils.getBootstrapServerAndTopic(input, PROJECT);
47+
Assert.assertEquals(result, List.of("1.1.1.1:9094", "topic1", "topic2", "topic3"));
48+
}
49+
}

v2/kafka-to-bigquery/README_Kafka_to_BigQuery_Flex.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat
3333

3434
* **outputTableSpec**: BigQuery table location to write the output to. The name should be in the format `<project>:<dataset>.<table_name>`. The table's schema must match input objects.
3535
* **persistKafkaKey**: If true, the pipeline will persist the Kafka message key in the BigQuery table, in a `_key` field of type `BYTES`. Default is `false` (Key is ignored).
36+
* **persistKafkaTopic**: If true, the pipeline will persist the Kafka message topic in the BigQuery table, in a `_topic` field of type `STRING`. Default is `false` (Topic is ignored).
3637
* **outputProject**: BigQuery output project in wehich the dataset resides. Tables will be created dynamically in the dataset. Defaults to empty.
3738
* **outputDataset**: BigQuery output dataset to write the output to. Tables will be created dynamically in the dataset. If the tables are created beforehand, the table names should follow the specified naming convention. The name should be `bqTableNamePrefix + Avro Schema FullName` , each word will be separated by a hyphen `-`. Defaults to empty.
3839
* **bqTableNamePrefix**: Naming prefix to be used while creating BigQuery output tables. Only applicable when using schema registry. Defaults to empty.
@@ -154,6 +155,7 @@ export USE_BIG_QUERY_DLQ=false
154155
### Optional
155156
export OUTPUT_TABLE_SPEC=<outputTableSpec>
156157
export PERSIST_KAFKA_KEY=false
158+
export PERSIST_KAFKA_TOPIC=false
157159
export OUTPUT_PROJECT=""
158160
export OUTPUT_DATASET=""
159161
export BQ_TABLE_NAME_PREFIX=""
@@ -196,6 +198,7 @@ gcloud dataflow flex-template run "kafka-to-bigquery-flex-job" \
196198
--parameters "readBootstrapServerAndTopic=$READ_BOOTSTRAP_SERVER_AND_TOPIC" \
197199
--parameters "outputTableSpec=$OUTPUT_TABLE_SPEC" \
198200
--parameters "persistKafkaKey=$PERSIST_KAFKA_KEY" \
201+
--parameters "persistKafkaTopic=$PERSIST_KAFKA_TOPIC" \
199202
--parameters "writeMode=$WRITE_MODE" \
200203
--parameters "outputProject=$OUTPUT_PROJECT" \
201204
--parameters "outputDataset=$OUTPUT_DATASET" \
@@ -261,6 +264,7 @@ export USE_BIG_QUERY_DLQ=false
261264
### Optional
262265
export OUTPUT_TABLE_SPEC=<outputTableSpec>
263266
export PERSIST_KAFKA_KEY=false
267+
export PERSIST_KAFKA_TOPIC=false
264268
export OUTPUT_PROJECT=""
265269
export OUTPUT_DATASET=""
266270
export BQ_TABLE_NAME_PREFIX=""
@@ -303,7 +307,7 @@ mvn clean package -PtemplatesRun \
303307
-Dregion="$REGION" \
304308
-DjobName="kafka-to-bigquery-flex-job" \
305309
-DtemplateName="Kafka_to_BigQuery_Flex" \
306-
-Dparameters="readBootstrapServerAndTopic=$READ_BOOTSTRAP_SERVER_AND_TOPIC,outputTableSpec=$OUTPUT_TABLE_SPEC,persistKafkaKey=$PERSIST_KAFKA_KEY,writeMode=$WRITE_MODE,outputProject=$OUTPUT_PROJECT,outputDataset=$OUTPUT_DATASET,bqTableNamePrefix=$BQ_TABLE_NAME_PREFIX,createDisposition=$CREATE_DISPOSITION,writeDisposition=$WRITE_DISPOSITION,useAutoSharding=$USE_AUTO_SHARDING,numStorageWriteApiStreams=$NUM_STORAGE_WRITE_API_STREAMS,storageWriteApiTriggeringFrequencySec=$STORAGE_WRITE_API_TRIGGERING_FREQUENCY_SEC,useStorageWriteApiAtLeastOnce=$USE_STORAGE_WRITE_API_AT_LEAST_ONCE,enableCommitOffsets=$ENABLE_COMMIT_OFFSETS,consumerGroupId=$CONSUMER_GROUP_ID,kafkaReadOffset=$KAFKA_READ_OFFSET,kafkaReadAuthenticationMode=$KAFKA_READ_AUTHENTICATION_MODE,kafkaReadUsernameSecretId=$KAFKA_READ_USERNAME_SECRET_ID,kafkaReadPasswordSecretId=$KAFKA_READ_PASSWORD_SECRET_ID,kafkaReadKeystoreLocation=$KAFKA_READ_KEYSTORE_LOCATION,kafkaReadTruststoreLocation=$KAFKA_READ_TRUSTSTORE_LOCATION,kafkaReadTruststorePasswordSecretId=$KAFKA_READ_TRUSTSTORE_PASSWORD_SECRET_ID,kafkaReadKeystorePasswordSecretId=$KAFKA_READ_KEYSTORE_PASSWORD_SECRET_ID,kafkaReadKeyPasswordSecretId=$KAFKA_READ_KEY_PASSWORD_SECRET_ID,messageFormat=$MESSAGE_FORMAT,schemaFormat=$SCHEMA_FORMAT,confluentAvroSchemaPath=$CONFLUENT_AVRO_SCHEMA_PATH,schemaRegistryConnectionUrl=$SCHEMA_REGISTRY_CONNECTION_URL,binaryAvroSchemaPath=$BINARY_AVRO_SCHEMA_PATH,schemaRegistryAuthenticationMode=$SCHEMA_REGISTRY_AUTHENTICATION_MODE,schemaRegistryTruststoreLocation=$SCHEMA_REGISTRY_TRUSTSTORE_LOCATION,schemaRegistryTruststorePasswordSecretId=$SCHEMA_REGISTRY_TRUSTSTORE_PASSWORD_SECRET_ID,schemaRegistryKeystoreLocation=$SCHEMA_REGISTRY_KEYSTORE_LOCATION,schemaRegistryKeystorePasswordSecretId=$SCHEMA_REGISTRY_KEYSTORE_PASSWORD_SECRET_ID,schemaRegistryKeyPasswordSecretId=$SCHEMA_REGISTRY_KEY_PASSWORD_SECRET_ID,schemaRegistryOauthClientId=$SCHEMA_REGISTRY_OAUTH_CLIENT_ID,schemaRegistryOauthClientSecretId=$SCHEMA_REGISTRY_OAUTH_CLIENT_SECRET_ID,schemaRegistryOauthScope=$SCHEMA_REGISTRY_OAUTH_SCOPE,schemaRegistryOauthTokenEndpointUrl=$SCHEMA_REGISTRY_OAUTH_TOKEN_ENDPOINT_URL,outputDeadletterTable=$OUTPUT_DEADLETTER_TABLE,useBigQueryDLQ=$USE_BIG_QUERY_DLQ" \
310+
-Dparameters="readBootstrapServerAndTopic=$READ_BOOTSTRAP_SERVER_AND_TOPIC,outputTableSpec=$OUTPUT_TABLE_SPEC,persistKafkaKey=$PERSIST_KAFKA_KEY,persistKafkaTopic=$PERSIST_KAFKA_TOPIC,writeMode=$WRITE_MODE,outputProject=$OUTPUT_PROJECT,outputDataset=$OUTPUT_DATASET,bqTableNamePrefix=$BQ_TABLE_NAME_PREFIX,createDisposition=$CREATE_DISPOSITION,writeDisposition=$WRITE_DISPOSITION,useAutoSharding=$USE_AUTO_SHARDING,numStorageWriteApiStreams=$NUM_STORAGE_WRITE_API_STREAMS,storageWriteApiTriggeringFrequencySec=$STORAGE_WRITE_API_TRIGGERING_FREQUENCY_SEC,useStorageWriteApiAtLeastOnce=$USE_STORAGE_WRITE_API_AT_LEAST_ONCE,enableCommitOffsets=$ENABLE_COMMIT_OFFSETS,consumerGroupId=$CONSUMER_GROUP_ID,kafkaReadOffset=$KAFKA_READ_OFFSET,kafkaReadAuthenticationMode=$KAFKA_READ_AUTHENTICATION_MODE,kafkaReadUsernameSecretId=$KAFKA_READ_USERNAME_SECRET_ID,kafkaReadPasswordSecretId=$KAFKA_READ_PASSWORD_SECRET_ID,kafkaReadKeystoreLocation=$KAFKA_READ_KEYSTORE_LOCATION,kafkaReadTruststoreLocation=$KAFKA_READ_TRUSTSTORE_LOCATION,kafkaReadTruststorePasswordSecretId=$KAFKA_READ_TRUSTSTORE_PASSWORD_SECRET_ID,kafkaReadKeystorePasswordSecretId=$KAFKA_READ_KEYSTORE_PASSWORD_SECRET_ID,kafkaReadKeyPasswordSecretId=$KAFKA_READ_KEY_PASSWORD_SECRET_ID,messageFormat=$MESSAGE_FORMAT,schemaFormat=$SCHEMA_FORMAT,confluentAvroSchemaPath=$CONFLUENT_AVRO_SCHEMA_PATH,schemaRegistryConnectionUrl=$SCHEMA_REGISTRY_CONNECTION_URL,binaryAvroSchemaPath=$BINARY_AVRO_SCHEMA_PATH,schemaRegistryAuthenticationMode=$SCHEMA_REGISTRY_AUTHENTICATION_MODE,schemaRegistryTruststoreLocation=$SCHEMA_REGISTRY_TRUSTSTORE_LOCATION,schemaRegistryTruststorePasswordSecretId=$SCHEMA_REGISTRY_TRUSTSTORE_PASSWORD_SECRET_ID,schemaRegistryKeystoreLocation=$SCHEMA_REGISTRY_KEYSTORE_LOCATION,schemaRegistryKeystorePasswordSecretId=$SCHEMA_REGISTRY_KEYSTORE_PASSWORD_SECRET_ID,schemaRegistryKeyPasswordSecretId=$SCHEMA_REGISTRY_KEY_PASSWORD_SECRET_ID,schemaRegistryOauthClientId=$SCHEMA_REGISTRY_OAUTH_CLIENT_ID,schemaRegistryOauthClientSecretId=$SCHEMA_REGISTRY_OAUTH_CLIENT_SECRET_ID,schemaRegistryOauthScope=$SCHEMA_REGISTRY_OAUTH_SCOPE,schemaRegistryOauthTokenEndpointUrl=$SCHEMA_REGISTRY_OAUTH_TOKEN_ENDPOINT_URL,outputDeadletterTable=$OUTPUT_DEADLETTER_TABLE,useBigQueryDLQ=$USE_BIG_QUERY_DLQ" \
307311
-f v2/kafka-to-bigquery
308312
```
309313

@@ -355,6 +359,7 @@ resource "google_dataflow_flex_template_job" "kafka_to_bigquery_flex" {
355359
useBigQueryDLQ = "false"
356360
# outputTableSpec = "<outputTableSpec>"
357361
# persistKafkaKey = "false"
362+
# persistKafkaTopic = "false"
358363
# outputProject = ""
359364
# outputDataset = ""
360365
# bqTableNamePrefix = ""

v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/options/KafkaToBigQueryFlexOptions.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public interface KafkaToBigQueryFlexOptions
4747
void setReadBootstrapServerAndTopic(String value);
4848

4949
@TemplateParameter.Boolean(
50-
order = 3,
50+
order = 2,
5151
groupName = "Source",
5252
optional = true,
5353
description = "Persist the Kafka Message Key to the BigQuery table",
@@ -58,6 +58,18 @@ public interface KafkaToBigQueryFlexOptions
5858

5959
void setPersistKafkaKey(Boolean value);
6060

61+
@TemplateParameter.Boolean(
62+
order = 3,
63+
groupName = "Source",
64+
optional = true,
65+
description = "Persist the Kafka Message Topic to the BigQuery table",
66+
helpText =
67+
"If true, the pipeline will persist the Kafka message topic in the BigQuery table, in a `_topic` field of type `STRING`. Default is `false` (Topic is ignored).")
68+
@Default.Boolean(false)
69+
Boolean getPersistKafkaTopic();
70+
71+
void setPersistKafkaTopic(Boolean value);
72+
6173
@TemplateParameter.Enum(
6274
order = 4,
6375
name = "writeMode",

v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.ArrayList;
4343
import java.util.List;
4444
import java.util.Map;
45+
import java.util.stream.Collectors;
4546
import org.apache.beam.sdk.Pipeline;
4647
import org.apache.beam.sdk.PipelineResult;
4748
import org.apache.beam.sdk.coders.ByteArrayCoder;
@@ -241,7 +242,7 @@ public static PipelineResult run(KafkaToBigQueryFlexOptions options) throws Exce
241242
List<String> bootstrapServerAndTopicList =
242243
KafkaTopicUtils.getBootstrapServerAndTopic(
243244
options.getReadBootstrapServerAndTopic(), options.getProject());
244-
topicsList = List.of(bootstrapServerAndTopicList.get(1));
245+
topicsList = bootstrapServerAndTopicList.stream().skip(1).collect(Collectors.toList());
245246
bootstrapServers = bootstrapServerAndTopicList.get(0);
246247
} else {
247248
throw new IllegalArgumentException(
@@ -313,6 +314,7 @@ private static WriteResult handleAvroBinaryEncoding(
313314
options.getNumStorageWriteApiStreams(),
314315
options.getStorageWriteApiTriggeringFrequencySec(),
315316
options.getPersistKafkaKey(),
317+
options.getPersistKafkaTopic(),
316318
options.getUseAutoSharding(),
317319
errorHandler);
318320
} else {
@@ -325,6 +327,7 @@ private static WriteResult handleAvroBinaryEncoding(
325327
options.getNumStorageWriteApiStreams(),
326328
options.getStorageWriteApiTriggeringFrequencySec(),
327329
options.getPersistKafkaKey(),
330+
options.getPersistKafkaTopic(),
328331
options.getUseAutoSharding());
329332
}
330333
writeResult =
@@ -369,6 +372,7 @@ private static WriteResult handleSingleSchemaFileFormat(
369372
options.getNumStorageWriteApiStreams(),
370373
options.getStorageWriteApiTriggeringFrequencySec(),
371374
options.getPersistKafkaKey(),
375+
options.getPersistKafkaTopic(),
372376
options.getUseAutoSharding(),
373377
errorHandler);
374378
} else {
@@ -381,6 +385,7 @@ private static WriteResult handleSingleSchemaFileFormat(
381385
options.getNumStorageWriteApiStreams(),
382386
options.getStorageWriteApiTriggeringFrequencySec(),
383387
options.getPersistKafkaKey(),
388+
options.getPersistKafkaTopic(),
384389
options.getUseAutoSharding());
385390
}
386391
writeResult =

v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryWriteUtils.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ public static class BigQueryWrite
6060

6161
private Boolean persistKafkaKey;
6262

63+
private Boolean persistKafkaTopic;
64+
6365
private String writeDisposition;
6466

6567
private String createDisposition;
@@ -82,6 +84,7 @@ public BigQueryWrite(
8284
Integer numStorageWriteApiStreams,
8385
Integer storageWriteApiTriggeringFrequencySec,
8486
Boolean persistKafkaKey,
87+
Boolean persistKafkaTopic,
8588
Boolean useAutoSharding) {
8689
this.avroSchema = avroSchema;
8790
this.outputTableSpec = outputTableSpec;
@@ -90,6 +93,7 @@ public BigQueryWrite(
9093
this.numStorageWriteApiStreams = numStorageWriteApiStreams;
9194
this.storageWriteApiTriggeringFrequencySec = storageWriteApiTriggeringFrequencySec;
9295
this.persistKafkaKey = persistKafkaKey;
96+
this.persistKafkaTopic = persistKafkaTopic;
9397
this.useAutoSharding = useAutoSharding;
9498
this.errorHandler = new ErrorHandler.DefaultErrorHandler<>();
9599
}
@@ -103,6 +107,7 @@ public BigQueryWrite(
103107
Integer numStorageWriteApiStreams,
104108
Integer storageWriteApiTriggeringFrequencySec,
105109
Boolean persistKafkaKey,
110+
Boolean persistKafkaTopic,
106111
Boolean useAutoSharding,
107112
ErrorHandler<BadRecord, ?> errorHandler) {
108113
this.avroSchema = avroSchema;
@@ -112,6 +117,7 @@ public BigQueryWrite(
112117
this.numStorageWriteApiStreams = numStorageWriteApiStreams;
113118
this.storageWriteApiTriggeringFrequencySec = storageWriteApiTriggeringFrequencySec;
114119
this.persistKafkaKey = persistKafkaKey;
120+
this.persistKafkaTopic = persistKafkaTopic;
115121
this.useAutoSharding = useAutoSharding;
116122
this.errorHandler = errorHandler;
117123
}
@@ -124,6 +130,7 @@ public static BigQueryWriteUtils.BigQueryWrite of(
124130
Integer numStorageWriteApiStreams,
125131
Integer storageWriteApiTriggeringFrequencySec,
126132
Boolean persistKafkaKey,
133+
Boolean persistKafkaTopic,
127134
Boolean useAutoSharding) {
128135
return new BigQueryWriteUtils.BigQueryWrite(
129136
avroSchema,
@@ -133,6 +140,7 @@ public static BigQueryWriteUtils.BigQueryWrite of(
133140
numStorageWriteApiStreams,
134141
storageWriteApiTriggeringFrequencySec,
135142
persistKafkaKey,
143+
persistKafkaTopic,
136144
useAutoSharding);
137145
}
138146

@@ -144,6 +152,7 @@ public static BigQueryWriteUtils.BigQueryWrite of(
144152
Integer numStorageWriteApiStreams,
145153
Integer storageWriteApiTriggeringFrequencySec,
146154
Boolean persistKafkaKey,
155+
Boolean persistKafkaTopic,
147156
Boolean useAutoSharding,
148157
ErrorHandler<BadRecord, ?> errorHandler) {
149158
return new BigQueryWriteUtils.BigQueryWrite(
@@ -154,6 +163,7 @@ public static BigQueryWriteUtils.BigQueryWrite of(
154163
numStorageWriteApiStreams,
155164
storageWriteApiTriggeringFrequencySec,
156165
persistKafkaKey,
166+
persistKafkaTopic,
157167
useAutoSharding,
158168
errorHandler);
159169
}
@@ -166,8 +176,11 @@ private static class GenericRecordToTableRowFn
166176

167177
private boolean persistKafkaKey;
168178

169-
GenericRecordToTableRowFn(boolean persistKafkaKey) {
179+
private boolean persistKafkaTopic;
180+
181+
GenericRecordToTableRowFn(boolean persistKafkaKey, boolean persistKafkaTopic) {
170182
this.persistKafkaKey = persistKafkaKey;
183+
this.persistKafkaTopic = persistKafkaTopic;
171184
}
172185

173186
@ProcessElement
@@ -181,6 +194,9 @@ public void processElement(ProcessContext context) {
181194
if (this.persistKafkaKey) {
182195
row.set(BigQueryConstants.KAFKA_KEY_FIELD, element.getOriginalPayload().getKV().getKey());
183196
}
197+
if (this.persistKafkaTopic) {
198+
row.set(BigQueryConstants.KAFKA_TOPIC_FIELD, element.getOriginalPayload().getTopic());
199+
}
184200
context.output(FailsafeElement.of(element.getOriginalPayload(), row));
185201
}
186202
}
@@ -229,7 +245,8 @@ public WriteResult expand(
229245
input
230246
.apply(
231247
"ConvertGenericRecordToTableRow",
232-
ParDo.of(new GenericRecordToTableRowFn(this.persistKafkaKey)))
248+
ParDo.of(
249+
new GenericRecordToTableRowFn(this.persistKafkaKey, this.persistKafkaTopic)))
233250
.setCoder(
234251
FailsafeElementCoder.of(
235252
KafkaRecordCoder.of(

v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/utils/BigQueryConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@ public class BigQueryConstants {
2020
private BigQueryConstants() {}
2121

2222
public static final String KAFKA_KEY_FIELD = "_key";
23+
public static final String KAFKA_TOPIC_FIELD = "_topic";
2324
}

v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToGcsFlex.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.HashMap;
3030
import java.util.List;
3131
import java.util.Map;
32+
import java.util.stream.Collectors;
3233
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
3334
import org.apache.beam.sdk.Pipeline;
3435
import org.apache.beam.sdk.PipelineResult;
@@ -135,7 +136,7 @@ public static PipelineResult run(KafkaToGcsOptions options) throws Exception {
135136
List<String> bootstrapServerAndTopicList =
136137
KafkaTopicUtils.getBootstrapServerAndTopic(
137138
options.getReadBootstrapServerAndTopic(), options.getProject());
138-
topicsList = List.of(bootstrapServerAndTopicList.get(1));
139+
topicsList = bootstrapServerAndTopicList.stream().skip(1).collect(Collectors.toList());
139140
bootstrapServes = bootstrapServerAndTopicList.get(0);
140141
} else {
141142
throw new IllegalArgumentException(

0 commit comments

Comments
 (0)