Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions deltacat/storage/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,80 @@ def list_partition_deltas(
raise NotImplementedError("list_partition_deltas not implemented")


def list_partition_deltas_by_timestamp(
partition_like: Union[Partition, PartitionLocator],
first_stream_position: Optional[int] = None,
last_stream_position: Optional[int] = None,
ascending_order: bool = False,
include_manifest: bool = False,
*args,
**kwargs,
) -> ListResult[Delta]:
"""
Lists a page of deltas committed to the given partition, sorted by stream
timestamp (with stream position as tiebreaker), and sliced by 1-based positions.

This method is intended for use with unordered (ADD) deltas where temporal
ordering based on commit time is needed. Unlike `list_partition_deltas`
which sorts by stream position, this method sorts by the timestamp when
each delta was committed.

The deltas are sorted by (stream_timestamp, stream_position) in the requested
order, then sliced using 1-based positions.

Example:
If deltas by timestamp are [A(oldest), B, C, D, E(newest)]:
- Descending (default): sorted as [E, D, C, B, A]
positions 1-3 -> [E, D, C] (3 most recent)
- Ascending: sorted as [A, B, C, D, E]
positions 1-3 -> [A, B, C] (3 oldest)

Args:
partition_like: The partition or partition locator to list deltas from.
first_stream_position: Start position for slicing (1-based, inclusive).
Position 1 is the first item in the sorted order. If None, starts from 1.
last_stream_position: End position for slicing (1-based, inclusive).
If None, includes all remaining items.
ascending_order: If True, sort ascending (oldest first).
If False (default), sort descending (newest first).
include_manifest: If True, include manifests in the returned deltas.

To conserve memory, the deltas returned do not include manifests by
default. The manifests can either be optionally retrieved as part of this
call or lazily loaded via subsequent calls to `get_delta_manifest`.
"""
raise NotImplementedError("list_partition_deltas_by_timestamp not implemented")


def get_latest_delta_by_timestamp(
partition_like: Union[Partition, PartitionLocator],
include_manifest: bool = False,
*args,
**kwargs,
) -> Optional[Delta]:
"""
Gets the latest delta by timestamp for the given partition.

This method returns the most recently committed delta based on stream_timestamp,
regardless of whether it's an ordered or unordered (ADD) delta. Unlike
`get_latest_delta` which only returns the latest ordered delta based on
stream_position, this method considers all deltas and uses commit timestamp
for ordering.

Args:
partition_like: The partition or partition locator to get the latest delta from.
include_manifest: If True, include the manifest in the returned delta.

Returns:
The latest delta by timestamp, or None if the partition has no deltas.

To conserve memory, the delta returned does not include a manifest by
default. The manifest can either be optionally retrieved as part of this
call or lazily loaded via a subsequent call to `get_delta_manifest`.
"""
raise NotImplementedError("get_latest_delta_by_timestamp not implemented")


def get_delta(
namespace: str,
table_name: str,
Expand Down
150 changes: 150 additions & 0 deletions deltacat/storage/main/impl.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import time
import uuid
import posixpath

Expand Down Expand Up @@ -738,6 +739,152 @@ def list_partition_deltas(
)


def list_partition_deltas_by_timestamp(
partition_like: Union[Partition, PartitionLocator],
first_stream_position: Optional[int] = None,
last_stream_position: Optional[int] = None,
ascending_order: bool = False,
include_manifest: bool = False,
*args,
**kwargs,
) -> ListResult[Delta]:
"""
Lists a page of deltas committed to the given partition, sorted by stream
timestamp (with stream position as tiebreaker), and sliced by 1-based positions.

This method is intended for use with unordered (ADD) deltas where temporal
ordering based on commit time is needed. Unlike `list_partition_deltas`
which sorts by stream position, this method sorts by the timestamp when
each delta was committed.

The deltas are sorted by (stream_timestamp, stream_position) in the requested
order, then sliced using 1-based positions.

Example:
If deltas by timestamp are [A(oldest), B, C, D, E(newest)]:
- Descending (default): sorted as [E, D, C, B, A]
positions 1-3 -> [E, D, C] (3 most recent)
- Ascending: sorted as [A, B, C, D, E]
positions 1-3 -> [A, B, C] (3 oldest)

Args:
partition_like: The partition or partition locator to list deltas from.
first_stream_position: Start position for slicing (1-based, inclusive).
Position 1 is the first item in the sorted order. If None, starts from 1.
last_stream_position: End position for slicing (1-based, inclusive).
If None, includes all remaining items.
ascending_order: If True, sort ascending (oldest first).
If False (default), sort descending (newest first).
include_manifest: If True, include manifests in the returned deltas.

To conserve memory, the deltas returned do not include manifests by
default. The manifests can either be optionally retrieved as part of this
call or lazily loaded via subsequent calls to `get_delta_manifest`.
"""
if first_stream_position is not None and last_stream_position is not None:
if first_stream_position > last_stream_position:
raise ValueError(
f"first_stream_position must be less than or equal to last_stream_position. "
f"first_stream_position: {first_stream_position}, "
f"last_stream_position: {last_stream_position}"
)
if first_stream_position is not None and first_stream_position < 1:
raise ValueError(
f"first_stream_position must be >= 1 (1-based). "
f"Got: {first_stream_position}"
)
if last_stream_position is not None and last_stream_position < 1:
raise ValueError(
f"last_stream_position must be >= 1 (1-based). "
f"Got: {last_stream_position}"
)

locator = DeltaLocator.of(
partition_locator=partition_like
if isinstance(partition_like, PartitionLocator)
else partition_like.locator,
stream_position=None,
)
delta = Delta.of(
locator=locator,
delta_type=None,
meta=None,
properties=None,
manifest=None,
)
try:
all_deltas_list_result: ListResult[Delta] = _list(
metafile=delta,
txn_op_type=TransactionOperationType.READ_SIBLINGS,
*args,
**kwargs,
)
except ObjectNotFoundError as e:
raise PartitionNotFoundError(f"Partition {partition_like} not found") from e
all_deltas = all_deltas_list_result.all_items()

# Sort deltas by stream timestamp with stream position as tiebreaker
# Use 0 as default for None timestamps to maintain consistent sorting
# Descending (default): newest first; Ascending: oldest first
all_deltas.sort(
reverse=(not ascending_order),
key=lambda d: (d.stream_timestamp or 0, d.stream_position),
)

# Slice the sorted list using 1-based positions
# Position 1 = index 0, Position N = index N-1
start = (first_stream_position - 1) if first_stream_position is not None else 0
end = last_stream_position if last_stream_position is not None else None
sliced_deltas = all_deltas[start:end]

return ListResult.of(
items=sliced_deltas,
pagination_key=None,
next_page_provider=None,
)


def get_latest_delta_by_timestamp(
partition_like: Union[Partition, PartitionLocator],
include_manifest: bool = False,
*args,
**kwargs,
) -> Optional[Delta]:
"""
Gets the latest delta by timestamp for the given partition.

This method returns the most recently committed delta based on stream_timestamp,
regardless of whether it's an ordered or unordered (ADD) delta. Unlike
`get_latest_delta` which only returns the latest ordered delta based on
stream_position, this method considers all deltas and uses commit timestamp
for ordering.

Args:
partition_like: The partition or partition locator to get the latest delta from.
include_manifest: If True, include the manifest in the returned delta.

Returns:
The latest delta by timestamp, or None if the partition has no deltas.

To conserve memory, the delta returned does not include a manifest by
default. The manifest can either be optionally retrieved as part of this
call or lazily loaded via a subsequent call to `get_delta_manifest`.
"""
# Use list_partition_deltas_by_timestamp with descending order (newest first)
# and get only the first item (position 1)
result = list_partition_deltas_by_timestamp(
partition_like=partition_like,
first_stream_position=1,
last_stream_position=1,
ascending_order=False, # Descending = newest first
include_manifest=include_manifest,
*args,
**kwargs,
)
deltas = result.all_items()
return deltas[0] if deltas else None


def get_delta(
namespace: str,
table_name: str,
Expand Down Expand Up @@ -2806,6 +2953,9 @@ def commit_delta(
while delta.locator.stream_position <= UNSIGNED_INT32_MAX_VALUE:
delta.locator.stream_position = uuid.uuid4().int & (1 << 63) - 1

# Add stream timestamp in milliseconds
delta.locator.stream_timestamp = int(time.time() * 1000)

# Add operations to the transaction
# the 1st operation creates the delta
try:
Expand Down
29 changes: 27 additions & 2 deletions deltacat/storage/model/delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,13 @@ def stream_position(self) -> Optional[int]:
return delta_locator.stream_position
return None

@property
def stream_timestamp(self) -> Optional[int]:
delta_locator = self.locator
if delta_locator:
return delta_locator.stream_timestamp
return None

def url(
self,
catalog_name: Optional[str] = None,
Expand Down Expand Up @@ -375,14 +382,18 @@ class DeltaLocator(Locator, dict):
def of(
partition_locator: Optional[PartitionLocator] = None,
stream_position: Optional[int] = None,
stream_timestamp: Optional[int] = None,
) -> DeltaLocator:
"""
Creates a partition delta locator. Stream Position, if provided, should
be greater than that of any prior delta in the partition.
Creates a partition delta locator.

Stream position, if provided, should be greater than that of any prior delta
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking back on it, this comment has been incorrect since ADD deltas were introduced:

Suggested change
Stream position, if provided, should be greater than that of any prior delta
For APPEND, UPSERT, and DELETE deltas the stream position should be greater than that of any prior delta

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it’s a good idea to introduce CHRONO-type deltas with the corresponding write mode. Since CHRONO deltas will follow the same behavior as ADD deltas, this shouldn’t add implementation complexity or interfere with the existing delta types.

The key difference should be that CHRONO deltas must have their previous stream position set, unlike ADD deltas. This shouldn’t interfere with writing, and it should still allow delta writes without concurrency issues. Eventually, this delta type could behave like ordered deltas and be used for upsert and delete operations.

What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... I'm not sure how we set a consistent/correct previous delta stream position without also requiring serialized writes of CHRONO deltas. APPEND deltas currently get it from the parent partition's latest stream position (https://github.com/ray-project/deltacat/blob/2.0/deltacat/storage/main/impl.py#L2787), which then requires synchronous updates to the partition on each append to keep accurate (https://github.com/ray-project/deltacat/blob/2.0/deltacat/storage/main/impl.py#L2799-L2801). I think this would make the CHRONO delta behave more like an APPEND, and result in either (1) serial/ordered writes of each CHRONO delta or (2) an eventually consistent reconciliation mechanism to "backfill" previous stream positions after CHRONO deltas have been appended. Maybe there's another way of doing this that I'm missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I were thinking for setting it just like the append, but probably I'm missing a key point there. I will investigate deeper.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that I’ve revisited the conversation, I realize I misunderstood — I was probably in some kind of tunnel vision. Nevertheless, I now understand the full picture.

I will be able to implement the CHRONO type delta, for which list_partition_delta should work and return an ordered result. From there, it will be easy if someone wants the latest delta of a stream — it will be accessible through the list manually. Sadly, get_latest_delta won’t work for CHRONO.

In the future, it might be possible to create some kind of backfill job to populate previousStreamPosition for CHRONO. I have some ideas, but I need to distill them a bit more.

I will close this pull request since it is no longer relevant.

in the partition. Stream timestamp is a unix millisecond timestamp.
"""
delta_locator = DeltaLocator()
delta_locator.partition_locator = partition_locator
delta_locator.stream_position = stream_position
delta_locator.stream_timestamp = stream_timestamp
return delta_locator

@staticmethod
Expand Down Expand Up @@ -451,6 +462,20 @@ def stream_position(self) -> Optional[int]:
def stream_position(self, stream_position: Optional[int]) -> None:
self["streamPosition"] = stream_position

@property
def stream_timestamp(self) -> Optional[int]:
return self.get("streamTimestamp")

@stream_timestamp.setter
def stream_timestamp(self, stream_timestamp: Optional[int]) -> None:
if stream_timestamp is not None and not (
1_000_000_000_000 <= stream_timestamp <= 9_999_999_999_999
):
raise ValueError(
f"stream_timestamp must be unix milliseconds, got {stream_timestamp}"
)
self["streamTimestamp"] = stream_timestamp

@property
def namespace_locator(self) -> Optional[NamespaceLocator]:
partition_locator = self.partition_locator
Expand Down
1 change: 1 addition & 0 deletions deltacat/storage/model/metafile.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,7 @@ def equivalent_to(self, other: Metafile) -> bool:
"partitionLocator",
"deltaLocator",
"compactionRoundCompletionInfo",
"streamTimestamp",
}
return Metafile._equivalent_minus_exclusions(self, other, identifiers)

Expand Down
Loading
Loading