Skip to content

Commit ad26a9c

Browse files
committed
Remove accidentally committed python modules
1 parent d18218a commit ad26a9c

File tree

5 files changed

+27
-76
lines changed

5 files changed

+27
-76
lines changed

sdks/java/extensions/kafka-factories/src/test/java/org/apache/beam/sdk/io/kafka/file/aware/factories/FileAwareFactoryFnTest.java

Whitespace-only changes.

sdks/java/extensions/kafka-factories/src/test/java/org/apache/beam/sdk/io/kafka/file/aware/factories/KerberosConsumerFactoryFnTest.java

Whitespace-only changes.

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.HashSet;
3737
import java.util.List;
3838
import java.util.Map;
39+
import java.util.Objects;
3940
import java.util.Optional;
4041
import java.util.Set;
4142
import java.util.regex.Pattern;
@@ -114,6 +115,7 @@
114115
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
115116
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
116117
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
118+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
117119
import org.apache.kafka.clients.CommonClientConfigs;
118120
import org.apache.kafka.clients.consumer.Consumer;
119121
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -937,18 +939,27 @@ static <K, V> void setupExternalBuilder(
937939
if (config.consumerFactoryFnClass != null) {
938940
if (config.consumerFactoryFnClass.contains("KerberosConsumerFactoryFn")) {
939941
try {
940-
if (!config.consumerFactoryFnClass.contains("krb5Location")) {
941-
throw new IllegalArgumentException("The KerberosConsumerFactoryFn requires a location for the krb5.conf file. " +
942-
"Please provide either a GCS location or Google Secret Manager location for this file.");
942+
if (!config.consumerFactoryFnParams.containsKey("krb5Location")) {
943+
throw new IllegalArgumentException(
944+
"The KerberosConsumerFactoryFn requires a location for the krb5.conf file. "
945+
+ "Please provide either a GCS location or Google Secret Manager location for this file.");
943946
}
944-
builder.setConsumerFactoryFn(InstanceBuilder.ofType(new TypeDescriptor<SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>>() {})
945-
.fromClassName(config.consumerFactoryFnClass)
946-
.withArg(
947-
String.class,
948-
config.consumerFactoryFnParams.get("krb5Location"))
949-
.build());
947+
String krb5Location = config.consumerFactoryFnParams.get("krb5Location");
948+
builder.setConsumerFactoryFn(
949+
InstanceBuilder.ofType(
950+
new TypeDescriptor<
951+
SerializableFunction<
952+
Map<String, Object>, Consumer<byte[], byte[]>>>() {})
953+
.fromClassName(config.consumerFactoryFnClass)
954+
.withArg(String.class, Objects.requireNonNull(krb5Location))
955+
.build());
950956
} catch (Exception e) {
951-
throw new RuntimeException("Unable to construct FactoryFn " + config.consumerFactoryFnClass + ": " + e.getMessage(), e);
957+
throw new RuntimeException(
958+
"Unable to construct FactoryFn "
959+
+ config.consumerFactoryFnClass
960+
+ ": "
961+
+ e.getMessage(),
962+
e);
952963
}
953964
}
954965
}
@@ -1094,12 +1105,12 @@ public void setDynamicReadPollIntervalSeconds(Long dynamicReadPollIntervalSecond
10941105
}
10951106

10961107
public void setConsumerFactoryFnClass(String consumerFactoryFnClass) {
1097-
this.consumerFactoryFnClass = consumerFactoryFnClass;
1108+
this.consumerFactoryFnClass = consumerFactoryFnClass;
10981109
}
10991110

11001111
public void setConsumerFactoryFnParams(Map<String, String> consumerFactoryFnParams) {
1101-
this.consumerFactoryFnParams = consumerFactoryFnParams;
1102-
}
1112+
this.consumerFactoryFnParams = consumerFactoryFnParams;
1113+
}
11031114
}
11041115
}
11051116

sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py

Lines changed: 1 addition & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,6 @@
4141
from apache_beam.transforms.userstate import BagStateSpec
4242
from apache_beam.transforms.userstate import CombiningValueStateSpec
4343
from apache_beam.utils import subprocess_server
44-
from apache_beam.options.pipeline_options import (
45-
PipelineOptions,
46-
GoogleCloudOptions
47-
)
4844

4945
NUM_RECORDS = 1000
5046

@@ -72,7 +68,7 @@ def process(
7268

7369
class CrossLanguageKafkaIO(object):
7470
def __init__(
75-
self, bootstrap_servers=None, topic=None, null_key=None, expansion_service=None):
71+
self, bootstrap_servers, topic, null_key, expansion_service=None):
7672
self.bootstrap_servers = bootstrap_servers
7773
self.topic = topic
7874
self.null_key = null_key
@@ -111,34 +107,6 @@ def build_read_pipeline(self, pipeline, max_num_records=None):
111107
| 'CalculateSum' >> beam.ParDo(CollectingFn())
112108
| 'SetSumCounter' >> beam.Map(self.sum_counter.inc))
113109

