Skip to content

Commit b8771c9

Browse files
jatin5251franciscojavierarceo
authored andcommitted
fix(redis): Preserve millisecond timestamp precision for Redis online store (#5807)
* Update redis.py Add millisecond-precision timestamp support to Redis online store Signed-off-by: Jatin Kumar <jatink.5251@gmail.com> * Update redis.py sub-second precision when returning timestamps to client Signed-off-by: Jatin Kumar <jatink.5251@gmail.com> * Update redis.py fix(redis): preserve millisecond timestamp precision Signed-off-by: Jatin Kumar <jatink.5251@gmail.com> * Update redis.py fix: Remove whitespace on blank lines (W293) Signed-off-by: Jatin Kumar <jatink.5251@gmail.com> --------- Signed-off-by: Jatin Kumar <jatink.5251@gmail.com>
1 parent a058aac commit b8771c9

File tree

1 file changed

+16
-11
lines changed
  • sdk/python/feast/infra/online_stores

1 file changed

+16
-11
lines changed

sdk/python/feast/infra/online_stores/redis.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -304,22 +304,25 @@ def online_write_batch(
304304
for redis_key_bin, prev_event_time, (_, values, timestamp, _) in zip(
305305
keys, prev_event_timestamps, data
306306
):
307-
event_time_seconds = int(utils.make_tzaware(timestamp).timestamp())
308-
309-
# ignore if event_timestamp is before the event features that are currently in the feature store
307+
# Convert incoming timestamp to millisecond-aware datetime
308+
aware_ts = utils.make_tzaware(timestamp)
309+
# Build protobuf timestamp with nanos
310+
ts = Timestamp()
311+
ts.FromDatetime(aware_ts)
312+
# New timestamp in nanoseconds
313+
new_total_nanos = ts.seconds * 1_000_000_000 + ts.nanos
314+
# Compare against existing timestamp (nanosecond precision)
310315
if prev_event_time:
311316
prev_ts = Timestamp()
312317
prev_ts.ParseFromString(prev_event_time)
313-
if prev_ts.seconds and event_time_seconds <= prev_ts.seconds:
314-
# TODO: somehow signal that it's not overwriting the current record?
318+
prev_total_nanos = prev_ts.seconds * 1_000_000_000 + prev_ts.nanos
319+
# Skip only if older OR exact same instant
320+
if prev_total_nanos and new_total_nanos <= prev_total_nanos:
315321
if progress:
316322
progress(1)
317323
continue
318-
319-
ts = Timestamp()
320-
ts.seconds = event_time_seconds
321-
entity_hset = dict()
322-
entity_hset[ts_key] = ts.SerializeToString()
324+
# Store full timestamp (seconds + nanos)
325+
entity_hset = {ts_key: ts.SerializeToString()}
323326

324327
for feature_name, val in values.items():
325328
f_key = _mmh3(f"{feature_view}:{feature_name}")
@@ -456,5 +459,7 @@ def _get_features_for_entity(
456459
if not res:
457460
return None, None
458461
else:
459-
timestamp = datetime.fromtimestamp(res_ts.seconds, tz=timezone.utc)
462+
# reconstruct full timestamp including nanos
463+
total_seconds = res_ts.seconds + res_ts.nanos / 1_000_000_000.0
464+
timestamp = datetime.fromtimestamp(total_seconds, tz=timezone.utc)
460465
return timestamp, res

0 commit comments

Comments
 (0)