Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit a8074d6

Browse files
committed
Add TableStats to TableFragmentsInfo.
Signed-off-by: ienkovich <[email protected]>
1 parent f5839ff commit a8074d6

File tree

9 files changed

+179
-5
lines changed

9 files changed

+179
-5
lines changed

omniscidb/ArrowStorage/ArrowStorage.cpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,7 @@ TableFragmentsInfo ArrowStorage::getTableMetadata(int db_id, int table_id) const
319319
frag_info.setChunkMetadata(columnId(col_idx), frag.metadata[col_idx]);
320320
}
321321
}
322+
res.setTableStats(table.table_stats);
322323
return res;
323324
}
324325

@@ -656,10 +657,16 @@ void ArrowStorage::appendArrowTable(std::shared_ptr<arrow::Table> at, int table_
656657
(static_cast<size_t>(at->num_rows()) + table.fragment_size - 1 - first_frag_size) /
657658
table.fragment_size +
658659
1;
660+
// Pre-allocate fragment infos and table stats for each column for the following
661+
// parallel data import.
659662
fragments.resize(frag_count);
660663
for (auto& frag : fragments) {
661664
frag.metadata.resize(at->columns().size());
662665
}
666+
TableStats table_stats;
667+
for (int col_idx = 0; col_idx < static_cast<int>(at->columns().size()); ++col_idx) {
668+
table_stats.emplace(columnId(col_idx), ChunkStats{});
669+
}
663670

664671
mapd_shared_lock<mapd_shared_mutex> dict_lock(dict_mutex_);
665672
std::vector<bool> lazy_fetch_cols(at->columns().size(), false);
@@ -788,7 +795,17 @@ void ArrowStorage::appendArrowTable(std::shared_ptr<arrow::Table> at, int table_
788795
frag.metadata[col_idx] = meta;
789796
}
790797
}); // each fragment
798+
799+
// Merge fragment stats to the table stats.
800+
auto& column_stats = table_stats.at(col_info->column_id);
801+
column_stats = fragments[0].metadata[col_idx]->chunkStats();
802+
for (size_t frag_idx = 1; frag_idx < frag_count; ++frag_idx) {
803+
mergeStats(column_stats,
804+
fragments[frag_idx].metadata[col_idx]->chunkStats(),
805+
col_type);
806+
}
791807
} else {
808+
bool has_nulls = false;
792809
for (size_t frag_idx = 0; frag_idx < frag_count; ++frag_idx) {
793810
auto& frag = fragments[frag_idx];
794811
frag.offset =
@@ -804,9 +821,15 @@ void ArrowStorage::appendArrowTable(std::shared_ptr<arrow::Table> at, int table_
804821
frag.row_count);
805822
meta->fillStringChunkStats(
806823
col_arr->Slice(frag.offset, frag.row_count)->null_count());
824+
has_nulls = has_nulls || meta->chunkStats().has_nulls;
807825

808826
frag.metadata[col_idx] = meta;
809827
}
828+
829+
auto& column_stats = table_stats.at(col_info->column_id);
830+
column_stats.has_nulls = has_nulls;
831+
column_stats.min.stringval = nullptr;
832+
column_stats.max.stringval = nullptr;
810833
}
811834
}
812835
}); // each column
@@ -842,6 +865,13 @@ void ArrowStorage::appendArrowTable(std::shared_ptr<arrow::Table> at, int table_
842865
start_frag = 1;
843866
}
844867

