Skip to content

Commit da41cb4

Browse files
committed
Fix nullable column writer to use sparse value encoding
1 parent d93e4ed commit da41cb4

File tree

8 files changed

+90
-177
lines changed

8 files changed

+90
-177
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
cmake_minimum_required(VERSION 3.16)
2-
project(carquet VERSION 0.1.0 LANGUAGES C)
2+
project(carquet VERSION 0.1.1 LANGUAGES C)
33

44
set(CMAKE_C_STANDARD 11)
55
set(CMAKE_C_STANDARD_REQUIRED ON)

benchmark/results.csv

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
library,dataset,compression,rows,write_ms,read_ms,file_bytes
2-
carquet,small,none,100000,0.90,0.61,2000366
3-
carquet,small,snappy,100000,1.98,0.72,1933903
4-
carquet,small,zstd,100000,4.63,1.23,1121063
5-
carquet,medium,none,1000000,11.00,2.98,20000380
6-
carquet,medium,snappy,1000000,21.87,3.75,19933920
7-
carquet,medium,zstd,1000000,54.50,9.80,11259205
8-
carquet,large,none,10000000,117.91,20.99,200000392
9-
carquet,large,snappy,10000000,224.98,28.52,199933935
10-
carquet,large,zstd,10000000,538.75,92.85,112635043
11-
pyarrow,small,none,100000,5.97,1.01,2109803
12-
pyarrow,small,snappy,100000,6.68,1.28,1829413
13-
pyarrow,small,zstd,100000,8.05,1.54,1574730
14-
pyarrow,medium,none,1000000,59.03,4.91,21097309
15-
pyarrow,medium,snappy,1000000,70.05,4.64,18292050
16-
pyarrow,medium,zstd,1000000,80.96,6.69,15742253
17-
pyarrow,large,none,10000000,595.91,27.37,210969394
18-
pyarrow,large,snappy,10000000,674.78,32.60,182922756
19-
pyarrow,large,zstd,10000000,806.42,50.59,157429767
2+
carquet,small,none,100000,1.77,0.61,2000366
3+
carquet,small,snappy,100000,2.79,0.76,1933903
4+
carquet,small,zstd,100000,4.71,1.14,1121063
5+
carquet,medium,none,1000000,12.20,5.29,20000380
6+
carquet,medium,snappy,1000000,23.39,4.32,19933920
7+
carquet,medium,zstd,1000000,55.23,9.78,11259205
8+
carquet,large,none,10000000,165.01,24.85,200000392
9+
carquet,large,snappy,10000000,249.13,40.25,199933935
10+
carquet,large,zstd,10000000,552.52,100.29,112635043
11+
pyarrow,small,none,100000,5.98,1.08,2109803
12+
pyarrow,small,snappy,100000,6.71,1.33,1829413
13+
pyarrow,small,zstd,100000,8.13,1.59,1574730
14+
pyarrow,medium,none,1000000,60.50,5.03,21097309
15+
pyarrow,medium,snappy,1000000,68.17,4.82,18292050
16+
pyarrow,medium,zstd,1000000,80.98,7.16,15742253
17+
pyarrow,large,none,10000000,622.26,29.03,210969394
18+
pyarrow,large,snappy,10000000,680.52,34.45,182922756
19+
pyarrow,large,zstd,10000000,813.11,54.20,157429767

