Skip to content

Commit f3bac2e

Browse files
zxqfd555Manul from Pathway
authored andcommitted
make kafka message keys accessible (#9512)
GitOrigin-RevId: bf3966813216d6842546b3330f5abee39e781e53
1 parent 6295656 commit f3bac2e

File tree

9 files changed

+158
-50
lines changed

9 files changed

+158
-50
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
1010

1111
### Changed
1212
- The MCP server `tool` method now allows to pass an optional `description`, default value ​​being kept as the handler's docstring.
13+
- `pw.io.kafka.read` and `pw.io.redpanda.read` now create a `key` column storing the contents of the message keys.
1314

1415
## [0.27.0] - 2025-11-13
1516

integration_tests/kafka/test_simple.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import time
99
import uuid
1010

11+
import pandas as pd
1112
import pytest
1213

1314
import pathway as pw
@@ -55,6 +56,31 @@ def test_kafka_raw(with_metadata, tmp_path, kafka_context):
5556
)
5657

5758

59+
@pytest.mark.parametrize("with_metadata", [False, True])
60+
@pytest.mark.parametrize("input_format", ["plaintext", "raw"])
61+
@pytest.mark.flaky(reruns=3)
62+
def test_kafka_key_parsing(input_format, with_metadata, tmp_path, kafka_context):
63+
context = [("1", "one"), ("2", "two"), ("3", "three")]
64+
kafka_context.fill(context)
65+
66+
table = pw.io.kafka.read(
67+
rdkafka_settings=kafka_context.default_rdkafka_settings(),
68+
topic=kafka_context.input_topic,
69+
format=input_format,
70+
autocommit_duration_ms=100,
71+
with_metadata=with_metadata,
72+
mode="static",
73+
)
74+
75+
pandas_table = pw.debug.table_to_pandas(table)
76+
for key, value in context:
77+
if input_format != "plaintext":
78+
key = key.encode("utf-8") # type: ignore
79+
value = value.encode("utf-8") # type: ignore
80+
row = pandas_table.loc[pandas_table["key"] == key, ["data"]].iloc[0]
81+
assert (row == pd.Series({"data": value})).all()
82+
83+
5884
@pytest.mark.flaky(reruns=3)
5985
def test_kafka_static_mode(tmp_path, kafka_context):
6086
kafka_context.fill(["foo", "bar"])

python/pathway/io/_utils.py

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
SNAPSHOT_MODE_NAME = "streaming_with_deletions" # deprecated
3131

3232
METADATA_COLUMN_NAME = "_metadata"
33+
MESSAGE_QUEUE_KEY_COLUMN_NAME = "key"
3334

