Skip to content

Commit 0164d89

Browse files
authored
Merge branch 'main' into feat-list-func
2 parents 7bbb47d + 844ecdf commit 0164d89

File tree

28 files changed

+583
-58
lines changed

28 files changed

+583
-58
lines changed

Cargo.lock

Lines changed: 27 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/ast/src/ast/format/ast_format.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,7 @@ impl<'ast> Visitor<'ast> for AstFormatVisitor {
692692
ExplainKind::Raw => "Raw",
693693
ExplainKind::Plan => "Plan",
694694
ExplainKind::Memo(_) => "Memo",
695+
ExplainKind::JOIN => "JOIN",
695696
ExplainKind::AnalyzePlan => "Analyze",
696697
});
697698
let format_ctx = AstFormatContext::with_children(name, 1);

src/query/ast/src/ast/statements/explain.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ pub enum ExplainKind {
2525
Raw,
2626
Plan,
2727

28+
JOIN,
29+
2830
// Explain analyze plan
2931
AnalyzePlan,
3032
}

src/query/ast/src/ast/statements/statement.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,8 @@ impl Display for Statement {
253253
ExplainKind::Raw => write!(f, " RAW")?,
254254
ExplainKind::Plan => (),
255255
ExplainKind::AnalyzePlan => write!(f, " ANALYZE")?,
256-
ExplainKind::Memo(_) => write!(f, "MEMO")?,
256+
ExplainKind::JOIN => write!(f, " JOIN")?,
257+
ExplainKind::Memo(_) => write!(f, " MEMO")?,
257258
}
258259
write!(f, " {query}")?;
259260
}