examples/nullable_columns.c

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,13 +156,13 @@ static int write_nullable_data(const char* filename, carquet_schema_t* schema) {
156156
if (age_def_levels[i] == 0) {
157157
printf("%-6s ", "NULL");
158158
} else {
159-
printf("%-6d ", ages[age_value_count - 1 - (i % 3 == 2 ? 0 : (i >= 2 ? (i - i % 3) / 3 : 0))]);
159+
printf("%-6d ", ages[age_value_count - 1]);
160160
}
161161

162162
if (score_def_levels[i] == 0) {
163163
printf("%-8s ", "NULL");
164164
} else {
165-
printf("%-8.1f ", scores[score_value_count - 1 - (i % 4 == 3 ? 0 : (i >= 3 ? (i - i % 4) / 4 : 0))]);
165+
printf("%-8.1f ", scores[score_value_count - 1]);
166166
}
167167

168168
if (email_def_levels[i] == 0) {
@@ -315,13 +315,15 @@ static void explain_definition_levels(void) {
315315
printf(" - def_level = 1: Outer struct present, inner field is NULL\n");
316316
printf(" - def_level = 2: Both outer struct and inner field are present\n\n");
317317

318-
printf("Important: When writing, only provide non-NULL values in the values array!\n");
319-
printf("The definition levels array should have one entry per logical row.\n\n");
318+
printf("When writing, only provide non-NULL values in the values array\n");
319+
printf("(sparse encoding). The definition levels array has one entry per\n");
320+
printf("logical row. num_values = number of logical rows.\n\n");
320321

321322
printf("Example:\n");
322323
printf(" Logical rows: [10, NULL, 20, NULL, 30]\n");
323-
printf(" Values array: [10, 20, 30] (only 3 values)\n");
324-
printf(" Def levels: [1, 0, 1, 0, 1] (5 entries, one per row)\n\n");
324+
printf(" Values array: [10, 20, 30] (3 non-null values, packed)\n");
325+
printf(" Def levels: [1, 0, 1, 0, 1] (5 entries, one per row)\n");
326+
printf(" num_values: 5 (logical row count)\n\n");
325327
}
326328

327329
int main(int argc, char* argv[]) {

include/carquet/carquet.h

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/**
22
* @file carquet.h
33
* @brief Carquet - High-Performance Pure C Parquet Library
4-
* @version 0.1.0
4+
* @version 0.1.1
55
*
66
* @copyright Copyright (c) 2025. All rights reserved.
77
* @license MIT License
@@ -196,10 +196,10 @@ extern "C" {
196196
#define CARQUET_VERSION_MINOR 1
197197

198198
/** @brief Patch version number */
199-
#define CARQUET_VERSION_PATCH 0
199+
#define CARQUET_VERSION_PATCH 1
200200

201201
/** @brief Version string in "MAJOR.MINOR.PATCH" format */
202-
#define CARQUET_VERSION_STRING "0.1.0"
202+
#define CARQUET_VERSION_STRING "0.1.1"
203203

204204
/** @brief Numeric version for compile-time comparisons: (MAJOR * 10000 + MINOR * 100 + PATCH) */
205205
#define CARQUET_VERSION_NUMBER (CARQUET_VERSION_MAJOR * 10000 + CARQUET_VERSION_MINOR * 100 + CARQUET_VERSION_PATCH)
@@ -1664,9 +1664,12 @@ carquet_writer_t* carquet_writer_create_file(
16641664
*
16651665
* @param[in] writer File writer
16661666
* @param[in] column_index Column index
1667-
* @param[in] values Input values (type must match column physical type)
1668-
* @param[in] num_values Number of values to write
1669-
* @param[in] def_levels Definition levels (NULL if all values defined)
1667+
* @param[in] values Input values (type must match column physical type).
1668+
* For nullable columns, this contains only the non-null
1669+
* values, packed contiguously (sparse encoding).
1670+
* @param[in] num_values Number of logical rows (length of def_levels if provided)
1671+
* @param[in] def_levels Definition levels (NULL if all values defined).
1672+
* One entry per logical row.
16701673
* @param[in] rep_levels Repetition levels (NULL if no repetition)
16711674
* @return CARQUET_OK on success
16721675
*
@@ -1677,14 +1680,19 @@ carquet_writer_t* carquet_writer_create_file(
16771680
* - def_level = max_def_level: value is present
16781681
* - def_level < max_def_level: value is null
16791682
*
1683+
* The values array uses sparse encoding: it contains only the non-null values,
1684+
* packed contiguously. The def_levels array has num_values entries (one per
1685+
* logical row). The number of entries in values must equal the number of
1686+
* entries in def_levels where def_level == max_def_level.
1687+
*
16801688
* @code{.c}
1681-
* // Write non-nullable column
1689+
* // Write non-nullable column (5 rows, all present)
16821690
* int64_t ids[] = {1, 2, 3, 4, 5};
16831691
* carquet_writer_write_batch(writer, 0, ids, 5, NULL, NULL);
16841692
*
1685-
* // Write nullable column with some nulls
1686-
* double values[] = {1.1, 0.0, 3.3, 0.0, 5.5}; // 0.0 will be null
1687-
* int16_t def_levels[] = {1, 0, 1, 0, 1}; // 0 = null, 1 = present
1693+
* // Write nullable column: logical rows [1.1, NULL, 3.3, NULL, 5.5]
1694+
* double values[] = {1.1, 3.3, 5.5}; // 3 non-null values only
1695+
* int16_t def_levels[] = {1, 0, 1, 0, 1}; // 5 entries, one per row
16881696
* carquet_writer_write_batch(writer, 1, values, 5, def_levels, NULL);
16891697
* @endcode
16901698
*/

interop/roundtrip_writer.c

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ static carquet_compression_t COMPRESSIONS[] = {
2727
CARQUET_COMPRESSION_ZSTD
2828
};
2929

30-
/* Test data generation */
30+
/* Test data generation (sparse encoding: values arrays contain only non-null values) */
3131
static void generate_test_data(
3232
uint8_t* bools, int32_t* int32s, int64_t* int64s,
3333
float* floats, double* doubles,
@@ -40,28 +40,35 @@ static void generate_test_data(
4040
"alpha", "beta", "gamma", "delta", "epsilon"
4141
};
4242

43+
int string_value_count = 0;
44+
int nullable_int_count = 0;
45+
4346
for (int i = 0; i < n; i++) {
4447
bools[i] = (i % 2 == 0) ? 1 : 0;
4548
int32s[i] = i * 10 - 5000; /* Negative and positive */
4649
int64s[i] = (int64_t)i * 1000000LL - 2500000000LL;
4750
floats[i] = (float)i * 0.5f - 1250.0f;
4851
doubles[i] = (double)i * 0.125 - 312.5;
4952

50-
/* Strings: every 7th is null */
53+
/* Strings: every 7th is null (sparse: only non-null in values array) */
5154
if (i % 7 == 0) {
5255
string_def_levels[i] = 0;
53-
strings[i].length = 0;
54-
strings[i].data = NULL;
5556
} else {
5657
string_def_levels[i] = 1;
5758
const char* s = sample_strings[i % 10];
58-
strings[i].length = (int32_t)strlen(s);
59-
strings[i].data = (uint8_t*)s;
59+
strings[string_value_count].length = (int32_t)strlen(s);
60+
strings[string_value_count].data = (uint8_t*)s;
61+
string_value_count++;
6062
}
6163

62-
/* Nullable ints: every 5th is null */
63-
nullable_ints[i] = i * 100;
64-
nullable_def_levels[i] = (i % 5 == 0) ? 0 : 1;
64+
/* Nullable ints: every 5th is null (sparse: only non-null in values array) */
65+
if (i % 5 == 0) {
66+
nullable_def_levels[i] = 0;
67+
} else {
68+
nullable_def_levels[i] = 1;
69+
nullable_ints[nullable_int_count] = i * 100;
70+
nullable_int_count++;
71+
}
6572
}
6673
}
6774

src/writer/page_writer.c

Lines changed: 25 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -319,171 +319,67 @@ carquet_status_t carquet_page_writer_add_values(
319319
&writer->rep_levels_buffer);
320320
}
321321

322-
/* Encode values using PLAIN encoding
323-
* When there are nulls (def_levels provided), we must only encode
324-
* the non-null values, skipping positions where def_level < max_def_level
322+
/* Encode values using PLAIN encoding.
323+
*
324+
* The values array uses sparse encoding: it contains only non-null values
325+
* (packed at the front), with num_non_null entries. The def_levels array
326+
* has num_values entries (one per logical row) indicating which rows are
327+
* null vs present.
325328
*/
326329
carquet_status_t status = CARQUET_OK;
327-
bool has_nulls = def_levels && writer->max_def_level > 0 && num_non_null < num_values;
328330

329331
switch (writer->type) {
330332
case CARQUET_PHYSICAL_BOOLEAN: {
331333
const uint8_t* bools = (const uint8_t*)values;
332-
if (has_nulls) {
333-
uint8_t* non_null_bools = malloc(num_non_null);
334-
if (!non_null_bools) return CARQUET_ERROR_OUT_OF_MEMORY;
335-
int64_t j = 0;
336-
for (int64_t i = 0; i < num_values && j < num_non_null; i++) {
337-
if (def_levels[i] == writer->max_def_level) {
338-
non_null_bools[j++] = bools[i];
339-
}
340-
}
341-
status = carquet_encode_plain_boolean(non_null_bools, num_non_null,
342-
&writer->values_buffer);
343-
free(non_null_bools);
344-
} else {
345-
status = carquet_encode_plain_boolean(bools, num_non_null,
346-
&writer->values_buffer);
347-
}
334+
status = carquet_encode_plain_boolean(bools, num_non_null,
335+
&writer->values_buffer);
348336
break;
349337
}
350338

351339
case CARQUET_PHYSICAL_INT32: {
352340
const int32_t* ints = (const int32_t*)values;
353-
if (has_nulls) {
354-
int32_t* non_null_ints = malloc(num_non_null * sizeof(int32_t));
355-
if (!non_null_ints) return CARQUET_ERROR_OUT_OF_MEMORY;
356-
int64_t j = 0;
357-
for (int64_t i = 0; i < num_values && j < num_non_null; i++) {
358-
if (def_levels[i] == writer->max_def_level) {
359-
non_null_ints[j++] = ints[i];
360-
}
361-
}
362-
status = carquet_encode_plain_int32(non_null_ints, num_non_null,
363-
&writer->values_buffer);
364-
update_statistics_i32(writer, non_null_ints, num_non_null);
365-
free(non_null_ints);
366-
} else {
367-
status = carquet_encode_plain_int32(ints, num_non_null,
368-
&writer->values_buffer);
369-
update_statistics_i32(writer, ints, num_non_null);
370-
}
341+
status = carquet_encode_plain_int32(ints, num_non_null,
342+
&writer->values_buffer);
343+
update_statistics_i32(writer, ints, num_non_null);
371344
break;
372345
}
373346

374347
case CARQUET_PHYSICAL_INT64: {
375348
const int64_t* ints = (const int64_t*)values;
376-
if (has_nulls) {
377-
int64_t* non_null_ints = malloc(num_non_null * sizeof(int64_t));
378-
if (!non_null_ints) return CARQUET_ERROR_OUT_OF_MEMORY;
379-
int64_t j = 0;
380-
for (int64_t i = 0; i < num_values && j < num_non_null; i++) {
381-
if (def_levels[i] == writer->max_def_level) {
382-
non_null_ints[j++] = ints[i];
383-
}
384-
}
385-
status = carquet_encode_plain_int64(non_null_ints, num_non_null,
386-
&writer->values_buffer);
387-
update_statistics_i64(writer, non_null_ints, num_non_null);
388-
free(non_null_ints);
389-
} else {
390-
status = carquet_encode_plain_int64(ints, num_non_null,
391-
&writer->values_buffer);
392-
update_statistics_i64(writer, ints, num_non_null);
393-
}
349+
status = carquet_encode_plain_int64(ints, num_non_null,
350+
&writer->values_buffer);
351+
update_statistics_i64(writer, ints, num_non_null);
394352
break;
395353
}
396354

397355
case CARQUET_PHYSICAL_FLOAT: {
398356
const float* floats = (const float*)values;
399-
if (has_nulls) {
400-
float* non_null_floats = malloc(num_non_null * sizeof(float));
401-
if (!non_null_floats) return CARQUET_ERROR_OUT_OF_MEMORY;
402-
int64_t j = 0;
403-
for (int64_t i = 0; i < num_values && j < num_non_null; i++) {
404-
if (def_levels[i] == writer->max_def_level) {
405-
non_null_floats[j++] = floats[i];
406-
}
407-
}
408-
status = carquet_encode_plain_float(non_null_floats, num_non_null,
409-
&writer->values_buffer);
410-
update_statistics_float(writer, non_null_floats, num_non_null);
411-
free(non_null_floats);
412-
} else {
413-
status = carquet_encode_plain_float(floats, num_non_null,
414-
&writer->values_buffer);
415-
update_statistics_float(writer, floats, num_non_null);
416-
}
357+
status = carquet_encode_plain_float(floats, num_non_null,
358+
&writer->values_buffer);
359+
update_statistics_float(writer, floats, num_non_null);
417360
break;
418361
}
419362

420363
case CARQUET_PHYSICAL_DOUBLE: {
421364
const double* doubles = (const double*)values;
422-
if (has_nulls) {
423-
double* non_null_doubles = malloc(num_non_null * sizeof(double));
424-
if (!non_null_doubles) return CARQUET_ERROR_OUT_OF_MEMORY;
425-
int64_t j = 0;
426-
for (int64_t i = 0; i < num_values && j < num_non_null; i++) {
427-
if (def_levels[i] == writer->max_def_level) {
428-
non_null_doubles[j++] = doubles[i];
429-
}
430-
}
431-
status = carquet_encode_plain_double(non_null_doubles, num_non_null,
432-
&writer->values_buffer);
433-
update_statistics_double(writer, non_null_doubles, num_non_null);
434-
free(non_null_doubles);
435-
} else {
436-
status = carquet_encode_plain_double(doubles, num_non_null,
437-
&writer->values_buffer);
438-
update_statistics_double(writer, doubles, num_non_null);
439-
}
365+
status = carquet_encode_plain_double(doubles, num_non_null,
366+
&writer->values_buffer);
367+
update_statistics_double(writer, doubles, num_non_null);
440368
break;
441369
}
442370

443371
case CARQUET_PHYSICAL_BYTE_ARRAY: {
444372
const carquet_byte_array_t* arrays = (const carquet_byte_array_t*)values;
445-
if (has_nulls) {
446-
carquet_byte_array_t* non_null_arrays = malloc(num_non_null * sizeof(carquet_byte_array_t));
447-
if (!non_null_arrays) return CARQUET_ERROR_OUT_OF_MEMORY;
448-
int64_t j = 0;
449-
for (int64_t i = 0; i < num_values && j < num_non_null; i++) {
450-
if (def_levels[i] == writer->max_def_level) {
451-
non_null_arrays[j++] = arrays[i];
452-
}
453-
}
454-
status = carquet_encode_plain_byte_array(non_null_arrays, num_non_null,
455-
&writer->values_buffer);
456-
free(non_null_arrays);
457-
} else {
458-
status = carquet_encode_plain_byte_array(arrays, num_non_null,
459-
&writer->values_buffer);
460-
}
373+
status = carquet_encode_plain_byte_array(arrays, num_non_null,
374+
&writer->values_buffer);
461375
break;
462376
}
463377

464378
case CARQUET_PHYSICAL_FIXED_LEN_BYTE_ARRAY: {
465379
const uint8_t* fixed = (const uint8_t*)values;
466-
if (has_nulls) {
467-
uint8_t* non_null_fixed = malloc(num_non_null * writer->type_length);
468-
if (!non_null_fixed) return CARQUET_ERROR_OUT_OF_MEMORY;
469-
int64_t j = 0;
470-
for (int64_t i = 0; i < num_values && j < num_non_null; i++) {
471-
if (def_levels[i] == writer->max_def_level) {
472-
memcpy(non_null_fixed + j * writer->type_length,
473-
fixed + i * writer->type_length,
474-
writer->type_length);
475-
j++;
476-
}
477-
}
478-
status = carquet_encode_plain_fixed_byte_array(non_null_fixed, num_non_null,
479-
writer->type_length,
480-
&writer->values_buffer);
481-
free(non_null_fixed);
482-
} else {
483-
status = carquet_encode_plain_fixed_byte_array(fixed, num_non_null,
484-
writer->type_length,
485-
&writer->values_buffer);
486-
}
380+
status = carquet_encode_plain_fixed_byte_array(fixed, num_non_null,
381+
writer->type_length,
382+
&writer->values_buffer);
487383
break;
488384
}
489385

0 commit comments

Comments
 (0)