Skip to content

[FLINK-37863] add DynamicKafkaSource interface #177

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 153 additions & 0 deletions flink-python/pyflink/datastream/connectors/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from pyflink.util.java_utils import to_jarray, get_field, get_field_value

__all__ = [
'DynamicKafkaSource',
'DynamicKafkaSourceBuilder',
'KafkaSource',
'KafkaSourceBuilder',
'KafkaSink',
Expand All @@ -41,6 +43,157 @@
'KafkaTopicSelector'
]

# ---- DynamicKafkaSource ----


class DynamicKafkaSource(Source):
"""Python wrapper of the Java DynamicKafkaSource.

A DynamicKafkaSource enables consuming records from dynamically discovered Kafka streams
(topics that may span multiple clusters) without restarting the Flink job.

Use :py:meth:`builder` to construct an instance, for example::

>>> source = DynamicKafkaSource.builder() \\
... .set_stream_ids({"stream-a", "stream-b"}) \\
... .set_kafka_metadata_service(metadata_service) \\
... .set_value_only_deserializer(SimpleStringSchema()) \\
... .set_property("group.id", "my_group") \\
... .build()

The builder methods closely mirror their Java counterparts defined on
``org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceBuilder``.
"""

def __init__(self, j_dynamic_kafka_source: JavaObject):
super().__init__(j_dynamic_kafka_source)

@staticmethod
def builder() -> 'DynamicKafkaSourceBuilder':
"""Create and return a new :class:`DynamicKafkaSourceBuilder`."""
return DynamicKafkaSourceBuilder()


class DynamicKafkaSourceBuilder(object):
"""Builder for :class:`DynamicKafkaSource`.

The builder is a thin Python wrapper delegating to the underlying Java builder defined in
``org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceBuilder``.
"""

def __init__(self):
self._gateway = get_gateway()
self._j_builder = (
self._gateway.jvm.org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSource
.builder()
)

# ---------------------------------------------------------------------
# Build
# ---------------------------------------------------------------------

def build(self) -> 'DynamicKafkaSource':
"""Finalize the configuration and return a :class:`DynamicKafkaSource`."""
return DynamicKafkaSource(self._j_builder.build())

# ---------------------------------------------------------------------
# Stream subscription configuration
# ---------------------------------------------------------------------

def set_stream_ids(self, stream_ids: Set[str]) -> 'DynamicKafkaSourceBuilder':
"""Subscribe to a fixed set of stream IDs.

:param stream_ids: A Python ``set`` of stream IDs.
"""
j_set = self._gateway.jvm.java.util.HashSet()
for stream_id in stream_ids:
j_set.add(stream_id)
self._j_builder.setStreamIds(j_set)
return self

def set_stream_pattern(self, stream_pattern: str) -> 'DynamicKafkaSourceBuilder':
"""Subscribe to streams whose IDs match the given Java regex pattern."""
j_pattern = self._gateway.jvm.java.util.regex.Pattern.compile(stream_pattern)
self._j_builder.setStreamPattern(j_pattern)
return self

def set_kafka_stream_subscriber(self, kafka_stream_subscriber: JavaObject) -> \
'DynamicKafkaSourceBuilder':
"""Use a custom ``KafkaStreamSubscriber`` implementation."""
self._j_builder.setKafkaStreamSubscriber(kafka_stream_subscriber)
return self

# ---------------------------------------------------------------------
# Metadata service
# ---------------------------------------------------------------------

def set_kafka_metadata_service(self, kafka_metadata_service: JavaObject) -> \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about also introducing some basic built-in KafkaMetadataService implementations, e.g. SingleClusterTopicMetadataService, etc. Otherwise, it would be difficult for Python users to use.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I'll see what it'll take to make that happen, although I have not yet looked at how much of the Java<>Python bindings work and whether anything special is needed for this.

Any chance you have a pointer to any easy solution for packaging up an arbitrary JavaObject?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation of Python DynamicKafkaSource in this pull request could be seen as just a wrapper of an arbitrary JavaObject?

