Skip to content

Commit 394d1ea

Browse files
committed
got custom arrow encoder to work with 200x20 record batches
1 parent 3b9755f commit 394d1ea

File tree

2 files changed

+58
-24
lines changed

2 files changed

+58
-24
lines changed

Telemetry-SD-Card/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ FetchContent_MakeAvailable(nanoarrow)
3131
set(CMAKE_CXX_STANDARD 20)
3232
add_executable(${PROJECT_NAME} main.cpp)
3333

34-
target_include_directories(${PROJECT_NAME} PUBLIC include)
34+
# "SYSTEM" should remove warnings when compiling the generated flatbuffers code
35+
target_include_directories(${PROJECT_NAME} SYSTEM PUBLIC include)
3536
target_link_libraries(${PROJECT_NAME} mbed-os mbed-storage-sd mbed-storage-fat nanoarrow nanoarrow_ipc)
3637
# target_link_libraries(${PROJECT_NAME} mbed-baremetal mbed-storage-sd mbed-storage-fat)
3738
# target_link_libraries(${PROJECT_NAME} mbed-baremetal mbed-storage-sd mbed-storage-fat nanoarrow nanoarrow_ipc)

Telemetry-SD-Card/main.cpp

Lines changed: 56 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ const char *col_names[] = {
8080
"198", "199",
8181
};
8282
#define COLS 200
83-
#define ROWS 5
83+
#define ROWS 20
8484

