Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions examples/global_order_writes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# global_order_writes.py
#
# LICENSE
#
# The MIT License
#
# Copyright (c) 2025 TileDB, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# DESCRIPTION
#
# This example demonstrates writing to TileDB arrays in global order using
# multiple submit() calls before finalize(). This is useful for writing large
# datasets in batches while ensuring only a single fragment is created.
#

import numpy as np

import tiledb

# Name of the arrays to create
sparse_array_name = "global_order_sparse"
dense_array_name = "global_order_dense"


def create_sparse_array():
"""Create a simple 1D sparse array."""
dim = tiledb.Dim("d1", domain=(1, 1000), tile=100, dtype=np.int32)
dom = tiledb.Domain(dim)
att = tiledb.Attr("a1", dtype=np.int64)
schema = tiledb.ArraySchema(domain=dom, attrs=(att,), sparse=True)
tiledb.Array.create(sparse_array_name, schema)


def create_dense_array():
"""Create a simple 1D dense array."""
dim = tiledb.Dim("d1", domain=(1, 1000), tile=100, dtype=np.int32)
dom = tiledb.Domain(dim)
att = tiledb.Attr("a1", dtype=np.int64)
schema = tiledb.ArraySchema(domain=dom, attrs=(att,), sparse=False)
tiledb.Array.create(dense_array_name, schema)


def write_sparse_global_order():
"""Write to sparse array in global order with multiple submits."""
print("Writing sparse array in global order with multiple submits...")

with tiledb.open(sparse_array_name, "w") as A:
# Create a query with global order layout
q = tiledb.Query(A, order="G")

# First batch of data
coords_batch1 = np.array([1, 5, 10, 20, 50], dtype=np.int32)
data_batch1 = np.array([100, 200, 300, 400, 500], dtype=np.int64)
q.set_data({"d1": coords_batch1, "a1": data_batch1})
q.submit()

# Second batch of data (coordinates must be in global order relative to first batch)
coords_batch2 = np.array([100, 200, 500], dtype=np.int32)
data_batch2 = np.array([600, 700, 800], dtype=np.int64)
q.set_data({"d1": coords_batch2, "a1": data_batch2})
q.submit()

# Finalize to complete the write
q.finalize()

# Verify only one fragment was created
fragments = tiledb.array_fragments(sparse_array_name)
print(f"Number of fragments created: {len(fragments)}")
print("Sparse array written successfully")


def write_dense_global_order():
"""Write to dense array in global order with multiple submits."""
print("\nWriting dense array in global order with multiple submits...")

with tiledb.open(dense_array_name, "w") as A:
# Create a query with global order layout
q = tiledb.Query(A, order="G")

# Set the subarray to cover the full range we're writing
start_coord = 1
end_coord = 100
q.set_subarray_ranges([(start_coord, end_coord)])

# First batch of data (cells 1-50)
data_batch1 = np.arange(1, 51, dtype=np.int64)
q.set_data({"a1": data_batch1})
q.submit()

# Second batch of data (cells 51-100)
data_batch2 = np.arange(51, 101, dtype=np.int64)
q.set_data({"a1": data_batch2})
q.submit()

# Finalize to complete the write
q.finalize()

# Verify only one fragment was created
fragments = tiledb.array_fragments(dense_array_name)
print(f"Number of fragments created: {len(fragments)}")
print("Dense array written successfully")


def read_and_verify():
"""Read back the data to verify it was written correctly."""
print("\nReading and verifying data...")

# Read sparse array
with tiledb.open(sparse_array_name, "r") as A:
result = A[:]
print(f"Sparse array has {len(result['a1'])} cells")
print(f"First 5 values: {result['a1'][:5]}")

# Read dense array
with tiledb.open(dense_array_name, "r") as A:
result = A[1:20] # Read cells 1-19
print(f"Dense array cells 1-19: {result['a1'][:5]}...{result['a1'][-3:]}")


if __name__ == "__main__":
print("=" * 60)
print("TileDB Global Order Writes Example")
print("=" * 60)

# Check if arrays exist, create if not
if tiledb.object_type(sparse_array_name) != "array":
create_sparse_array()
if tiledb.object_type(dense_array_name) != "array":
create_dense_array()

# Write data
write_sparse_global_order()
write_dense_global_order()

# Read and verify
read_and_verify()

print("\n" + "=" * 60)
print("Example completed successfully!")
print("=" * 60)
145 changes: 124 additions & 21 deletions tiledb/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
from json import loads as json_loads
from typing import Optional, Sequence, Union

import numpy as np

import tiledb.libtiledb as lt

from .array import Array
from .ctx import Ctx, CtxMixin, default_ctx
from .domain_indexer import DomainIndexer
from .datatypes import DataType
from .subarray import Subarray


