-
Notifications
You must be signed in to change notification settings - Fork 7.3k
[Data] Add map namespace support for expression operations #59879
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 3 commits
Commits
Show all changes
27 commits
Select commit
Hold shift + click to select a range
1bd4269
[Data] Add map namespace support for expression operations
ryankert01 2e157bd
Merge branch 'master' into map-expression
ryankert01 68bef64
address ai review
ryankert01 fe2642b
fix cursor bot suggestions
ryankert01 843cac1
Merge branch 'master' into map-expression
ryankert01 f16bfd1
Merge remote-tracking branch 'origin/master' into map-expression
ryankert01 df1fe8c
refactor tests
ryankert01 6461062
Merge branch 'master' into map-expression
ryankert01 fcd3652
Merge branch 'master' into map-expression
ryankert01 202a652
Merge branch 'master' into map-expression
owenowenisme 50a2e64
Update python/ray/data/namespace_expressions/map_namespace.py
ryankert01 e613cfa
address commits
ryankert01 70a3760
Merge branch 'master' into map-expression
ryankert01 49268ec
Merge branch 'master' into map-expression
ryankert01 c390a24
create 3 helper functions to make the intent clearer
ryankert01 5e024c8
use numpy.repeat()
ryankert01 10e4b7c
text extractioon on empty chunkedArray
ryankert01 f9d53b8
Merge branch 'master' into map-expression
ryankert01 978132e
lint
ryankert01 2eff519
Merge remote-tracking branch 'origin/map-expression' into map-expression
ryankert01 7a11478
Merge branch 'master' into map-expression
goutamvenkat-anyscale dae4645
Merge branch 'master' into map-expression
ryankert01 59f8047
address comments
ryankert01 ef6f899
Merge branch 'master' into map-expression
ryankert01 5df9b8f
[fix] Update type hint for _get_child_array return value to Optional[…
ryankert01 183aec1
Merge branch 'master' into map-expression
ryankert01 2aebeb6
[fix] Refactor _rebuild_list_array and _get_result_type to support La…
ryankert01 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,158 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from dataclasses import dataclass | ||
| from enum import Enum | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| import pyarrow | ||
| import pyarrow.compute as pc | ||
|
|
||
| from ray.data.datatype import DataType | ||
| from ray.data.expressions import pyarrow_udf | ||
|
|
||
| if TYPE_CHECKING: | ||
| from ray.data.expressions import Expr, UDFExpr | ||
|
|
||
|
|
||
| class MapComponent(str, Enum): | ||
| KEYS = "keys" | ||
| VALUES = "values" | ||
|
|
||
|
|
||
| def _extract_map_component( | ||
| arr: pyarrow.Array, component: MapComponent | ||
| ) -> pyarrow.Array: | ||
| """ | ||
| Extracts keys or values from a MapArray or ListArray<Struct>. | ||
|
|
||
| This serves as the primary implementation since PyArrow does not yet | ||
| expose dedicated compute kernels for map projection in the Python API. | ||
| """ | ||
| # 1. Handle Chunked Arrays (Recursion) | ||
| if isinstance(arr, pyarrow.ChunkedArray): | ||
| return pyarrow.chunked_array( | ||
| [_extract_map_component(chunk, component) for chunk in arr.chunks] | ||
| ) | ||
cursor[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| child_array = None | ||
|
|
||
| # Case 1: MapArray | ||
| if isinstance(arr, pyarrow.MapArray): | ||
| if component == MapComponent.KEYS: | ||
| child_array = arr.keys | ||
| else: | ||
| child_array = arr.items | ||
|
|
||
| # Case 2: ListArray<Struct<Key, Value>> | ||
| elif isinstance(arr, (pyarrow.ListArray, pyarrow.LargeListArray)): | ||
| flat_values = arr.values | ||
| if ( | ||
ryankert01 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| isinstance(flat_values, pyarrow.StructArray) | ||
| and flat_values.type.num_fields >= 2 | ||
| ): | ||
| idx = 0 if component == MapComponent.KEYS else 1 | ||
| child_array = flat_values.field(idx) | ||
|
|
||
| if child_array is None: | ||
| # This can happen if the input array is not a supported map type. | ||
| # We allow this to proceed only if the array is empty or all-nulls, | ||
| # in which case we'll produce an empty or all-nulls output. | ||
| if len(arr) > 0 and arr.null_count < len(arr): | ||
| raise TypeError( | ||
| f"Expression is not a map type. .map.{component.value}() can only be " | ||
| f"called on MapArray or List<Struct<key, value>> types, but got {arr.type}." | ||
| ) | ||
ryankert01 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| child_array = pyarrow.array([], type=pyarrow.null()) | ||
|
|
||
| # Reconstruct ListArray & Normalize Offsets | ||
| offsets = arr.offsets | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if len(offsets) > 0: # Handle offsets changes | ||
| start_offset = offsets[0] | ||
| if start_offset.as_py() != 0: | ||
| # Slice child_array to match normalized offsets | ||
| end_offset = offsets[-1] | ||
| child_array = child_array.slice( | ||
| offset=start_offset.as_py(), length=(end_offset - start_offset).as_py() | ||
| ) | ||
| offsets = pc.subtract(offsets, start_offset) | ||
|
|
||
| return pyarrow.ListArray.from_arrays( | ||
| offsets=offsets, values=child_array, mask=arr.is_null() | ||
| ) | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
ryankert01 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
ryankert01 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
|
|
||
| @dataclass | ||
| class _MapNamespace: | ||
| """Namespace for map operations on expression columns. | ||
|
|
||
| This namespace provides methods for operating on map-typed columns | ||
| (including MapArrays and ListArrays of Structs) using PyArrow UDFs. | ||
|
|
||
| Example: | ||
| >>> from ray.data.expressions import col | ||
| >>> # Get keys from map column | ||
| >>> expr = col("headers").map.keys() | ||
| >>> # Get values from map column | ||
| >>> expr = col("headers").map.values() | ||
| """ | ||
|
|
||
| _expr: "Expr" | ||
|
|
||
| def keys(self) -> "UDFExpr": | ||
| """Returns a list expression containing the keys of the map. | ||
|
|
||
| Example: | ||
| >>> from ray.data.expressions import col | ||
| >>> # Get keys from map column | ||
| >>> expr = col("headers").map.keys() | ||
|
|
||
| Returns: | ||
| A list expression containing the keys. | ||
| """ | ||
| return self._create_projection_udf(MapComponent.KEYS) | ||
|
|
||
| def values(self) -> "UDFExpr": | ||
| """Returns a list expression containing the values of the map. | ||
|
|
||
| Example: | ||
| >>> from ray.data.expressions import col | ||
| >>> # Get values from map column | ||
| >>> expr = col("headers").map.values() | ||
|
|
||
| Returns: | ||
| A list expression containing the values. | ||
| """ | ||
| return self._create_projection_udf(MapComponent.VALUES) | ||
|
|
||
| def _create_projection_udf(self, component: MapComponent) -> "UDFExpr": | ||
| """Helper to generate UDFs for map projections.""" | ||
|
|
||
| return_dtype = DataType(object) | ||
| if self._expr.data_type.is_arrow_type(): | ||
| arrow_type = self._expr.data_type.to_arrow_dtype() | ||
|
|
||
| is_physical_map = ( | ||
| pyarrow.types.is_list(arrow_type) | ||
| and pyarrow.types.is_struct(arrow_type.value_type) | ||
| and arrow_type.value_type.num_fields >= 2 | ||
| ) | ||
cursor[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| inner_arrow_type = None | ||
| if pyarrow.types.is_map(arrow_type): | ||
| inner_arrow_type = ( | ||
| arrow_type.key_type | ||
| if component == MapComponent.KEYS | ||
| else arrow_type.item_type | ||
| ) | ||
| elif is_physical_map: | ||
| idx = 0 if component == MapComponent.KEYS else 1 | ||
| inner_arrow_type = arrow_type.value_type.field(idx).type | ||
|
|
||
| if inner_arrow_type: | ||
| return_dtype = DataType.list(DataType.from_arrow(inner_arrow_type)) | ||
cursor[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| @pyarrow_udf(return_dtype=return_dtype) | ||
| def _project_map(arr: pyarrow.Array) -> pyarrow.Array: | ||
| return _extract_map_component(arr, component) | ||
|
|
||
| return _project_map(self._expr) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.