Skip to content

Commit 7a61d6f

Browse files
authored
feat: Add date_partition_column_format for spark source (feast-dev#5273)
* feat: Add date_partition_column_format for spark source Signed-off-by: joeyutong <[email protected]> * feat: Add date_partition_column_format for spark source Signed-off-by: joeyutong <[email protected]> * feat: Add date_partition_column_format for spark source Signed-off-by: joeyutong <[email protected]> --------- Signed-off-by: joeyutong <[email protected]>
1 parent e416d01 commit 7a61d6f

File tree

6 files changed

+172
-21
lines changed

6 files changed

+172
-21
lines changed

protos/feast/core/DataSource.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,9 @@ message DataSource {
226226

227227
// Format of files at `path` (e.g. parquet, avro, etc)
228228
string file_format = 4;
229+
230+
// Date Format of date partition column (e.g. %Y-%m-%d)
231+
string date_partition_column_format = 5;
229232
}
230233

231234
// Defines configuration for custom third-party data sources.

sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
import tempfile
33
import uuid
44
import warnings
5+
from dataclasses import asdict, dataclass
56
from datetime import datetime, timezone
6-
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
7+
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast
78

89
import numpy as np
910
import pandas
@@ -55,6 +56,12 @@ class SparkOfflineStoreConfig(FeastConfigBaseModel):
5556
""" AWS Region if applicable for s3-based staging locations"""
5657

5758

59+
@dataclass(frozen=True)
60+
class SparkFeatureViewQueryContext(offline_utils.FeatureViewQueryContext):
61+
min_date_partition: Optional[str]
62+
max_date_partition: str
63+
64+
5865
class SparkOfflineStore(OfflineStore):
5966
@staticmethod
6067
def pull_latest_from_table_or_query(
@@ -101,6 +108,7 @@ def pull_latest_from_table_or_query(
101108
aliases_as_string = ", ".join(aliases)
102109

103110
date_partition_column = data_source.date_partition_column
111+
date_partition_column_format = data_source.date_partition_column_format
104112

105113
start_date_str = _format_datetime(start_date)
106114
end_date_str = _format_datetime(end_date)
@@ -112,7 +120,7 @@ def pull_latest_from_table_or_query(
112120
SELECT {fields_as_string},
113121
ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS feast_row_
114122
FROM {from_expression} t1
115-
WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date_str}') AND TIMESTAMP('{end_date_str}'){" AND " + date_partition_column + " >= '" + start_date.strftime("%Y-%m-%d") + "' AND " + date_partition_column + " <= '" + end_date.strftime("%Y-%m-%d") + "' " if date_partition_column != "" and date_partition_column is not None else ""}
123+
WHERE {timestamp_field} BETWEEN TIMESTAMP('{start_date_str}') AND TIMESTAMP('{end_date_str}'){" AND " + date_partition_column + " >= '" + start_date.strftime(date_partition_column_format) + "' AND " + date_partition_column + " <= '" + end_date.strftime(date_partition_column_format) + "' " if date_partition_column != "" and date_partition_column is not None else ""}
116124
) t2
117125
WHERE feast_row_ = 1
118126
"""
@@ -136,8 +144,12 @@ def get_historical_features(
136144
full_feature_names: bool = False,
137145
) -> RetrievalJob:
138146
assert isinstance(config.offline_store, SparkOfflineStoreConfig)
147+
date_partition_column_formats = []
139148
for fv in feature_views:
140149
assert isinstance(fv.batch_source, SparkSource)
150+
date_partition_column_formats.append(
151+
fv.batch_source.date_partition_column_format
152+
)
141153

142154
warnings.warn(
143155
"The spark offline store is an experimental feature in alpha development. "
@@ -186,8 +198,27 @@ def get_historical_features(
186198
entity_df_event_timestamp_range,
187199
)
188200

201+
spark_query_context = [
202+
SparkFeatureViewQueryContext(
203+
**asdict(context),
204+
min_date_partition=datetime.fromisoformat(
205+
context.min_event_timestamp
206+
).strftime(date_format)
207+
if context.min_event_timestamp is not None
208+
else None,
209+
max_date_partition=datetime.fromisoformat(
210+
context.max_event_timestamp
211+
).strftime(date_format),
212+
)
213+
for date_format, context in zip(
214+
date_partition_column_formats, query_context
215+
)
216+
]
217+
189218
query = offline_utils.build_point_in_time_query(
190-
feature_view_query_contexts=query_context,
219+
feature_view_query_contexts=cast(
220+
List[offline_utils.FeatureViewQueryContext], spark_query_context
221+
),
191222
left_table_query_string=tmp_entity_df_table_name,
192223
entity_df_event_timestamp_col=event_timestamp_col,
193224
entity_df_columns=entity_schema.keys(),
@@ -651,13 +682,13 @@ def _cast_data_frame(
651682
FROM {{ featureview.table_subquery }}
652683
WHERE {{ featureview.timestamp_field }} <= '{{ featureview.max_event_timestamp }}'
653684
{% if featureview.date_partition_column != "" and featureview.date_partition_column is not none %}
654-
AND {{ featureview.date_partition_column }} <= '{{ featureview.max_event_timestamp[:10] }}'
685+
AND {{ featureview.date_partition_column }} <= '{{ featureview.max_date_partition }}'
655686
{% endif %}
656687
657688
{% if featureview.ttl == 0 %}{% else %}
658689
AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}'
659690
{% if featureview.date_partition_column != "" and featureview.date_partition_column is not none %}
660-
AND {{ featureview.date_partition_column }} >= '{{ featureview.min_event_timestamp[:10] }}'
691+
AND {{ featureview.date_partition_column }} >= '{{ featureview.min_date_partition }}'
661692
{% endif %}
662693
{% endif %}
663694
),

sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ def __init__(
4646
owner: Optional[str] = "",
4747
timestamp_field: Optional[str] = None,
4848
date_partition_column: Optional[str] = None,
49+
date_partition_column_format: Optional[str] = "%Y-%m-%d",
4950
):
5051
"""Creates a SparkSource object.
5152
@@ -97,6 +98,7 @@ def __init__(
9798
query=query,
9899
path=path,
99100
file_format=file_format,
101+
date_partition_column_format=date_partition_column_format,
100102
)
101103

102104
@property
@@ -127,6 +129,13 @@ def file_format(self):
127129
"""
128130
return self.spark_options.file_format
129131

132+
@property
133+
def date_partition_column_format(self):
134+
"""
135+
Returns the date partition column format of this feature data source.
136+
"""
137+
return self.spark_options.date_partition_column_format
138+
130139
@staticmethod
131140
def from_proto(data_source: DataSourceProto) -> Any:
132141
assert data_source.HasField("spark_options")
@@ -139,6 +148,7 @@ def from_proto(data_source: DataSourceProto) -> Any:
139148
query=spark_options.query,
140149
path=spark_options.path,
141150
file_format=spark_options.file_format,
151+
date_partition_column_format=spark_options.date_partition_column_format,
142152
date_partition_column=data_source.date_partition_column,
143153
timestamp_field=data_source.timestamp_field,
144154
created_timestamp_column=data_source.created_timestamp_column,
@@ -240,6 +250,7 @@ def __init__(
240250
query: Optional[str],
241251
path: Optional[str],
242252
file_format: Optional[str],
253+
date_partition_column_format: Optional[str] = "%Y-%m-%d",
243254
):
244255
# Check that only one of the ways to load a spark dataframe can be used. We have
245256
# to treat empty string and null the same due to proto (de)serialization.
@@ -261,6 +272,7 @@ def __init__(
261272
self._query = query
262273
self._path = path
263274
self._file_format = file_format
275+
self._date_partition_column_format = date_partition_column_format
264276

265277
@property
266278
def table(self):
@@ -294,6 +306,14 @@ def file_format(self):
294306
def file_format(self, file_format):
295307
self._file_format = file_format
296308

309+
@property
310+
def date_partition_column_format(self):
311+
return self._date_partition_column_format
312+
313+
@date_partition_column_format.setter
314+
def date_partition_column_format(self, date_partition_column_format):
315+
self._date_partition_column_format = date_partition_column_format
316+
297317
@classmethod
298318
def from_proto(cls, spark_options_proto: DataSourceProto.SparkOptions):
299319
"""
@@ -308,6 +328,7 @@ def from_proto(cls, spark_options_proto: DataSourceProto.SparkOptions):
308328
query=spark_options_proto.query,
309329
path=spark_options_proto.path,
310330
file_format=spark_options_proto.file_format,
331+
date_partition_column_format=spark_options_proto.date_partition_column_format,
311332
)
312333

313334
return spark_options
@@ -323,6 +344,7 @@ def to_proto(self) -> DataSourceProto.SparkOptions:
323344
query=self.query,
324345
path=self.path,
325346
file_format=self.file_format,
347+
date_partition_column_format=self.date_partition_column_format,
326348
)
327349

328350
return spark_options_proto

0 commit comments

Comments
 (0)