Skip to content

Commit 745897e

Browse files
authored
[Enhancement](parquet-orc)add column size check for debug & stable . (#59780)
### What problem does this PR solve? Problem Summary: Add code to the Parquet/ORC reader for checking block column size, to debug the code and improve its stability.
1 parent 1bbb464 commit 745897e

File tree

3 files changed

+119
-2
lines changed

3 files changed

+119
-2
lines changed

be/src/vec/exec/format/orc/vorc_reader.cpp

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2306,6 +2306,9 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
23062306
col_name, column_ptr, column_type,
23072307
_table_info_node_ptr->get_children_node(col_name), _type_map[file_column_name],
23082308
batch_vec[orc_col_idx->second], _batch->numElements));
2309+
#ifndef NDEBUG
2310+
column_ptr->sanity_check();
2311+
#endif
23092312
}
23102313

23112314
RETURN_IF_ERROR(_fill_partition_columns(block, _batch->numElements,
@@ -2322,8 +2325,25 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
23222325
return Status::OK();
23232326
}
23242327
{
2328+
#ifndef NDEBUG
2329+
for (auto col : *block) {
2330+
col.column->sanity_check();
2331+
2332+
DCHECK(block->rows() == col.column->size())
2333+
<< absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
2334+
block->rows(), col.column->size(), col.name);
2335+
}
2336+
#endif
23252337
SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
23262338
_execute_filter_position_delete_rowids(*_filter);
2339+
#ifndef NDEBUG
2340+
for (auto col : *block) {
2341+
col.column->sanity_check();
2342+
DCHECK(block->rows() == col.column->size())
2343+
<< absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
2344+
block->rows(), col.column->size(), col.name);
2345+
}
2346+
#endif
23272347
{
23282348
SCOPED_RAW_TIMER(&_statistics.decode_null_map_time);
23292349
RETURN_IF_CATCH_EXCEPTION(
@@ -2332,6 +2352,14 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
23322352
Block::erase_useless_column(block, column_to_keep);
23332353
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
23342354
*read_rows = block->rows();
2355+
#ifndef NDEBUG
2356+
for (auto col : *block) {
2357+
col.column->sanity_check();
2358+
DCHECK(block->rows() == col.column->size())
2359+
<< absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
2360+
block->rows(), col.column->size(), col.name);
2361+
}
2362+
#endif
23352363
}
23362364
} else {
23372365
uint64_t rr;
@@ -2403,6 +2431,9 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
24032431
col_name, column_ptr, column_type,
24042432
_table_info_node_ptr->get_children_node(col_name), _type_map[file_column_name],
24052433
batch_vec[orc_col_idx->second], _batch->numElements));
2434+
#ifndef NDEBUG
2435+
column_ptr->sanity_check();
2436+
#endif
24062437
}
24072438

24082439
RETURN_IF_ERROR(_fill_partition_columns(block, _batch->numElements,
@@ -2419,6 +2450,14 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
24192450
return Status::OK();
24202451
}
24212452

2453+
#ifndef NDEBUG
2454+
for (auto col : *block) {
2455+
col.column->sanity_check();
2456+
DCHECK(block->rows() == col.column->size())
2457+
<< absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
2458+
block->rows(), col.column->size(), col.name);
2459+
}
2460+
#endif
24222461
{
24232462
SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
24242463
_build_delete_row_filter(block, _batch->numElements);
@@ -2472,7 +2511,23 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
24722511
Block::erase_useless_column(block, column_to_keep);
24732512
}
24742513
}
2514+
#ifndef NDEBUG
2515+
for (auto col : *block) {
2516+
col.column->sanity_check();
2517+
DCHECK(block->rows() == col.column->size())
2518+
<< absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
2519+
block->rows(), col.column->size(), col.name);
2520+
}
2521+
#endif
24752522
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
2523+
#ifndef NDEBUG
2524+
for (auto col : *block) {
2525+
col.column->sanity_check();
2526+
DCHECK(block->rows() == col.column->size())
2527+
<< absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
2528+
block->rows(), col.column->size(), col.name);
2529+
}
2530+
#endif
24762531
*read_rows = block->rows();
24772532
}
24782533
return Status::OK();
@@ -2582,6 +2637,9 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
25822637
table_col_name, column_ptr, column_type,
25832638
_table_info_node_ptr->get_children_node(table_col_name),
25842639
_type_map[file_column_name], batch_vec[orc_col_idx->second], data.numElements));
2640+
#ifndef NDEBUG
2641+
column_ptr->sanity_check();
2642+
#endif
25852643
}
25862644
RETURN_IF_ERROR(
25872645
_fill_partition_columns(block, size, _lazy_read_ctx.predicate_partition_columns));

