Skip to content

Commit ecf0dd9

Browse files
zxqfd555Manul from Pathway
authored andcommitted
add kafka key support in JSONLines format (#9519)
GitOrigin-RevId: 9027601933f583ab611cf1ffe289c70d8fcc7a4d
1 parent 803e413 commit ecf0dd9

File tree

19 files changed

+693
-214
lines changed

19 files changed

+693
-214
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ All notable changes to this project will be documented in this file.
55
This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66
## [Unreleased]
77

8+
### Added
9+
- `pw.io.kafka.read` and `pw.io.redpanda.read` now allow each schema field to be specified as coming from either the message key or the message value.
10+
811
## [0.27.1] - 2025-12-08
912

1013
### Added

integration_tests/kafka/test_simple.py

Lines changed: 202 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import time
99
import uuid
1010

11-
import pandas as pd
1211
import pytest
1312

1413
import pathway as pw
@@ -60,9 +59,16 @@ def test_kafka_raw(with_metadata, tmp_path, kafka_context):
6059
@pytest.mark.parametrize("input_format", ["plaintext", "raw"])
6160
@pytest.mark.flaky(reruns=3)
6261
def test_kafka_key_parsing(input_format, with_metadata, tmp_path, kafka_context):
63-
context = [("1", "one"), ("2", "two"), ("3", "three")]
62+
context = [
63+
("1", "one"),
64+
("2", "two"),
65+
("3", "three"),
66+
("4", None),
67+
(None, "five"),
68+
]
6469
kafka_context.fill(context)
6570

71+
output_path = tmp_path / "output.jsonl"
6672
table = pw.io.kafka.read(
6773
rdkafka_settings=kafka_context.default_rdkafka_settings(),
6874
topic=kafka_context.input_topic,
@@ -71,14 +77,23 @@ def test_kafka_key_parsing(input_format, with_metadata, tmp_path, kafka_context)
7177
with_metadata=with_metadata,
7278
mode="static",
7379
)
80+
pw.io.jsonlines.write(table, output_path)
81+
pw.run()
7482

75-
pandas_table = pw.debug.table_to_pandas(table)
76-
for key, value in context:
77-
if input_format != "plaintext":
78-
key = key.encode("utf-8") # type: ignore
79-
value = value.encode("utf-8") # type: ignore
80-
row = pandas_table.loc[pandas_table["key"] == key, ["data"]].iloc[0]
81-
assert (row == pd.Series({"data": value})).all()
83+
parsed_values = []
84+
with open(output_path, "r") as f:
85+
for row in f:
86+
data = json.loads(row)
87+
key = data["key"]
88+
value = data["data"]
89+
if input_format == "raw" and key is not None:
90+
key = base64.b64decode(key).decode("utf-8")
91+
if input_format == "raw" and value is not None:
92+
value = base64.b64decode(value).decode("utf-8")
93+
parsed_values.append((key, value))
94+
parsed_values.sort(key=lambda data: str(data[0]))
95+
context.sort(key=lambda data: str(data[0]))
96+
assert parsed_values == context
8297

8398

8499
@pytest.mark.flaky(reruns=3)
@@ -172,6 +187,133 @@ class InputSchema(pw.Schema):
172187
)
173188

174189

