Skip to content

Commit a7e60e7

Browse files
committed
[Data] - Port over changes from lance-ray into Ray Data
Signed-off-by: yaommen <myanstu@163.com>
1 parent fd8b107 commit a7e60e7

File tree

8 files changed

+309
-48
lines changed

8 files changed

+309
-48
lines changed

ci/lint/pydoclint-baseline.txt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,10 +1002,6 @@ python/ray/data/_internal/block_batching/util.py
10021002
DOC402: Function `finalize_batches` has "yield" statements, but the docstring does not have a "Yields" section
10031003
DOC404: Function `finalize_batches` yield type(s) in docstring not consistent with the return annotation. Return annotation exists, but docstring "yields" section does not exist or has 0 type(s).
10041004
--------------------
1005-
python/ray/data/_internal/datasource/lance_datasink.py
1006-
DOC101: Method `LanceDatasink.__init__`: Docstring contains fewer arguments than in function signature.
1007-
DOC103: Method `LanceDatasink.__init__`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [**kwargs: , *args: , max_rows_per_file: int, min_rows_per_file: int, mode: Literal['create', 'append', 'overwrite'], schema: Optional[pa.Schema], storage_options: Optional[Dict[str, Any]], uri: str]. Arguments in the docstring but not in the function signature: [max_rows_per_file : , min_rows_per_file : , mode : , schema : , storage_options : , uri : ].
1008-
--------------------
10091005
python/ray/data/_internal/datasource/sql_datasource.py
10101006
DOC101: Method `SQLDatasource.supports_sharding`: Docstring contains fewer arguments than in function signature.
10111007
DOC103: Method `SQLDatasource.supports_sharding`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [parallelism: int].

python/ray/data/_internal/datasource/lance_datasink.py