be/src/vec/exec/format/parquet/vparquet_column_reader.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -688,7 +688,9 @@ Status ArrayColumnReader::read_column_data(
688688
fill_array_offset(_field_schema, offsets_data, null_map_ptr, _element_reader->get_rep_level(),
689689
_element_reader->get_def_level());
690690
DCHECK_EQ(element_column->size(), offsets_data.back());
691-
691+
#ifndef NDEBUG
692+
doris_column->sanity_check();
693+
#endif
692694
return Status::OK();
693695
}
694696

@@ -764,7 +766,9 @@ Status MapColumnReader::read_column_data(
764766
fill_array_offset(_field_schema, map.get_offsets(), null_map_ptr, _key_reader->get_rep_level(),
765767
_key_reader->get_def_level());
766768
DCHECK_EQ(key_column->size(), map.get_offsets().back());
767-
769+
#ifndef NDEBUG
770+
doris_column->sanity_check();
771+
#endif
768772
return Status::OK();
769773
}
770774

@@ -988,6 +992,9 @@ Status StructColumnReader::read_column_data(
988992
fill_struct_null_map(_field_schema, *null_map_ptr, this->get_rep_level(),
989993
this->get_def_level());
990994
}
995+
#ifndef NDEBUG
996+
doris_column->sanity_check();
997+
#endif
991998
return Status::OK();
992999
}
9931000

be/src/vec/exec/format/parquet/vparquet_group_reader.cpp

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,9 +327,26 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
327327
RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns));
328328
RETURN_IF_ERROR(_fill_row_id_columns(block, *read_rows, false));
329329

330+
#ifndef NDEBUG
331+
for (auto col : *block) {
332+
col.column->sanity_check();
333+
DCHECK(block->rows() == col.column->size())
334+
<< absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
335+
block->rows(), col.column->size(), col.name);
336+
}
337+
#endif
338+
330339
if (block->rows() == 0) {
331340
_convert_dict_cols_to_string_cols(block);
332341
*read_rows = block->rows();
342+
#ifndef NDEBUG
343+
for (auto col : *block) {
344+
col.column->sanity_check();
345+
DCHECK(block->rows() == col.column->size())
346+
<< absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
347+
block->rows(), col.column->size(), col.name);
348+
}
349+
#endif
333350
return Status::OK();
334351
}
335352
{
@@ -373,6 +390,14 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
373390
}
374391
_convert_dict_cols_to_string_cols(block);
375392
}
393+
#ifndef NDEBUG
394+
for (auto col : *block) {
395+
col.column->sanity_check();
396+
DCHECK(block->rows() == col.column->size())
397+
<< absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
398+
block->rows(), col.column->size(), col.name);
399+
}
400+
#endif
376401
*read_rows = block->rows();
377402
return Status::OK();
378403
}
@@ -441,6 +466,10 @@ Status RowGroupReader::_read_column_data(Block* block,
441466
return Status::Corruption("Can't read the same number of rows among parquet columns");
442467
}
443468
batch_read_rows = col_read_rows;
469+
470+
#ifndef NDEBUG
471+
column_ptr->sanity_check();
472+
#endif
444473
if (col_eof) {
445474
has_eof = true;
446475
}
@@ -485,6 +514,18 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
485514

486515
RETURN_IF_ERROR(_build_pos_delete_filter(pre_read_rows));
487516

517+
#ifndef NDEBUG
518+
for (auto col : *block) {
519+
if (col.column->size() == 0) { // lazy read column.
520+
continue;
521+
}
522+
col.column->sanity_check();
523+
DCHECK(pre_read_rows == col.column->size())
524+
<< absl::Substitute("pre_read_rows = $0 , column rows = $1, col name = $2",
525+
pre_read_rows, col.column->size(), col.name);
526+
}
527+
#endif
528+
488529
bool can_filter_all = false;
489530
{
490531
SCOPED_RAW_TIMER(&_predicate_filter_time);
@@ -631,6 +672,14 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
631672
*batch_eof = pre_eof;
632673
RETURN_IF_ERROR(_fill_partition_columns(block, column_size, _lazy_read_ctx.partition_columns));
633674
RETURN_IF_ERROR(_fill_missing_columns(block, column_size, _lazy_read_ctx.missing_columns));
675+
#ifndef NDEBUG
676+
for (auto col : *block) {
677+
col.column->sanity_check();
678+
DCHECK(block->rows() == col.column->size())
679+
<< absl::Substitute("block rows = $0 , column rows = $1, col name = $2",
680+
block->rows(), col.column->size(), col.name);
681+
}
682+
#endif
634683
return Status::OK();
635684
}
636685

@@ -902,6 +951,9 @@ Status RowGroupReader::_rewrite_dict_predicates() {
902951
bool has_dict = false;
903952
RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->read_dict_values_to_column(
904953
dict_value_column, &has_dict));
954+
#ifndef NDEBUG
955+
dict_value_column->sanity_check();
956+
#endif
905957
size_t dict_value_column_size = dict_value_column->size();
906958
DCHECK(has_dict);
907959
// 2. Build a temp block from the dict string column, then execute conjuncts and filter block.

0 commit comments

Comments
 (0)