Skip to content

Commit f5c12f1

Browse files
authored
feat(mlop-2760): Create metadata module (feature, feature_set, reader, writer and pipeline) (#418)
# Why This change introduces a robust metadata generation system for Butterfree components. The primary motivation is to create a standardized and automated way to: - Document the schema of feature set pipelines and their components. - Facilitate serialization of pipeline metadata for various use cases, such as cataloging, lineage tracking, and integration with other tools. **This will improve the maintainability, understandability, and interoperability of feature set pipelines within the Butterfree ecosystem.** # What ## New `metadata` module: - Introduced a new `butterfree/metadata` directory to house all Pydantic models for metadata. ## Dependencies - `Pydantic` added to requirements ## Tests - Unit tests for EVERY behavior of new class - Integration tests for pipeline with `FeatureSet` and with `AggregatedFeatureSet` # How The solution refactors and centralizes metadata definition using Pydantic models within a dedicated `butterfree/metadata` directory. 1. **Hierarchical Metadata Construction:** * Individual components (`Feature`, `KeyFeature`, `TimestampFeature`, `Window`, `Reader` subclasses, `Writer`) are responsible for building their own metadata Pydantic models (`FeatureMetadata`, `ReaderMetadata` variants, `WriterMetadata`). * Container classes (`FeatureSet`, `AggregatedFeatureSet`, `FeatureSetPipeline`) aggregate metadata from their constituent parts. For instance, `FeatureSet.build_metadata()` calls `build_metadata()` on its keys, timestamp, and then constructs metadata for its transformed features before packaging it all into a `FeatureSetMetadata` object. 2. **Pydantic Model Usage:** Each metadata type (e.g., for a feature, a reader, a feature set) has a corresponding Pydantic model ensuring structure and validation.
1 parent a411f10 commit f5c12f1

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1484
-67
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,5 @@ init/
121121

122122
# integration tests artifacts
123123
metastore_db/
124+
125+
.DS_Store

butterfree/extract/readers/file_reader.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from butterfree.clients import SparkClient
99
from butterfree.extract.readers.reader import Reader
10+
from butterfree.metadata.reader_metadata import FileReaderMetadata
1011

1112

1213
class FileReader(Reader):
@@ -117,3 +118,11 @@ def consume(self, client: SparkClient) -> DataFrame:
117118
path=self.path,
118119
**self.options,
119120
)
121+
122+
def build_metadata(self) -> FileReaderMetadata:
123+
"""Build the metadata for the file reader."""
124+
return FileReaderMetadata(
125+
path=self.path,
126+
format=self.format,
127+
incremental_strategy=self.incremental_strategy is not None,
128+
)

butterfree/extract/readers/kafka_reader.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from butterfree.configs import environment
1010
from butterfree.extract.pre_processing import explode_json_column
1111
from butterfree.extract.readers.reader import Reader
12+
from butterfree.metadata.reader_metadata import KafkaReaderMetadata
1213

1314

1415
class KafkaReader(Reader):
@@ -182,3 +183,9 @@ def consume(self, client: SparkClient) -> DataFrame:
182183

183184
# apply schema defined in self.value_schema
184185
return self._struct_df(raw_df)
186+
187+
def build_metadata(self) -> KafkaReaderMetadata:
188+
"""Build the metadata for the kafka reader."""
189+
return KafkaReaderMetadata(
190+
topic=self.topic,
191+
)

butterfree/extract/readers/reader.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,25 @@
22

33
from abc import ABC, abstractmethod
44
from functools import reduce
5-
from typing import Any, Callable, Dict, List, Optional
5+
from typing import Any, Callable, Dict, List, Optional, Union
66

77
from pyspark.sql import DataFrame
88

99
from butterfree.clients import SparkClient
1010
from butterfree.dataframe_service import IncrementalStrategy
1111
from butterfree.hooks import HookableComponent
12+
from butterfree.metadata.reader_metadata import (
13+
FileReaderMetadata,
14+
KafkaReaderMetadata,
15+
TableReaderMetadata,
16+
)
1217

1318

1419
class Reader(ABC, HookableComponent):
1520
"""Abstract base class for Readers.
1621
1722
Attributes:
18-
id: unique string id for register the reader as a view on the metastore.
23+
id: unique string id for register the reader as a view.
1924
transformations: list os methods that will be applied over the dataframe
2025
after the raw data is extracted.
2126
@@ -139,3 +144,10 @@ def _apply_transformations(self, df: DataFrame) -> DataFrame:
139144
self.transformations,
140145
df,
141146
)
147+
148+
@abstractmethod
149+
def build_metadata(
150+
self,
151+
) -> Union[FileReaderMetadata, KafkaReaderMetadata, TableReaderMetadata]:
152+
"""Abstract method to build the metadata for reader type."""
153+
pass

butterfree/extract/readers/table_reader.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from butterfree.clients import SparkClient
88
from butterfree.extract.readers.reader import Reader
9+
from butterfree.metadata.reader_metadata import TableReaderMetadata
910

1011

1112
class TableReader(Reader):
@@ -66,3 +67,11 @@ def consume(self, client: SparkClient) -> DataFrame:
6667
6768
"""
6869
return client.read_table(self.table, self.database)
70+
71+
def build_metadata(self) -> TableReaderMetadata:
72+
"""Build the metadata for the table reader."""
73+
return TableReaderMetadata(
74+
table=self.table,
75+
database=self.database,
76+
incremental_strategy=self.incremental_strategy is not None,
77+
)