868+
// Merge table stats.
869+
for (size_t col_idx = 0; col_idx < at->columns().size(); ++col_idx) {
870+
auto col_id = columnId(col_idx);
871+
auto col_type = getColumnInfo(db_id_, table_id, col_id)->type;
872+
mergeStats(table.table_stats.at(col_id), table_stats.at(col_id), col_type);
873+
}
874+
845875
// Copy the rest of fragments adjusting offset.
846876
table.fragments.reserve(table.fragments.size() + fragments.size() - start_frag);
847877
for (size_t frag_idx = start_frag; frag_idx < fragments.size(); ++frag_idx) {
@@ -855,7 +885,9 @@ void ArrowStorage::appendArrowTable(std::shared_ptr<arrow::Table> at, int table_
855885
table.col_data = std::move(col_data);
856886
table.fragments = std::move(fragments);
857887
table.row_count = at->num_rows();
888+
table.table_stats = std::move(table_stats);
858889
}
890+
CHECK_EQ(table.table_stats.size(), at->columns().size());
859891

860892
auto table_info = getTableInfo(db_id_, table_id);
861893
table_info->fragments = table.fragments.size();

omniscidb/ArrowStorage/ArrowStorage.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ class ArrowStorage : public SimpleSchemaProvider, public AbstractDataProvider {
163163
std::vector<std::shared_ptr<arrow::ChunkedArray>> col_data;
164164
std::vector<DataFragment> fragments;
165165
size_t row_count = 0;
166+
TableStats table_stats;
166167
};
167168

168169
struct DictionaryData {

omniscidb/DataMgr/ChunkMetadata.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,11 @@ class ChunkMetadata {
190190
return chunk_stats_;
191191
}
192192

193+
// Check if there are pre-computed chunk stats available.
194+
// chunkStats can be used in any case but it can trigger
195+
// stats computation if there are no pre-computed stats.
196+
bool hasComputedChunkStats() const { return !stats_materialize_fn_; }
197+
193198
#ifndef __CUDACC__
194199
std::string dump() const {
195200
std::string res = "type: " + type_->toString() +

omniscidb/DataProvider/TableFragmentsInfo.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ class FragmentInfo {
7373
mutable ChunkMetadataMap chunkMetadataMap;
7474
};
7575

76+
using TableStats = std::map<int, ChunkStats>;
77+
using TableStatsMaterializeFn = std::function<void(TableStats&)>;
78+
7679
class TableFragmentsInfo {
7780
public:
7881
TableFragmentsInfo() : numTuples(0) {}
@@ -94,9 +97,37 @@ class TableFragmentsInfo {
9497
return fragment_num_tupples_upper_bound;
9598
}
9699

100+
const TableStats& getTableStats() const {
101+
maybeMaterializeTableStats();
102+
return table_stats_;
103+
}
104+
105+
// Check if there are pre-computed tables stats available.
106+
// getTableStats can be used in any case but it can be costly
107+
// if there are no pre-computed stats.
108+
bool hasComputedTableStats() const { return !table_stats_materialize_fn_; }
109+
110+
void setTableStats(const TableStats& stats) const {
111+
TableStatsMaterializeFn().swap(table_stats_materialize_fn_);
112+
table_stats_ = stats;
113+
}
114+
115+
void setTableStatsMaterializeFn(TableStatsMaterializeFn stats_materialize_fn) const {
116+
table_stats_materialize_fn_ = stats_materialize_fn;
117+
}
118+
97119
std::vector<int> chunkKeyPrefix;
98120
std::vector<FragmentInfo> fragments;
99121

100122
private:
123+
void maybeMaterializeTableStats() const {
124+
if (table_stats_materialize_fn_) {
125+
table_stats_materialize_fn_(table_stats_);
126+
TableStatsMaterializeFn().swap(table_stats_materialize_fn_);
127+
}
128+
}
129+
101130
mutable size_t numTuples;
131+
mutable TableStats table_stats_;
132+
mutable TableStatsMaterializeFn table_stats_materialize_fn_;
102133
};

omniscidb/QueryEngine/ExpressionRange.cpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,11 +587,29 @@ ExpressionRange getLeafColumnRange(const hdk::ir::ColumnVar* col_expr,
587587
}
588588
}
589589
}
590+
591+
auto& table_stats = query_info.getTableStats();
592+
auto col_stats_it = table_stats.find(col_id);
593+
CHECK(col_stats_it != table_stats.end())
594+
<< query_infos[*ti_idx].db_id << ":" << query_infos[*ti_idx].table_id << ":"
595+
<< col_id << " " << table_stats.size();
596+
if (col_stats_it == table_stats.end()) {
597+
return ExpressionRange::makeInvalidRange();
598+
}
599+
600+
auto& col_stats = col_stats_it->second;
601+
CHECK_EQ(col_stats.has_nulls || is_outer_join_proj, has_nulls);
602+
has_nulls = has_nulls || col_stats.has_nulls;
603+
590604
if (col_type->isFloatingPoint()) {
591605
const auto min_val =
592606
extract_min_stat_fp_type(min_it->second->chunkStats(), col_type);
593607
const auto max_val =
594608
extract_max_stat_fp_type(max_it->second->chunkStats(), col_type);
609+
const auto new_min_val = extract_min_stat_fp_type(col_stats, col_type);
610+
const auto new_max_val = extract_max_stat_fp_type(col_stats, col_type);
611+
CHECK_EQ(new_min_val, min_val);
612+
CHECK_EQ(new_max_val, max_val);
595613
return col_type->size() == 4
596614
? ExpressionRange::makeFloatRange(min_val, max_val, has_nulls)
597615
: ExpressionRange::makeDoubleRange(min_val, max_val, has_nulls);
@@ -600,6 +618,10 @@ ExpressionRange getLeafColumnRange(const hdk::ir::ColumnVar* col_expr,
600618
extract_min_stat_int_type(min_it->second->chunkStats(), col_type);
601619
const auto max_val =
602620
extract_max_stat_int_type(max_it->second->chunkStats(), col_type);
621+
const auto new_min_val = extract_min_stat_int_type(col_stats, col_type);
622+
const auto new_max_val = extract_max_stat_int_type(col_stats, col_type);
623+
CHECK_EQ(new_min_val, min_val);
624+
CHECK_EQ(new_max_val, max_val);
603625
if (max_val < min_val) {
604626
// The column doesn't contain any non-null values, synthesize an empty range.
605627
CHECK_GT(min_val, 0);

omniscidb/QueryEngine/InputMetadata.cpp

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,7 @@ InputTableInfoCache::InputTableInfoCache(Executor* executor) : executor_(executo
2222
namespace {
2323

2424
TableFragmentsInfo copy_table_info(const TableFragmentsInfo& table_info) {
25-
TableFragmentsInfo table_info_copy;
26-
table_info_copy.chunkKeyPrefix = table_info.chunkKeyPrefix;
27-
table_info_copy.fragments = table_info.fragments;
28-
table_info_copy.setPhysicalNumTuples(table_info.getPhysicalNumTuples());
29-
return table_info_copy;
25+
return table_info;
3026
}
3127

3228
} // namespace

omniscidb/ResultSetRegistry/ResultSetRegistry.cpp

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,59 @@ ChunkStats ResultSetRegistry::getChunkStats(int table_id,
292292
return frag.meta.at(columnId(col_idx))->chunkStats();
293293
}
294294

295+
TableStats ResultSetRegistry::getTableStats(int table_id) const {
296+
mapd_shared_lock<mapd_shared_mutex> data_lock(data_mutex_);
297+
CHECK_EQ(tables_.count(table_id), (size_t)1);
298+
auto& table = *tables_.at(table_id);
299+
mapd_shared_lock<mapd_shared_mutex> table_lock(table.mutex);
300+
data_lock.unlock();
301+
302+
if (!table.table_stats.empty()) {
303+
return table.table_stats;
304+
}
305+
306+
for (auto& frag : table.fragments) {
307+
mapd_shared_lock<mapd_shared_mutex> frag_read_lock(*frag.mutex);
308+
if (frag.meta.empty()) {
309+
frag_read_lock.unlock();
310+
mapd_unique_lock<mapd_shared_mutex> frag_write_lock(*frag.mutex);
311+
if (frag.meta.empty()) {
312+
frag.meta = synthesizeMetadata(frag.rs.get());
313+
}
314+
}
315+
}
316+
317+
auto table_stats = buildTableStatsNoLock(table_id);
318+
table_lock.unlock();
319+
mapd_unique_lock<mapd_shared_mutex> table_write_lock(table.mutex);
320+
if (table.table_stats.empty()) {
321+
table.table_stats = table_stats;
322+
}
323+
return table_stats;
324+
}
325+
326+
TableStats ResultSetRegistry::buildTableStatsNoLock(int table_id) const {
327+
// This method is only called when all fragments have computed metadata
328+
// and table is read-locked.
329+
CHECK(tables_.count(table_id));
330+
auto& table = *tables_.at(table_id);
331+
TableStats table_stats;
332+
{
333+
auto& first_frag = table.fragments.front();
334+
mapd_shared_lock<mapd_shared_mutex> frag_lock(*first_frag.mutex);
335+
for (auto& pr : first_frag.meta) {
336+
table_stats.emplace(pr.first, pr.second->chunkStats());
337+
}
338+
}
339+
for (size_t frag_idx = 1; frag_idx < table.fragments.size(); ++frag_idx) {
340+
mapd_shared_lock<mapd_shared_mutex> frag_lock(*table.fragments[frag_idx].mutex);
341+
for (auto& pr : table.fragments[frag_idx].meta) {
342+
mergeStats(table_stats.at(pr.first), pr.second->chunkStats(), pr.second->type());
343+
}
344+
}
345+
return table_stats;
346+
}
347+
295348
void ResultSetRegistry::fetchBuffer(const ChunkKey& key,
296349
Data_Namespace::AbstractBuffer* dest,
297350
const size_t num_bytes) {
@@ -386,6 +439,7 @@ TableFragmentsInfo ResultSetRegistry::getTableMetadata(int db_id, int table_id)
386439

387440
TableFragmentsInfo res;
388441
res.setPhysicalNumTuples(table.row_count);
442+
bool has_lazy_stats = false;
389443
for (size_t frag_idx = 0; frag_idx < table.fragments.size(); ++frag_idx) {
390444
auto& frag = table.fragments[frag_idx];
391445
auto& frag_info = res.fragments.emplace_back();
@@ -416,13 +470,34 @@ TableFragmentsInfo ResultSetRegistry::getTableMetadata(int db_id, int table_id)
416470
stats = this->getChunkStats(table_id, frag_idx, col_idx);
417471
});
418472
frag_info.setChunkMetadata(columnId(col_idx), meta);
473+
has_lazy_stats = true;
419474
}
420475
}
421476
} else {
422477
frag_info.setChunkMetadataMap(frag.meta);
423478
}
424479
}
425480

481+
if (table.table_stats.empty()) {
482+
if (has_lazy_stats) {
483+
res.setTableStatsMaterializeFn(
484+
[this, table_id](TableStats& stats) { stats = this->getTableStats(table_id); });
485+
} else {
486+
// We can get here if all stats were materialized in the loop above.
487+
// In this case, build and assigne table stats.
488+
TableStats table_stats = buildTableStatsNoLock(table_id);
489+
res.setTableStats(table_stats);
490+
491+
table_lock.unlock();
492+
mapd_unique_lock<mapd_shared_mutex> table_write_lock(table.mutex);
493+
if (table.table_stats.empty()) {
494+
table.table_stats = std::move(table_stats);
495+
}
496+
}
497+
} else {
498+
res.setTableStats(table.table_stats);
499+
}
500+
426501
return res;
427502
}
428503

