Skip to content

Commit 6e68942

Browse files
committed
[Data] - Port over changes from lance-ray into Ray Data
Signed-off-by: yaommen <myanstu@163.com>
1 parent fd8b107 commit 6e68942

File tree

9 files changed

+335
-59
lines changed

9 files changed

+335
-59
lines changed

ci/lint/pydoclint-baseline.txt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,10 +1002,6 @@ python/ray/data/_internal/block_batching/util.py
10021002
DOC402: Function `finalize_batches` has "yield" statements, but the docstring does not have a "Yields" section
10031003
DOC404: Function `finalize_batches` yield type(s) in docstring not consistent with the return annotation. Return annotation exists, but docstring "yields" section does not exist or has 0 type(s).
10041004
--------------------
1005-
python/ray/data/_internal/datasource/lance_datasink.py
1006-
DOC101: Method `LanceDatasink.__init__`: Docstring contains fewer arguments than in function signature.
1007-
DOC103: Method `LanceDatasink.__init__`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [**kwargs: , *args: , max_rows_per_file: int, min_rows_per_file: int, mode: Literal['create', 'append', 'overwrite'], schema: Optional[pa.Schema], storage_options: Optional[Dict[str, Any]], uri: str]. Arguments in the docstring but not in the function signature: [max_rows_per_file : , min_rows_per_file : , mode : , schema : , storage_options : , uri : ].
1008-
--------------------
10091005
python/ray/data/_internal/datasource/sql_datasource.py
10101006
DOC101: Method `SQLDatasource.supports_sharding`: Docstring contains fewer arguments than in function signature.
10111007
DOC103: Method `SQLDatasource.supports_sharding`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [parallelism: int].

python/ray/data/_internal/datasource/lance_datasink.py

Lines changed: 185 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,52 @@
11
import pickle
2-
from itertools import chain
32
from typing import (
43
TYPE_CHECKING,
54
Any,
65
Dict,
76
Iterable,
87
List,
9-
Literal,
108
Optional,
119
Tuple,
1210
Union,
1311
)
1412

1513
import pyarrow as pa
1614

15+
from ray._common.retry import call_with_retry
16+
from ray.data._internal.datasource.lance_utils import (
17+
create_storage_options_provider,
18+
get_or_create_namespace,
19+
)
20+
from ray.data._internal.savemode import SaveMode
1721
from ray.data._internal.util import _check_import
1822
from ray.data.block import BlockAccessor
23+
from ray.data.context import DataContext
1924
from ray.data.datasource.datasink import Datasink
2025

2126
if TYPE_CHECKING:
2227
import pandas as pd
2328
from lance.fragment import FragmentMetadata
2429

2530

