diff --git a/sdks/java/io/expansion-service/build.gradle b/sdks/java/io/expansion-service/build.gradle index f1366817db22..dbd6e279846b 100644 --- a/sdks/java/io/expansion-service/build.gradle +++ b/sdks/java/io/expansion-service/build.gradle @@ -76,6 +76,8 @@ dependencies { permitUnusedDeclared project(":sdks:java:io:kafka") // BEAM-11761 implementation project(":sdks:java:io:kafka:upgrade") permitUnusedDeclared project(":sdks:java:io:kafka:upgrade") // BEAM-11761 + implementation project(":sdks:java:extensions:kafka-factories") + permitUnusedDeclared project(":sdks:java:extensions:kafka-factories") if (JavaVersion.current().compareTo(JavaVersion.VERSION_11) >= 0 && project.findProperty('testJavaVersion') != '8') { // iceberg ended support for Java 8 in 1.7.0 diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 568fe49217b3..48e4ae2317ac 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -35,6 +35,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.regex.Pattern; @@ -94,6 +95,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime; +import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.util.construction.PTransformMatchers; import org.apache.beam.sdk.util.construction.ReplacementOutputs; @@ -930,6 +932,34 @@ static void setupExternalBuilder( builder.setOffsetDeduplication(false); builder.setRedistributeByRecordKey(false); } + + if (config.consumerFactoryFnClass != null) { + if (config.consumerFactoryFnClass.contains("KerberosConsumerFactoryFn")) { + try { + if (!config.consumerFactoryFnParams.containsKey("krb5Location")) { + throw new IllegalArgumentException( + "The KerberosConsumerFactoryFn requires a location for the krb5.conf file. " + + "Please provide either a GCS location or Google Secret Manager location for this file."); + } + String krb5Location = config.consumerFactoryFnParams.get("krb5Location"); + builder.setConsumerFactoryFn( + InstanceBuilder.ofType( + new TypeDescriptor< + SerializableFunction< + Map, Consumer>>() {}) + .fromClassName(config.consumerFactoryFnClass) + .withArg(String.class, Objects.requireNonNull(krb5Location)) + .build()); + } catch (Exception e) { + throw new RuntimeException( + "Unable to construct FactoryFn " + + config.consumerFactoryFnClass + + ": " + + e.getMessage(), + e); + } + } + } } private static Coder resolveCoder(Class> deserializer) { @@ -1000,6 +1030,8 @@ public static class Configuration { private Boolean offsetDeduplication; private Boolean redistributeByRecordKey; private Long dynamicReadPollIntervalSeconds; + private String consumerFactoryFnClass; + private Map consumerFactoryFnParams; public void setConsumerConfig(Map consumerConfig) { this.consumerConfig = consumerConfig; @@ -1068,6 +1100,14 @@ public void setRedistributeByRecordKey(Boolean redistributeByRecordKey) { public void setDynamicReadPollIntervalSeconds(Long dynamicReadPollIntervalSeconds) { this.dynamicReadPollIntervalSeconds = dynamicReadPollIntervalSeconds; } + + public void setConsumerFactoryFnClass(String consumerFactoryFnClass) { + this.consumerFactoryFnClass = consumerFactoryFnClass; + } + + public void setConsumerFactoryFnParams(Map consumerFactoryFnParams) { + this.consumerFactoryFnParams = consumerFactoryFnParams; + } } } diff --git a/sdks/python/apache_beam/io/kafka.py b/sdks/python/apache_beam/io/kafka.py index f3e6c39cfda4..09063fb0828f 100644 --- a/sdks/python/apache_beam/io/kafka.py +++ b/sdks/python/apache_beam/io/kafka.py @@ -100,6 +100,7 @@ # pytype: skip-file +import collections import typing import numpy as np @@ -110,22 +111,21 @@ ReadFromKafkaSchema = typing.NamedTuple( 'ReadFromKafkaSchema', - [ - ('consumer_config', typing.Mapping[str, str]), - ('topics', typing.List[str]), - ('key_deserializer', str), - ('value_deserializer', str), - ('start_read_time', typing.Optional[int]), - ('max_num_records', typing.Optional[int]), - ('max_read_time', typing.Optional[int]), - ('commit_offset_in_finalize', bool), - ('timestamp_policy', str), - ('consumer_polling_timeout', typing.Optional[int]), - ('redistribute', typing.Optional[bool]), - ('redistribute_num_keys', typing.Optional[np.int32]), - ('allow_duplicates', typing.Optional[bool]), - ('dynamic_read_poll_interval_seconds', typing.Optional[int]), - ]) + [('consumer_config', typing.Mapping[str, str]), + ('topics', typing.List[str]), ('key_deserializer', str), + ('value_deserializer', str), ('start_read_time', typing.Optional[int]), + ('max_num_records', typing.Optional[int]), + ('max_read_time', typing.Optional[int]), + ('commit_offset_in_finalize', bool), ('timestamp_policy', str), + ('consumer_polling_timeout', typing.Optional[int]), + ('redistribute', typing.Optional[bool]), + ('redistribute_num_keys', typing.Optional[np.int32]), + ('allow_duplicates', typing.Optional[bool]), + ('dynamic_read_poll_interval_seconds', typing.Optional[int]), + ('consumer_factory_fn_class', typing.Optional[str]), + ( + 'consumer_factory_fn_params', + typing.Optional[collections.abc.Mapping[str, str]])]) def default_io_expansion_service(append_args=None): @@ -173,7 +173,9 @@ def __init__( redistribute_num_keys=np.int32(0), allow_duplicates=False, dynamic_read_poll_interval_seconds: typing.Optional[int] = None, - ): + consumer_factory_fn_class: typing.Optional[str] = None, + consumer_factory_fn_params: typing.Optional[ + collections.abc.Mapping] = None): """ Initializes a read operation from Kafka. @@ -216,6 +218,13 @@ def __init__( :param dynamic_read_poll_interval_seconds: The interval in seconds at which to check for new partitions. If not None, dynamic partition discovery is enabled. + :param consumer_factory_fn_class: A fully qualified classpath to an + existing provided consumerFactoryFn. If not None, this will construct + Kafka consumers with a custom configuration. + :param consumer_factory_fn_params: A map which specifies the parameters for + the provided consumer_factory_fn_class. If not None, the values in this + map will be used when constructing the consumer_factory_fn_class object. + This cannot be null if the consumer_factory_fn_class is not null. """ if timestamp_policy not in [ReadFromKafka.processing_time_policy, ReadFromKafka.create_time_policy, @@ -242,7 +251,9 @@ def __init__( redistribute_num_keys=redistribute_num_keys, allow_duplicates=allow_duplicates, dynamic_read_poll_interval_seconds= - dynamic_read_poll_interval_seconds)), + dynamic_read_poll_interval_seconds, + consumer_factory_fn_class=consumer_factory_fn_class, + consumer_factory_fn_params=consumer_factory_fn_params)), expansion_service or default_io_expansion_service()) diff --git a/settings.gradle.kts b/settings.gradle.kts index 97facd1e3918..b83782a29e3f 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -18,14 +18,14 @@ import com.gradle.enterprise.gradleplugin.internal.extension.BuildScanExtensionWithHiddenFeatures pluginManagement { - plugins { - id("org.javacc.javacc") version "3.0.3" // enable the JavaCC parser generator - } + plugins { + id("org.javacc.javacc") version "3.0.3" // enable the JavaCC parser generator + } } plugins { - id("com.gradle.develocity") version "3.19" - id("com.gradle.common-custom-user-data-gradle-plugin") version "2.4.0" + id("com.gradle.develocity") version "3.19" + id("com.gradle.common-custom-user-data-gradle-plugin") version "2.2.1" } @@ -36,32 +36,32 @@ val isGithubActionsBuild = arrayOf("GITHUB_REPOSITORY", "GITHUB_RUN_ID").all { S val isCi = isJenkinsBuild || isGithubActionsBuild develocity { - server = "https://develocity.apache.org" - projectId = "beam" + server = "https://develocity.apache.org" + projectId = "beam" - buildScan { - uploadInBackground = !isCi - publishing.onlyIf { it.isAuthenticated } - obfuscation { - ipAddresses { addresses -> addresses.map { "0.0.0.0" } } + buildScan { + uploadInBackground = !isCi + publishing.onlyIf { it.isAuthenticated } + obfuscation { + ipAddresses { addresses -> addresses.map { "0.0.0.0" } } + } } - } } buildCache { - local { - isEnabled = true - } - remote { - url = uri("https://beam-cache.apache.org/cache/") - isAllowUntrustedServer = false - credentials { - username = System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME") - password = System.getenv("GRADLE_ENTERPRISE_CACHE_PASSWORD") + local { + isEnabled = true + } + remote { + url = uri("https://beam-cache.apache.org/cache/") + isAllowUntrustedServer = false + credentials { + username = System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME") + password = System.getenv("GRADLE_ENTERPRISE_CACHE_PASSWORD") + } + isEnabled = !System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME").isNullOrBlank() + isPush = isCi && !System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME").isNullOrBlank() } - isEnabled = !System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME").isNullOrBlank() - isPush = isCi && !System.getenv("GRADLE_ENTERPRISE_CACHE_USERNAME").isNullOrBlank() - } } rootProject.name = "beam"