Skip to content

Commit 2569ae0

Browse files
authored
Merge branch 'duckdb:master' into master
2 parents 99108d8 + eb8206a commit 2569ae0

40 files changed

+502
-149
lines changed
38.6 KB
Binary file not shown.

data/parquet-testing/enum.parquet

3.84 KB
Binary file not shown.

extension/parquet/parquet_reader.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ LogicalType ParquetReader::DeriveLogicalType(const SchemaElement &s_ele, bool bi
174174
"DECIMAL converted type can only be set for value of Type::(FIXED_LEN_)BYTE_ARRAY/INT32/INT64");
175175
}
176176
case ConvertedType::UTF8:
177+
case ConvertedType::ENUM:
177178
switch (s_ele.type) {
178179
case Type::BYTE_ARRAY:
179180
case Type::FIXED_LEN_BYTE_ARRAY:
@@ -193,7 +194,6 @@ LogicalType ParquetReader::DeriveLogicalType(const SchemaElement &s_ele, bool bi
193194
case ConvertedType::MAP:
194195
case ConvertedType::MAP_KEY_VALUE:
195196
case ConvertedType::LIST:
196-
case ConvertedType::ENUM:
197197
case ConvertedType::JSON:
198198
case ConvertedType::BSON:
199199
default:

src/common/progress_bar.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44

55
namespace duckdb {
66

7-
ProgressBar::ProgressBar(Executor &executor, idx_t show_progress_after)
8-
: executor(executor), show_progress_after(show_progress_after), current_percentage(-1) {
7+
ProgressBar::ProgressBar(Executor &executor, idx_t show_progress_after, bool print_progress)
8+
: executor(executor), show_progress_after(show_progress_after), current_percentage(-1),
9+
print_progress(print_progress) {
910
}
1011

1112
double ProgressBar::GetCurrentPercentage() {
@@ -28,7 +29,6 @@ void ProgressBar::Update(bool final) {
2829
return;
2930
}
3031
auto sufficient_time_elapsed = profiler.Elapsed() > show_progress_after / 1000.0;
31-
auto print_progress = ClientConfig::GetConfig(executor.context).print_progress_bar;
3232
if (new_percentage > current_percentage) {
3333
current_percentage = new_percentage;
3434
}

src/execution/aggregate_hashtable.cpp

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -304,28 +304,23 @@ idx_t GroupedAggregateHashTable::AddChunk(DataChunk &groups, Vector &group_hashe
304304
// value have not been seen yet
305305
idx_t new_group_count =
306306
distinct_hashes[aggr_idx]->FindOrCreateGroups(probe_chunk, dummy_addresses, new_groups);
307-
308-
// now fix up the payload and addresses accordingly by creating
309-
// a selection vector
310307
if (new_group_count > 0) {
308+
// now fix up the payload and addresses accordingly by creating
309+
// a selection vector
310+
DataChunk distinct_payload;
311+
distinct_payload.Initialize(payload.GetTypes());
312+
distinct_payload.Slice(payload, new_groups, new_group_count);
313+
distinct_payload.Verify();
314+
315+
Vector distinct_addresses(addresses, new_groups, new_group_count);
316+
distinct_addresses.Verify(new_group_count);
317+
311318
if (aggr.filter) {
312-
Vector distinct_addresses(addresses, new_groups, new_group_count);
313-
DataChunk distinct_payload;
314-
auto pay_types = payload.GetTypes();
315-
distinct_payload.Initialize(pay_types);
316-
distinct_payload.Slice(payload, new_groups, new_group_count);
317-
distinct_addresses.Verify(new_group_count);
318319
distinct_addresses.Normalify(new_group_count);
319320
RowOperations::UpdateFilteredStates(aggr, distinct_addresses, distinct_payload, payload_idx);
320321
} else {
321-
Vector distinct_addresses(addresses, new_groups, new_group_count);
322-
for (idx_t i = 0; i < aggr.child_count; i++) {
323-
payload.data[payload_idx + i].Slice(new_groups, new_group_count);
324-
payload.data[payload_idx + i].Verify(new_group_count);
325-
}
326-
distinct_addresses.Verify(new_group_count);
327-
328-
RowOperations::UpdateStates(aggr, distinct_addresses, payload, payload_idx, new_group_count);
322+
RowOperations::UpdateStates(aggr, distinct_addresses, distinct_payload, payload_idx,
323+
new_group_count);
329324
}
330325
}
331326
} else if (aggr.filter) {

src/execution/operator/aggregate/physical_hash_aggregate.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ SinkResultType PhysicalHashAggregate::Sink(ExecutionContext &context, GlobalSink
161161
for (auto &child_expr : aggr.children) {
162162
D_ASSERT(child_expr->type == ExpressionType::BOUND_REF);
163163
auto &bound_ref_expr = (BoundReferenceExpression &)*child_expr;
164+
D_ASSERT(bound_ref_expr.index < input.data.size());
164165
aggregate_input_chunk.data[aggregate_input_idx++].Reference(input.data[bound_ref_expr.index]);
165166
}
166167
}
@@ -169,6 +170,7 @@ SinkResultType PhysicalHashAggregate::Sink(ExecutionContext &context, GlobalSink
169170
if (aggr.filter) {
170171
auto it = filter_indexes.find(aggr.filter.get());
171172
D_ASSERT(it != filter_indexes.end());
173+
D_ASSERT(it->second < input.data.size());
172174
aggregate_input_chunk.data[aggregate_input_idx++].Reference(input.data[it->second]);
173175
}
174176
}

src/execution/operator/aggregate/physical_window.cpp

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -901,6 +901,25 @@ static void ComputeWindowExpression(BoundWindowExpression *wexpr, ChunkCollectio
901901
}
902902
}
903903

904+
// evaluate the FILTER clause and stuff it into a large mask for compactness and reuse
905+
ValidityMask filter_mask;
906+
vector<validity_t> filter_bits;
907+
if (wexpr->filter_expr) {
908+
// Start with all invalid and set the ones that pass
909+
filter_bits.resize(ValidityMask::ValidityMaskSize(input.Count()), 0);
910+
filter_mask.Initialize(filter_bits.data());
911+
ExpressionExecutor filter_execution(*wexpr->filter_expr);
912+
SelectionVector true_sel(STANDARD_VECTOR_SIZE);
913+
idx_t base_idx = 0;
914+
for (auto &chunk : input.Chunks()) {
915+
const auto filtered = filter_execution.SelectExpression(*chunk, true_sel);
916+
for (idx_t f = 0; f < filtered; ++f) {
917+
filter_mask.SetValid(base_idx + true_sel[f]);
918+
}
919+
base_idx += chunk->size();
920+
}
921+
}
922+
904923
// evaluate boundaries if present. Parser has checked boundary types.
905924
ChunkCollection boundary_start_collection;
906925
if (wexpr->start_expr) {
@@ -954,7 +973,7 @@ static void ComputeWindowExpression(BoundWindowExpression *wexpr, ChunkCollectio
954973

955974
if (wexpr->aggregate) {
956975
segment_tree = make_unique<WindowSegmentTree>(*(wexpr->aggregate), wexpr->bind_info.get(), wexpr->return_type,
957-
&payload_collection, mode);
976+
&payload_collection, filter_mask, mode);
958977
}
959978

960979
WindowBoundariesState bounds(wexpr);

src/execution/operator/scan/physical_table_scan.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,14 @@ void PhysicalTableScan::GetData(ExecutionContext &context, DataChunk &chunk, Glo
113113
}
114114
}
115115

116+
double PhysicalTableScan::GetProgress(ClientContext &context, GlobalSourceState &gstate_p) const {
117+
if (function.table_scan_progress) {
118+
return function.table_scan_progress(context, bind_data.get());
119+
}
120+
// if table_scan_progress is not implemented we don't support this function yet in the progress bar
121+
return -1;
122+
}
123+
116124
idx_t PhysicalTableScan::GetBatchIndex(ExecutionContext &context, DataChunk &chunk, GlobalSourceState &gstate_p,
117125
LocalSourceState &lstate) const {
118126
D_ASSERT(SupportsBatchIndex());

src/execution/physical_operator.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ idx_t PhysicalOperator::GetBatchIndex(ExecutionContext &context, DataChunk &chun
7474
LocalSourceState &lstate) const {
7575
throw InternalException("Calling GetBatchIndex on a node that does not support it");
7676
}
77+
78+
double PhysicalOperator::GetProgress(ClientContext &context, GlobalSourceState &gstate) const {
79+
return -1;
80+
}
7781
// LCOV_EXCL_STOP
7882

7983
//===--------------------------------------------------------------------===//

src/execution/window_segment_tree.cpp

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,19 @@ namespace duckdb {
88

99
WindowSegmentTree::WindowSegmentTree(AggregateFunction &aggregate, FunctionData *bind_info,
1010
const LogicalType &result_type_p, ChunkCollection *input,
11-
WindowAggregationMode mode_p)
11+
const ValidityMask &filter_mask_p, WindowAggregationMode mode_p)
1212
: aggregate(aggregate), bind_info(bind_info), result_type(result_type_p), state(aggregate.state_size()),
1313
statep(Value::POINTER((idx_t)state.data())), frame(0, 0), active(0, 1),
14-
statev(Value::POINTER((idx_t)state.data())), internal_nodes(0), input_ref(input), mode(mode_p) {
14+
statev(Value::POINTER((idx_t)state.data())), internal_nodes(0), input_ref(input), filter_mask(filter_mask_p),
15+
mode(mode_p) {
1516
#if STANDARD_VECTOR_SIZE < 512
1617
throw NotImplementedException("Window functions are not supported for vector sizes < 512");
1718
#endif
1819
statep.Normalify(STANDARD_VECTOR_SIZE);
1920
statev.SetVectorType(VectorType::FLAT_VECTOR); // Prevent conversion of results to constants
2021

2122
if (input_ref && input_ref->ColumnCount() > 0) {
23+
filter_sel.Initialize(STANDARD_VECTOR_SIZE);
2224
inputs.Initialize(input_ref->Types());
2325
// if we have a frame-by-frame method, share the single state
2426
if (aggregate.window && UseWindowAPI()) {
@@ -99,6 +101,19 @@ void WindowSegmentTree::ExtractFrame(idx_t begin, idx_t end) {
99101
VectorOperations::Copy(chunk_b.data[i], v, chunk_b_count, 0, chunk_a_count);
100102
}
101103
}
104+
105+
// Slice to any filtered rows
106+
if (!filter_mask.AllValid()) {
107+
idx_t filtered = 0;
108+
for (idx_t i = begin; i < end; ++i) {
109+
if (filter_mask.RowIsValid(i)) {
110+
filter_sel.set_index(filtered++, i - begin);
111+
}
112+
}
113+
if (filtered != inputs.size()) {
114+
inputs.Slice(filter_sel, filtered);
115+
}
116+
}
102117
}
103118

104119
void WindowSegmentTree::WindowSegmentValue(idx_t l_idx, idx_t begin, idx_t end) {
@@ -179,7 +194,16 @@ void WindowSegmentTree::Compute(Vector &result, idx_t rid, idx_t begin, idx_t en
179194
if (inputs.ColumnCount() == 0) {
180195
D_ASSERT(GetTypeIdSize(result_type.InternalType()) == sizeof(idx_t));
181196
auto data = FlatVector::GetData<idx_t>(result);
182-
data[rid] = end - begin;
197+
// Slice to any filtered rows
198+
if (!filter_mask.AllValid()) {
199+
idx_t filtered = 0;
200+
for (idx_t i = begin; i < end; ++i) {
201+
filtered += filter_mask.RowIsValid(i);
202+
}
203+
data[rid] = filtered;
204+
} else {
205+
data[rid] = end - begin;
206+
}
183207
return;
184208
}
185209

@@ -220,8 +244,8 @@ void WindowSegmentTree::Compute(Vector &result, idx_t rid, idx_t begin, idx_t en
220244
active = FrameBounds(active_chunks.first * STANDARD_VECTOR_SIZE,
221245
MinValue((active_chunks.second + 1) * STANDARD_VECTOR_SIZE, coll.Count()));
222246

223-
aggregate.window(inputs.data.data(), bind_info, inputs.ColumnCount(), state.data(), frame, prev, result, rid,
224-
active.first);
247+
aggregate.window(inputs.data.data(), filter_mask, bind_info, inputs.ColumnCount(), state.data(), frame, prev,
248+
result, rid, active.first);
225249
return;
226250
}
227251

0 commit comments

Comments
 (0)