190+
@pytest.mark.parametrize("with_metadata", [False, True])
191+
@pytest.mark.flaky(reruns=3)
192+
def test_kafka_json_key_parsing(tmp_path, kafka_context, with_metadata):
193+
context = [
194+
(json.dumps({"k": 0}), json.dumps({"v": "foo"})),
195+
(json.dumps({"k": 1}), json.dumps({"v": "bar"})),
196+
(json.dumps({"k": 2}), json.dumps({"v": "baz"})),
197+
]
198+
kafka_context.fill(context)
199+
200+
class InputSchema(pw.Schema):
201+
k: int = pw.column_definition(primary_key=True, source_component="key")
202+
v: str = pw.column_definition(primary_key=True, source_component="payload")
203+
204+
table = pw.io.kafka.read(
205+
rdkafka_settings=kafka_context.default_rdkafka_settings(),
206+
topic=kafka_context.input_topic,
207+
format="json",
208+
schema=InputSchema,
209+
with_metadata=with_metadata,
210+
autocommit_duration_ms=100,
211+
)
212+
213+
pw.io.csv.write(table, tmp_path / "output.csv")
214+
215+
wait_result_with_checker(
216+
expect_csv_checker(
217+
"""
218+
k | v
219+
0 | foo
220+
1 | bar
221+
2 | baz
222+
""",
223+
tmp_path / "output.csv",
224+
usecols=["v"],
225+
index_col=["k"],
226+
),
227+
10,
228+
)
229+
230+
231+
@pytest.mark.parametrize("with_metadata", [False, True])
232+
@pytest.mark.flaky(reruns=3)
233+
def test_kafka_json_key_jsonpaths(tmp_path, kafka_context, with_metadata):
234+
context = [
235+
(json.dumps({"k": {"l": 0, "m": 3}}), json.dumps({"v": {"vv": "foo"}})),
236+
(json.dumps({"k": {"l": 1, "m": 4}}), json.dumps({"v": {"vv": "bar"}})),
237+
(json.dumps({"k": {"l": 2, "m": 5}}), json.dumps({"v": {"vv": "baz"}})),
238+
]
239+
kafka_context.fill(context)
240+
241+
class InputSchema(pw.Schema):
242+
k: int = pw.column_definition(primary_key=True, source_component="key")
243+
v: str = pw.column_definition(primary_key=True, source_component="payload")
244+
245+
table = pw.io.kafka.read(
246+
rdkafka_settings=kafka_context.default_rdkafka_settings(),
247+
topic=kafka_context.input_topic,
248+
format="json",
249+
schema=InputSchema,
250+
with_metadata=with_metadata,
251+
autocommit_duration_ms=100,
252+
json_field_paths={"k": "/k/l", "v": "/v/vv"},
253+
)
254+
255+
pw.io.csv.write(table, tmp_path / "output.csv")
256+
257+
wait_result_with_checker(
258+
expect_csv_checker(
259+
"""
260+
k | v
261+
0 | foo
262+
1 | bar
263+
2 | baz
264+
""",
265+
tmp_path / "output.csv",
266+
usecols=["v"],
267+
index_col=["k"],
268+
),
269+
10,
270+
)
271+
272+
273+
@pytest.mark.parametrize("with_metadata", [False, True])
274+
@pytest.mark.parametrize("unparsable_value", ["abracadabra", None])
275+
@pytest.mark.flaky(reruns=3)
276+
def test_kafka_json_data_only_in_key(
277+
tmp_path, unparsable_value, kafka_context, with_metadata
278+
):
279+
context = [
280+
(json.dumps({"k": 0, "v": "foo"}), unparsable_value),
281+
(json.dumps({"k": 1, "v": "bar"}), unparsable_value),
282+
(json.dumps({"k": 2, "v": "baz"}), unparsable_value),
283+
]
284+
kafka_context.fill(context)
285+
286+
class InputSchema(pw.Schema):
287+
k: int = pw.column_definition(primary_key=True, source_component="key")
288+
v: str = pw.column_definition(primary_key=True, source_component="key")
289+
290+
table = pw.io.kafka.read(
291+
rdkafka_settings=kafka_context.default_rdkafka_settings(),
292+
topic=kafka_context.input_topic,
293+
format="json",
294+
schema=InputSchema,
295+
with_metadata=with_metadata,
296+
autocommit_duration_ms=100,
297+
)
298+
299+
pw.io.csv.write(table, tmp_path / "output.csv")
300+
301+
wait_result_with_checker(
302+
expect_csv_checker(
303+
"""
304+
k | v
305+
0 | foo
306+
1 | bar
307+
2 | baz
308+
""",
309+
tmp_path / "output.csv",
310+
usecols=["v"],
311+
index_col=["k"],
312+
),
313+
10,
314+
)
315+
316+
175317
@pytest.mark.flaky(reruns=3)
176318
def test_kafka_simple_wrapper_bytes_io(
177319
tmp_path: pathlib.Path, kafka_context: KafkaTestContext
@@ -704,6 +846,7 @@ def test_kafka_registry(tmp_path, kafka_context):
704846

705847
input_path = tmp_path / "input.jsonl"
706848
output_path = tmp_path / "output.jsonl"
849+
raw_output_path = tmp_path / "output_raw.jsonl"
707850
input_entries = [
708851
{"key": 1, "value": "one"},
709852
{"key": 2, "value": "two"},
@@ -743,9 +886,17 @@ class TableSchema(pw.Schema):
743886
timeout=datetime.timedelta(seconds=5),
744887
),
745888
)
889+
table_raw = pw.io.kafka.read(
890+
rdkafka_settings=kafka_context.default_rdkafka_settings(),
891+
topic=kafka_context.input_topic,
892+
format="raw",
893+
)
746894