31+
def _declare_table_with_fallback(
32+
namespace, table_id: List[str]
33+
) -> Tuple[str, Optional[Dict[str, str]]]:
34+
"""Declare a table using declare_table, falling back to create_empty_table."""
35+
try:
36+
from lance_namespace import DeclareTableRequest
37+
38+
declare_request = DeclareTableRequest(id=table_id, location=None)
39+
declare_response = namespace.declare_table(declare_request)
40+
return declare_response.location, declare_response.storage_options
41+
except (AttributeError, NotImplementedError):
42+
# Fallback for older namespace implementations without declare_table.
43+
from lance_namespace import CreateEmptyTableRequest
44+
45+
create_request = CreateEmptyTableRequest(id=table_id)
46+
create_response = namespace.create_empty_table(create_request)
47+
return create_response.location, create_response.storage_options
48+
49+
2650
def _write_fragment(
2751
stream: Iterable[Union["pa.Table", "pd.DataFrame"]],
2852
uri: str,
@@ -33,12 +57,20 @@ def _write_fragment(
3357
max_rows_per_group: int = 1024, # Only useful for v1 writer.
3458
data_storage_version: Optional[str] = None,
3559
storage_options: Optional[Dict[str, Any]] = None,
60+
namespace_impl: Optional[str] = None,
61+
namespace_properties: Optional[Dict[str, str]] = None,
62+
table_id: Optional[List[str]] = None,
63+
retry_params: Optional[Dict[str, Any]] = None,
3664
) -> List[Tuple["FragmentMetadata", "pa.Schema"]]:
3765
import pandas as pd
3866
from lance.fragment import DEFAULT_MAX_BYTES_PER_FILE, write_fragments
3967

68+
stream = list(stream)
69+
if not stream:
70+
return []
71+
4072
if schema is None:
41-
first = next(stream)
73+
first = stream[0]
4274
if isinstance(first, pd.DataFrame):
4375
schema = pa.Schema.from_pandas(first).remove_metadata()
4476
else:
@@ -47,8 +79,6 @@ def _write_fragment(
4779
# Empty table.
4880
schema = None
4981

50-
stream = chain([first], stream)
51-
5282
def record_batch_converter():
5383
for block in stream:
5484
tbl = BlockAccessor.for_block(block).to_arrow()
@@ -58,17 +88,35 @@ def record_batch_converter():
5888
DEFAULT_MAX_BYTES_PER_FILE if max_bytes_per_file is None else max_bytes_per_file
5989
)
6090

61-
reader = pa.RecordBatchReader.from_batches(schema, record_batch_converter())
62-
fragments = write_fragments(
63-
reader,
64-
uri,
65-
schema=schema,
66-
max_rows_per_file=max_rows_per_file,
67-
max_rows_per_group=max_rows_per_group,
68-
max_bytes_per_file=max_bytes_per_file,
69-
data_storage_version=data_storage_version,
70-
storage_options=storage_options,
91+
if retry_params is None:
92+
retry_params = {
93+
"description": "write lance fragments",
94+
"match": [],
95+
"max_attempts": 1,
96+
"max_backoff_s": 0,
97+
}
98+
99+
storage_options_provider = create_storage_options_provider(
100+
namespace_impl,
101+
namespace_properties,
102+
table_id,
71103
)
104+
105+
def _write_once():
106+
reader = pa.RecordBatchReader.from_batches(schema, record_batch_converter())
107+
return write_fragments(
108+
reader,
109+
uri,
110+
schema=schema,
111+
max_rows_per_file=max_rows_per_file,
112+
max_rows_per_group=max_rows_per_group,
113+
max_bytes_per_file=max_bytes_per_file,
114+
data_storage_version=data_storage_version,
115+
storage_options=storage_options,
116+
storage_options_provider=storage_options_provider,
117+
)
118+
119+
fragments = call_with_retry(_write_once, **retry_params)
72120
return [(fragment, schema) for fragment in fragments]
73121

74122

@@ -77,21 +125,84 @@ class _BaseLanceDatasink(Datasink):
77125

78126
def __init__(
79127
self,
80-
uri: str,
128+
uri: Optional[str] = None,
81129
schema: Optional[pa.Schema] = None,
82-
mode: Literal["create", "append", "overwrite"] = "create",
130+
mode: SaveMode = SaveMode.CREATE,
83131
storage_options: Optional[Dict[str, Any]] = None,
84-
*args,
85-
**kwargs,
132+
table_id: Optional[List[str]] = None,
133+
namespace_impl: Optional[str] = None,
134+
namespace_properties: Optional[Dict[str, str]] = None,
135+
*args: Any,
136+
**kwargs: Any,
86137
):
87138
super().__init__(*args, **kwargs)
88139

89-
self.uri = uri
140+
merged_storage_options: Dict[str, Any] = {}
141+
if storage_options:
142+
merged_storage_options.update(storage_options)
143+
144+
self._namespace_impl = namespace_impl
145+
self._namespace_properties = namespace_properties
146+
if mode not in {SaveMode.CREATE, SaveMode.APPEND, SaveMode.OVERWRITE}:
147+
raise ValueError(f"Unsupported mode: {mode}")
148+
149+
namespace = get_or_create_namespace(namespace_impl, namespace_properties)
150+
151+
if namespace is not None and table_id is not None:
152+
if uri is not None:
153+
import warnings
154+
155+
warnings.warn(
156+
"The 'uri' argument is ignored when namespace parameters are "
157+
"provided. The resolved namespace location will be used instead.",
158+
UserWarning,
159+
stacklevel=2,
160+
)
161+
from lance_namespace import DescribeTableRequest
162+
163+
self.table_id = table_id
164+
has_namespace_storage_options = True
165+
166+
if mode == SaveMode.CREATE:
167+
uri, ns_storage_options = _declare_table_with_fallback(
168+
namespace, table_id
169+
)
170+
self.uri = uri
171+
if ns_storage_options:
172+
merged_storage_options.update(ns_storage_options)
173+
else:
174+
describe_request = DescribeTableRequest(id=table_id)
175+
describe_response = namespace.describe_table(describe_request)
176+
self.uri = describe_response.location
177+
if describe_response.storage_options:
178+
merged_storage_options.update(describe_response.storage_options)
179+
180+
self._has_namespace_storage_options = has_namespace_storage_options
181+
else:
182+
self.table_id = None
183+
if uri is None:
184+
raise ValueError(
185+
"Must provide either 'uri' or ('namespace_impl' and 'table_id')."
186+
)
187+
self.uri = uri
188+
self._has_namespace_storage_options = False
189+
90190
self.schema = schema
91191
self.mode = mode
92192

93193
self.read_version: Optional[int] = None
94-
self.storage_options = storage_options
194+
self.storage_options = merged_storage_options
195+
196+
@property
197+
def storage_options_provider(self):
198+
"""Lazily create storage options provider using namespace_impl/properties."""
199+
if not self._has_namespace_storage_options:
200+
return None
201+
return create_storage_options_provider(
202+
self._namespace_impl,
203+
self._namespace_properties,
204+
self.table_id,
205+
)
95206

96207
@property
97208
def supports_distributed_writes(self) -> bool:
@@ -102,8 +213,12 @@ def on_write_start(self, schema: Optional["pa.Schema"] = None) -> None:
102213

103214
import lance
104215

105-
if self.mode == "append":
106-
ds = lance.LanceDataset(self.uri, storage_options=self.storage_options)
216+
if self.mode == SaveMode.APPEND:
217+
ds = lance.LanceDataset(
218+
self.uri,
219+
storage_options=self.storage_options,
220+
storage_options_provider=self.storage_options_provider,
221+
)
107222
self.read_version = ds.version
108223
if self.schema is None:
109224
self.schema = ds.schema
@@ -143,15 +258,16 @@ def on_write_complete(
143258
# Skip commit when there are no fragments.
144259
if not schema:
145260
return
146-
if self.mode in {"create", "overwrite"}:
261+
if self.mode in {SaveMode.CREATE, SaveMode.OVERWRITE}:
147262
op = lance.LanceOperation.Overwrite(schema, fragments)
148-
elif self.mode == "append":
263+
elif self.mode == SaveMode.APPEND:
149264
op = lance.LanceOperation.Append(fragments)
150265
lance.LanceDataset.commit(
151266
self.uri,
152267
op,
153268
read_version=self.read_version,
154269
storage_options=self.storage_options,
270+
storage_options_provider=self.storage_options_provider,
155271
)
156272

157273

@@ -164,44 +280,53 @@ class LanceDatasink(_BaseLanceDatasink):
164280
we can use `LanceFragmentWriter` and `LanceCommitter`.
165281
166282
Args:
167-
uri : the base URI of the dataset.
168-
schema : pyarrow.Schema, optional.
169-
The schema of the dataset.
170-
mode : str, optional
171-
The write mode. Default is 'append'.
172-
Choices are 'append', 'create', 'overwrite'.
173-
min_rows_per_file : int, optional
174-
The minimum number of rows per file. Default is 1024 * 1024.
175-
max_rows_per_file : int, optional
176-
The maximum number of rows per file. Default is 64 * 1024 * 1024.
177-
data_storage_version: optional, str, default None
178-
The version of the data storage format to use. Newer versions are more
179-
efficient but require newer versions of lance to read. The default is
180-
"legacy" which will use the legacy v1 version. See the user guide
181-
for more details.
182-
storage_options : Dict[str, Any], optional
183-
The storage options for the writer. Default is None.
283+
uri: The base URI of the dataset.
284+
schema: The schema of the dataset.
285+
mode: The write mode. Supports ``SaveMode.CREATE``, ``SaveMode.APPEND``, and
286+
``SaveMode.OVERWRITE``.
287+
min_rows_per_file: The minimum number of rows per file. Default is 1024 * 1024.
288+
max_rows_per_file: The maximum number of rows per file. Default is 64 * 1024 * 1024.
289+
data_storage_version: The version of the data storage format to use. Newer versions are more
290+
efficient but require newer versions of lance to read. The default is "legacy",
291+
which will use the legacy v1 version. See the user guide for more details.
292+
storage_options: The storage options for the writer. Default is None.
293+
table_id: The table identifier as a list of strings, used with namespace params.
294+
namespace_impl: The namespace implementation type (e.g., "rest", "dir").
295+
Used together with namespace_properties and table_id for credentials vending.
296+
namespace_properties: Properties for connecting to the namespace.
297+
Used together with namespace_impl and table_id for credentials vending.
298+
*args: Additional positional arguments forwarded to the base class.
299+
**kwargs: Additional keyword arguments forwarded to the base class.
184300
"""
185301

186302
NAME = "Lance"
303+
WRITE_FRAGMENTS_ERRORS_TO_RETRY = ["LanceError(IO)"]
304+
WRITE_FRAGMENTS_MAX_ATTEMPTS = 10
305+
WRITE_FRAGMENTS_RETRY_MAX_BACKOFF_SECONDS = 32
187306

188307
def __init__(
189308
self,
190-
uri: str,
309+
uri: Optional[str] = None,
191310
schema: Optional[pa.Schema] = None,
192-
mode: Literal["create", "append", "overwrite"] = "create",
311+
mode: SaveMode = SaveMode.CREATE,
193312
min_rows_per_file: int = 1024 * 1024,
194313
max_rows_per_file: int = 64 * 1024 * 1024,
195314
data_storage_version: Optional[str] = None,
196315
storage_options: Optional[Dict[str, Any]] = None,
197-
*args,
198-
**kwargs,
316+
table_id: Optional[List[str]] = None,
317+
namespace_impl: Optional[str] = None,
318+
namespace_properties: Optional[Dict[str, str]] = None,
319+
*args: Any,
320+
**kwargs: Any,
199321
):
200322
super().__init__(
201323
uri,
202324
schema=schema,
203325
mode=mode,
204326
storage_options=storage_options,
327+
table_id=table_id,
328+
namespace_impl=namespace_impl,
329+
namespace_properties=namespace_properties,
205330
*args,
206331
**kwargs,
207332
)
@@ -212,6 +337,16 @@ def __init__(
212337
# if mode is append, read_version is read from existing dataset.
213338
self.read_version: Optional[int] = None
214339

340+
match = []
341+
match.extend(self.WRITE_FRAGMENTS_ERRORS_TO_RETRY)
342+
match.extend(DataContext.get_current().retried_io_errors)
343+
self._retry_params = {
344+
"description": "write lance fragments",
345+
"match": match,
346+
"max_attempts": self.WRITE_FRAGMENTS_MAX_ATTEMPTS,
347+
"max_backoff_s": self.WRITE_FRAGMENTS_RETRY_MAX_BACKOFF_SECONDS,
348+
}
349+
215350
@property
216351
def min_rows_per_write(self) -> int:
217352
return self.min_rows_per_file
@@ -231,6 +366,10 @@ def write(
231366
max_rows_per_file=self.max_rows_per_file,
232367
data_storage_version=self.data_storage_version,
233368
storage_options=self.storage_options,
369+
namespace_impl=self._namespace_impl,
370+
namespace_properties=self._namespace_properties,
371+
table_id=self.table_id,
372+
retry_params=self._retry_params,
234373
)
235374
return [
236375
(pickle.dumps(fragment), pickle.dumps(schema))

0 commit comments

Comments
 (0)