omniscidb/ResultSetRegistry/ResultSetRegistry.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ class ResultSetRegistry : public SimpleSchemaProvider,
5858

5959
private:
6060
ChunkStats getChunkStats(int table_id, size_t frag_idx, size_t col_idx) const;
61+
TableStats getTableStats(int table_id) const;
62+
TableStats buildTableStatsNoLock(int table_id) const;
6163

6264
struct DataFragment {
6365
size_t offset = 0;
@@ -74,6 +76,7 @@ class ResultSetRegistry : public SimpleSchemaProvider,
7476
size_t row_count;
7577
bool use_columnar_res;
7678
bool has_varlen_col;
79+
TableStats table_stats;
7780
};
7881

7982
const int db_id_;

omniscidb/Tests/TestDataProvider.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,14 @@ class TestTableData {
8686

8787
auto& frag_info = info_.fragments[data_[col_id - 1].size() - 1];
8888
frag_info.setChunkMetadata(col_id, chunk_meta);
89+
90+
if (table_stats_.count(col_id)) {
91+
mergeStats(
92+
table_stats_.at(col_id), chunk_meta->chunkStats(), col_types_.at(col_id));
93+
} else {
94+
table_stats_.emplace(col_id, chunk_meta->chunkStats());
95+
}
96+
info_.setTableStats(table_stats_);
8997
}
9098

9199
void fetchData(int col_id, int frag_id, int8_t* dst, size_t size) {
@@ -102,6 +110,7 @@ class TestTableData {
102110
TableRef ref_;
103111
std::vector<std::vector<std::vector<int8_t>>> data_;
104112
TableFragmentsInfo info_;
113+
TableStats table_stats_;
105114
std::unordered_map<int, const hdk::ir::Type*> col_types_;
106115
};
107116

0 commit comments

Comments
 (0)