butterfree/load/writers/writer.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from butterfree.clients import SparkClient
1010
from butterfree.configs.db import AbstractWriteConfig
1111
from butterfree.hooks import HookableComponent
12+
from butterfree.metadata.writer_metadata import WriterMetadata
1213
from butterfree.transform import FeatureSet
1314

1415

@@ -122,3 +123,22 @@ def validate(
122123
AssertionError: if validation fails.
123124
124125
"""
126+
127+
def build_metadata(self) -> WriterMetadata:
128+
"""Get the writer's metadata as a Pydantic model.
129+
130+
This method creates a standardized representation of writer metadata
131+
that can be used for documentation, validation, and serialization purposes.
132+
133+
Returns:
134+
A BaseWriterMetadata model containing the writer's metadata
135+
"""
136+
137+
writer_metadata = WriterMetadata(
138+
type=self.__class__.__name__,
139+
interval_mode=self.interval_mode,
140+
write_to_entity=self.write_to_entity,
141+
db_config=self.db_config.__class__.__name__,
142+
)
143+
144+
return writer_metadata

butterfree/metadata/__init__.py

Whitespace-only changes.
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
from __future__ import annotations
2+
3+
from pydantic import BaseModel, Field
4+
5+
6+
class FeatureMetadata(BaseModel):
7+
"""Metadata model for a column in a feature set.
8+
9+
This model represents the metadata of a single column in a feature set,
10+
including its name, data type, and whether it's a primary key.
11+
"""
12+
13+
name: str = Field(..., description="The name of the column")
14+
data_type: str = Field(
15+
...,
16+
description=(
17+
"The data type of the column (e.g., StringType, IntegerType) represented by pyspark.sql.types" # noqa: E501
18+
),
19+
)
20+
description: str = Field(
21+
...,
22+
description="The description of the column",
23+
)
24+
primary_key: bool = Field(
25+
...,
26+
description="Whether the column is a primary (or partition if it's a Cassandra table) key", # noqa: E501
27+
)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
from __future__ import annotations
2+
3+
from typing import List, Literal, Optional
4+
5+
from pydantic import BaseModel, Field
6+
7+
from butterfree.metadata.feature_metadata import FeatureMetadata
8+
9+
10+
class FeatureSetMetadata(BaseModel):
11+
"""Metadata model for a feature set catalog.
12+
13+
This model represents the catalog information of a feature set,
14+
including its name, description, and column definitions.
15+
"""
16+
17+
entity: str = Field(
18+
..., description="The entity type associated with the feature set"
19+
)
20+
name: str = Field(..., description="The name of the Feature Set")
21+
type: Literal["FeatureSet", "AggregatedFeatureSet"] = Field(
22+
..., description="The type of feature set"
23+
)
24+
description: str = Field(..., description="The description of the Feature Set")
25+
windows_definition: Optional[List[str]] = Field(
26+
None, description="The definition of the windows for the feature set"
27+
)
28+
features: List[FeatureMetadata] = Field(
29+
..., description="A list of column definitions"
30+
)
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from __future__ import annotations
2+
3+
from typing import List, Union
4+
5+
from pydantic import BaseModel, Field
6+
from typing_extensions import Annotated
7+
8+
from butterfree.load.writers.writer import WriterMetadata
9+
from butterfree.metadata.feature_set_metadata import FeatureSetMetadata
10+
from butterfree.metadata.reader_metadata import (
11+
FileReaderMetadata,
12+
KafkaReaderMetadata,
13+
TableReaderMetadata,
14+
)
15+
16+
17+
class FeatureSetPipelineMetadata(BaseModel):
18+
"""Metadata model for a feature set pipeline.
19+
20+
This model represents the complete metadata of a feature set pipeline,
21+
including its configuration, data sources, output schema, and processing details.
22+
"""
23+
24+
feature_set: FeatureSetMetadata = Field(
25+
..., description="Metadata about the feature set's output"
26+
)
27+
28+
# Required for correct serialization using Union
29+
readers: List[
30+
Annotated[
31+
Union[FileReaderMetadata, KafkaReaderMetadata, TableReaderMetadata],
32+
Field(discriminator="type"),
33+
]
34+
] = Field(
35+
...,
36+
description="A list of data sources required to generate the feature set",
37+
)
38+
39+
writers: List[WriterMetadata] = Field(
40+
..., description="The writers to be used for the feature set"
41+
)

0 commit comments

Comments
 (0)