Lines changed: 191 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import pickle
2-
from itertools import chain
32
from typing import (
43
TYPE_CHECKING,
54
Any,
@@ -14,15 +13,40 @@
1413

1514
import pyarrow as pa
1615

16+
from ray._common.retry import call_with_retry
17+
from ray.data._internal.datasource.lance_utils import (
18+
create_storage_options_provider,
19+
get_or_create_namespace,
20+
)
1721
from ray.data._internal.util import _check_import
1822
from ray.data.block import BlockAccessor
23+
from ray.data.context import DataContext
1924
from ray.data.datasource.datasink import Datasink
2025

2126
if TYPE_CHECKING:
2227
import pandas as pd
2328
from lance.fragment import FragmentMetadata
2429

2530

31+
def _declare_table_with_fallback(
32+
namespace, table_id: List[str]
33+
) -> Tuple[str, Optional[Dict[str, str]]]:
34+
"""Declare a table using declare_table, falling back to create_empty_table."""
35+
try:
36+
from lance_namespace import DeclareTableRequest
37+
38+
declare_request = DeclareTableRequest(id=table_id, location=None)
39+
declare_response = namespace.declare_table(declare_request)
40+
return declare_response.location, declare_response.storage_options
41+
except (AttributeError, NotImplementedError):
42+
# Fallback for older namespace implementations without declare_table.
43+
from lance_namespace import CreateEmptyTableRequest
44+
45+
create_request = CreateEmptyTableRequest(id=table_id)
46+
create_response = namespace.create_empty_table(create_request)
47+
return create_response.location, create_response.storage_options
48+
49+
2650
def _write_fragment(
2751
stream: Iterable[Union["pa.Table", "pd.DataFrame"]],
2852
uri: str,
@@ -33,12 +57,20 @@ def _write_fragment(
3357
max_rows_per_group: int = 1024, # Only useful for v1 writer.
3458
data_storage_version: Optional[str] = None,
3559
storage_options: Optional[Dict[str, Any]] = None,
60+
namespace_impl: Optional[str] = None,
61+
namespace_properties: Optional[Dict[str, str]] = None,
62+
table_id: Optional[List[str]] = None,
63+
retry_params: Optional[Dict[str, Any]] = None,
3664
) -> List[Tuple["FragmentMetadata", "pa.Schema"]]:
3765
import pandas as pd
3866
from lance.fragment import DEFAULT_MAX_BYTES_PER_FILE, write_fragments
3967

68+
stream = list(stream)
69+
if not stream:
70+
return []
71+
4072
if schema is None:
41-
first = next(stream)
73+
first = stream[0]
4274
if isinstance(first, pd.DataFrame):
4375
schema = pa.Schema.from_pandas(first).remove_metadata()
4476
else:
@@ -47,8 +79,6 @@ def _write_fragment(
4779
# Empty table.
4880
schema = None
4981

50-
stream = chain([first], stream)
51-
5282
def record_batch_converter():
5383
for block in stream:
5484
tbl = BlockAccessor.for_block(block).to_arrow()
@@ -58,17 +88,35 @@ def record_batch_converter():
5888
DEFAULT_MAX_BYTES_PER_FILE if max_bytes_per_file is None else max_bytes_per_file
5989
)
6090

61-
reader = pa.RecordBatchReader.from_batches(schema, record_batch_converter())
62-
fragments = write_fragments(
63-
reader,
64-
uri,
65-
schema=schema,
66-
max_rows_per_file=max_rows_per_file,
67-
max_rows_per_group=max_rows_per_group,
68-
max_bytes_per_file=max_bytes_per_file,
69-
data_storage_version=data_storage_version,
70-
storage_options=storage_options,
91+
if retry_params is None:
92+
retry_params = {
93+
"description": "write lance fragments",
94+
"match": [],
95+
"max_attempts": 1,
96+
"max_backoff_s": 0,
97+
}
98+
99+
storage_options_provider = create_storage_options_provider(
100+
namespace_impl,
101+
namespace_properties,
102+
table_id,
71103
)
104+
105+
def _write_once():
106+
reader = pa.RecordBatchReader.from_batches(schema, record_batch_converter())
107+
return write_fragments(
108+
reader,
109+
uri,
110+
schema=schema,
111+
max_rows_per_file=max_rows_per_file,
112+
max_rows_per_group=max_rows_per_group,
113+
max_bytes_per_file=max_bytes_per_file,
114+
data_storage_version=data_storage_version,
115+
storage_options=storage_options,
116+
storage_options_provider=storage_options_provider,
117+
)
118+
119+
fragments = call_with_retry(_write_once, **retry_params)
72120
return [(fragment, schema) for fragment in fragments]
73121

74122

@@ -77,21 +125,97 @@ class _BaseLanceDatasink(Datasink):
77125

78126
def __init__(
79127
self,
80-
uri: str,
128+
uri: Optional[str] = None,
81129
schema: Optional[pa.Schema] = None,
82130
mode: Literal["create", "append", "overwrite"] = "create",
83131
storage_options: Optional[Dict[str, Any]] = None,
84-
*args,
85-
**kwargs,
132+
table_id: Optional[List[str]] = None,
133+
namespace_impl: Optional[str] = None,
134+
namespace_properties: Optional[Dict[str, str]] = None,
135+
*args: Any,
136+
**kwargs: Any,
86137
):
87138
super().__init__(*args, **kwargs)
88139

89-
self.uri = uri
140+
merged_storage_options: Dict[str, Any] = {}
141+
if storage_options:
142+
merged_storage_options.update(storage_options)
143+
144+
self._namespace_impl = namespace_impl
145+
self._namespace_properties = namespace_properties
146+
147+
namespace = get_or_create_namespace(namespace_impl, namespace_properties)
148+
149+
if namespace is not None and table_id is not None:
150+
if uri is not None:
151+
import warnings
152+
153+
warnings.warn(
154+
"The 'uri' argument is ignored when namespace parameters are "
155+
"provided. The resolved namespace location will be used instead.",
156+
UserWarning,
157+
stacklevel=2,
158+
)
159+
from lance_namespace import DescribeTableRequest
160+
from lance_namespace.errors import TableNotFoundError
161+
162+
self.table_id = table_id
163+
has_namespace_storage_options = True
164+
165+
if mode == "append":
166+
describe_request = DescribeTableRequest(id=table_id)
167+
describe_response = namespace.describe_table(describe_request)
168+
self.uri = describe_response.location
169+
if describe_response.storage_options:
170+
merged_storage_options.update(describe_response.storage_options)
171+
elif mode == "overwrite":
172+
try:
173+
describe_request = DescribeTableRequest(id=table_id)
174+
describe_response = namespace.describe_table(describe_request)
175+
self.uri = describe_response.location
176+
if describe_response.storage_options:
177+
merged_storage_options.update(describe_response.storage_options)
178+
except TableNotFoundError:
179+
uri, ns_storage_options = _declare_table_with_fallback(
180+
namespace, table_id
181+
)
182+
self.uri = uri
183+
if ns_storage_options:
184+
merged_storage_options.update(ns_storage_options)
185+
else:
186+
uri, ns_storage_options = _declare_table_with_fallback(
187+
namespace, table_id
188+
)
189+
self.uri = uri
190+
if ns_storage_options:
191+
merged_storage_options.update(ns_storage_options)
192+
193+
self._has_namespace_storage_options = has_namespace_storage_options
194+
else:
195+
self.table_id = None
196+
if uri is None:
197+
raise ValueError(
198+
"Must provide either 'uri' or ('namespace_impl' and 'table_id')."
199+
)
200+
self.uri = uri
201+
self._has_namespace_storage_options = False
202+
90203
self.schema = schema
91204
self.mode = mode
92205

93206
self.read_version: Optional[int] = None
94-
self.storage_options = storage_options
207+
self.storage_options = merged_storage_options
208+
209+
@property
210+
def storage_options_provider(self):
211+
"""Lazily create storage options provider using namespace_impl/properties."""
212+
if not self._has_namespace_storage_options:
213+
return None
214+
return create_storage_options_provider(
215+
self._namespace_impl,
216+
self._namespace_properties,
217+
self.table_id,
218+
)
95219

96220
@property
97221
def supports_distributed_writes(self) -> bool:
@@ -103,7 +227,11 @@ def on_write_start(self, schema: Optional["pa.Schema"] = None) -> None:
103227
import lance
104228

105229
if self.mode == "append":
106-
ds = lance.LanceDataset(self.uri, storage_options=self.storage_options)
230+
ds = lance.LanceDataset(
231+
self.uri,
232+
storage_options=self.storage_options,
233+
storage_options_provider=self.storage_options_provider,
234+
)
107235
self.read_version = ds.version
108236
if self.schema is None:
109237
self.schema = ds.schema
@@ -152,6 +280,7 @@ def on_write_complete(
152280
op,
153281
read_version=self.read_version,
154282
storage_options=self.storage_options,
283+
storage_options_provider=self.storage_options_provider,
155284
)
156285

157286

@@ -164,44 +293,52 @@ class LanceDatasink(_BaseLanceDatasink):
164293
we can use `LanceFragmentWriter` and `LanceCommitter`.
165294
166295
Args:
167-
uri : the base URI of the dataset.
168-
schema : pyarrow.Schema, optional.
169-
The schema of the dataset.
170-
mode : str, optional
171-
The write mode. Default is 'append'.
172-
Choices are 'append', 'create', 'overwrite'.
173-
min_rows_per_file : int, optional
174-
The minimum number of rows per file. Default is 1024 * 1024.
175-
max_rows_per_file : int, optional
176-
The maximum number of rows per file. Default is 64 * 1024 * 1024.
177-
data_storage_version: optional, str, default None
178-
The version of the data storage format to use. Newer versions are more
179-
efficient but require newer versions of lance to read. The default is
180-
"legacy" which will use the legacy v1 version. See the user guide
181-
for more details.
182-
storage_options : Dict[str, Any], optional
183-
The storage options for the writer. Default is None.
296+
uri: The base URI of the dataset.
297+
schema: The schema of the dataset.
298+
mode: The write mode. Default is 'append'. Choices are 'append', 'create', 'overwrite'.
299+
min_rows_per_file: The minimum number of rows per file. Default is 1024 * 1024.
300+
max_rows_per_file: The maximum number of rows per file. Default is 64 * 1024 * 1024.
301+
data_storage_version: The version of the data storage format to use. Newer versions are more
302+
efficient but require newer versions of lance to read. The default is "legacy",
303+
which will use the legacy v1 version. See the user guide for more details.
304+
storage_options: The storage options for the writer. Default is None.
305+
table_id: The table identifier as a list of strings, used with namespace params.
306+
namespace_impl: The namespace implementation type (e.g., "rest", "dir").
307+
Used together with namespace_properties and table_id for credentials vending.
308+
namespace_properties: Properties for connecting to the namespace.
309+
Used together with namespace_impl and table_id for credentials vending.
310+
*args: Additional positional arguments forwarded to the base class.
311+
**kwargs: Additional keyword arguments forwarded to the base class.
184312
"""
185313

186314
NAME = "Lance"
315+
WRITE_FRAGMENTS_ERRORS_TO_RETRY = ["LanceError(IO)"]
316+
WRITE_FRAGMENTS_MAX_ATTEMPTS = 10
317+
WRITE_FRAGMENTS_RETRY_MAX_BACKOFF_SECONDS = 32
187318

188319
def __init__(
189320
self,
190-
uri: str,
321+
uri: Optional[str] = None,
191322
schema: Optional[pa.Schema] = None,
192323
mode: Literal["create", "append", "overwrite"] = "create",
193324
min_rows_per_file: int = 1024 * 1024,
194325
max_rows_per_file: int = 64 * 1024 * 1024,
195326
data_storage_version: Optional[str] = None,
196327
storage_options: Optional[Dict[str, Any]] = None,
197-
*args,
198-
**kwargs,
328+
table_id: Optional[List[str]] = None,
329+
namespace_impl: Optional[str] = None,
330+
namespace_properties: Optional[Dict[str, str]] = None,
331+
*args: Any,
332+
**kwargs: Any,
199333
):
200334
super().__init__(
201335
uri,
202336
schema=schema,
203337
mode=mode,
204338
storage_options=storage_options,
339+
table_id=table_id,
340+
namespace_impl=namespace_impl,
341+
namespace_properties=namespace_properties,
205342
*args,
206343
**kwargs,
207344
)
@@ -212,6 +349,16 @@ def __init__(
212349
# if mode is append, read_version is read from existing dataset.
213350
self.read_version: Optional[int] = None
214351

352+
match = []
353+
match.extend(self.WRITE_FRAGMENTS_ERRORS_TO_RETRY)
354+
match.extend(DataContext.get_current().retried_io_errors)
355+
self._retry_params = {
356+
"description": "write lance fragments",
357+
"match": match,
358+
"max_attempts": self.WRITE_FRAGMENTS_MAX_ATTEMPTS,
359+
"max_backoff_s": self.WRITE_FRAGMENTS_RETRY_MAX_BACKOFF_SECONDS,
360+
}
361+
215362
@property
216363
def min_rows_per_write(self) -> int:
217364
return self.min_rows_per_file
@@ -231,6 +378,10 @@ def write(
231378
max_rows_per_file=self.max_rows_per_file,
232379
data_storage_version=self.data_storage_version,
233380
storage_options=self.storage_options,
381+
namespace_impl=self._namespace_impl,
382+
namespace_properties=self._namespace_properties,
383+
table_id=self.table_id,
384+
retry_params=self._retry_params,
234385
)
235386
return [
236387
(pickle.dumps(fragment), pickle.dumps(schema))

0 commit comments

Comments
 (0)