'DynamicKafkaSourceBuilder':
"""Specify the :class:`KafkaMetadataService` that resolves stream IDs to clusters."""
self._j_builder.setKafkaMetadataService(kafka_metadata_service)
return self

# ---------------------------------------------------------------------
# Offset initializers & boundedness
# ---------------------------------------------------------------------

def set_starting_offsets(self, starting_offsets_initializer: 'KafkaOffsetsInitializer') -> \
'DynamicKafkaSourceBuilder':
self._j_builder.setStartingOffsets(starting_offsets_initializer._j_initializer)
return self

def set_bounded(self, stopping_offsets_initializer: 'KafkaOffsetsInitializer') -> \
'DynamicKafkaSourceBuilder':
self._j_builder.setBounded(stopping_offsets_initializer._j_initializer)
return self

# ---------------------------------------------------------------------
# Deserializer helpers
# ---------------------------------------------------------------------

def set_deserializer(self, kafka_record_deserializer: JavaObject) -> \
'DynamicKafkaSourceBuilder':
"""Set a custom Java ``KafkaRecordDeserializationSchema`` instance."""
self._j_builder.setDeserializer(kafka_record_deserializer)
return self

def set_value_only_deserializer(self, deserialization_schema: DeserializationSchema) -> \
'DynamicKafkaSourceBuilder':
"""Convenience method to deserialize the *value* of each Kafka record using the provided
:class:`~pyflink.common.serialization.DeserializationSchema`. Other fields (key, headers,
etc.) are ignored.
"""
j_schema = deserialization_schema._j_deserialization_schema
j_value_only_wrapper = (
self._gateway.jvm.org.apache.flink.connector.kafka.source.reader.deserializer
.KafkaRecordDeserializationSchema.valueOnly(j_schema)
)
self._j_builder.setDeserializer(j_value_only_wrapper)
return self

# ---------------------------------------------------------------------
# Misc Kafka consumer properties
# ---------------------------------------------------------------------

def set_properties(self, props: Dict[str, str]) -> 'DynamicKafkaSourceBuilder':
j_props = self._gateway.jvm.java.util.Properties()
for k, v in props.items():
j_props.setProperty(k, v)
self._j_builder.setProperties(j_props)
return self

def set_property(self, key: str, value: str) -> 'DynamicKafkaSourceBuilder':
self._j_builder.setProperty(key, value)
return self

def set_group_id(self, group_id: str) -> 'DynamicKafkaSourceBuilder':
self._j_builder.setGroupId(group_id)
return self

def set_client_id_prefix(self, prefix: str) -> 'DynamicKafkaSourceBuilder':
self._j_builder.setClientIdPrefix(prefix)
return self


# ---- KafkaSource ----

Expand Down
112 changes: 112 additions & 0 deletions flink-python/pyflink/datastream/connectors/tests/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
from pyflink.datastream.connectors.base import DeliveryGuarantee
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaTopicPartition, \
KafkaOffsetsInitializer, KafkaOffsetResetStrategy, KafkaRecordSerializationSchema, KafkaSink
# Import newly added DynamicKafkaSource
from pyflink.datastream.connectors.kafka import DynamicKafkaSource
from pyflink.datastream.formats.avro import AvroRowDeserializationSchema, AvroRowSerializationSchema
from pyflink.datastream.formats.csv import CsvRowDeserializationSchema, CsvRowSerializationSchema
from pyflink.datastream.formats.json import JsonRowDeserializationSchema, JsonRowSerializationSchema
Expand Down Expand Up @@ -380,6 +382,116 @@ def _get_kafka_source_configuration(source: KafkaSource):
return Configuration(j_configuration=j_configuration)


class DynamicKafkaSourceTests(PyFlinkStreamingTestCase):

