Skip to content

Commit 470a751

Browse files
committed
Remove accidentally committed python modules
1 parent 43d731c commit 470a751

File tree

5 files changed

+45
-103
lines changed

5 files changed

+45
-103
lines changed

sdks/java/extensions/kafka-factories/src/test/java/org/apache/beam/sdk/io/kafka/file/aware/factories/FileAwareFactoryFnTest.java

Whitespace-only changes.

sdks/java/extensions/kafka-factories/src/test/java/org/apache/beam/sdk/io/kafka/file/aware/factories/KerberosConsumerFactoryFnTest.java

Whitespace-only changes.

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
2828
import java.io.InputStream;
2929
import java.io.OutputStream;
30-
import java.lang.reflect.Constructor;
3130
import java.lang.reflect.Method;
3231
import java.util.ArrayList;
3332
import java.util.Arrays;
@@ -36,6 +35,7 @@
3635
import java.util.HashSet;
3736
import java.util.List;
3837
import java.util.Map;
38+
import java.util.Objects;
3939
import java.util.Optional;
4040
import java.util.Set;
4141
import java.util.regex.Pattern;
@@ -96,7 +96,6 @@
9696
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.MonotonicallyIncreasing;
9797
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.WallTime;
9898
import org.apache.beam.sdk.util.InstanceBuilder;
99-
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
10099
import org.apache.beam.sdk.util.Preconditions;
101100
import org.apache.beam.sdk.util.construction.PTransformMatchers;
102101
import org.apache.beam.sdk.util.construction.ReplacementOutputs;
@@ -937,18 +936,27 @@ static <K, V> void setupExternalBuilder(
937936
if (config.consumerFactoryFnClass != null) {
938937
if (config.consumerFactoryFnClass.contains("KerberosConsumerFactoryFn")) {
939938
try {
940-
if (!config.consumerFactoryFnClass.contains("krb5Location")) {
941-
throw new IllegalArgumentException("The KerberosConsumerFactoryFn requires a location for the krb5.conf file. " +
942-
"Please provide either a GCS location or Google Secret Manager location for this file.");
939+
if (!config.consumerFactoryFnParams.containsKey("krb5Location")) {
940+
throw new IllegalArgumentException(
941+
"The KerberosConsumerFactoryFn requires a location for the krb5.conf file. "
942+
+ "Please provide either a GCS location or Google Secret Manager location for this file.");
943943
}
944-
builder.setConsumerFactoryFn(InstanceBuilder.ofType(new TypeDescriptor<SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>>() {})
945-
.fromClassName(config.consumerFactoryFnClass)
946-
.withArg(
947-
String.class,
948-
config.consumerFactoryFnParams.get("krb5Location"))
949-
.build());
944+
String krb5Location = config.consumerFactoryFnParams.get("krb5Location");
945+
builder.setConsumerFactoryFn(
946+
InstanceBuilder.ofType(
947+
new TypeDescriptor<
948+
SerializableFunction<
949+
Map<String, Object>, Consumer<byte[], byte[]>>>() {})
950+
.fromClassName(config.consumerFactoryFnClass)
951+
.withArg(String.class, Objects.requireNonNull(krb5Location))
952+
.build());
950953
} catch (Exception e) {
951-
throw new RuntimeException("Unable to construct FactoryFn " + config.consumerFactoryFnClass + ": " + e.getMessage(), e);
954+
throw new RuntimeException(
955+
"Unable to construct FactoryFn "
956+
+ config.consumerFactoryFnClass
957+
+ ": "
958+
+ e.getMessage(),
959+
e);
952960
}
953961
}
954962
}
@@ -1094,12 +1102,12 @@ public void setDynamicReadPollIntervalSeconds(Long dynamicReadPollIntervalSecond
10941102
}
10951103

10961104
public void setConsumerFactoryFnClass(String consumerFactoryFnClass) {
1097-
this.consumerFactoryFnClass = consumerFactoryFnClass;
1105+
this.consumerFactoryFnClass = consumerFactoryFnClass;
10981106
}
10991107

11001108
public void setConsumerFactoryFnParams(Map<String, String> consumerFactoryFnParams) {
1101-
this.consumerFactoryFnParams = consumerFactoryFnParams;
1102-
}
1109+
this.consumerFactoryFnParams = consumerFactoryFnParams;
1110+
}
11031111
}
11041112
}
11051113

sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py

Lines changed: 1 addition & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,6 @@
4141
from apache_beam.transforms.userstate import BagStateSpec
4242
from apache_beam.transforms.userstate import CombiningValueStateSpec
4343
from apache_beam.utils import subprocess_server
44-
from apache_beam.options.pipeline_options import (
45-
PipelineOptions,
46-
GoogleCloudOptions
47-
)
4844

4945
NUM_RECORDS = 1000
5046

@@ -72,7 +68,7 @@ def process(
7268

7369
class CrossLanguageKafkaIO(object):
7470
def __init__(
75-
self, bootstrap_servers=None, topic=None, null_key=None, expansion_service=None):
71+
self, bootstrap_servers, topic, null_key, expansion_service=None):
7672
self.bootstrap_servers = bootstrap_servers
7773
self.topic = topic
7874
self.null_key = null_key
@@ -111,34 +107,6 @@ def build_read_pipeline(self, pipeline, max_num_records=None):
111107
| 'CalculateSum' >> beam.ParDo(CollectingFn())
112108
| 'SetSumCounter' >> beam.Map(self.sum_counter.inc))
113109

114-
def build_read_pipeline_with_kerberos(self, p, max_num_records=None):
115-
116-
jaas_config = (
117-
f'com.sun.security.auth.module.Krb5LoginModule required '
118-
f'useKeyTab=true storeKey=true '
119-
f'keyTab="secretValue:projects/dataflow-testing-311516/secrets/kafka-client-keytab/versions/latest" '
120-
f'principal="[email protected]";'
121-
)
122-
123-
kafka_records = (
124-
p
125-
| 'ReadFromKafka' >> ReadFromKafka(
126-
consumer_config={
127-
'bootstrap.servers': 'fozzie-test-kafka-broker.us-central1-c.c.dataflow-testing-311516.internal:9092',
128-
'auto.offset.reset': 'earliest',
129-
'max_num_records': max_num_records,
130-
'security.protocol': 'SASL_PLAINTEXT',
131-
'sasl.mechanism': 'GSSAPI',
132-
'sasl.kerberos.service.name': 'kafka',
133-
'sasl.jaas.config': jaas_config
134-
},
135-
topics=['fozzie_test_kerberos_topic'],
136-
key_deserializer='org.apache.kafka.common.serialization.StringDeserializer',
137-
value_deserializer='org.apache.kafka.common.serialization.StringDeserializer',
138-
consumer_factory_fn_class='org.apache.beam.sdk.extensions.kafka.factories.KerberosConsumerFactoryFn',
139-
consumer_factory_fn_params={'krb5Location': 'gs://fozzie_testing_bucket/kerberos/krb5.conf'}))
140-
return kafka_records
141-
142110
def run_xlang_kafkaio(self, pipeline):
143111
self.build_write_pipeline(pipeline)
144112
self.build_read_pipeline(pipeline)
@@ -214,17 +182,6 @@ def test_hosted_kafkaio_null_key(self):
214182
self.run_kafka_write(pipeline_creator)
215183
self.run_kafka_read(pipeline_creator, None)
216184

217-
def test_hosted_kafkaio_null_key_kerberos(self):
218-
kafka_topic = 'xlang_kafkaio_test_null_key_{}'.format(uuid.uuid4())
219-
bootstrap_servers = 'fozzie-test-kafka-broker.us-central1-c.c.dataflow-testing-311516.internal:9092'
220-
pipeline_creator = CrossLanguageKafkaIO(
221-
bootstrap_servers,
222-
kafka_topic,
223-
True,
224-
'localhost:%s' % os.environ.get('EXPANSION_PORT'))
225-
226-
self.run_kafka_read_with_kerberos(pipeline_creator)
227-
228185
def run_kafka_write(self, pipeline_creator):
229186
with TestPipeline() as pipeline:
230187
pipeline.not_use_test_runner_api = True
@@ -239,23 +196,6 @@ def run_kafka_read(self, pipeline_creator, expected_key):
239196
equal_to([(expected_key, str(i).encode())
240197
for i in range(NUM_RECORDS)]))
241198