8585
// We use alignas(8) because Arrow mandates all buffers to be 8-byte aligned
8686
struct RecordBatchBody {
@@ -300,19 +300,19 @@ void arrow_flatcc_build_schema(flatcc_builder_t *b) {
300300
org_apache_arrow_flatbuf_Field_nullable_add(b, false);
301301
org_apache_arrow_flatbuf_Field_type_Int_create(b, 32, true);
302302

303-
// org_apache_arrow_flatbuf_Field_children_start(b);
304303
// // Our columns are super simple (primitives only); no children
304+
// org_apache_arrow_flatbuf_Field_children_start(b);
305305
// org_apache_arrow_flatbuf_Field_children_end(b);
306-
// org_apache_arrow_flatbuf_Field_custom_metadata_start(b);
307306
// // Our fields don't have any custom metadata afaik
307+
// org_apache_arrow_flatbuf_Field_custom_metadata_start(b);
308308
// org_apache_arrow_flatbuf_Field_custom_metadata_end(b);
309309

310310
org_apache_arrow_flatbuf_Schema_fields_push_end(b);
311311
}
312312

313313
org_apache_arrow_flatbuf_Schema_fields_end(b);
314-
// org_apache_arrow_flatbuf_Schema_custom_metadata_start(b);
315314
// // we don't have any custom metadata afaik
315+
// org_apache_arrow_flatbuf_Schema_custom_metadata_start(b);
316316
// org_apache_arrow_flatbuf_Schema_custom_metadata_end(b);
317317
org_apache_arrow_flatbuf_Schema_features_start(b);
318318
org_apache_arrow_flatbuf_Schema_features_end(b);
@@ -330,22 +330,25 @@ void arrow_flatcc_encode_schema_message(flatcc_builder_t *b) {
330330
org_apache_arrow_flatbuf_Message_end_as_root(b);
331331
}
332332

333-
// Each column defined in the schema is represented by one Node struct here,
334-
// giving it's length and null count
335-
org_apache_arrow_flatbuf_FieldNode rb_nodes[COLS] = {};
336-
// Each column has 1-3+ buffers depending on its type; eg. data, validity,
337-
// offsets, etc. For primitives (which we're using exclusively), those are
338-
// validity followed by data. See buffer orders for different types here:
339-
// https://arrow.apache.org/docs/format/Columnar.html#buffer-listing-for-each-layout
340-
org_apache_arrow_flatbuf_Buffer rb_buffers[COLS * 2] = {};
341-
342333
// Note that the actual data buffers come after this flatbuffer message, in the
343334
// messageBody. This is contains only the metadata (lenghts, offsets, etc)
344335
void arrow_flatcc_encode_record_batch_message(flatcc_builder_t *b) {
345336
org_apache_arrow_flatbuf_Message_start_as_root(b);
346337
org_apache_arrow_flatbuf_Message_version_add(b, org_apache_arrow_flatbuf_MetadataVersion_V5);
347338
org_apache_arrow_flatbuf_Message_header_RecordBatch_start(b);
348339

340+
// We use heap allocated lists below as mbed-os RTOS threads are heavily
341+
// stack-limited. These are cleaned up immediately after writing the schema.
342+
343+
// Each column defined in the schema is represented by one Node struct here,
344+
// giving it's length and null count
345+
org_apache_arrow_flatbuf_FieldNode *rb_nodes = new org_apache_arrow_flatbuf_FieldNode[COLS];
346+
// Each column has 1-3+ buffers depending on its type; eg. data, validity,
347+
// offsets, etc. For primitives (which we're using exclusively atm), those
348+
// are validity followed by data. Buffer orders for different types:
349+
// https://arrow.apache.org/docs/format/Columnar.html#buffer-listing-for-each-layout
350+
org_apache_arrow_flatbuf_Buffer *rb_buffers = new org_apache_arrow_flatbuf_Buffer[COLS*2];
351+
349352
int64_t offset = 0;
350353
int buffer_index = 0;
351354
for (uint i = 0; i < COLS; i++) {
@@ -358,7 +361,7 @@ void arrow_flatcc_encode_record_batch_message(flatcc_builder_t *b) {
358361

359362
// Since we're omitting validity buffers (enforcing non-null values)
360363
// entirely for now, we set each validity buffer metadata to say
361-
// length=0 so Arrow decoders know to not look for them
364+
// length=0 so Arrow decoders know not to look for them
362365
const uint validity_buf_size = 0; // (int)(ceil(ROWS / 8.0f));
363366
rb_buffers[buffer_index++] = {.offset = offset, .length = 0};
364367
if (rb_nodes[i].null_count > 0) // NOTE: ATM this will never happen
@@ -378,6 +381,10 @@ void arrow_flatcc_encode_record_batch_message(flatcc_builder_t *b) {
378381
// metadata that contain the actual data buffers.
379382
org_apache_arrow_flatbuf_Message_bodyLength_add(b, offset);
380383
org_apache_arrow_flatbuf_Message_end_as_root(b);
384+
385+
// These take a significant amount of space, lets clean them up!
386+
delete[] rb_nodes;
387+
delete[] rb_buffers;
381388
}
382389

383390

@@ -410,7 +417,22 @@ void arrow_stream_write_message_to_file(void *flatbuf, size_t flatbuf_size, void
410417
}
411418
}
412419

413-
flatcc_builder_t b;
420+
// TODO: experiment with a custom emitter that writes directly to a FILE*
421+
int dbg_emitter(void *emit_context, const flatcc_iovec_t *iov, int iov_count, flatbuffers_soffset_t offset, size_t len) {
422+
printf("dbg: emit: iov_count: %d, offset: %d, len: %d\n", iov_count, offset, len);
423+
424+
for (int i = 0; i < iov_count; ++i) {
425+
if (iov[i].iov_base == flatcc_builder_padding_base) {
426+
printf("dbg: padding at: %d, len: %d\n", offset, iov[i].iov_len);
427+
}
428+
if (iov[i].iov_base == 0) {
429+
printf("dbg: null vector reserved at: %d, len: %d\n", offset, iov[i].iov_len);
430+
}
431+
offset += (flatbuffers_soffset_t)iov[i].iov_len;
432+
}
433+
return 0;
434+
}
435+
414436
RecordBatchBody values;
415437
int32_t *cols[] = {
416438
values.col0, values.col1, values.col2, values.col3,
@@ -495,37 +517,48 @@ int main(int argc, char *argv[]) {
495517
if (file == NULL)
496518
error_quit("Error opening file!");
497519

498-
flatcc_builder_init(&b);
520+
flatcc_builder_t b, *B;
521+
B = &b;
522+
523+
flatcc_builder_init(B);
524+
// flatcc_builder_custom_init(B, dbg_emitter, 0, 0, 0);
499525

500526
print_mem_usage();
501527

502-
arrow_flatcc_encode_schema_message(&b);
528+
arrow_flatcc_encode_schema_message(B);
503529
size_t schema_size;
504-
void *schema = flatcc_builder_finalize_buffer(&b, &schema_size);
530+
void *schema = flatcc_builder_finalize_buffer(B, &schema_size);
531+
printf("schema flatbuf size: %d", schema_size);
505532
MBED_ASSERT(schema_size != 0 && schema);
506533

534+
free(schema);
535+
507536
print_mem_usage();
508537

509-
flatcc_builder_reset(&b);
538+
// A full clear/init cycle seems to free much more memory than just a reset.
539+
flatcc_builder_clear(B);
540+
flatcc_builder_init(B);
510541

511542
print_mem_usage();
512543

513544
arrow_stream_write_message_to_file(schema, schema_size, nullptr, 0, file);
514545

515546
print_mem_usage();
516547

517-
arrow_flatcc_encode_record_batch_message(&b);
548+
arrow_flatcc_encode_record_batch_message(B);
518549

519550
print_mem_usage();
520551

521552
size_t record_batch_flatbuf_size;
522-
// void *record_batch_flatbuf = flatcc_builder_finalize_buffer(&b, &record_batch_flatbuf_size);
523-
void *record_batch_flatbuf = flatcc_builder_get_direct_buffer(&b, &record_batch_flatbuf_size);
553+
void *record_batch_flatbuf = flatcc_builder_finalize_buffer(B, &record_batch_flatbuf_size);
554+
// void *record_batch_flatbuf = flatcc_builder_get_direct_buffer(B, &record_batch_flatbuf_size);
555+
556+
printf("record batch flatbuf size: %d", record_batch_flatbuf_size);
524557
MBED_ASSERT(record_batch_flatbuf_size != 0 && record_batch_flatbuf);
525558

526559
print_mem_usage();
527560

528-
flatcc_builder_reset(&b);
561+
flatcc_builder_clear(B);
529562

530563
int val = 0;
531564
for (int i = 0; i < COLS; i++) {

0 commit comments

Comments
 (0)