def _create_metadata_service(self):
"""Create a simple metadata service pointing to a dummy cluster."""
jvm = get_gateway().jvm
props = jvm.java.util.Properties()
# Minimal bootstrap servers property to satisfy Kafka client validation.
props.setProperty("bootstrap.servers", "localhost:9092")
return jvm.org.apache.flink.connector.kafka.dynamic.metadata.SingleClusterTopicMetadataService(
"dummy-cluster", props)

def _build_base_source(self):
return DynamicKafkaSource.builder() \
.set_stream_ids({"stream_test"}) \
.set_kafka_metadata_service(self._create_metadata_service()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.set_group_id("test_group") \
.build()

def test_compiling(self):
source = self._build_base_source()
ds = self.env.from_source(
source=source,
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
source_name="dynamic kafka source")
ds.print()
plan = json.loads(self.env.get_execution_plan())
self.assertEqual('Source: dynamic kafka source', plan['nodes'][0]['type'])

def test_set_properties(self):
source = DynamicKafkaSource.builder() \
.set_stream_ids({"stream_test"}) \
.set_kafka_metadata_service(self._create_metadata_service()) \
.set_group_id("test_group_id") \
.set_client_id_prefix("test_client_id_prefix") \
.set_property("test_property", "test_value") \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()

# Extract the internal properties field for verification.
props = get_field_value(source.get_java_function(), 'properties')
self.assertEqual(props.getProperty('group.id'), 'test_group_id')
self.assertEqual(props.getProperty('client.id.prefix'), 'test_client_id_prefix')
self.assertEqual(props.getProperty('test_property'), 'test_value')

def test_set_stream_ids(self):
stream_ids = {"stream_a", "stream_b"}
source = DynamicKafkaSource.builder() \
.set_stream_ids(stream_ids) \
.set_kafka_metadata_service(self._create_metadata_service()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()

subscriber = get_field_value(source.get_java_function(), 'kafkaStreamSubscriber')
self.assertEqual(
subscriber.getClass().getCanonicalName(),
'org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSetSubscriber'
)

subscribed_ids = get_field_value(subscriber, 'streamIds')
self.assertTrue(is_instance_of(subscribed_ids, get_gateway().jvm.java.util.Set))
self.assertEqual(subscribed_ids.size(), len(stream_ids))
for s in stream_ids:
self.assertTrue(subscribed_ids.contains(s))

def test_set_stream_pattern(self):
pattern = 'stream_*'
source = DynamicKafkaSource.builder() \
.set_stream_pattern(pattern) \
.set_kafka_metadata_service(self._create_metadata_service()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()

subscriber = get_field_value(source.get_java_function(), 'kafkaStreamSubscriber')
self.assertEqual(
subscriber.getClass().getCanonicalName(),
'org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.StreamPatternSubscriber'
)
j_pattern = get_field_value(subscriber, 'streamPattern')
self.assertTrue(is_instance_of(j_pattern, get_gateway().jvm.java.util.regex.Pattern))
self.assertEqual(j_pattern.toString(), pattern)

def test_bounded(self):
source = DynamicKafkaSource.builder() \
.set_stream_ids({"stream_test"}) \
.set_kafka_metadata_service(self._create_metadata_service()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.set_bounded(KafkaOffsetsInitializer.latest()) \
.build()

self.assertEqual(
get_field_value(source.get_java_function(), 'boundedness').toString(), 'BOUNDED'
)

def test_starting_offsets(self):
source = DynamicKafkaSource.builder() \
.set_stream_ids({"stream_test"}) \
.set_kafka_metadata_service(self._create_metadata_service()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.set_starting_offsets(KafkaOffsetsInitializer.latest()) \
.build()

initializer = get_field_value(source.get_java_function(), 'startingOffsetsInitializer')
self.assertEqual(
initializer.getClass().getCanonicalName(),
'org.apache.flink.connector.kafka.source.enumerator.initializer.LatestOffsetsInitializer'
)



class KafkaSinkTests(PyFlinkStreamingTestCase):

def test_compile(self):
Expand Down