diff --git a/examples/global_order_writes.py b/examples/global_order_writes.py new file mode 100644 index 0000000000..7d3570176a --- /dev/null +++ b/examples/global_order_writes.py @@ -0,0 +1,158 @@ +# global_order_writes.py +# +# LICENSE +# +# The MIT License +# +# Copyright (c) 2025 TileDB, Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# +# DESCRIPTION +# +# This example demonstrates writing to TileDB arrays in global order using +# multiple submit() calls before finalize(). This is useful for writing large +# datasets in batches while ensuring only a single fragment is created. +# + +import numpy as np + +import tiledb + +# Name of the arrays to create +sparse_array_name = "global_order_sparse" +dense_array_name = "global_order_dense" + + +def create_sparse_array(): + """Create a simple 1D sparse array.""" + dim = tiledb.Dim("d1", domain=(1, 1000), tile=100, dtype=np.int32) + dom = tiledb.Domain(dim) + att = tiledb.Attr("a1", dtype=np.int64) + schema = tiledb.ArraySchema(domain=dom, attrs=(att,), sparse=True) + tiledb.Array.create(sparse_array_name, schema) + + +def create_dense_array(): + """Create a simple 1D dense array.""" + dim = tiledb.Dim("d1", domain=(1, 1000), tile=100, dtype=np.int32) + dom = tiledb.Domain(dim) + att = tiledb.Attr("a1", dtype=np.int64) + schema = tiledb.ArraySchema(domain=dom, attrs=(att,), sparse=False) + tiledb.Array.create(dense_array_name, schema) + + +def write_sparse_global_order(): + """Write to sparse array in global order with multiple submits.""" + print("Writing sparse array in global order with multiple submits...") + + with tiledb.open(sparse_array_name, "w") as A: + # Create a query with global order layout + q = tiledb.Query(A, order="G") + + # First batch of data + coords_batch1 = np.array([1, 5, 10, 20, 50], dtype=np.int32) + data_batch1 = np.array([100, 200, 300, 400, 500], dtype=np.int64) + q.set_data({"d1": coords_batch1, "a1": data_batch1}) + q.submit() + + # Second batch of data (coordinates must be in global order relative to first batch) + coords_batch2 = np.array([100, 200, 500], dtype=np.int32) + data_batch2 = np.array([600, 700, 800], dtype=np.int64) + q.set_data({"d1": coords_batch2, "a1": data_batch2}) + q.submit() + + # Finalize to complete the write + q.finalize() + + # Verify only one fragment was created + fragments = tiledb.array_fragments(sparse_array_name) + print(f"Number of fragments created: {len(fragments)}") + print("Sparse array written successfully") + + +def write_dense_global_order(): + """Write to dense array in global order with multiple submits.""" + print("\nWriting dense array in global order with multiple submits...") + + with tiledb.open(dense_array_name, "w") as A: + # Create a query with global order layout + q = tiledb.Query(A, order="G") + + # Set the subarray to cover the full range we're writing + start_coord = 1 + end_coord = 100 + q.set_subarray_ranges([(start_coord, end_coord)]) + + # First batch of data (cells 1-50) + data_batch1 = np.arange(1, 51, dtype=np.int64) + q.set_data({"a1": data_batch1}) + q.submit() + + # Second batch of data (cells 51-100) + data_batch2 = np.arange(51, 101, dtype=np.int64) + q.set_data({"a1": data_batch2}) + q.submit() + + # Finalize to complete the write + q.finalize() + + # Verify only one fragment was created + fragments = tiledb.array_fragments(dense_array_name) + print(f"Number of fragments created: {len(fragments)}") + print("Dense array written successfully") + + +def read_and_verify(): + """Read back the data to verify it was written correctly.""" + print("\nReading and verifying data...") + + # Read sparse array + with tiledb.open(sparse_array_name, "r") as A: + result = A[:] + print(f"Sparse array has {len(result['a1'])} cells") + print(f"First 5 values: {result['a1'][:5]}") + + # Read dense array + with tiledb.open(dense_array_name, "r") as A: + result = A[1:20] # Read cells 1-19 + print(f"Dense array cells 1-19: {result['a1'][:5]}...{result['a1'][-3:]}") + + +if __name__ == "__main__": + print("=" * 60) + print("TileDB Global Order Writes Example") + print("=" * 60) + + # Check if arrays exist, create if not + if tiledb.object_type(sparse_array_name) != "array": + create_sparse_array() + if tiledb.object_type(dense_array_name) != "array": + create_dense_array() + + # Write data + write_sparse_global_order() + write_dense_global_order() + + # Read and verify + read_and_verify() + + print("\n" + "=" * 60) + print("Example completed successfully!") + print("=" * 60) diff --git a/tiledb/query.py b/tiledb/query.py index 9775b62f91..5a37e7121d 100644 --- a/tiledb/query.py +++ b/tiledb/query.py @@ -2,11 +2,13 @@ from json import loads as json_loads from typing import Optional, Sequence, Union +import numpy as np + import tiledb.libtiledb as lt from .array import Array from .ctx import Ctx, CtxMixin, default_ctx -from .domain_indexer import DomainIndexer +from .datatypes import DataType from .subarray import Subarray @@ -35,6 +37,9 @@ def __init__( across one or more attributes. Optionally subselect over attributes, return dense result coordinate values, and specify a layout a result layout / cell-order. + For write mode arrays, the Query can be used to write data with explicit control + over submit() and finalize() operations. + :param array: the Array object to query. :param ctx: the TileDB context. :param attrs: the attributes to subselect over. @@ -46,7 +51,7 @@ def __init__( :param has_coords: (deprecated) if True, return array of coordinate value (default False). :param index_col: For dataframe queries, override the saved index information, and only set specified index(es) in the final dataframe, or None. - :param order: 'C', 'F', or 'G' (row-major, col-major, tiledb global order) + :param order: 'C', 'F', 'G', or 'U' (row-major, col-major, global order, unordered). :param use_arrow: if True, return dataframes via PyArrow if applicable. :param return_arrow: if True, return results as a PyArrow Table if applicable. :param return_incomplete: if True, initialize and return an iterable Query object over the indexed range. @@ -55,8 +60,8 @@ def __init__( resubmitting until query is complete. """ - if array.mode not in ("r", "d"): - raise ValueError("array mode must be read or delete mode") + if array.mode not in ("r", "d", "w"): + raise ValueError("array mode must be read, delete, or write mode") if dims not in (False, None) and has_coords == True: raise ValueError("Cannot pass both dims and has_coords=True to Query") @@ -68,10 +73,39 @@ def __init__( # reference to the array we are querying self._array = array + + query_type_map = { + "r": lt.QueryType.READ, + "d": lt.QueryType.DELETE, + "w": lt.QueryType.WRITE, + } + query_type = query_type_map[array.mode] + super().__init__( - ctx, lt.Array(ctx if ctx is not None else default_ctx(), array) + ctx, lt.Array(ctx if ctx is not None else default_ctx(), array), query_type ) + if order is None: + if array.schema.sparse: + order = "U" # unordered + else: + order = "C" # row-major + + layout_map = { + "C": lt.LayoutType.ROW_MAJOR, + "F": lt.LayoutType.COL_MAJOR, + "G": lt.LayoutType.GLOBAL_ORDER, + "U": lt.LayoutType.UNORDERED, + } + + if order not in layout_map: + raise ValueError( + f"order must be one of {list(layout_map.keys())}, got '{order}'" + ) + + self.layout = layout_map[order] + self._order = order + self._dims = dims if dims == True or has_coords == True: @@ -94,29 +128,19 @@ def __init__( raise lt.TileDBError(f"Selected attribute does not exist: '{name}'") self._attrs = attrs self._cond = cond - - if order == None: - if array.schema.sparse: - self._order = "U" # unordered - else: - self._order = "C" # row-major - else: - self._order = order - self._has_coords = has_coords self._index_col = index_col self._return_arrow = return_arrow - if return_arrow: + self._use_arrow = use_arrow + self._return_incomplete = return_incomplete + + if array.mode in ("r", "d") and return_arrow: if use_arrow is None: use_arrow = True if not use_arrow: raise lt.TileDBError( "Cannot initialize return_arrow with use_arrow=False" ) - self._use_arrow = use_arrow - - self._return_incomplete = return_incomplete - self._domain_index = DomainIndexer(array, query=self) def subarray(self) -> Subarray: """Subarray with the ranges this query is on. @@ -312,5 +336,84 @@ def get_stats(self, print_out=True, json=False): return stats def submit(self): - """An alias for calling the regular indexer [:]""" - return self[:] + """Submit the query. + + For read/delete queries: an alias for calling the regular indexer [:]. + For write queries: submits the write query with current buffers. + """ + if self._array.mode in ("r", "d"): + return self[:] + else: + # Write mode - submit the underlying query + return self._submit() + + def finalize(self): + """Finalize a query.""" + super().finalize() + + def set_data(self, data): + """Set data buffers for write queries. + + :param data: Dictionary mapping attribute/dimension names to numpy arrays, + or a single numpy array if the array has a single attribute. + :raises ValueError: if array is not in write mode or invalid data provided + + Example: + >>> import tiledb, numpy as np + >>> with tiledb.open(uri, 'w') as A: + ... q = tiledb.Query(A, order='G') + ... q.set_data({'d1': np.array([1, 5, 10]), 'a1': np.array([100, 200, 300])}) + ... q.submit() + ... q.set_data({'d1': np.array([15, 20]), 'a1': np.array([400, 500])}) + ... q.submit() + ... q.finalize() + """ + if self._array.mode != "w": + raise ValueError("set_data() is only supported for arrays in write mode") + + schema = self._array.schema + + # Convert single array to dict + if isinstance(data, np.ndarray): + if schema.nattr != 1: + raise ValueError( + "Single array provided but schema has multiple attributes" + ) + data = {schema.attr(0).name: data} + + if not isinstance(data, dict): + raise ValueError("data must be a dict or numpy array") + + # Set buffers for each attribute/dimension + for name, buffer in data.items(): + if not isinstance(buffer, np.ndarray): + buffer = np.array(buffer) + + # Determine ncells based on datatype + if schema.has_attr(name): + dtype = schema.attr(name).dtype + elif schema.domain.has_dim(name): + dtype = schema.domain.dim(name).dtype + else: + raise ValueError(f"Unknown attribute or dimension: {name}") + + ncells = DataType.from_numpy(dtype).ncells + buffer_size = np.uint64(len(buffer) * ncells) + + self.set_data_buffer(name, buffer, buffer_size) + + def set_subarray_ranges(self, ranges): + """Set subarray for dense array writes. + + :param ranges: List of (start, end) tuples, one per dimension. + """ + + if self._array.mode != "w": + raise ValueError( + "set_subarray_ranges() is only supported for arrays in write mode" + ) + + subarray = Subarray(self._array, self._ctx) + dim_ranges = [[r] for r in ranges] + subarray.add_ranges(dim_ranges) + self.set_subarray(subarray) diff --git a/tiledb/tests/test_query.py b/tiledb/tests/test_query.py index efc6603961..debb8c7d99 100644 --- a/tiledb/tests/test_query.py +++ b/tiledb/tests/test_query.py @@ -38,3 +38,190 @@ def test_label_range_query(self): query._submit() output_subarray = query.subarray() assert output_subarray.num_dim_ranges(0) == 2 + + @pytest.mark.parametrize("sparse", [True, False]) + def test_global_order_write_single_submit(self, sparse): + """Test writing in global order with a single submit and finalize.""" + uri = self.path("test_global_order_single") + + # Create schema + dim = tiledb.Dim("d1", domain=(1, 100), tile=10, dtype=np.int32) + dom = tiledb.Domain(dim) + att = tiledb.Attr("a1", dtype=np.int64) + schema = tiledb.ArraySchema(domain=dom, attrs=(att,), sparse=sparse) + tiledb.Array.create(uri, schema) + + # Write using Query with global order + with tiledb.open(uri, "w") as A: + q = tiledb.Query(A, order="G") + + if sparse: + coords = np.array([1, 5, 10, 15, 20], dtype=np.int32) + data = np.array([100, 200, 300, 400, 500], dtype=np.int64) + q.set_data({"d1": coords, "a1": data}) + else: + start_coord = 1 + end_coord = 20 + data = np.arange( + 100, 100 + (end_coord - start_coord + 1), dtype=np.int64 + ) + q.set_subarray_ranges([(start_coord, end_coord)]) + q.set_data({"a1": data}) + + q.submit() + q.finalize() + + # Verify only one fragment was created + fragments_info = tiledb.array_fragments(uri) + assert len(fragments_info) == 1 + + # Verify data + with tiledb.open(uri, "r") as A: + if sparse: + result = A[:] + np.testing.assert_array_equal(result["a1"], data) + np.testing.assert_array_equal(result["d1"], coords) + else: + result = A[start_coord:end_coord] + np.testing.assert_array_equal(result["a1"], data[:-1]) + + @pytest.mark.parametrize("sparse", [True, False]) + def test_global_order_write_multiple_submits(self, sparse): + """Test writing in global order with multiple submits before finalize.""" + uri = self.path("test_global_order_multiple") + + # Create schema + dim = tiledb.Dim("d1", domain=(1, 100), tile=10, dtype=np.int32) + dom = tiledb.Domain(dim) + att = tiledb.Attr("a1", dtype=np.int64) + schema = tiledb.ArraySchema(domain=dom, attrs=(att,), sparse=sparse) + tiledb.Array.create(uri, schema) + + # Write using Query with global order and multiple submits + with tiledb.open(uri, "w") as A: + q = tiledb.Query(A, order="G") + + if sparse: + # First submit + coords_batch1 = np.array([1, 5, 10], dtype=np.int32) + data_batch1 = np.array([100, 200, 300], dtype=np.int64) + q.set_data({"d1": coords_batch1, "a1": data_batch1}) + q.submit() + + # Second submit + coords_batch2 = np.array([15, 20], dtype=np.int32) + data_batch2 = np.array([400, 500], dtype=np.int64) + q.set_data({"d1": coords_batch2, "a1": data_batch2}) + q.submit() + else: + # For dense arrays, set subarray once to cover full range + start_coord = 1 + end_coord = 20 + q.set_subarray_ranges([(start_coord, end_coord)]) + + # First submit - first batch of cells + mid_point = 10 + data_batch1 = np.arange(100, 100 + mid_point, dtype=np.int64) + q.set_data({"a1": data_batch1}) + q.submit() + + # Second submit - second batch of cells + data_batch2 = np.arange( + 100 + mid_point, 100 + (end_coord - start_coord + 1), dtype=np.int64 + ) + q.set_data({"a1": data_batch2}) + q.submit() + + q.finalize() + + # Verify only one fragment was created + fragments_info = tiledb.array_fragments(uri) + assert len(fragments_info) == 1 + + # Verify data + with tiledb.open(uri, "r") as A: + if sparse: + result = A[:] + expected_data = np.array([100, 200, 300, 400, 500], dtype=np.int64) + expected_coords = np.array([1, 5, 10, 15, 20], dtype=np.int32) + np.testing.assert_array_equal(result["a1"], expected_data) + np.testing.assert_array_equal(result["d1"], expected_coords) + else: + result = A[1:20] + expected_data = np.arange(100, 120, dtype=np.int64) + np.testing.assert_array_equal(result["a1"], expected_data[:-1]) + + @pytest.mark.parametrize( + "sparse,order", + [ + (True, "U"), # Sparse arrays support unordered + (False, "C"), # Dense arrays support row-major + (False, "F"), # Dense arrays support col-major + ], + ) + def test_non_global_order_writes(self, sparse, order): + """Test writing in non-global-order modes where each submit creates a new fragment. + + For row-major (C), col-major (F), and unordered (U) modes, finalize is not needed. + Each submit creates a separate fragment - incremental writes are not possible. + + Note: C/F (row/col-major) only work for dense arrays, U (unordered) only for sparse. + """ + uri = self.path( + f"test_{order}_order_multiple_{'sparse' if sparse else 'dense'}" + ) + + # Create schema + dim = tiledb.Dim("d1", domain=(1, 100), tile=10, dtype=np.int32) + dom = tiledb.Domain(dim) + att = tiledb.Attr("a1", dtype=np.int64) + schema = tiledb.ArraySchema(domain=dom, attrs=(att,), sparse=sparse) + tiledb.Array.create(uri, schema) + + # Write using Query with multiple submits (each creates a fragment) + with tiledb.open(uri, "w") as A: + if sparse: + # First submit + q1 = tiledb.Query(A, order=order) + coords_batch1 = np.array([1, 5, 10], dtype=np.int32) + data_batch1 = np.array([100, 200, 300], dtype=np.int64) + q1.set_data({"d1": coords_batch1, "a1": data_batch1}) + q1.submit() + + # Second submit - creates a new fragment + q2 = tiledb.Query(A, order=order) + coords_batch2 = np.array([15, 20], dtype=np.int32) + data_batch2 = np.array([400, 500], dtype=np.int64) + q2.set_data({"d1": coords_batch2, "a1": data_batch2}) + q2.submit() + else: + # First submit + q1 = tiledb.Query(A, order=order) + q1.set_subarray_ranges([(1, 10)]) + data_batch1 = np.arange(100, 110, dtype=np.int64) + q1.set_data({"a1": data_batch1}) + q1.submit() + + # Second submit - creates a new fragment + q2 = tiledb.Query(A, order=order) + q2.set_subarray_ranges([(11, 20)]) + data_batch2 = np.arange(110, 120, dtype=np.int64) + q2.set_data({"a1": data_batch2}) + q2.submit() + + # Verify two fragments were created (one per submit) + fragments_info = tiledb.array_fragments(uri) + assert len(fragments_info) == 2 + + # Verify data + with tiledb.open(uri, "r") as A: + if sparse: + result = A[:] + expected_data = np.array([100, 200, 300, 400, 500], dtype=np.int64) + expected_coords = np.array([1, 5, 10, 15, 20], dtype=np.int32) + np.testing.assert_array_equal(result["a1"], expected_data) + np.testing.assert_array_equal(result["d1"], expected_coords) + else: + result = A[1:20] + expected_data = np.arange(100, 120, dtype=np.int64) + np.testing.assert_array_equal(result["a1"], expected_data[:-1])