Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class KafkaTopicUtils {

Expand All @@ -27,23 +29,22 @@ public class KafkaTopicUtils {
public static List<String> getBootstrapServerAndTopic(
String bootstrapServerAndTopicString, String project) {
Matcher matcher = GMK_PATTERN.matcher(bootstrapServerAndTopicString);
String bootstrapServer = null;
String topicName = null;
if (matcher.matches()) {
bootstrapServer =
String bootstrapServer =
"bootstrap."
+ matcher.group(3)
+ "."
+ matcher.group(2)
+ ".managedkafka."
+ project
+ ".cloud.goog:9092";
topicName = matcher.group(4);
} else {
String[] list = bootstrapServerAndTopicString.split(";");
bootstrapServer = list[0];
topicName = list[1];
String topicName = matcher.group(4);
return List.of(bootstrapServer, topicName);
}
return List.of(bootstrapServer, topicName);
String[] list = bootstrapServerAndTopicString.split(";");
String bootstrapServer = list[0];
String[] topicNames = list[1].split(",");
return Stream.concat(Stream.of(bootstrapServer), Stream.of(topicNames))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.kafka.utils;

import java.util.List;
import org.junit.Assert;
import org.junit.Test;

public class KafkaTopicUtilsTest {

private static final String PROJECT = "test-project";

@Test
public void testParsingBootstrapServerAndTopicFromGMK() {
String input = "projects/project1/locations/us-central1/clusters/cluster1/topics/topic1";
List<String> result = KafkaTopicUtils.getBootstrapServerAndTopic(input, PROJECT);
Assert.assertEquals(
result,
List.of(
"bootstrap.cluster1.us-central1.managedkafka.test-project.cloud.goog:9092", "topic1"));
}

@Test
public void testParsingBootstrapServerAndTopic() {
String input = "1.1.1.1:9094;topic1";
List<String> result = KafkaTopicUtils.getBootstrapServerAndTopic(input, PROJECT);
Assert.assertEquals(result, List.of("1.1.1.1:9094", "topic1"));
}

@Test
public void testParsingBootstrapServerAndMultipleTopics() {
String input = "1.1.1.1:9094;topic1,topic2,topic3";
List<String> result = KafkaTopicUtils.getBootstrapServerAndTopic(input, PROJECT);
Assert.assertEquals(result, List.of("1.1.1.1:9094", "topic1", "topic2", "topic3"));
}
}
7 changes: 6 additions & 1 deletion v2/kafka-to-bigquery/README_Kafka_to_BigQuery_Flex.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat

* **outputTableSpec**: BigQuery table location to write the output to. The name should be in the format `<project>:<dataset>.<table_name>`. The table's schema must match input objects.
* **persistKafkaKey**: If true, the pipeline will persist the Kafka message key in the BigQuery table, in a `_key` field of type `BYTES`. Default is `false` (Key is ignored).
* **persistKafkaTopic**: If true, the pipeline will persist the Kafka message topic in the BigQuery table, in a `_topic` field of type `STRING`. Default is `false` (Topic is ignored).
* **outputProject**: BigQuery output project in wehich the dataset resides. Tables will be created dynamically in the dataset. Defaults to empty.
* **outputDataset**: BigQuery output dataset to write the output to. Tables will be created dynamically in the dataset. If the tables are created beforehand, the table names should follow the specified naming convention. The name should be `bqTableNamePrefix + Avro Schema FullName` , each word will be separated by a hyphen `-`. Defaults to empty.
* **bqTableNamePrefix**: Naming prefix to be used while creating BigQuery output tables. Only applicable when using schema registry. Defaults to empty.
Expand Down Expand Up @@ -171,6 +172,7 @@ export USE_BIG_QUERY_DLQ=false
### Optional
export OUTPUT_TABLE_SPEC=<outputTableSpec>
export PERSIST_KAFKA_KEY=false
export PERSIST_KAFKA_TOPIC=false
export OUTPUT_PROJECT=""
export OUTPUT_DATASET=""
export BQ_TABLE_NAME_PREFIX=""
Expand Down Expand Up @@ -220,6 +222,7 @@ gcloud dataflow flex-template run "kafka-to-bigquery-flex-job" \
--parameters "readBootstrapServerAndTopic=$READ_BOOTSTRAP_SERVER_AND_TOPIC" \
--parameters "outputTableSpec=$OUTPUT_TABLE_SPEC" \
--parameters "persistKafkaKey=$PERSIST_KAFKA_KEY" \
--parameters "persistKafkaTopic=$PERSIST_KAFKA_TOPIC" \
--parameters "writeMode=$WRITE_MODE" \
--parameters "outputProject=$OUTPUT_PROJECT" \
--parameters "outputDataset=$OUTPUT_DATASET" \
Expand Down Expand Up @@ -292,6 +295,7 @@ export USE_BIG_QUERY_DLQ=false
### Optional
export OUTPUT_TABLE_SPEC=<outputTableSpec>
export PERSIST_KAFKA_KEY=false
export PERSIST_KAFKA_TOPIC=false
export OUTPUT_PROJECT=""
export OUTPUT_DATASET=""
export BQ_TABLE_NAME_PREFIX=""
Expand Down Expand Up @@ -341,7 +345,7 @@ mvn clean package -PtemplatesRun \
-Dregion="$REGION" \
-DjobName="kafka-to-bigquery-flex-job" \
-DtemplateName="Kafka_to_BigQuery_Flex" \
-Dparameters="readBootstrapServerAndTopic=$READ_BOOTSTRAP_SERVER_AND_TOPIC,outputTableSpec=$OUTPUT_TABLE_SPEC,persistKafkaKey=$PERSIST_KAFKA_KEY,writeMode=$WRITE_MODE,outputProject=$OUTPUT_PROJECT,outputDataset=$OUTPUT_DATASET,bqTableNamePrefix=$BQ_TABLE_NAME_PREFIX,createDisposition=$CREATE_DISPOSITION,writeDisposition=$WRITE_DISPOSITION,useAutoSharding=$USE_AUTO_SHARDING,numStorageWriteApiStreams=$NUM_STORAGE_WRITE_API_STREAMS,storageWriteApiTriggeringFrequencySec=$STORAGE_WRITE_API_TRIGGERING_FREQUENCY_SEC,useStorageWriteApiAtLeastOnce=$USE_STORAGE_WRITE_API_AT_LEAST_ONCE,enableCommitOffsets=$ENABLE_COMMIT_OFFSETS,consumerGroupId=$CONSUMER_GROUP_ID,kafkaReadOffset=$KAFKA_READ_OFFSET,kafkaReadAuthenticationMode=$KAFKA_READ_AUTHENTICATION_MODE,kafkaReadUsernameSecretId=$KAFKA_READ_USERNAME_SECRET_ID,kafkaReadPasswordSecretId=$KAFKA_READ_PASSWORD_SECRET_ID,kafkaReadKeystoreLocation=$KAFKA_READ_KEYSTORE_LOCATION,kafkaReadTruststoreLocation=$KAFKA_READ_TRUSTSTORE_LOCATION,kafkaReadTruststorePasswordSecretId=$KAFKA_READ_TRUSTSTORE_PASSWORD_SECRET_ID,kafkaReadKeystorePasswordSecretId=$KAFKA_READ_KEYSTORE_PASSWORD_SECRET_ID,kafkaReadKeyPasswordSecretId=$KAFKA_READ_KEY_PASSWORD_SECRET_ID,kafkaReadSaslScramUsernameSecretId=$KAFKA_READ_SASL_SCRAM_USERNAME_SECRET_ID,kafkaReadSaslScramPasswordSecretId=$KAFKA_READ_SASL_SCRAM_PASSWORD_SECRET_ID,kafkaReadSaslScramTruststoreLocation=$KAFKA_READ_SASL_SCRAM_TRUSTSTORE_LOCATION,kafkaReadSaslScramTruststorePasswordSecretId=$KAFKA_READ_SASL_SCRAM_TRUSTSTORE_PASSWORD_SECRET_ID,messageFormat=$MESSAGE_FORMAT,schemaFormat=$SCHEMA_FORMAT,confluentAvroSchemaPath=$CONFLUENT_AVRO_SCHEMA_PATH,schemaRegistryConnectionUrl=$SCHEMA_REGISTRY_CONNECTION_URL,binaryAvroSchemaPath=$BINARY_AVRO_SCHEMA_PATH,schemaRegistryAuthenticationMode=$SCHEMA_REGISTRY_AUTHENTICATION_MODE,schemaRegistryTruststoreLocation=$SCHEMA_REGISTRY_TRUSTSTORE_LOCATION,schemaRegistryTruststorePasswordSecretId=$SCHEMA_REGISTRY_TRUSTSTORE_PASSWORD_SECRET_ID,schemaRegistryKeystoreLocation=$SCHEMA_REGISTRY_KEYSTORE_LOCATION,schemaRegistryKeystorePasswordSecretId=$SCHEMA_REGISTRY_KEYSTORE_PASSWORD_SECRET_ID,schemaRegistryKeyPasswordSecretId=$SCHEMA_REGISTRY_KEY_PASSWORD_SECRET_ID,schemaRegistryOauthClientId=$SCHEMA_REGISTRY_OAUTH_CLIENT_ID,schemaRegistryOauthClientSecretId=$SCHEMA_REGISTRY_OAUTH_CLIENT_SECRET_ID,schemaRegistryOauthScope=$SCHEMA_REGISTRY_OAUTH_SCOPE,schemaRegistryOauthTokenEndpointUrl=$SCHEMA_REGISTRY_OAUTH_TOKEN_ENDPOINT_URL,outputDeadletterTable=$OUTPUT_DEADLETTER_TABLE,useBigQueryDLQ=$USE_BIG_QUERY_DLQ,javascriptTextTransformGcsPath=$JAVASCRIPT_TEXT_TRANSFORM_GCS_PATH,javascriptTextTransformFunctionName=$JAVASCRIPT_TEXT_TRANSFORM_FUNCTION_NAME,javascriptTextTransformReloadIntervalMinutes=$JAVASCRIPT_TEXT_TRANSFORM_RELOAD_INTERVAL_MINUTES" \
-Dparameters="readBootstrapServerAndTopic=$READ_BOOTSTRAP_SERVER_AND_TOPIC,outputTableSpec=$OUTPUT_TABLE_SPEC,persistKafkaKey=$PERSIST_KAFKA_KEY,persistKafkaTopic=$PERSIST_KAFKA_TOPIC,writeMode=$WRITE_MODE,outputProject=$OUTPUT_PROJECT,outputDataset=$OUTPUT_DATASET,bqTableNamePrefix=$BQ_TABLE_NAME_PREFIX,createDisposition=$CREATE_DISPOSITION,writeDisposition=$WRITE_DISPOSITION,useAutoSharding=$USE_AUTO_SHARDING,numStorageWriteApiStreams=$NUM_STORAGE_WRITE_API_STREAMS,storageWriteApiTriggeringFrequencySec=$STORAGE_WRITE_API_TRIGGERING_FREQUENCY_SEC,useStorageWriteApiAtLeastOnce=$USE_STORAGE_WRITE_API_AT_LEAST_ONCE,enableCommitOffsets=$ENABLE_COMMIT_OFFSETS,consumerGroupId=$CONSUMER_GROUP_ID,kafkaReadOffset=$KAFKA_READ_OFFSET,kafkaReadAuthenticationMode=$KAFKA_READ_AUTHENTICATION_MODE,kafkaReadUsernameSecretId=$KAFKA_READ_USERNAME_SECRET_ID,kafkaReadPasswordSecretId=$KAFKA_READ_PASSWORD_SECRET_ID,kafkaReadKeystoreLocation=$KAFKA_READ_KEYSTORE_LOCATION,kafkaReadTruststoreLocation=$KAFKA_READ_TRUSTSTORE_LOCATION,kafkaReadTruststorePasswordSecretId=$KAFKA_READ_TRUSTSTORE_PASSWORD_SECRET_ID,kafkaReadKeystorePasswordSecretId=$KAFKA_READ_KEYSTORE_PASSWORD_SECRET_ID,kafkaReadKeyPasswordSecretId=$KAFKA_READ_KEY_PASSWORD_SECRET_ID,kafkaReadSaslScramUsernameSecretId=$KAFKA_READ_SASL_SCRAM_USERNAME_SECRET_ID,kafkaReadSaslScramPasswordSecretId=$KAFKA_READ_SASL_SCRAM_PASSWORD_SECRET_ID,kafkaReadSaslScramTruststoreLocation=$KAFKA_READ_SASL_SCRAM_TRUSTSTORE_LOCATION,kafkaReadSaslScramTruststorePasswordSecretId=$KAFKA_READ_SASL_SCRAM_TRUSTSTORE_PASSWORD_SECRET_ID,messageFormat=$MESSAGE_FORMAT,schemaFormat=$SCHEMA_FORMAT,confluentAvroSchemaPath=$CONFLUENT_AVRO_SCHEMA_PATH,schemaRegistryConnectionUrl=$SCHEMA_REGISTRY_CONNECTION_URL,binaryAvroSchemaPath=$BINARY_AVRO_SCHEMA_PATH,schemaRegistryAuthenticationMode=$SCHEMA_REGISTRY_AUTHENTICATION_MODE,schemaRegistryTruststoreLocation=$SCHEMA_REGISTRY_TRUSTSTORE_LOCATION,schemaRegistryTruststorePasswordSecretId=$SCHEMA_REGISTRY_TRUSTSTORE_PASSWORD_SECRET_ID,schemaRegistryKeystoreLocation=$SCHEMA_REGISTRY_KEYSTORE_LOCATION,schemaRegistryKeystorePasswordSecretId=$SCHEMA_REGISTRY_KEYSTORE_PASSWORD_SECRET_ID,schemaRegistryKeyPasswordSecretId=$SCHEMA_REGISTRY_KEY_PASSWORD_SECRET_ID,schemaRegistryOauthClientId=$SCHEMA_REGISTRY_OAUTH_CLIENT_ID,schemaRegistryOauthClientSecretId=$SCHEMA_REGISTRY_OAUTH_CLIENT_SECRET_ID,schemaRegistryOauthScope=$SCHEMA_REGISTRY_OAUTH_SCOPE,schemaRegistryOauthTokenEndpointUrl=$SCHEMA_REGISTRY_OAUTH_TOKEN_ENDPOINT_URL,outputDeadletterTable=$OUTPUT_DEADLETTER_TABLE,useBigQueryDLQ=$USE_BIG_QUERY_DLQ,javascriptTextTransformGcsPath=$JAVASCRIPT_TEXT_TRANSFORM_GCS_PATH,javascriptTextTransformFunctionName=$JAVASCRIPT_TEXT_TRANSFORM_FUNCTION_NAME,javascriptTextTransformReloadIntervalMinutes=$JAVASCRIPT_TEXT_TRANSFORM_RELOAD_INTERVAL_MINUTES" \
-f v2/kafka-to-bigquery
```

Expand Down Expand Up @@ -393,6 +397,7 @@ resource "google_dataflow_flex_template_job" "kafka_to_bigquery_flex" {
useBigQueryDLQ = "false"
# outputTableSpec = "<outputTableSpec>"
# persistKafkaKey = "false"
# persistKafkaTopic = "false"
# outputProject = ""
# outputDataset = ""
# bqTableNamePrefix = ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public interface KafkaToBigQueryFlexOptions
void setReadBootstrapServerAndTopic(String value);

@TemplateParameter.Boolean(
order = 3,
order = 2,
groupName = "Source",
optional = true,
description = "Persist the Kafka Message Key to the BigQuery table",
Expand All @@ -58,6 +58,18 @@ public interface KafkaToBigQueryFlexOptions

void setPersistKafkaKey(Boolean value);

@TemplateParameter.Boolean(
order = 3,
groupName = "Source",
optional = true,
description = "Persist the Kafka Message Topic to the BigQuery table",
helpText =
"If true, the pipeline will persist the Kafka message topic in the BigQuery table, in a `_topic` field of type `STRING`. Default is `false` (Topic is ignored).")
@Default.Boolean(false)
Boolean getPersistKafkaTopic();

void setPersistKafkaTopic(Boolean value);

@TemplateParameter.Enum(
order = 4,
name = "writeMode",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.ByteArrayCoder;
Expand Down Expand Up @@ -241,7 +242,7 @@
List<String> bootstrapServerAndTopicList =
KafkaTopicUtils.getBootstrapServerAndTopic(
options.getReadBootstrapServerAndTopic(), options.getProject());
topicsList = List.of(bootstrapServerAndTopicList.get(1));
topicsList = bootstrapServerAndTopicList.stream().skip(1).collect(Collectors.toList());

Check warning on line 245 in v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java

View check run for this annotation

Codecov / codecov/patch

v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java#L245

Added line #L245 was not covered by tests
bootstrapServers = bootstrapServerAndTopicList.get(0);
} else {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -313,6 +314,7 @@
options.getNumStorageWriteApiStreams(),
options.getStorageWriteApiTriggeringFrequencySec(),
options.getPersistKafkaKey(),
options.getPersistKafkaTopic(),

Check warning on line 317 in v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java

View check run for this annotation

Codecov / codecov/patch

v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java#L317

Added line #L317 was not covered by tests
options.getUseAutoSharding(),
errorHandler);
} else {
Expand All @@ -325,6 +327,7 @@
options.getNumStorageWriteApiStreams(),
options.getStorageWriteApiTriggeringFrequencySec(),
options.getPersistKafkaKey(),
options.getPersistKafkaTopic(),

Check warning on line 330 in v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java

View check run for this annotation

Codecov / codecov/patch

v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java#L330

Added line #L330 was not covered by tests
options.getUseAutoSharding());
}
writeResult =
Expand Down Expand Up @@ -369,6 +372,7 @@
options.getNumStorageWriteApiStreams(),
options.getStorageWriteApiTriggeringFrequencySec(),
options.getPersistKafkaKey(),
options.getPersistKafkaTopic(),

Check warning on line 375 in v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java

View check run for this annotation

Codecov / codecov/patch

v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java#L375

Added line #L375 was not covered by tests
options.getUseAutoSharding(),
errorHandler);
} else {
Expand All @@ -381,6 +385,7 @@
options.getNumStorageWriteApiStreams(),
options.getStorageWriteApiTriggeringFrequencySec(),
options.getPersistKafkaKey(),
options.getPersistKafkaTopic(),

Check warning on line 388 in v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java

View check run for this annotation

Codecov / codecov/patch

v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java#L388

Added line #L388 was not covered by tests
options.getUseAutoSharding());
}
writeResult =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@

private Boolean persistKafkaKey;

private Boolean persistKafkaTopic;

private String writeDisposition;

private String createDisposition;
Expand All @@ -82,6 +84,7 @@
Integer numStorageWriteApiStreams,
Integer storageWriteApiTriggeringFrequencySec,
Boolean persistKafkaKey,
Boolean persistKafkaTopic,
Boolean useAutoSharding) {
this.avroSchema = avroSchema;
this.outputTableSpec = outputTableSpec;
Expand All @@ -90,6 +93,7 @@
this.numStorageWriteApiStreams = numStorageWriteApiStreams;
this.storageWriteApiTriggeringFrequencySec = storageWriteApiTriggeringFrequencySec;
this.persistKafkaKey = persistKafkaKey;
this.persistKafkaTopic = persistKafkaTopic;

Check warning on line 96 in v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryWriteUtils.java

View check run for this annotation

Codecov / codecov/patch

v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryWriteUtils.java#L96

Added line #L96 was not covered by tests
this.useAutoSharding = useAutoSharding;
this.errorHandler = new ErrorHandler.DefaultErrorHandler<>();
}
Expand All @@ -103,6 +107,7 @@
Integer numStorageWriteApiStreams,
Integer storageWriteApiTriggeringFrequencySec,
Boolean persistKafkaKey,
Boolean persistKafkaTopic,
Boolean useAutoSharding,
ErrorHandler<BadRecord, ?> errorHandler) {
this.avroSchema = avroSchema;
Expand All @@ -112,6 +117,7 @@
this.numStorageWriteApiStreams = numStorageWriteApiStreams;
this.storageWriteApiTriggeringFrequencySec = storageWriteApiTriggeringFrequencySec;
this.persistKafkaKey = persistKafkaKey;
this.persistKafkaTopic = persistKafkaTopic;

Check warning on line 120 in v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryWriteUtils.java

View check run for this annotation

Codecov / codecov/patch

v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryWriteUtils.java#L120

Added line #L120 was not covered by tests
this.useAutoSharding = useAutoSharding;
this.errorHandler = errorHandler;
}
Expand All @@ -124,6 +130,7 @@
Integer numStorageWriteApiStreams,
Integer storageWriteApiTriggeringFrequencySec,
Boolean persistKafkaKey,
Boolean persistKafkaTopic,
Boolean useAutoSharding) {
return new BigQueryWriteUtils.BigQueryWrite(
avroSchema,
Expand All @@ -133,6 +140,7 @@
numStorageWriteApiStreams,
storageWriteApiTriggeringFrequencySec,
persistKafkaKey,
persistKafkaTopic,
useAutoSharding);
}

Expand All @@ -144,6 +152,7 @@
Integer numStorageWriteApiStreams,
Integer storageWriteApiTriggeringFrequencySec,
Boolean persistKafkaKey,
Boolean persistKafkaTopic,
Boolean useAutoSharding,
ErrorHandler<BadRecord, ?> errorHandler) {
return new BigQueryWriteUtils.BigQueryWrite(
Expand All @@ -154,6 +163,7 @@
numStorageWriteApiStreams,
storageWriteApiTriggeringFrequencySec,
persistKafkaKey,
persistKafkaTopic,
useAutoSharding,
errorHandler);
}
Expand All @@ -166,8 +176,11 @@

private boolean persistKafkaKey;

GenericRecordToTableRowFn(boolean persistKafkaKey) {
private boolean persistKafkaTopic;

GenericRecordToTableRowFn(boolean persistKafkaKey, boolean persistKafkaTopic) {

Check warning on line 181 in v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryWriteUtils.java

View check run for this annotation

Codecov / codecov/patch

v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryWriteUtils.java#L181

Added line #L181 was not covered by tests
this.persistKafkaKey = persistKafkaKey;
this.persistKafkaTopic = persistKafkaTopic;

Check warning on line 183 in v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryWriteUtils.java

View check run for this annotation

Codecov / codecov/patch

v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryWriteUtils.java#L183

Added line #L183 was not covered by tests
}

@ProcessElement
Expand All @@ -181,6 +194,9 @@
if (this.persistKafkaKey) {
row.set(BigQueryConstants.KAFKA_KEY_FIELD, element.getOriginalPayload().getKV().getKey());
}
if (this.persistKafkaTopic) {
row.set(BigQueryConstants.KAFKA_TOPIC_FIELD, element.getOriginalPayload().getTopic());

Check warning on line 198 in v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryWriteUtils.java

View check run for this annotation

Codecov / codecov/patch

v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryWriteUtils.java#L198

Added line #L198 was not covered by tests
}
context.output(FailsafeElement.of(element.getOriginalPayload(), row));
}
}
Expand Down Expand Up @@ -229,7 +245,8 @@
input
.apply(
"ConvertGenericRecordToTableRow",
ParDo.of(new GenericRecordToTableRowFn(this.persistKafkaKey)))
ParDo.of(
new GenericRecordToTableRowFn(this.persistKafkaKey, this.persistKafkaTopic)))

Check warning on line 249 in v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryWriteUtils.java

View check run for this annotation

Codecov / codecov/patch

v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/transforms/BigQueryWriteUtils.java#L248-L249

Added lines #L248 - L249 were not covered by tests
.setCoder(
FailsafeElementCoder.of(
KafkaRecordCoder.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ public class BigQueryConstants {
private BigQueryConstants() {}

public static final String KAFKA_KEY_FIELD = "_key";
public static final String KAFKA_TOPIC_FIELD = "_topic";
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
Expand Down Expand Up @@ -135,7 +136,7 @@
List<String> bootstrapServerAndTopicList =
KafkaTopicUtils.getBootstrapServerAndTopic(
options.getReadBootstrapServerAndTopic(), options.getProject());
topicsList = List.of(bootstrapServerAndTopicList.get(1));
topicsList = bootstrapServerAndTopicList.stream().skip(1).collect(Collectors.toList());

Check warning on line 139 in v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToGcsFlex.java

View check run for this annotation

Codecov / codecov/patch

v2/kafka-to-gcs/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToGcsFlex.java#L139

Added line #L139 was not covered by tests
bootstrapServes = bootstrapServerAndTopicList.get(0);
} else {
throw new IllegalArgumentException(
Expand Down
Loading
Loading