Skip to content

Commit d18218a

Browse files
committed
Add plumbing for python use case.
1 parent 68284b8 commit d18218a

File tree

4 files changed

+110
-3
lines changed

4 files changed

+110
-3
lines changed

sdks/java/io/expansion-service/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ dependencies {
7676
permitUnusedDeclared project(":sdks:java:io:kafka") // BEAM-11761
7777
implementation project(":sdks:java:io:kafka:upgrade")
7878
permitUnusedDeclared project(":sdks:java:io:kafka:upgrade") // BEAM-11761
79+
implementation project(":sdks:java:extensions:kafka-factories")
80+
permitUnusedDeclared project(":sdks:java:extensions:kafka-factories")
7981

8082
if (JavaVersion.current().compareTo(JavaVersion.VERSION_11) >= 0 && project.findProperty('testJavaVersion') != '8') {
8183
// iceberg ended support for Java 8 in 1.7.0

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
2828
import java.io.InputStream;
2929
import java.io.OutputStream;
30+
import java.lang.reflect.Constructor;
3031
import java.lang.reflect.Method;
3132
import java.util.ArrayList;
3233
import java.util.Arrays;
@@ -94,6 +95,8 @@
9495
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual;
9596
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
9697
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime;
98+
import org.apache.beam.sdk.util.InstanceBuilder;
99+
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
97100
import org.apache.beam.sdk.util.Preconditions;
98101
import org.apache.beam.sdk.util.construction.PTransformMatchers;
99102
import org.apache.beam.sdk.util.construction.ReplacementOutputs;
@@ -930,6 +933,25 @@ static <K, V> void setupExternalBuilder(
930933
builder.setOffsetDeduplication(false);
931934
builder.setRedistributeByRecordKey(false);
932935
}
936+
937+
if (config.consumerFactoryFnClass != null) {
938+
if (config.consumerFactoryFnClass.contains("KerberosConsumerFactoryFn")) {
939+
try {
940+
if (!config.consumerFactoryFnClass.contains("krb5Location")) {
941+
throw new IllegalArgumentException("The KerberosConsumerFactoryFn requires a location for the krb5.conf file. " +
942+
"Please provide either a GCS location or Google Secret Manager location for this file.");
943+
}
944+
builder.setConsumerFactoryFn(InstanceBuilder.ofType(new TypeDescriptor<SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>>() {})
945+
.fromClassName(config.consumerFactoryFnClass)
946+
.withArg(
947+
String.class,
948+
config.consumerFactoryFnParams.get("krb5Location"))
949+
.build());
950+
} catch (Exception e) {
951+
throw new RuntimeException("Unable to construct FactoryFn " + config.consumerFactoryFnClass + ": " + e.getMessage(), e);
952+
}
953+
}
954+
}
933955
}
934956

935957
private static <T> Coder<T> resolveCoder(Class<Deserializer<T>> deserializer) {
@@ -1000,6 +1022,8 @@ public static class Configuration {
10001022
private Boolean offsetDeduplication;
10011023
private Boolean redistributeByRecordKey;
10021024
private Long dynamicReadPollIntervalSeconds;
1025+
private String consumerFactoryFnClass;
1026+
private Map<String, String> consumerFactoryFnParams;
10031027

10041028
public void setConsumerConfig(Map<String, String> consumerConfig) {
10051029
this.consumerConfig = consumerConfig;
@@ -1068,6 +1092,14 @@ public void setRedistributeByRecordKey(Boolean redistributeByRecordKey) {
10681092
public void setDynamicReadPollIntervalSeconds(Long dynamicReadPollIntervalSeconds) {
10691093
this.dynamicReadPollIntervalSeconds = dynamicReadPollIntervalSeconds;
10701094
}
1095+
1096+
public void setConsumerFactoryFnClass(String consumerFactoryFnClass) {
1097+
this.consumerFactoryFnClass = consumerFactoryFnClass;
1098+
}
1099+
1100+
public void setConsumerFactoryFnParams(Map<String, String> consumerFactoryFnParams) {
1101+
this.consumerFactoryFnParams = consumerFactoryFnParams;
1102+
}
10711103
}
10721104
}
10731105

sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@
4141
from apache_beam.transforms.userstate import BagStateSpec
4242
from apache_beam.transforms.userstate import CombiningValueStateSpec
4343
from apache_beam.utils import subprocess_server
44+
from apache_beam.options.pipeline_options import (
45+
PipelineOptions,
46+
GoogleCloudOptions
47+
)
4448

4549
NUM_RECORDS = 1000
4650

@@ -68,7 +72,7 @@ def process(
6872

6973
class CrossLanguageKafkaIO(object):
7074
def __init__(
71-
self, bootstrap_servers, topic, null_key, expansion_service=None):
75+
self, bootstrap_servers=None, topic=None, null_key=None, expansion_service=None):
7276
self.bootstrap_servers = bootstrap_servers
7377
self.topic = topic
7478
self.null_key = null_key
@@ -107,6 +111,34 @@ def build_read_pipeline(self, pipeline, max_num_records=None):
107111
| 'CalculateSum' >> beam.ParDo(CollectingFn())
108112
| 'SetSumCounter' >> beam.Map(self.sum_counter.inc))
109113

114+
def build_read_pipeline_with_kerberos(self, p, max_num_records=None):
115+
116+
jaas_config = (
117+
f'com.sun.security.auth.module.Krb5LoginModule required '
118+
f'useKeyTab=true storeKey=true '
119+
f'keyTab="secretValue:projects/dataflow-testing-311516/secrets/kafka-client-keytab/versions/latest" '
120+
f'principal="[email protected]";'
121+
)
122+
123+
kafka_records = (
124+
p
125+
| 'ReadFromKafka' >> ReadFromKafka(
126+
consumer_config={
127+
'bootstrap.servers': 'fozzie-test-kafka-broker.us-central1-c.c.dataflow-testing-311516.internal:9092',
128+
'auto.offset.reset': 'earliest',
129+
'max_num_records': max_num_records,
130+
'security.protocol': 'SASL_PLAINTEXT',
131+
'sasl.mechanism': 'GSSAPI',
132+
'sasl.kerberos.service.name': 'kafka',
133+
'sasl.jaas.config': jaas_config
134+
},
135+
topics=['fozzie_test_kerberos_topic'],
136+
key_deserializer='org.apache.kafka.common.serialization.StringDeserializer',
137+
value_deserializer='org.apache.kafka.common.serialization.StringDeserializer',
138+
consumer_factory_fn_class='org.apache.beam.sdk.extensions.kafka.factories.KerberosConsumerFactoryFn',
139+
consumer_factory_fn_params={'krb5Location': 'gs://fozzie_testing_bucket/kerberos/krb5.conf'}))
140+
return kafka_records
141+
110142
def run_xlang_kafkaio(self, pipeline):
111143
self.build_write_pipeline(pipeline)
112144
self.build_read_pipeline(pipeline)
@@ -182,6 +214,17 @@ def test_hosted_kafkaio_null_key(self):
182214
self.run_kafka_write(pipeline_creator)
183215
self.run_kafka_read(pipeline_creator, None)
184216

217+
def test_hosted_kafkaio_null_key_kerberos(self):
218+
kafka_topic = 'xlang_kafkaio_test_null_key_{}'.format(uuid.uuid4())
219+
bootstrap_servers = 'fozzie-test-kafka-broker.us-central1-c.c.dataflow-testing-311516.internal:9092'
220+
pipeline_creator = CrossLanguageKafkaIO(
221+
bootstrap_servers,
222+
kafka_topic,
223+
True,
224+
'localhost:%s' % os.environ.get('EXPANSION_PORT'))
225+
226+
self.run_kafka_read_with_kerberos(pipeline_creator)
227+
185228
def run_kafka_write(self, pipeline_creator):
186229
with TestPipeline() as pipeline:
187230
pipeline.not_use_test_runner_api = True
@@ -196,6 +239,23 @@ def run_kafka_read(self, pipeline_creator, expected_key):
196239
equal_to([(expected_key, str(i).encode())
197240
for i in range(NUM_RECORDS)]))
198241

242+
def run_kafka_read_with_kerberos(self, pipeline_creator):
243+
options_dict = {
244+
'runner': 'DataflowRunner',
245+
'project': 'dataflow-testing-311516',
246+
'region': 'us-central1',
247+
'streaming': False
248+
}
249+
options = PipelineOptions.from_dictionary(options_dict)
250+
expected_records = [f'test{i}' for i in range(1, 12)]
251+
with beam.Pipeline(options=options) as p:
252+
pipeline.not_use_test_runner_api = True
253+
result = pipeline_creator.build_read_pipeline_with_kerberos(p, max_num_records=11)
254+
assert_that(
255+
result,
256+
equal_to(expected_records)
257+
)
258+
199259
def get_platform_localhost(self):
200260
if sys.platform == 'darwin':
201261
return 'host.docker.internal'

sdks/python/apache_beam/io/kafka.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,9 @@
124124
('redistribute', typing.Optional[bool]),
125125
('redistribute_num_keys', typing.Optional[np.int32]),
126126
('allow_duplicates', typing.Optional[bool]),
127-
('dynamic_read_poll_interval_seconds', typing.Optional[int]),
127+
('dynamic_read_poll_interval_seconds', typing.Optional[int])
128+
('consumer_factory_fn', typing.Optional[str]),
129+
('consumer_factory_fn_params', typing.Optional[typing.Mapping[str, str]])
128130
])
129131