242-
def run_kafka_read_with_kerberos(self, pipeline_creator):
243-
options_dict = {
244-
'runner': 'DataflowRunner',
245-
'project': 'dataflow-testing-311516',
246-
'region': 'us-central1',
247-
'streaming': False
248-
}
249-
options = PipelineOptions.from_dictionary(options_dict)
250-
expected_records = [f'test{i}' for i in range(1, 12)]
251-
with beam.Pipeline(options=options) as p:
252-
pipeline.not_use_test_runner_api = True
253-
result = pipeline_creator.build_read_pipeline_with_kerberos(p, max_num_records=11)
254-
assert_that(
255-
result,
256-
equal_to(expected_records)
257-
)
258-
259199
def get_platform_localhost(self):
260200
if sys.platform == 'darwin':
261201
return 'host.docker.internal'

sdks/python/apache_beam/io/kafka.py

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -110,24 +110,19 @@
110110

111111
ReadFromKafkaSchema = typing.NamedTuple(
112112
'ReadFromKafkaSchema',
113-
[
114-
('consumer_config', typing.Mapping[str, str]),
115-
('topics', typing.List[str]),
116-
('key_deserializer', str),
117-
('value_deserializer', str),
118-
('start_read_time', typing.Optional[int]),
119-
('max_num_records', typing.Optional[int]),
120-
('max_read_time', typing.Optional[int]),
121-
('commit_offset_in_finalize', bool),
122-
('timestamp_policy', str),
123-
('consumer_polling_timeout', typing.Optional[int]),
124-
('redistribute', typing.Optional[bool]),
125-
('redistribute_num_keys', typing.Optional[np.int32]),
126-
('allow_duplicates', typing.Optional[bool]),
127-
('dynamic_read_poll_interval_seconds', typing.Optional[int])
128-
('consumer_factory_fn', typing.Optional[str]),
129-
('consumer_factory_fn_params', typing.Optional[typing.Mapping[str, str]])
130-
])
113+
[('consumer_config', typing.Mapping[str, str]),
114+
('topics', typing.List[str]), ('key_deserializer', str),
115+
('value_deserializer', str), ('start_read_time', typing.Optional[int]),
116+
('max_num_records', typing.Optional[int]),
117+
('max_read_time', typing.Optional[int]),
118+
('commit_offset_in_finalize', bool), ('timestamp_policy', str),
119+
('consumer_polling_timeout', typing.Optional[int]),
120+
('redistribute', typing.Optional[bool]),
121+
('redistribute_num_keys', typing.Optional[np.int32]),
122+
('allow_duplicates', typing.Optional[bool]),
123+
('dynamic_read_poll_interval_seconds', typing.Optional[int]),
124+
('consumer_factory_fn_class', typing.Optional[str]),
125+
('consumer_factory_fn_params', typing.Optional[typing.Mapping[str, str]])])
131126

132127

133128
def default_io_expansion_service(append_args=None):
@@ -176,8 +171,7 @@ def __init__(
176171
allow_duplicates=False,
177172
dynamic_read_poll_interval_seconds: typing.Optional[int] = None,
178173
consumer_factory_fn_class=None,
179-
consumer_factory_fn_params=None
180-
):
174+
consumer_factory_fn_params=None):
181175
"""
182176
Initializes a read operation from Kafka.
183177
@@ -220,13 +214,13 @@ def __init__(
220214
:param dynamic_read_poll_interval_seconds: The interval in seconds at which
221215
to check for new partitions. If not None, dynamic partition discovery
222216
is enabled.
223-
:param consumer_factory_fn_class: A fully qualified classpath to an existing provided
224-
consumerFactoryFn. If not None, this will construct Kafka consumers with a custom
225-
configuration.
226-
:param consumer_factory_fn_params: A map which specifies the parameters for the provided
227-
consumer_factory_fn_class. IF not None, the values in this map will be used when
228-
constructing the consumer_factory_fn_class object. This cannot be null
229-
if the consumer_factory_fn_class is not null.
217+
:param consumer_factory_fn_class: A fully qualified classpath to an
218+
existing provided consumerFactoryFn. If not None, this will construct
219+
Kafka consumers with a custom configuration.
220+
:param consumer_factory_fn_params: A map which specifies the parameters for
221+
the provided consumer_factory_fn_class. IF not None, the values in this
222+
map will be used when constructing the consumer_factory_fn_class object.
223+
This cannot be null if the consumer_factory_fn_class is not null.
230224
"""
231225
if timestamp_policy not in [ReadFromKafka.processing_time_policy,
232226
ReadFromKafka.create_time_policy,

0 commit comments

Comments
 (0)