114-
def build_read_pipeline_with_kerberos(self, p, max_num_records=None):
115-
116-
jaas_config = (
117-
f'com.sun.security.auth.module.Krb5LoginModule required '
118-
f'useKeyTab=true storeKey=true '
119-
f'keyTab="secretValue:projects/dataflow-testing-311516/secrets/kafka-client-keytab/versions/latest" '
120-
f'principal="[email protected]";'
121-
)
122-
123-
kafka_records = (
124-
p
125-
| 'ReadFromKafka' >> ReadFromKafka(
126-
consumer_config={
127-
'bootstrap.servers': 'fozzie-test-kafka-broker.us-central1-c.c.dataflow-testing-311516.internal:9092',
128-
'auto.offset.reset': 'earliest',
129-
'max_num_records': max_num_records,
130-
'security.protocol': 'SASL_PLAINTEXT',
131-
'sasl.mechanism': 'GSSAPI',
132-
'sasl.kerberos.service.name': 'kafka',
133-
'sasl.jaas.config': jaas_config
134-
},
135-
topics=['fozzie_test_kerberos_topic'],
136-
key_deserializer='org.apache.kafka.common.serialization.StringDeserializer',
137-
value_deserializer='org.apache.kafka.common.serialization.StringDeserializer',
138-
consumer_factory_fn_class='org.apache.beam.sdk.extensions.kafka.factories.KerberosConsumerFactoryFn',
139-
consumer_factory_fn_params={'krb5Location': 'gs://fozzie_testing_bucket/kerberos/krb5.conf'}))
140-
return kafka_records
141-
142110
def run_xlang_kafkaio(self, pipeline):
143111
self.build_write_pipeline(pipeline)
144112
self.build_read_pipeline(pipeline)
@@ -214,17 +182,6 @@ def test_hosted_kafkaio_null_key(self):
214182
self.run_kafka_write(pipeline_creator)
215183
self.run_kafka_read(pipeline_creator, None)
216184

217-
def test_hosted_kafkaio_null_key_kerberos(self):
218-
kafka_topic = 'xlang_kafkaio_test_null_key_{}'.format(uuid.uuid4())
219-
bootstrap_servers = 'fozzie-test-kafka-broker.us-central1-c.c.dataflow-testing-311516.internal:9092'
220-
pipeline_creator = CrossLanguageKafkaIO(
221-
bootstrap_servers,
222-
kafka_topic,
223-
True,
224-
'localhost:%s' % os.environ.get('EXPANSION_PORT'))
225-
226-
self.run_kafka_read_with_kerberos(pipeline_creator)
227-
228185
def run_kafka_write(self, pipeline_creator):
229186
with TestPipeline() as pipeline:
230187
pipeline.not_use_test_runner_api = True
@@ -239,23 +196,6 @@ def run_kafka_read(self, pipeline_creator, expected_key):
239196
equal_to([(expected_key, str(i).encode())
240197
for i in range(NUM_RECORDS)]))
241198

242-
def run_kafka_read_with_kerberos(self, pipeline_creator):
243-
options_dict = {
244-
'runner': 'DataflowRunner',
245-
'project': 'dataflow-testing-311516',
246-
'region': 'us-central1',
247-
'streaming': False
248-
}
249-
options = PipelineOptions.from_dictionary(options_dict)
250-
expected_records = [f'test{i}' for i in range(1, 12)]
251-
with beam.Pipeline(options=options) as p:
252-
pipeline.not_use_test_runner_api = True
253-
result = pipeline_creator.build_read_pipeline_with_kerberos(p, max_num_records=11)
254-
assert_that(
255-
result,
256-
equal_to(expected_records)
257-
)
258-
259199
def get_platform_localhost(self):
260200
if sys.platform == 'darwin':
261201
return 'host.docker.internal'

sdks/python/apache_beam/io/kafka.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,8 @@
124124
('redistribute', typing.Optional[bool]),
125125
('redistribute_num_keys', typing.Optional[np.int32]),
126126
('allow_duplicates', typing.Optional[bool]),
127-
('dynamic_read_poll_interval_seconds', typing.Optional[int])
128-
('consumer_factory_fn', typing.Optional[str]),
127+
('dynamic_read_poll_interval_seconds', typing.Optional[int]),
128+
('consumer_factory_fn_class', typing.Optional[str]),
129129
('consumer_factory_fn_params', typing.Optional[typing.Mapping[str, str]])
130130
])
131131

0 commit comments

Comments
 (0)