Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 172 additions & 0 deletions tiledb/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import tiledb.cc as lt

from .ctx import Config, Ctx, default_ctx
from .datatypes import DataType
from .domain_indexer import DomainIndexer
from .enumeration import Enumeration
from .metadata import Metadata
Expand Down Expand Up @@ -769,6 +770,177 @@ def domain_index(self):
def dindex(self):
return self.domain_index

def _write_array(
tiledb_array,
subarray,
coordinates: list,
buffer_names: list,
values: list,
labels: dict,
nullmaps: dict,
issparse: bool,
):
# used for buffer conversion (local import to avoid circularity)
from .main import array_to_buffer

isfortran = False
nattr = len(buffer_names)
nlabel = len(labels)

# Create arrays to hold buffer sizes
nbuffer = nattr + nlabel
if issparse:
nbuffer += tiledb_array.schema.ndim
buffer_sizes = np.zeros((nbuffer,), dtype=np.uint64)

# Create lists for data and offset buffers
output_values = list()
output_offsets = list()

# Set data and offset buffers for attributes
for i in range(nattr):
# if dtype is ASCII, ensure all characters are valid
if tiledb_array.schema.attr(i).isascii:
try:
values[i] = np.asarray(values[i], dtype=np.bytes_)
except Exception as exc:
raise tiledb.TileDBError(
f'dtype of attr {tiledb_array.schema.attr(i).name} is "ascii" but attr_val contains invalid ASCII characters'
)

attr = tiledb_array.schema.attr(i)

if attr.isvar:
try:
if attr.isnullable:
if np.issubdtype(attr.dtype, np.str_) or np.issubdtype(
attr.dtype, np.bytes_
):
attr_val = np.array(
["" if v is None else v for v in values[i]]
)
else:
attr_val = np.nan_to_num(values[i])
else:
attr_val = values[i]
buffer, offsets = array_to_buffer(attr_val, True, False)
except Exception as exc:
raise type(exc)(
f"Failed to convert buffer for attribute: '{attr.name}'"
) from exc
else:
buffer, offsets = values[i], None

buffer_sizes[i] = buffer.nbytes // (attr.dtype.itemsize or 1)
output_values.append(buffer)
output_offsets.append(offsets)

# Check value layouts
if len(values) and nattr > 1:
value = output_values[0]
isfortran = value.ndim > 1 and value.flags.f_contiguous
for value in values:
if value.ndim > 1 and value.flags.f_contiguous and not isfortran:
raise ValueError("mixed C and Fortran array layouts")

# Set data and offsets buffers for dimensions (sparse arrays only)
ibuffer = nattr
if issparse:
for dim_idx, coords in enumerate(coordinates):
dim = tiledb_array.schema.domain.dim(dim_idx)
if dim.isvar:
buffer, offsets = array_to_buffer(coords, True, False)
buffer_sizes[ibuffer] = buffer.nbytes // (dim.dtype.itemsize or 1)
else:
buffer, offsets = coords, None
buffer_sizes[ibuffer] = buffer.nbytes // (dim.dtype.itemsize or 1)
output_values.append(buffer)
output_offsets.append(offsets)

name = dim.name
buffer_names.append(name)

ibuffer = ibuffer + 1

for label_name, label_values in labels.items():
# Append buffer name
buffer_names.append(label_name)
# Get label data buffer and offsets buffer for the labels
dim_label = tiledb_array.schema.dim_label(label_name)
if dim_label.isvar:
buffer, offsets = array_to_buffer(label_values, True, False)
buffer_sizes[ibuffer] = buffer.nbytes // (dim_label.dtype.itemsize or 1)
else:
buffer, offsets = label_values, None
buffer_sizes[ibuffer] = buffer.nbytes // (dim_label.dtype.itemsize or 1)
# Append the buffers
output_values.append(buffer)
output_offsets.append(offsets)

ibuffer = ibuffer + 1

# Allocate the query
ctx = lt.Context(tiledb_array.ctx)
q = lt.Query(ctx, tiledb_array.array, lt.QueryType.WRITE)