src/query/ast/src/parser/statement.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ pub enum CreateDatabaseOption {
5656
pub fn statement(i: Input) -> IResult<StatementMsg> {
5757
let explain = map_res(
5858
rule! {
59-
EXPLAIN ~ ( AST | SYNTAX | PIPELINE | GRAPH | FRAGMENTS | RAW | MEMO )? ~ #statement
59+
EXPLAIN ~ ( AST | SYNTAX | PIPELINE | JOIN | GRAPH | FRAGMENTS | RAW | MEMO )? ~ #statement
6060
},
6161
|(_, opt_kind, statement)| {
6262
Ok(Statement::Explain {
@@ -72,6 +72,7 @@ pub fn statement(i: Input) -> IResult<StatementMsg> {
7272
ExplainKind::Syntax(pretty_stmt)
7373
}
7474
Some(TokenKind::PIPELINE) => ExplainKind::Pipeline,
75+
Some(TokenKind::JOIN) => ExplainKind::JOIN,
7576
Some(TokenKind::GRAPH) => ExplainKind::Graph,
7677
Some(TokenKind::FRAGMENTS) => ExplainKind::Fragments,
7778
Some(TokenKind::RAW) => ExplainKind::Raw,

src/query/functions/src/aggregates/aggregate_quantile_cont.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,15 @@ where T: Number + AsPrimitive<f64>
230230
let state = place.get::<QuantileState>();
231231
state.merge_result(builder, self.levels.clone())
232232
}
233+
234+
fn need_manual_drop_state(&self) -> bool {
235+
true
236+
}
237+
238+
unsafe fn drop_state(&self, place: StateAddr) {
239+
let state = place.get::<QuantileState>();
240+
std::ptr::drop_in_place(state);
241+
}
233242
}
234243

235244
impl<T> AggregateQuantileContFunction<T>

src/query/service/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ common-arrow = { path = "../../common/arrow" }
4646
common-ast = { path = "../ast" }
4747
common-auth = { path = "../../common/auth" }
4848
common-base = { path = "../../common/base" }
49+
common-cache = { path = "../../common/cache" }
4950
common-catalog = { path = "../catalog" }
5051
common-compress = { path = "../../common/compress" }
5152
common-config = { path = "../config" }
@@ -162,6 +163,7 @@ num = "0.4.0"
162163
p256 = "0.13"
163164
pretty_assertions = "1.3.0"
164165
reqwest = { workspace = true }
166+
sysinfo = "0.28.3"
165167
temp-env = "0.3.0"
166168
tempfile = "3.4.0"
167169
toml = { version = "0.7.3", default-features = false }

src/query/service/src/interpreters/interpreter_explain.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,28 @@ impl Interpreter for ExplainInterpreter {
8282
_ => self.explain_plan(&self.plan)?,
8383
},
8484

85+
ExplainKind::JOIN => match &self.plan {
86+
Plan::Query {
87+
s_expr, metadata, ..
88+
} => {
89+
let ctx = self.ctx.clone();
90+
let settings = ctx.get_settings();
91+
92+
let enable_distributed_eval_index =
93+
settings.get_enable_distributed_eval_index()?;
94+
settings.set_enable_distributed_eval_index(false)?;
95+
scopeguard::defer! {
96+
let _ = settings.set_enable_distributed_eval_index(enable_distributed_eval_index);
97+
}
98+
let mut builder = PhysicalPlanBuilder::new(metadata.clone(), ctx);
99+
let plan = builder.build(s_expr).await?;
100+
self.explain_join_order(&plan, metadata)?
101+
}
102+
_ => Err(ErrorCode::Unimplemented(
103+
"Unsupported EXPLAIN JOIN statement",
104+
))?,
105+
},
106+
85107
ExplainKind::AnalyzePlan => match &self.plan {
86108
Plan::Query {
87109
s_expr,
@@ -185,6 +207,17 @@ impl ExplainInterpreter {
185207
Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])])
186208
}
187209

210+
pub fn explain_join_order(
211+
&self,
212+
plan: &PhysicalPlan,
213+
metadata: &MetadataRef,
214+
) -> Result<Vec<DataBlock>> {
215+
let result = plan.format_join(metadata)?.format_pretty()?;
216+
let line_split_result: Vec<&str> = result.lines().collect();
217+
let formatted_plan = StringType::from_data(line_split_result);
218+
Ok(vec![DataBlock::new_from_columns(vec![formatted_plan])])
219+
}
220+
188221
pub async fn explain_pipeline(
189222
&self,
190223
s_expr: SExpr,

src/query/service/tests/it/storages/fuse/block_writer.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use common_arrow::parquet::metadata::ThriftFileMetaData;
1516
use common_exception::Result;
1617
use common_expression::DataBlock;
1718
use common_expression::FunctionContext;
@@ -56,13 +57,13 @@ impl<'a> BlockWriter<'a> {
5657
block: DataBlock,
5758
col_stats: StatisticsOfColumns,
5859
cluster_stats: Option<ClusterStatistics>,
59-
) -> Result<BlockMeta> {
60+
) -> Result<(BlockMeta, Option<ThriftFileMetaData>)> {
6061
let (location, block_id) = self.location_generator.gen_block_location();
6162

6263
let data_accessor = &self.data_accessor;
6364
let row_count = block.num_rows() as u64;
6465
let block_size = block.memory_size() as u64;
65-
let (bloom_filter_index_size, bloom_filter_index_location) = self
66+
let (bloom_filter_index_size, bloom_filter_index_location, meta) = self
6667
.build_block_index(data_accessor, schema.clone(), &block, block_id)
6768
.await?;
6869

@@ -88,7 +89,7 @@ impl<'a> BlockWriter<'a> {
8889
bloom_filter_index_size,
8990
Compression::Lz4Raw,
9091
);
91-
Ok(block_meta)
92+
Ok((block_meta, meta))
9293
}
9394

9495
pub async fn build_block_index(
@@ -97,7 +98,7 @@ impl<'a> BlockWriter<'a> {
9798
schema: TableSchemaRef,
9899
block: &DataBlock,
99100
block_id: Uuid,
100-
) -> Result<(u64, Option<Location>)> {
101+
) -> Result<(u64, Option<Location>, Option<ThriftFileMetaData>)> {
101102
let location = self
102103
.location_generator
103104
.block_bloom_index_location(&block_id);
@@ -109,18 +110,16 @@ impl<'a> BlockWriter<'a> {
109110
let filter_schema = bloom_index.filter_schema;
110111
let mut data = Vec::with_capacity(DEFAULT_BLOCK_INDEX_BUFFER_SIZE);
111112
let index_block_schema = &filter_schema;
112-
let (size, _) = blocks_to_parquet(
113+
let (size, meta) = blocks_to_parquet(
113114
index_block_schema,
114115
vec![index_block],
115116
&mut data,
116117
TableCompression::None,
117118
)?;
118-
119119
data_accessor.write(&location.0, data).await?;
120-
121-
Ok((size, Some(location)))
120+
Ok((size, Some(location), Some(meta)))
122121
} else {
123-
Ok((0u64, None))
122+
Ok((0u64, None, None))
124123
}
125124
}
126125
}

0 commit comments

Comments
 (0)