Expand Down Expand Up @@ -35,6 +37,9 @@ def __init__(
across one or more attributes. Optionally subselect over attributes, return
dense result coordinate values, and specify a layout a result layout / cell-order.

For write mode arrays, the Query can be used to write data with explicit control
over submit() and finalize() operations.

:param array: the Array object to query.
:param ctx: the TileDB context.
:param attrs: the attributes to subselect over.
Expand All @@ -46,7 +51,7 @@ def __init__(
:param has_coords: (deprecated) if True, return array of coordinate value (default False).
:param index_col: For dataframe queries, override the saved index information,
and only set specified index(es) in the final dataframe, or None.
:param order: 'C', 'F', or 'G' (row-major, col-major, tiledb global order)
:param order: 'C', 'F', 'G', or 'U' (row-major, col-major, global order, unordered).
:param use_arrow: if True, return dataframes via PyArrow if applicable.
:param return_arrow: if True, return results as a PyArrow Table if applicable.
:param return_incomplete: if True, initialize and return an iterable Query object over the indexed range.
Expand All @@ -55,8 +60,8 @@ def __init__(
resubmitting until query is complete.
"""

if array.mode not in ("r", "d"):
raise ValueError("array mode must be read or delete mode")
if array.mode not in ("r", "d", "w"):
raise ValueError("array mode must be read, delete, or write mode")

if dims not in (False, None) and has_coords == True:
raise ValueError("Cannot pass both dims and has_coords=True to Query")
Expand All @@ -68,10 +73,39 @@ def __init__(

# reference to the array we are querying
self._array = array

query_type_map = {
"r": lt.QueryType.READ,
"d": lt.QueryType.DELETE,
"w": lt.QueryType.WRITE,
}
query_type = query_type_map[array.mode]

super().__init__(
ctx, lt.Array(ctx if ctx is not None else default_ctx(), array)
ctx, lt.Array(ctx if ctx is not None else default_ctx(), array), query_type
)

if order is None:
if array.schema.sparse:
order = "U" # unordered
else:
order = "C" # row-major

layout_map = {
"C": lt.LayoutType.ROW_MAJOR,
"F": lt.LayoutType.COL_MAJOR,
"G": lt.LayoutType.GLOBAL_ORDER,
"U": lt.LayoutType.UNORDERED,
}

if order not in layout_map:
raise ValueError(
f"order must be one of {list(layout_map.keys())}, got '{order}'"
)

self.layout = layout_map[order]
self._order = order

self._dims = dims

if dims == True or has_coords == True:
Expand All @@ -94,29 +128,19 @@ def __init__(
raise lt.TileDBError(f"Selected attribute does not exist: '{name}'")
self._attrs = attrs
self._cond = cond

if order == None:
if array.schema.sparse:
self._order = "U" # unordered
else:
self._order = "C" # row-major
else:
self._order = order

self._has_coords = has_coords
self._index_col = index_col
self._return_arrow = return_arrow
if return_arrow:
self._use_arrow = use_arrow
self._return_incomplete = return_incomplete

if array.mode in ("r", "d") and return_arrow:
if use_arrow is None:
use_arrow = True
if not use_arrow:
raise lt.TileDBError(
"Cannot initialize return_arrow with use_arrow=False"
)
self._use_arrow = use_arrow

self._return_incomplete = return_incomplete
self._domain_index = DomainIndexer(array, query=self)

def subarray(self) -> Subarray:
"""Subarray with the ranges this query is on.
Expand Down Expand Up @@ -312,5 +336,84 @@ def get_stats(self, print_out=True, json=False):
return stats

def submit(self):
"""An alias for calling the regular indexer [:]"""
return self[:]
"""Submit the query.

For read/delete queries: an alias for calling the regular indexer [:].
For write queries: submits the write query with current buffers.
"""
if self._array.mode in ("r", "d"):
return self[:]
else:
# Write mode - submit the underlying query
return self._submit()

def finalize(self):
"""Finalize a query."""
super().finalize()

def set_data(self, data):
"""Set data buffers for write queries.

:param data: Dictionary mapping attribute/dimension names to numpy arrays,
or a single numpy array if the array has a single attribute.
:raises ValueError: if array is not in write mode or invalid data provided

Example:
>>> import tiledb, numpy as np
>>> with tiledb.open(uri, 'w') as A:
... q = tiledb.Query(A, order='G')
... q.set_data({'d1': np.array([1, 5, 10]), 'a1': np.array([100, 200, 300])})
... q.submit()
... q.set_data({'d1': np.array([15, 20]), 'a1': np.array([400, 500])})
... q.submit()
... q.finalize()
"""
if self._array.mode != "w":
raise ValueError("set_data() is only supported for arrays in write mode")

schema = self._array.schema

# Convert single array to dict
if isinstance(data, np.ndarray):
if schema.nattr != 1:
raise ValueError(
"Single array provided but schema has multiple attributes"
)
data = {schema.attr(0).name: data}

if not isinstance(data, dict):
raise ValueError("data must be a dict or numpy array")

# Set buffers for each attribute/dimension
for name, buffer in data.items():
if not isinstance(buffer, np.ndarray):
buffer = np.array(buffer)

# Determine ncells based on datatype
if schema.has_attr(name):
dtype = schema.attr(name).dtype
elif schema.domain.has_dim(name):
dtype = schema.domain.dim(name).dtype
else:
raise ValueError(f"Unknown attribute or dimension: {name}")

ncells = DataType.from_numpy(dtype).ncells
buffer_size = np.uint64(len(buffer) * ncells)

self.set_data_buffer(name, buffer, buffer_size)

def set_subarray_ranges(self, ranges):
"""Set subarray for dense array writes.

:param ranges: List of (start, end) tuples, one per dimension.
"""

if self._array.mode != "w":
raise ValueError(
"set_subarray_ranges() is only supported for arrays in write mode"
)

subarray = Subarray(self._array, self._ctx)
dim_ranges = [[r] for r in ranges]
subarray.add_ranges(dim_ranges)
self.set_subarray(subarray)
Loading
Loading