747895
pw.io.jsonlines.write(table_reread, output_path)
748-
wait_result_with_checker(FileLinesNumberChecker(output_path, 2), 30)
896+
pw.io.jsonlines.write(table_raw, raw_output_path)
897+
wait_result_with_checker(
898+
FileLinesNumberChecker(output_path, 2).add_path(raw_output_path, 2), 30
899+
)
749900
output_entries = []
750901
with open(output_path, "r") as f:
751902
for line in f:
@@ -758,3 +909,44 @@ class TableSchema(pw.Schema):
758909
)
759910
output_entries.sort(key=lambda x: x["key"])
760911
assert output_entries == input_entries
912+
913+
# Send the data encoded by the registry as a key, while keeping the value as empty.
914+
# Check that value parsing works.
915+
additional_topic = kafka_context.create_additional_topic()
916+
with open(raw_output_path, "r") as f:
917+
for line in f:
918+
data = json.loads(line)
919+
encoded_message = base64.b64decode(data["data"])
920+
kafka_context.send(message=(encoded_message, None), topic=additional_topic)
921+
922+
class KeyTableSchema(pw.Schema):
923+
key: int = pw.column_definition(source_component="key")
924+
value: str = pw.column_definition(source_component="key")
925+
926+
G.clear()
927+
table = pw.io.kafka.read(
928+
rdkafka_settings=kafka_context.default_rdkafka_settings(),
929+
topic=additional_topic,
930+
format="json",
931+
schema=KeyTableSchema,
932+
schema_registry_settings=pw.io.kafka.SchemaRegistrySettings(
933+
urls=[SCHEMA_REGISTRY_BASE_ROUTE],
934+
timeout=datetime.timedelta(seconds=5),
935+
),
936+
mode="static",
937+
)
938+
pw.io.jsonlines.write(table, output_path)
939+
pw.run(monitoring_level=pw.MonitoringLevel.NONE)
940+
941+
roundtrip_entries = []
942+
with open(output_path, "r") as f:
943+
for line in f:
944+
data = json.loads(line)
945+
roundtrip_entries.append(
946+
{
947+
"key": data["key"],
948+
"value": data["value"],
949+
}
950+
)
951+
roundtrip_entries.sort(key=lambda x: x["key"])
952+
assert roundtrip_entries == input_entries

integration_tests/kafka/utils.py

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@
2121
KINESIS_ENDPOINT_URL = "http://kinesis:4567"
2222

2323

24+
def random_topic_name():
25+
return f"integration-tests-{uuid4()}"
26+
27+
2428
class KafkaTestContext:
2529
_producer: KafkaProducer
2630
_admin: KafkaAdminClient
@@ -34,25 +38,43 @@ def __init__(self) -> None:
3438
self._admin = KafkaAdminClient(
3539
bootstrap_servers=KAFKA_SETTINGS["bootstrap_servers"],
3640
)
37-
self._input_topic = f"integration-tests-{uuid4()}"
38-
self._output_topic = f"integration-tests-{uuid4()}"
41+
self._input_topic = random_topic_name()
42+
self._output_topic = random_topic_name()
43+
self._created_topics: set[str] = set()
44+
3945
self._create_topic(self.input_topic)
4046
self._create_topic(self.output_topic)
4147