130132

@@ -173,6 +175,8 @@ def __init__(
173175
redistribute_num_keys=np.int32(0),
174176
allow_duplicates=False,
175177
dynamic_read_poll_interval_seconds: typing.Optional[int] = None,
178+
consumer_factory_fn_class=None,
179+
consumer_factory_fn_params=None
176180
):
177181
"""
178182
Initializes a read operation from Kafka.
@@ -216,6 +220,13 @@ def __init__(
216220
:param dynamic_read_poll_interval_seconds: The interval in seconds at which
217221
to check for new partitions. If not None, dynamic partition discovery
218222
is enabled.
223+
:param consumer_factory_fn_class: A fully qualified classpath to an existing provided
224+
consumerFactoryFn. If not None, this will construct Kafka consumers with a custom
225+
configuration.
226+
:param consumer_factory_fn_params: A map which specifies the parameters for the provided
227+
consumer_factory_fn_class. IF not None, the values in this map will be used when
228+
constructing the consumer_factory_fn_class object. This cannot be null
229+
if the consumer_factory_fn_class is not null.
219230
"""
220231
if timestamp_policy not in [ReadFromKafka.processing_time_policy,
221232
ReadFromKafka.create_time_policy,
@@ -242,7 +253,9 @@ def __init__(
242253
redistribute_num_keys=redistribute_num_keys,
243254
allow_duplicates=allow_duplicates,
244255
dynamic_read_poll_interval_seconds=
245-
dynamic_read_poll_interval_seconds)),
256+
dynamic_read_poll_interval_seconds,
257+
consumer_factory_fn_class=consumer_factory_fn_class,
258+
consumer_factory_fn_params=consumer_factory_fn_params)),
246259
expansion_service or default_io_expansion_service())
247260

248261

0 commit comments

Comments
 (0)