Skip to content

Commit a713fad

Browse files
lewymatiManul from Pathway
authored andcommitted
Add Table.add_update_timestamp_utc (#9468)
GitOrigin-RevId: 8085403dca86639b300b5004a3e3426322239d48
1 parent 224db3c commit a713fad

File tree

6 files changed

+87
-2
lines changed

6 files changed

+87
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
88
### Added
99
- JetStream extension is now supported in both NATS read and write connectors.
1010
- The Iceberg connectors now support Glue as a catalog backend.
11+
- New `Table.add_update_timestamp_utc` function for tracking update time of rows in the table
1112

1213
### Changed
1314
- **BREAKING** The API for the Iceberg connectors has changed. The `catalog` parameter is now required in both `pw.io.iceberg.read` and `pw.io.iceberg.write`. This parameter can be either of type `pw.io.iceberg.RestCatalog` or `pw.io.iceberg.GlueCatalog`, and it must contain the connection parameters.

python/pathway/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,3 +211,4 @@
211211
Table._repr_mimebundle_ = viz._repr_mimebundle_
212212

213213
Table.inactivity_detection = temporal.inactivity_detection
214+
Table.add_update_timestamp_utc = temporal.add_update_timestamp_utc

python/pathway/internals/table.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ class Table(
7575
from pathway.stdlib.ordered import diff # type: ignore[misc]
7676
from pathway.stdlib.statistical import interpolate # type: ignore[misc]
7777
from pathway.stdlib.temporal import ( # type: ignore[misc]
78+
add_update_timestamp_utc,
7879
asof_join,
7980
asof_join_left,
8081
asof_join_outer,

python/pathway/stdlib/temporal/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
common_behavior,
4141
exactly_once_behavior,
4242
)
43-
from .time_utils import inactivity_detection, utc_now
43+
from .time_utils import add_update_timestamp_utc, inactivity_detection, utc_now
4444

4545
__all__ = [
4646
"AsofJoinResult",
@@ -79,4 +79,5 @@
7979
"exactly_once_behavior",
8080
"utc_now",
8181
"inactivity_detection",
82+
"add_update_timestamp_utc",
8283
]

python/pathway/stdlib/temporal/time_utils.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,3 +184,46 @@ def get_now_timestamp_utc(for_test_only: pw.Pointer) -> pw.DateTimeUtc:
184184
inactivities = inactivities.without(pw.this.instance)
185185

186186
return inactivities
187+
188+
189+
@check_arg_types
190+
@trace_user_frame
191+
def add_update_timestamp_utc(
192+
self: pw.Table,
193+
refresh_rate: pw.Duration = pw.Duration(seconds=1),
194+
update_timestamp_column_name: str = "updated_timestamp_utc",
195+
) -> pw.Table:
196+
"""Adds a column with the UTC timestamp of the last row update
197+
198+
Args:
199+
refresh_rate (pw.Duration, optional): The interval at which the UTC
200+
timestamp is refreshed. Defaults to 1 second.
201+
update_timestamp_column_name (str, optional): The name of the column to
202+
store the update timestamp. Defaults to "updated_timestamp_utc".
203+
204+
Returns:
205+
pw.Table: A new table with an additional column containing the UTC
206+
timestamp of the last update for each row. The id column is preserved.
207+
"""
208+
utc_now_single_row = utc_now(refresh_rate=refresh_rate).reduce(
209+
timestamp_utc=pw.reducers.latest(pw.this.timestamp_utc)
210+
)
211+
212+
stream = self.with_columns(_id=pw.this.id).to_stream()
213+
214+
stream_joined = stream.asof_now_join_left(utc_now_single_row)
215+
new_cols = {
216+
update_timestamp_column_name: pw.coalesce(
217+
pw.right.timestamp_utc,
218+
pw.DateTimeUtc(datetime.datetime.now(tz=datetime.timezone.utc)),
219+
)
220+
}
221+
stream_with_update_time = stream_joined.select(*pw.left, **new_cols)
222+
223+
result = (
224+
stream_with_update_time.stream_to_table(pw.this.is_upsert)
225+
.without(pw.this.is_upsert)
226+
.with_id(pw.this._id)
227+
.without(pw.this._id)
228+
)
229+
return result

python/pathway/tests/temporal/test_time_utils.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@
99
import pandas as pd
1010

1111
import pathway as pw
12-
from pathway.tests.utils import T, assert_stream_equality_wo_index
12+
from pathway.tests.utils import (
13+
T,
14+
assert_stream_equality,
15+
assert_stream_equality_wo_index,
16+
)
1317

1418

1519
def fake_utc_now(total_duration_ms: int) -> Callable[[pw.Duration], pw.Table]:
@@ -233,3 +237,37 @@ def test_inactivity_detection_empty_instance(utc_now_mock, get_now_timestamp_utc
233237
)
234238

235239
assert_stream_equality_wo_index(inactivities, expected_inactivities)
240+
241+
242+
@patch("pathway.stdlib.temporal.time_utils.utc_now")
243+
def test_add_update_timestamp_utc(utc_now_mock):
244+
utc_now_mock.side_effect = fake_utc_now(500)
245+
t = T(
246+
"""
247+
| value | __time__ | __diff__
248+
1 | 10 | 0 | 1
249+
2 | 20 | 0 | 1
250+
3 | 30 | 0 | 1
251+
1 | 10 | 130 | -1
252+
1 | 110 | 130 | 1
253+
3 | 30 | 230 | -1
254+
"""
255+
)
256+
result = t.add_update_timestamp_utc(refresh_rate=pw.Duration(milliseconds=100))
257+
258+
expected = T(
259+
"""
260+
| value | updated_timestamp_utc | __time__ | __diff__
261+
1 | 10 | 0 | 0 | 1
262+
2 | 20 | 0 | 0 | 1
263+
3 | 30 | 0 | 0 | 1
264+
1 | 10 | 0 | 130 | -1
265+
1 | 110 | 100 | 130 | 1
266+
3 | 30 | 0 | 230 | -1
267+
"""
268+
).with_columns(
269+
updated_timestamp_utc=pw.this.updated_timestamp_utc.dt.utc_from_timestamp(
270+
unit="ms"
271+
)
272+
)
273+
assert_stream_equality(result, expected)

0 commit comments

Comments
 (0)