48+
def create_additional_topic(self) -> str:
49+
topic_name = random_topic_name()
50+
self._create_topic(topic_name)
51+
return topic_name
52+
4253
def _create_topic(self, name: str, num_partitions: int = 1) -> None:
4354
self._admin.create_topics(
4455
[NewTopic(name=name, num_partitions=num_partitions, replication_factor=1)]
4556
)
57+
self._created_topics.add(name)
4658

4759
def _delete_topic(self, name: str) -> None:
4860
self._admin.delete_topics(topics=[name])
4961

50-
def send(self, message: str | tuple[str, str]) -> None:
62+
def send(
63+
self, message: str | tuple[str | bytes | None, str | bytes | None], topic=None
64+
) -> None:
65+
topic = topic or self._input_topic
66+
5167
if isinstance(message, tuple):
5268
(key, value) = message
5369
else:
5470
(key, value) = str(uuid4()), message
55-
self._producer.send(self.input_topic, key=key.encode(), value=value.encode())
71+
72+
if isinstance(key, str):
73+
key = key.encode()
74+
if isinstance(value, str):
75+
value = value.encode()
76+
77+
self._producer.send(topic, key=key, value=value)
5678

5779
def set_input_topic_partitions(self, num_partitions: int):
5880
self._delete_topic(self._input_topic)
@@ -97,8 +119,8 @@ def read_input_topic(self, poll_timeout_ms: int = 1000) -> list[ConsumerRecord]:
97119
return self.read_topic(self._input_topic, poll_timeout_ms)
98120

99121
def teardown(self) -> None:
100-
self._delete_topic(self.input_topic)
101-
self._delete_topic(self.output_topic)
122+
for topic in self._created_topics:
123+
self._delete_topic(topic)
102124
self._producer.close()
103125
self._admin.close()
104126

python/pathway/engine.pyi

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -931,9 +931,15 @@ class AwsS3Settings:
931931
class AzureBlobStorageSettings:
932932
def __init__(self, *args, **kwargs): ...
933933

934+
class FieldSource(Enum):
935+
KEY: FieldSource
936+
PAYLOAD: FieldSource
937+
934938
class ValueField:
935939
name: str
936-
def __init__(self, name: str, type_: PathwayType): ...
940+
def __init__(
941+
self, name: str, type_: PathwayType, source: FieldSource = FieldSource.PAYLOAD
942+
): ...
937943
def set_default(self, *args, **kwargs): ...
938944
def set_metadata(self, *args, **kwargs): ...
939945

python/pathway/internals/_io_helpers.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ def _format_output_value_fields(table: Table) -> list[api.ValueField]:
195195
value_field = api.ValueField(
196196
column_name,
197197
column_data.dtype.to_engine(),
198+
source=column_data.engine_field_source,
198199
)
199200
value_field.set_metadata(
200201
json.dumps(column_data.to_json_serializable_dict(), sort_keys=True)
@@ -209,11 +210,21 @@ def _form_value_fields(schema: type[schema.Schema]) -> list[api.ValueField]:
209210
default_values = schema.default_values()
210211
result = []
211212

212-
types = {name: dtype.to_engine() for name, dtype in schema._dtypes().items()}
213-
213+
columns = schema.columns()
214214
for f in schema.column_names():
215-
dtype = types.get(f, api.PathwayType.ANY)
216-
value_field = api.ValueField(f, dtype)
215+
item = columns.get(f)
216+
if item is None:
217+
value_field = api.ValueField(
218+
f,
219+
api.PathwayType.ANY,
220+
source=api.FieldSource.PAYLOAD,
221+
)
222+
else:
223+
value_field = api.ValueField(
224+
f,
225+
item.dtype.to_engine(),
226+
source=item.engine_field_source,
227+
)
217228
if f in default_values:
218229
value_field.set_default(default_values[f])
219230
result.append(value_field)

0 commit comments

Comments
 (0)