# Set the layout
layout = (
lt.LayoutType.UNORDERED
if issparse
else (lt.LayoutType.COL_MAJOR if isfortran else lt.LayoutType.ROW_MAJOR)
)
q.layout = layout

# Create and set the subarray for the query (dense arrays only)
if not issparse:
q.set_subarray(subarray)

# Set buffers on the query
for i, buffer_name in enumerate(buffer_names):
# Set data buffer
ncells = DataType.from_numpy(output_values[i].dtype).ncells
q.set_data_buffer(
buffer_name,
output_values[i],
buffer_sizes[i] * ncells,
)

# Set offsets buffer
if output_offsets[i] is not None:
output_offsets[i] = output_offsets[i].astype(np.uint64)
q.set_offsets_buffer(
buffer_name, output_offsets[i], output_offsets[i].size
)

# Set validity buffer
if buffer_name in nullmaps:
nulmap = nullmaps[buffer_name]
q.set_validity_buffer(buffer_name, nulmap, nulmap.size)

q._submit()
q.finalize()

fragment_info = tiledb_array.last_fragment_info
if fragment_info is not False:
if not isinstance(fragment_info, dict):
raise ValueError(
f"Expected fragment_info to be a dict, got {type(fragment_info)}"
)
fragment_info.clear()

result = dict()
num_fragments = q.fragment_num()

if num_fragments < 1:
return result

for fragment_idx in range(0, num_fragments):
fragment_uri = q.fragment_uri(fragment_idx)
fragment_t1, fragment_t2 = q.fragment_timestamp_range(fragment_idx)
result[fragment_uri] = (fragment_t1, fragment_t2)

fragment_info.update(result)

def label_index(self, labels):
"""Retrieve data cells with multi-range, domain-inclusive indexing by label.
Returns the cross-product of the ranges.
Expand Down
15 changes: 9 additions & 6 deletions tiledb/cc/query.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ void init_query(py::module &m) {

.def("fragment_uri", &Query::fragment_uri)

.def("fragment_timestamp_range", &Query::fragment_timestamp_range)

.def("query_status", &Query::query_status)

.def("set_condition", &Query::set_condition)
Expand All @@ -71,13 +73,14 @@ void init_query(py::module &m) {
// uint64_t))&Query::set_data_buffer);

.def("set_data_buffer",
[](Query &q, std::string name, py::array a, uint32_t buff_size) {
q.set_data_buffer(name, const_cast<void *>(a.data()), buff_size);
[](Query &q, std::string name, py::array a, uint64_t nelements) {
QueryExperimental::set_data_buffer(
q, name, const_cast<void *>(a.data()), nelements);
})

.def("set_offsets_buffer",
[](Query &q, std::string name, py::array a, uint32_t buff_size) {
q.set_offsets_buffer(name, (uint64_t *)(a.data()), buff_size);
[](Query &q, std::string name, py::array a, uint64_t nelements) {
q.set_offsets_buffer(name, (uint64_t *)(a.data()), nelements);
})

.def("set_subarray",
Expand All @@ -86,8 +89,8 @@ void init_query(py::module &m) {
})

.def("set_validity_buffer",
[](Query &q, std::string name, py::array a, uint32_t buff_size) {
q.set_validity_buffer(name, (uint8_t *)(a.data()), buff_size);
[](Query &q, std::string name, py::array a, uint64_t nelements) {
q.set_validity_buffer(name, (uint8_t *)(a.data()), nelements);
})

.def("_submit", &Query::submit, py::call_guard<py::gil_scoped_release>())
Expand Down
6 changes: 1 addition & 5 deletions tiledb/dense_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,11 +575,7 @@ def _setitem_impl(self, selection, val, nullmaps: dict):
f"validity bitmap, got {type(val)}"
)

from .libtiledb import _write_array_wrapper

_write_array_wrapper(
self, subarray, [], attributes, values, labels, nullmaps, False
)
self._write_array(subarray, [], attributes, values, labels, nullmaps, False)

def __array__(self, dtype=None, **kw):
"""Implementation of numpy __array__ protocol (internal).
Expand Down
Loading
Loading