-
Notifications
You must be signed in to change notification settings - Fork 7.2k
[Data] Add map namespace support for expression operations #59879
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
ryankert01
wants to merge
23
commits into
ray-project:master
Choose a base branch
from
ryankert01:map-expression
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+348
−0
Open
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
1bd4269
[Data] Add map namespace support for expression operations
ryankert01 2e157bd
Merge branch 'master' into map-expression
ryankert01 68bef64
address ai review
ryankert01 fe2642b
fix cursor bot suggestions
ryankert01 843cac1
Merge branch 'master' into map-expression
ryankert01 f16bfd1
Merge remote-tracking branch 'origin/master' into map-expression
ryankert01 df1fe8c
refactor tests
ryankert01 6461062
Merge branch 'master' into map-expression
ryankert01 fcd3652
Merge branch 'master' into map-expression
ryankert01 202a652
Merge branch 'master' into map-expression
owenowenisme 50a2e64
Update python/ray/data/namespace_expressions/map_namespace.py
ryankert01 e613cfa
address commits
ryankert01 70a3760
Merge branch 'master' into map-expression
ryankert01 49268ec
Merge branch 'master' into map-expression
ryankert01 c390a24
create 3 helper functions to make the intent clearer
ryankert01 5e024c8
use numpy.repeat()
ryankert01 10e4b7c
text extractioon on empty chunkedArray
ryankert01 f9d53b8
Merge branch 'master' into map-expression
ryankert01 978132e
lint
ryankert01 2eff519
Merge remote-tracking branch 'origin/map-expression' into map-expression
ryankert01 7a11478
Merge branch 'master' into map-expression
goutamvenkat-anyscale dae4645
Merge branch 'master' into map-expression
ryankert01 59f8047
address comments
ryankert01 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,207 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from dataclasses import dataclass | ||
| from enum import Enum | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| import numpy as np | ||
| import pyarrow | ||
| import pyarrow.compute as pc | ||
|
|
||
| from ray.data.datatype import DataType | ||
| from ray.data.expressions import pyarrow_udf | ||
|
|
||
| if TYPE_CHECKING: | ||
| from ray.data.expressions import Expr, UDFExpr | ||
|
|
||
|
|
||
| class MapComponent(str, Enum): | ||
| KEYS = "keys" | ||
| VALUES = "values" | ||
|
|
||
|
|
||
| def _get_child_array( | ||
| arr: pyarrow.Array, component: MapComponent | ||
| ) -> pyarrow.Array | None: | ||
| """Extract the flat keys or values array from a map-like array. | ||
|
|
||
| Example: MapArray [{"a": 1}, {"b": 2}] -> keys ["a", "b"] or values [1, 2] | ||
| """ | ||
| if isinstance(arr, pyarrow.MapArray): | ||
| if component == MapComponent.KEYS: | ||
| return arr.keys | ||
| else: | ||
| return arr.items | ||
|
|
||
| if isinstance(arr, (pyarrow.ListArray, pyarrow.LargeListArray)): | ||
| flat_values = arr.values | ||
| if ( | ||
| isinstance(flat_values, pyarrow.StructArray) | ||
| and flat_values.type.num_fields >= 2 | ||
| ): | ||
| idx = 0 if component == MapComponent.KEYS else 1 | ||
| return flat_values.field(idx) | ||
|
|
||
| return None | ||
|
|
||
|
|
||
| def _make_empty_list_array( | ||
| arr: pyarrow.Array, component: MapComponent | ||
| ) -> pyarrow.Array: | ||
| """Create an all-null ListArray matching the input length. | ||
|
|
||
| Example: arr of length 3 -> ListArray [null, null, null] | ||
| """ | ||
| if len(arr) > 0 and arr.null_count < len(arr): | ||
| raise TypeError( | ||
| f"Expression is not a valid map type. .map.{component.value}() requires " | ||
| f"pyarrow.MapArray or pyarrow.ListArray<Struct> with at least 2 fields " | ||
| f"(key and value), but got: {arr.type}." | ||
| ) | ||
| return pyarrow.ListArray.from_arrays( | ||
| offsets=np.repeat(0, len(arr) + 1), | ||
| values=pyarrow.array([], type=pyarrow.null()), | ||
| mask=pyarrow.array(np.repeat(True, len(arr))), | ||
| ) | ||
|
|
||
|
|
||
| def _rebuild_list_array( | ||
| arr: pyarrow.Array, child_array: pyarrow.Array | ||
| ) -> pyarrow.Array: | ||
| """Rebuild a ListArray from parent offsets and child values, normalizing sliced offsets. | ||
|
|
||
| Example: offsets [5, 7, 10] -> slice child to [5:10], normalize offsets to [0, 2, 5] | ||
| """ | ||
| offsets = arr.offsets | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if len(offsets) > 0: | ||
| start_offset = offsets[0] | ||
| if start_offset.as_py() != 0: | ||
| end_offset = offsets[-1].as_py() | ||
| child_array = child_array.slice( | ||
| offset=start_offset.as_py(), length=end_offset - start_offset.as_py() | ||
|
||
| ) | ||
| offsets = pc.subtract(offsets, start_offset) | ||
|
|
||
| return pyarrow.ListArray.from_arrays( | ||
| offsets=offsets, values=child_array, mask=arr.is_null() | ||
| ) | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| def _get_result_type( | ||
| arr_type: pyarrow.DataType, component: MapComponent | ||
| ) -> pyarrow.DataType: | ||
| """Infer the result list type from the input map type.""" | ||
| if pyarrow.types.is_map(arr_type): | ||
| inner = ( | ||
| arr_type.key_type if component == MapComponent.KEYS else arr_type.item_type | ||
| ) | ||
| return pyarrow.list_(inner) | ||
| if pyarrow.types.is_list(arr_type) or pyarrow.types.is_large_list(arr_type): | ||
| struct_type = arr_type.value_type | ||
| if pyarrow.types.is_struct(struct_type) and struct_type.num_fields >= 2: | ||
| idx = 0 if component == MapComponent.KEYS else 1 | ||
| return pyarrow.list_(struct_type.field(idx).type) | ||
| return pyarrow.list_(pyarrow.null()) | ||
|
|
||
|
|
||
| def _extract_map_component( | ||
| arr: pyarrow.Array, component: MapComponent | ||
| ) -> pyarrow.Array: | ||
| """Extract keys or values from a MapArray or ListArray<Struct>. | ||
|
|
||
| This serves as the primary implementation since PyArrow does not yet | ||
| expose dedicated compute kernels for map projection in the Python API. | ||
| """ | ||
| if isinstance(arr, pyarrow.ChunkedArray): | ||
| chunks = [_extract_map_component(chunk, component) for chunk in arr.chunks] | ||
| if not chunks: | ||
| return pyarrow.chunked_array([], type=_get_result_type(arr.type, component)) | ||
| return pyarrow.chunked_array(chunks) | ||
|
|
||
| child_array = _get_child_array(arr, component) | ||
|
|
||
| if child_array is None: | ||
| return _make_empty_list_array(arr, component) | ||
|
|
||
| return _rebuild_list_array(arr, child_array) | ||
|
|
||
|
|
||
| @dataclass | ||
| class _MapNamespace: | ||
| """Namespace for map operations on expression columns. | ||
|
|
||
| This namespace provides methods for operating on map-typed columns | ||
| (including MapArrays and ListArrays of Structs) using PyArrow UDFs. | ||
|
|
||
| Example: | ||
| >>> from ray.data.expressions import col | ||
| >>> # Get keys from map column | ||
| >>> expr = col("headers").map.keys() | ||
| >>> # Get values from map column | ||
| >>> expr = col("headers").map.values() | ||
| """ | ||
|
|
||
| _expr: "Expr" | ||
|
|
||
| def keys(self) -> "UDFExpr": | ||
| """Returns a list expression containing the keys of the map. | ||
|
|
||
| Example: | ||
| >>> from ray.data.expressions import col | ||
| >>> # Get keys from map column | ||
| >>> expr = col("headers").map.keys() | ||
|
|
||
| Returns: | ||
| A list expression containing the keys. | ||
| """ | ||
| return self._create_projection_udf(MapComponent.KEYS) | ||
|
|
||
| def values(self) -> "UDFExpr": | ||
| """Returns a list expression containing the values of the map. | ||
|
|
||
| Example: | ||
| >>> from ray.data.expressions import col | ||
| >>> # Get values from map column | ||
| >>> expr = col("headers").map.values() | ||
|
|
||
| Returns: | ||
| A list expression containing the values. | ||
| """ | ||
| return self._create_projection_udf(MapComponent.VALUES) | ||
|
|
||
| def _create_projection_udf(self, component: MapComponent) -> "UDFExpr": | ||
| """Helper to generate UDFs for map projections.""" | ||
|
|
||
| return_dtype = DataType(object) | ||
| if self._expr.data_type.is_arrow_type(): | ||
| arrow_type = self._expr.data_type.to_arrow_dtype() | ||
|
|
||
| is_physical_map = ( | ||
| ( | ||
| pyarrow.types.is_list(arrow_type) | ||
| or pyarrow.types.is_large_list(arrow_type) | ||
| ) | ||
| and pyarrow.types.is_struct(arrow_type.value_type) | ||
| and arrow_type.value_type.num_fields >= 2 | ||
| ) | ||
cursor[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| inner_arrow_type = None | ||
| if pyarrow.types.is_map(arrow_type): | ||
| inner_arrow_type = ( | ||
| arrow_type.key_type | ||
| if component == MapComponent.KEYS | ||
| else arrow_type.item_type | ||
| ) | ||
| elif is_physical_map: | ||
| # List<Struct> map representation: idx 0 is key, idx 1 is value. | ||
| idx = 0 if component == MapComponent.KEYS else 1 | ||
| inner_arrow_type = arrow_type.value_type.field(idx).type | ||
|
|
||
| if inner_arrow_type: | ||
| return_dtype = DataType.list(DataType.from_arrow(inner_arrow_type)) | ||
cursor[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| @pyarrow_udf(return_dtype=return_dtype) | ||
| def _project_map(arr: pyarrow.Array) -> pyarrow.Array: | ||
| return _extract_map_component(arr, component) | ||
|
|
||
| return _project_map(self._expr) | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.