3435
STATUS_SIZE_LIMIT_EXCEEDED = "size_limit_exceeded"
3536
STATUS_DOWNLOADED = "downloaded"
@@ -206,6 +207,31 @@ def assert_schema_not_none(
206207
return schema
207208

208209

210+
class PlaintextKeySchema(pw.Schema):
211+
key: str
212+
213+
214+
class RawKeySchema(pw.Schema):
215+
key: bytes
216+
217+
218+
def construct_raw_data_schema_by_flags(
219+
*, with_native_record_key: bool, parse_utf8: bool, with_metadata: bool
220+
) -> type[pw.Schema]:
221+
Schema: Any
222+
if parse_utf8:
223+
Schema = PlaintextDataSchema
224+
if with_native_record_key:
225+
Schema = Schema | PlaintextKeySchema
226+
else:
227+
Schema = RawDataSchema
228+
if with_native_record_key:
229+
Schema = Schema | RawKeySchema
230+
if with_metadata:
231+
Schema = Schema | MetadataSchema
232+
return Schema
233+
234+
209235
def construct_schema_and_data_format(
210236
format: str,
211237
*,
@@ -215,6 +241,7 @@ def construct_schema_and_data_format(
215241
csv_settings: CsvParserSettings | None = None,
216242
json_field_paths: dict[str, str] | None = None,
217243
schema_registry_settings: SchemaRegistrySettings | None = None,
244+
with_native_record_key: bool = False,
218245
_stacklevel: int = 1,
219246
) -> tuple[type[Schema], api.DataFormat]:
220247
data_format_type = get_data_format_type(format, SUPPORTED_INPUT_FORMATS)
@@ -231,13 +258,11 @@ def construct_schema_and_data_format(
231258
raise ValueError(f"Unexpected argument for plaintext format: {param}")
232259

233260
parse_utf8 = format not in ("binary", "only_metadata")
234-
if parse_utf8:
235-
schema = PlaintextDataSchema
236-
else:
237-
schema = RawDataSchema
238-
239-
if with_metadata:
240-
schema |= MetadataSchema
261+
schema = construct_raw_data_schema_by_flags(
262+
with_native_record_key=with_native_record_key,
263+
parse_utf8=parse_utf8,
264+
with_metadata=with_metadata,
265+
)
241266
schema, api_schema = read_schema(schema)
242267

243268
return schema, api.DataFormat(
@@ -252,6 +277,9 @@ def construct_schema_and_data_format(
252277
schema_registry_settings=maybe_schema_registry_settings(
253278
schema_registry_settings
254279
),
280+
message_queue_key_field=(
281+
MESSAGE_QUEUE_KEY_COLUMN_NAME if with_native_record_key else None
282+
),
255283
)
256284

257285
schema = assert_schema_not_none(schema, data_format_type)

python/pathway/io/kafka/__init__.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ def read(
5252
If the ``"raw"`` format is chosen, the key and the payload are read from the topic as raw
5353
bytes and used in the table "as is". If you choose the ``"plaintext"`` option, however,
5454
they are parsed from the UTF-8 into the plaintext entries. In both cases, the
55-
table consists of a primary key and a single column ``"data"``, denoting the payload read.
55+
table consists of a primary key and two columns ``"key"`` and ``"data"``,
56+
denoting the key and the payload read.
5657
5758
If ``"json"`` is chosen, the connector first parses the payload of the message
5859
according to the JSON format and then creates the columns corresponding to the
@@ -108,9 +109,11 @@ def read(
108109
Returns:
109110
Table: The table read.
110111
111-
When using the format "raw", the connector will produce a single-column table:
112-
all the data is saved into a column named ``data``.
113-
For other formats, the argument value_column is required and defines the columns.
112+
When using the format ``"raw"`` or ``"plaintext"``, the connector will produce a
113+
two-column table: all the payloads are saved into a column named ``data``, while the
114+
keys are saved into a column ``key``.
115+
116+
For other formats, the schema is required and defines the columns.
114117
115118
Example:
116119
@@ -140,7 +143,8 @@ def read(
140143
... format="raw",
141144
... )
142145
143-
All the data will be accessible in the column data.
146+
All the payload data will be accessible in the column ``data``, the keys of the messages
147+
will be stored in the column ``key``.
144148
145149
JSON version:
146150
@@ -236,6 +240,7 @@ def read(
236240
schema=schema,
237241
json_field_paths=json_field_paths,
238242
schema_registry_settings=schema_registry_settings,
243+
with_native_record_key=True,
239244
_stacklevel=5,
240245
)
241246
data_source_options = datasource.DataSourceOptions(

python/pathway/io/redpanda/__init__.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,18 @@ def read(
3333
**kwargs,
3434
) -> Table:
3535
"""Reads table from a set of topics in Redpanda.
36-
There are three formats currently supported: "raw", "csv", and "json".
36+
37+
There are three formats currently supported: ``"plaintext"``, ``"raw"``, and ``"json"``.
38+
If the ``"raw"`` format is chosen, the key and the payload are read from the topic as raw
39+
bytes and used in the table "as is". If you choose the ``"plaintext"`` option, however,
40+
they are parsed from the UTF-8 into the plaintext entries. In both cases, the
41+
table consists of a primary key and two columns ``"key"`` and ``"data"``,
42+
denoting the key and the payload read.
43+
44+
If ``"json"`` is chosen, the connector first parses the payload of the message
45+
according to the JSON format and then creates the columns corresponding to the
46+
schema defined by the ``schema`` parameter. The values of these columns are
47+
taken from the respective parsed JSON fields.
3748
3849
Args:
3950
rdkafka_settings: Connection settings in the format of
@@ -45,7 +56,7 @@ def read(
4556
process them as they arrive, and send them into the engine. Alternatively,
4657
if set to ``"static"``, the engine will only read and process the data that
4758
is already available at the time of execution.
48-
format: format of the input data, "raw", "csv", or "json"
59+
format: format of the input data, ``"raw"``, ``"plaintext"``, or ``"json"``.
4960
schema_registry_settings: settings for connecting to the Confluent Schema Registry,
5061
if this type of registry is used.
5162
debug_data: Static data replacing original one when debug mode is active.
@@ -73,9 +84,11 @@ def read(
7384
Returns:
7485
Table: The table read.
7586
76-
When using the format "raw", the connector will produce a single-column table:
77-
all the data is saved into a column named `data`.
78-
For other formats, the argument value_column is required and defines the columns.
87+
When using the format ``"raw"`` or ``"plaintext"``, the connector will produce a
88+
two-column table: all the payloads are saved into a column named ``data``, while the
89+
keys are saved into a column ``key``.
90+
91+
For other formats, the schema is required and defines the columns.
7992
8093
Example:
8194

src/connectors/data_format.rs

Lines changed: 56 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -878,49 +878,87 @@ impl KeyGenerationPolicy {
878878
}
879879

880880
pub struct IdentityParser {
881-
value_fields: Vec<String>,
882881
parse_utf8: bool,
883882
metadata_column_value: Value,
884883
session_type: SessionType,
885884
key_generation_policy: KeyGenerationPolicy,
885+
886+
n_value_fields: usize,
887+
key_field_index: Option<usize>,
888+
metadata_field_index: Option<usize>,
889+
value_field_index: usize,
886890
}
887891

888892
impl IdentityParser {
889893
pub fn new(
890-
value_fields: Vec<String>,
894+
value_fields: &[String],
891895
parse_utf8: bool,
896+
message_queue_key_field: Option<&String>,
892897
key_generation_policy: KeyGenerationPolicy,
893898
session_type: SessionType,
894899
) -> IdentityParser {
900+
let mut key_field_index = None;
901+
let mut metadata_field_index = None;
902+
let mut value_field_index = None;
903+
for (index, value_field) in value_fields.iter().enumerate() {
904+
if value_field == METADATA_FIELD_NAME {
905+
assert!(metadata_field_index.is_none());
906+
metadata_field_index = Some(index);
907+
} else if Some(value_field) == message_queue_key_field {
908+
assert!(key_field_index.is_none());
909+
key_field_index = Some(index);
910+
} else {
911+
assert!(value_field_index.is_none());
912+
value_field_index = Some(index);
913+
}
914+
}
915+
895916
Self {
896-
value_fields,
917+
n_value_fields: value_fields.len(),
897918
parse_utf8,
898919
metadata_column_value: Value::None,
899920
key_generation_policy,
900921
session_type,
922+
key_field_index,
923+
metadata_field_index,
924+
value_field_index: value_field_index
925+
.expect("value field must be present in the schema"),
901926
}
902927
}
903928
}
904929

905930
impl Parser for IdentityParser {
906931
fn parse(&mut self, data: &ReaderContext) -> ParseResult {
932+
let mut values = Vec::with_capacity(self.n_value_fields);
933+
for _ in 0..self.n_value_fields {
934+
// clone isn't available for the array element type, hence constructing manually
935+
values.push(Ok(Value::None));
936+
}
937+
907938
let (event, key, value, metadata) = match data {
908939
RawBytes(event, raw_bytes) => (
909940
*event,
910941
None,
911942
value_from_bytes(raw_bytes, self.parse_utf8),
912943
Ok(None),
913944
),
914-
KeyValue((key, value)) => match value {
915-
Some(bytes) => (
916-
DataEventType::Insert,
917-
self.key_generation_policy
918-
.generate(key.as_ref(), self.parse_utf8),
919-
value_from_bytes(bytes, self.parse_utf8),
920-
Ok(None),
921-
),
922-
None => return Err(ParseError::EmptyKafkaPayload.into()),
923-
},
945+
KeyValue((key, value)) => {
946+
if let Some(key_field_index) = self.key_field_index {
947+
values[key_field_index] = key
948+
.as_ref()
949+
.map_or_else(|| Ok(Value::None), |k| value_from_bytes(k, self.parse_utf8));
950+
}
951+
match value {
952+
Some(bytes) => (
953+
DataEventType::Insert,
954+
self.key_generation_policy
955+
.generate(key.as_ref(), self.parse_utf8),
956+
value_from_bytes(bytes, self.parse_utf8),
957+
Ok(None),
958+
),
959+
None => return Err(ParseError::EmptyKafkaPayload.into()),
960+
}
961+
}
924962
Diff(_) | TokenizedEntries(_, _) => {
925963
return Err(ParseError::UnsupportedReaderContext.into())
926964
}
@@ -932,22 +970,11 @@ impl Parser for IdentityParser {
932970
let event = if is_commit {
933971
ParsedEventWithErrors::AdvanceTime
934972
} else {
935-
let mut values = Vec::new();
936-
let mut metadata = Some(metadata);
937-
let mut value = Some(value);
938-
for field in &self.value_fields {
939-
let to_insert = if field == METADATA_FIELD_NAME {
940-
metadata
941-
.take()
942-
.expect("metadata column should be used exactly once in IdentityParser")
943-
.map(|metadata| metadata.unwrap_or(self.metadata_column_value.clone()))
944-
} else {
945-
value
946-
.take()
947-
.expect("value column should be used exactly once in IdentityParser")
948-
};
949-
values.push(to_insert);
973+
if let Some(metadata_field_index) = self.metadata_field_index {
974+
values[metadata_field_index] =
975+
metadata.map(|metadata| metadata.unwrap_or(self.metadata_column_value.clone()));
950976
}
977+
values[self.value_field_index] = value;
951978
ParsedEventWithErrors::new(self.session_type(), event, key, values)
952979
};
953980

@@ -960,7 +987,7 @@ impl Parser for IdentityParser {
960987
}
961988

962989
fn column_count(&self) -> usize {
963-
self.value_fields.len()
990+
self.n_value_fields
964991
}
965992

966993
fn session_type(&self) -> SessionType {

src/python_api.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4959,6 +4959,7 @@ pub struct DataFormat {
49594959
designated_timestamp_policy: Option<String>,
49604960
external_diff_column_index: Option<usize>,
49614961
timestamp_unit: Option<String>,
4962+
message_queue_key_field: Option<String>,
49624963
}
49634964

49644965
#[pymethods]
@@ -5180,6 +5181,7 @@ impl DataFormat {
51805181
designated_timestamp_policy = None,
51815182
external_diff_column_index = None,
51825183
timestamp_unit = None,
5184+
message_queue_key_field = None,
51835185
))]
51845186
#[allow(clippy::too_many_arguments)]
51855187
fn new(
@@ -5200,6 +5202,7 @@ impl DataFormat {
52005202
designated_timestamp_policy: Option<String>,
52015203
external_diff_column_index: Option<usize>,
52025204
timestamp_unit: Option<String>,
5205+
message_queue_key_field: Option<String>,
52035206
) -> Self {
52045207
DataFormat {
52055208
format_type,
@@ -5219,6 +5222,7 @@ impl DataFormat {
52195222
designated_timestamp_policy,
52205223
external_diff_column_index,
52215224
timestamp_unit,
5225+
message_queue_key_field,
52225226
}
52235227
}
52245228

@@ -6658,8 +6662,9 @@ impl DataFormat {
66586662
Ok(Box::new(parser))
66596663
}
66606664
"identity" => Ok(Box::new(IdentityParser::new(
6661-
self.value_field_names(py),
6665+
self.value_field_names(py).as_slice(),
66626666
self.parse_utf8,
6667+
self.message_queue_key_field.as_ref(),
66636668
self.key_generation_policy,
66646669
self.session_type,
66656670
))),

0 commit comments

Comments
 (0)