[Data] - Port over changes from lance-ray into Ray Data#60497
[Data] - Port over changes from lance-ray into Ray Data#60497myandpr wants to merge 1 commit intoray-project:masterfrom
Conversation
There was a problem hiding this comment.
Code Review
The pull request introduces support for Lance namespaces, write retry mechanisms, and driver-side commit flow to Ray Data's LanceDatasink. The changes involve modifying the LanceDatasink and _write_fragment functions to handle new parameters related to namespaces and retries, and adding a new utility module lance_utils.py for namespace management. The pydoclint-baseline.txt file was updated to remove previously reported docstring issues, indicating improved documentation. A new test case was added to verify the correct passing of namespace arguments. Overall, the changes are well-structured and align with the described features. The introduction of call_with_retry and namespace handling enhances the robustness and flexibility of Lance integration.
|
Hey @goutamvenkat-anyscale , I implemented the #60147 migration (retry/namespace/driver commit). Maybe you can take a quick look at the PR’s approach when you have a moment—would love your review and whether it aligns with your expectations. Thanks! |
3d21e0b to
1c5363e
Compare
|
note: CI failure looks infra-related (Docker client/daemon API mismatch) and likely tied to recent infra updates, not this PR’s changes. |
46f1ffd to
096ba61
Compare
096ba61 to
a7e60e7
Compare
goutamvenkat-anyscale
left a comment
There was a problem hiding this comment.
Thanks for the change. Left a few comments.
| uri: str, | ||
| uri: Optional[str] = None, | ||
| schema: Optional[pa.Schema] = None, | ||
| mode: Literal["create", "append", "overwrite"] = "create", |
There was a problem hiding this comment.
Let's use SaveMode enum
| describe_request = DescribeTableRequest(id=table_id) | ||
| describe_response = namespace.describe_table(describe_request) | ||
| self.uri = describe_response.location | ||
| if describe_response.storage_options: | ||
| merged_storage_options.update(describe_response.storage_options) |
There was a problem hiding this comment.
Append and overwrite seem to be functionally the same?
There was a problem hiding this comment.
Thanks for calling this out. This branch was ported from lance-ray and keeps the same mode semantics (source: lance_ray/datasink.py, _BaseLanceDatasink.__init__, append/overwrite handling: https://github.com/lance-format/lance-ray/blob/342949e6ee0f7cfe2355951addfccaae57e39301/lance_ray/datasink.py#L79). They are similar when the table already exists, but behavior differs when it does not: append should fail, while overwrite falls back to _declare_table_with_fallback; commit behavior also differs (LanceOperation.Append vs LanceOperation.Overwrite).
Now mode handling branch has been removed.
| captured = {} | ||
|
|
||
| class _FakeLanceDatasink: | ||
| def __init__(self, path, **kwargs): | ||
| captured["path"] = path | ||
| captured["kwargs"] = kwargs | ||
|
|
||
| def _fake_write_datasink(self, datasink, **kwargs): | ||
| captured["datasink"] = datasink | ||
| captured["write_kwargs"] = kwargs | ||
|
|
||
| monkeypatch.setattr(ray.data.dataset, "LanceDatasink", _FakeLanceDatasink) | ||
| monkeypatch.setattr(ray.data.Dataset, "write_datasink", _fake_write_datasink) |
There was a problem hiding this comment.
Let's create a test fixture to create a fake lancedb
There was a problem hiding this comment.
Great point. I updated the test to use a fixture-backed fake namespace/LanceDB setup.
| self.table_id = table_id | ||
| has_namespace_storage_options = True | ||
|
|
||
| if mode == "append": |
There was a problem hiding this comment.
Can we separate the different mode handling in a different PR?
| table_id: Optional[List[str]] = None, | ||
| *args: Any, | ||
| schema: Optional[pa.Schema] = None, | ||
| mode: Literal["create", "append", "overwrite"] = "create", |
There was a problem hiding this comment.
The default parameter conflict with mode: The write mode. Default is 'append'. Choices are 'append', 'create', 'overwrite'.
https://github.com/ray-project/ray/pull/60497/changes#diff-79935e3b17cc6e14906191f95168a2caa7a4aaf4d5c50064a5bd75b4138e9afcR290
| assert captured["kwargs"]["namespace_properties"] == namespace_properties | ||
| assert isinstance(captured["datasink"], _FakeLanceDatasink) | ||
|
|
||
|
|
There was a problem hiding this comment.
There was a problem hiding this comment.
Thanks for sharing this, it’s very helpful. For this PR, I likely need to lean toward a fixture-based fake LanceDB testing style, but I’ll use your approach as a base and adapt/refactor it accordingly.
a786aac to
6e68942
Compare
| describe_response = namespace.describe_table(describe_request) | ||
| self.uri = describe_response.location | ||
| if describe_response.storage_options: | ||
| merged_storage_options.update(describe_response.storage_options) |
There was a problem hiding this comment.
OVERWRITE mode lacks fallback when table doesn't exist
Medium Severity
When using namespaces, SaveMode.OVERWRITE is treated the same as SaveMode.APPEND - both require the table to exist via describe_table. According to the PR discussion, OVERWRITE mode should fall back to _declare_table_with_fallback when the table doesn't exist, allowing it to create the table. Currently, the else branch handles both APPEND and OVERWRITE identically, so OVERWRITE on a non-existent table would fail instead of creating it.
6e68942 to
7c2d623
Compare
Signed-off-by: yaommen <myanstu@163.com>
7c2d623 to
26d5a54
Compare
|
note: CI failure looks not related to this PR (https://app.readthedocs.com/projects/anyscale-ray/builds/3732139/) |


Description
Port lance-ray datasink features into Ray Data LanceDatasink: write retry, Lance namespaces, and driver-side commit flow.
Related issues
Additional information
implementation details
Testing
Notes