Skip to content

Commit a35426a

Browse files
eddelbuettelKiterLuckounelisagisihnorton
authored
Extend Arrow support to cover nullable data. (#4049)
The `arrowio` header provides import export support from/to TileDB and Arrow with its interface of two `void*` pointers. This PR extends the support to cover 'nullable' aka 'validity map' data. The PR is in need of some tests but the existing tests is a little involved between Python, pybind11 and C++ so @ihnorton has kindly 'volunteered' to add this. The PR will remain a draft til we have tests. [sc-27472] --- TYPE: FEATURE DESC: Extend Arrow support to cover nullable data. --------- Co-authored-by: Luc Rancourt <[email protected]> Co-authored-by: Agisilaos Kounelis <[email protected]> Co-authored-by: Agisilaos Kounelis <[email protected]> Co-authored-by: Isaiah Norton <[email protected]>
1 parent 9730fee commit a35426a

File tree

3 files changed

+102
-20
lines changed

3 files changed

+102
-20
lines changed

test/src/unit-arrow.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ struct CPPArrayFx {
9090
str_attr.set_cell_val_num(TILEDB_VAR_NUM);
9191
attrs.push_back(str_attr);
9292
}
93+
{
94+
auto str_attr = Attribute(ctx, "utf_string3", TILEDB_STRING_UTF8);
95+
str_attr.set_cell_val_num(TILEDB_VAR_NUM);
96+
attrs.push_back(str_attr);
97+
}
9398
{
9499
auto str_attr = Attribute(ctx, "tiledb_char", TILEDB_CHAR);
95100
str_attr.set_cell_val_num(TILEDB_VAR_NUM);

test/src/unit_arrow.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ def create(self):
9292
utf_strings[np.random.randint(0, col_size, size=col_size//2)] = ''
9393
self.data['utf_string2'] = pa.array(utf_strings)
9494

95+
# another version with some cells set to NULL
96+
utf_strings[np.random.randint(0, col_size, size=col_size//2)] = None
97+
self.data['utf_string3'] = pa.array(utf_strings)
98+
9599
self.data['datetime_ns'] = pa.array(rand_datetime64_array(col_size))
96100

97101
##########################################################################

tiledb/sm/cpp_api/arrow_io_impl.h

Lines changed: 93 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@
4040
* source: https://arrow.apache.org/docs/format/CDataInterface.html
4141
*/
4242

43+
#ifndef ARROW_C_DATA_INTERFACE
44+
#define ARROW_C_DATA_INTERFACE
45+
4346
#define ARROW_FLAG_DICTIONARY_ORDERED 1
4447
#define ARROW_FLAG_NULLABLE 2
4548
#define ARROW_FLAG_MAP_KEYS_SORTED 4
@@ -76,6 +79,9 @@ struct ArrowArray {
7679
// Opaque producer-specific data
7780
void* private_data;
7881
};
82+
83+
#endif
84+
7985
/* End Arrow C API */
8086
/* ************************************************************************ */
8187

@@ -123,17 +129,21 @@ struct TypeInfo {
123129

124130
// is this represented as "Arrow large"
125131
bool arrow_large;
132+
133+
bool nullable;
126134
};
127135

128136
struct BufferInfo {
129137
TypeInfo tdbtype;
130138
bool is_var; // is var-length
139+
bool is_nullable; // is nullable
131140
uint64_t data_num; // number of data elements
132141
void* data; // data pointer
133142
uint64_t data_elem_size; // bytes per data element
134143
uint64_t offsets_num; // number of offsets
135144
void* offsets; // offsets pointer
136145
size_t offsets_elem_size; // bytes per offset element
146+
uint8_t* validity; // optional validity buffer (if is_nullable)
137147
};
138148

139149
/* ****************************** */
@@ -262,6 +272,7 @@ ArrowInfo tiledb_buffer_arrow_fmt(BufferInfo bufferinfo, bool use_list = true) {
262272

263273
TypeInfo arrow_type_to_tiledb(ArrowSchema* arw_schema) {
264274
auto fmt = std::string(arw_schema->format);
275+
bool nullable = arw_schema->flags & ARROW_FLAG_NULLABLE;
265276
bool large = false;
266277
if (fmt == "+l") {
267278
large = false;
@@ -274,36 +285,36 @@ TypeInfo arrow_type_to_tiledb(ArrowSchema* arw_schema) {
274285
}
275286

276287
if (fmt == "i")
277-
return {TILEDB_INT32, 4, 1, large};
288+
return {TILEDB_INT32, 4, 1, large, nullable};
278289
else if (fmt == "l")
279-
return {TILEDB_INT64, 8, 1, large};
290+
return {TILEDB_INT64, 8, 1, large, nullable};
280291
else if (fmt == "f")
281-
return {TILEDB_FLOAT32, 4, 1, large};
292+
return {TILEDB_FLOAT32, 4, 1, large, nullable};
282293
else if (fmt == "g")
283-
return {TILEDB_FLOAT64, 8, 1, large};
294+
return {TILEDB_FLOAT64, 8, 1, large, nullable};
284295
else if (fmt == "B")
285-
return {TILEDB_BLOB, 1, 1, large};
296+
return {TILEDB_BLOB, 1, 1, large, nullable};
286297
else if (fmt == "c")
287-
return {TILEDB_INT8, 1, 1, large};
298+
return {TILEDB_INT8, 1, 1, large, nullable};
288299
else if (fmt == "C")
289-
return {TILEDB_UINT8, 1, 1, large};
300+
return {TILEDB_UINT8, 1, 1, large, nullable};
290301
else if (fmt == "s")
291-
return {TILEDB_INT16, 2, 1, large};
302+
return {TILEDB_INT16, 2, 1, large, nullable};
292303
else if (fmt == "S")
293-
return {TILEDB_UINT16, 2, 1, large};
304+
return {TILEDB_UINT16, 2, 1, large, nullable};
294305
else if (fmt == "I")
295-
return {TILEDB_UINT32, 4, 1, large};
306+
return {TILEDB_UINT32, 4, 1, large, nullable};
296307
else if (fmt == "L")
297-
return {TILEDB_UINT64, 8, 1, large};
308+
return {TILEDB_UINT64, 8, 1, large, nullable};
298309
// this is kind of a hack
299310
// technically 'tsn:' is timezone-specific, which we don't support
300311
// however, the blank (no suffix) base is interconvertible w/ np.datetime64
301312
else if (fmt == "tsn:")
302-
return {TILEDB_DATETIME_NS, 8, 1, large};
313+
return {TILEDB_DATETIME_NS, 8, 1, large, nullable};
303314
else if (fmt == "z" || fmt == "Z")
304-
return {TILEDB_CHAR, 1, TILEDB_VAR_NUM, fmt == "Z"};
315+
return {TILEDB_CHAR, 1, TILEDB_VAR_NUM, fmt == "Z", nullable};
305316
else if (fmt == "u" || fmt == "U")
306-
return {TILEDB_STRING_UTF8, 1, TILEDB_VAR_NUM, fmt == "U"};
317+
return {TILEDB_STRING_UTF8, 1, TILEDB_VAR_NUM, fmt == "U", nullable};
307318
else
308319
throw tiledb::TileDBError(
309320
"[TileDB-Arrow]: Unknown or unsupported Arrow format string '" + fmt +
@@ -314,9 +325,11 @@ TypeInfo tiledb_dt_info(const ArraySchema& schema, const std::string& name) {
314325
if (schema.has_attribute(name)) {
315326
auto attr = schema.attribute(name);
316327
auto retval = TypeInfo();
317-
retval.type = attr.type(),
318-
retval.elem_size = tiledb::impl::type_size(attr.type()),
319-
retval.cell_val_num = attr.cell_val_num(), retval.arrow_large = false;
328+
retval.type = attr.type();
329+
retval.elem_size = tiledb::impl::type_size(attr.type());
330+
retval.cell_val_num = attr.cell_val_num();
331+
retval.arrow_large = false;
332+
retval.nullable = attr.nullable();
320333
return retval;
321334
} else if (schema.domain().has_dimension(name)) {
322335
auto dom = schema.domain();
@@ -327,6 +340,7 @@ TypeInfo tiledb_dt_info(const ArraySchema& schema, const std::string& name) {
327340
retval.elem_size = tiledb::impl::type_size(dim.type());
328341
retval.cell_val_num = dim.cell_val_num();
329342
retval.arrow_large = false;
343+
retval.nullable = false;
330344
return retval;
331345
} else {
332346
throw TDB_LERROR("Schema does not have attribute named '" + name + "'");
@@ -604,6 +618,18 @@ ArrowImporter::~ArrowImporter() {
604618
}
605619
}
606620

621+
static inline int8_t bitmap_get(const uint8_t* bits, int64_t i) {
622+
return (bits[i >> 3] >> (i & 0x07)) & 1;
623+
}
624+
625+
static void bitmap_to_bytemap(void* bitmap, int64_t n) {
626+
uint8_t* bmp = static_cast<uint8_t*>(bitmap);
627+
std::vector<uint8_t> valcpy(bmp, bmp + n); // we make as we will overwrite.
628+
for (auto i = 0; i < n; i++) {
629+
bmp[i] = bitmap_get(valcpy.data(), i);
630+
}
631+
}
632+
607633
void ArrowImporter::import_(
608634
std::string name, ArrowArray* arw_array, ArrowSchema* arw_schema) {
609635
auto typeinfo = arrow_type_to_tiledb(arw_schema);
@@ -630,6 +656,7 @@ void ArrowImporter::import_(
630656
query_->set_data_buffer(name, p_data, data_nbytes);
631657
query_->set_offsets_buffer(
632658
name, static_cast<uint64_t*>(p_offsets), num_offsets + 1);
659+
633660
} else {
634661
// fixed-size attribute (not TILEDB_VAR_NUM)
635662
assert(arw_array->n_buffers == 2);
@@ -639,6 +666,15 @@ void ArrowImporter::import_(
639666

640667
query_->set_data_buffer(name, static_cast<void*>(p_data), data_num);
641668
}
669+
670+
if (typeinfo.nullable && arw_array->buffers[0] != nullptr) {
671+
bitmap_to_bytemap(
672+
const_cast<void*>(arw_array->buffers[0]), arw_array->length);
673+
query_->set_validity_buffer(
674+
name,
675+
static_cast<uint8_t*>(const_cast<void*>(arw_array->buffers[0])),
676+
arw_array->length);
677+
}
642678
}
643679

644680
/* ****************************** */
@@ -670,6 +706,7 @@ BufferInfo ArrowExporter::buffer_info(const std::string& name) {
670706
uint64_t* offsets = nullptr;
671707
uint64_t offsets_nelem = 0;
672708
uint64_t elem_size = 0;
709+
uint8_t* validity = nullptr;
673710

674711
auto typeinfo = tiledb_dt_info(query_->array().schema(), name);
675712

@@ -714,6 +751,10 @@ BufferInfo ArrowExporter::buffer_info(const std::string& name) {
714751
query_->get_data_buffer(name, &data, &data_nelem, &elem_size);
715752
}
716753

754+
if (typeinfo.nullable) {
755+
query_->get_validity_buffer(name, &validity, &data_nelem);
756+
}
757+
717758
auto retval = BufferInfo();
718759
retval.tdbtype = typeinfo;
719760
retval.is_var = is_var;
@@ -723,6 +764,8 @@ BufferInfo ArrowExporter::buffer_info(const std::string& name) {
723764
retval.offsets_num = (is_var ? offsets_nelem : 1);
724765
retval.offsets = offsets;
725766
retval.offsets_elem_size = offsets_elem_nbytes;
767+
retval.is_nullable = typeinfo.nullable;
768+
retval.validity = validity;
726769

727770
return retval;
728771
}
@@ -733,8 +776,32 @@ int64_t flags_for_buffer(BufferInfo binfo) {
733776
#define ARROW_FLAG_NULLABLE 2
734777
#define ARROW_FLAG_MAP_KEYS_SORTED 4
735778
*/
736-
(void)binfo;
737-
return 0;
779+
int64_t val = 0;
780+
if (binfo.is_nullable)
781+
val |= ARROW_FLAG_NULLABLE;
782+
return val;
783+
}
784+
785+
int64_t bytemap_to_bitmap(uint8_t* bytemap, int64_t num) {
786+
// helper function from column_buffer class in libtiledbsoma
787+
// note that it transforms bytemap _in place_ by design, as we now own the
788+
// buffer added null count return for convenience
789+
int64_t nulls = 0;
790+
int i_dst = 0;
791+
for (unsigned int i_src = 0; i_src < num; i_src++) {
792+
nulls += bytemap[i_src] == 0;
793+
// Overwrite every 8 bytes with a one-byte bitmap
794+
if (i_src % 8 == 0) {
795+
// Each bit in the bitmap corresponds to one byte in the bytemap
796+
// Note: the bitmap must be byte-aligned (8 bits)
797+
int bitmap = 0;
798+
for (unsigned int i = i_src; i < i_src + 8 && i < num; i++) {
799+
bitmap |= bytemap[i] << (i % 8);
800+
}
801+
bytemap[i_dst++] = bitmap;
802+
}
803+
}
804+
return nulls;
738805
}
739806

740807
void ArrowExporter::export_(
@@ -763,6 +830,12 @@ void ArrowExporter::export_(
763830
}
764831
cpp_schema->export_ptr(schema);
765832

833+
int64_t null_num = 0;
834+
if (bufferinfo.is_nullable) {
835+
null_num = bytemap_to_bitmap(bufferinfo.validity, bufferinfo.data_num);
836+
buffers[0] = bufferinfo.validity;
837+
}
838+
766839
size_t elem_num = 0;
767840
if (bufferinfo.is_var) {
768841
// adjust for offset unless empty result
@@ -773,7 +846,7 @@ void ArrowExporter::export_(
773846

774847
auto cpp_arrow_array = new CPPArrowArray(
775848
elem_num, // elem_num
776-
0, // null_num
849+
null_num, // null_num
777850
0, // offset
778851
{}, // children
779852
buffers);

0 commit comments

Comments
 (0)