Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
798 changes: 375 additions & 423 deletions Cargo.lock

Large diffs are not rendered by default.

66 changes: 33 additions & 33 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ rust.unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
# See for more detaiils: https://github.com/rust-lang/cargo/issues/11329
ahash = { version = "0.8", features = ["compile-time-rng"] }
aquamarine = "0.6"
arrow = { version = "56.0", features = ["prettyprint"] }
arrow-array = { version = "56.0", default-features = false, features = ["chrono-tz"] }
arrow-buffer = "56.0"
arrow-flight = "56.0"
arrow-ipc = { version = "56.0", default-features = false, features = ["lz4", "zstd"] }
arrow-schema = { version = "56.0", features = ["serde"] }
arrow = { version = "56.2", features = ["prettyprint"] }
arrow-array = { version = "56.2", default-features = false, features = ["chrono-tz"] }
arrow-buffer = "56.2"
arrow-flight = "56.2"
arrow-ipc = { version = "56.2", default-features = false, features = ["lz4", "zstd"] }
arrow-schema = { version = "56.2", features = ["serde"] }
async-stream = "0.3"
async-trait = "0.1"
# Remember to update axum-extra, axum-macros when updating axum
Expand All @@ -123,18 +123,18 @@ clap = { version = "4.4", features = ["derive"] }
config = "0.13.0"
crossbeam-utils = "0.8"
dashmap = "6.1"
datafusion = "49"
datafusion-common = "49"
datafusion-expr = "49"
datafusion-functions = "49"
datafusion-functions-aggregate-common = "49"
datafusion-optimizer = "49"
datafusion-orc = { git = "https://github.com/GreptimeTeam/datafusion-orc", rev = "a0a5f902158f153119316eaeec868cff3fc8a99d" }
datafusion-pg-catalog = { git = "https://github.com/datafusion-contrib/datafusion-postgres", rev = "3d1b7c7d5b82dd49bafc2803259365e633f654fa" }
datafusion-physical-expr = "49"
datafusion-physical-plan = "49"
datafusion-sql = "49"
datafusion-substrait = "49"
datafusion = "50"
datafusion-common = "50"
datafusion-expr = "50"
datafusion-functions = "50"
datafusion-functions-aggregate-common = "50"
datafusion-optimizer = "50"
datafusion-orc = { git = "https://github.com/datafusion-contrib/datafusion-orc", tag = "v0.5.0" }
datafusion-pg-catalog = "0.11"
datafusion-physical-expr = "50"
datafusion-physical-plan = "50"
datafusion-sql = "50"
datafusion-substrait = "50"
deadpool = "0.12"
deadpool-postgres = "0.14"
derive_builder = "0.20"
Expand Down Expand Up @@ -180,7 +180,7 @@ otel-arrow-rust = { git = "https://github.com/GreptimeTeam/otel-arrow", rev = "2
"server",
] }
parking_lot = "0.12"
parquet = { version = "56.0", default-features = false, features = ["arrow", "async", "object_store"] }
parquet = { version = "56.2", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
pin-project = "1.0"
pretty_assertions = "1.4.0"
Expand Down Expand Up @@ -216,10 +216,7 @@ simd-json = "0.15"
similar-asserts = "1.6.0"
smallvec = { version = "1", features = ["serde"] }
snafu = "0.8"
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "39e4fc94c3c741981f77e9d63b5ce8c02e0a27ea", features = [
"visitor",
"serde",
] } # branch = "v0.55.x"
sqlparser = { version = "0.58.0", default-features = false, features = ["std", "visitor", "serde"] }
sqlx = { version = "0.8", features = [
"runtime-tokio-rustls",
"mysql",
Expand Down Expand Up @@ -321,16 +318,19 @@ git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "5618e779cf2bb4755b499c630fba4c35e91898cb"

[patch.crates-io]
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7d5214512740b4dfb742b6b3d91ed9affcc2c9d0" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7d5214512740b4dfb742b6b3d91ed9affcc2c9d0" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7d5214512740b4dfb742b6b3d91ed9affcc2c9d0" }
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7d5214512740b4dfb742b6b3d91ed9affcc2c9d0" }
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7d5214512740b4dfb742b6b3d91ed9affcc2c9d0" }
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7d5214512740b4dfb742b6b3d91ed9affcc2c9d0" }
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7d5214512740b4dfb742b6b3d91ed9affcc2c9d0" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7d5214512740b4dfb742b6b3d91ed9affcc2c9d0" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7d5214512740b4dfb742b6b3d91ed9affcc2c9d0" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "7d5214512740b4dfb742b6b3d91ed9affcc2c9d0" }
datafusion = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-functions = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-functions-aggregate-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "fd4b2abcf3c3e43e94951bda452c9fd35243aab0" }
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "4b519a5caa95472cc3988f5556813a583dd35af1" } # branch = "v0.58.x"

[profile.release]
debug = 1
Expand Down
4 changes: 3 additions & 1 deletion src/catalog/src/system_schema/pg_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use datafusion::error::DataFusionError;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
use datafusion_pg_catalog::pg_catalog::catalog_info::CatalogInfo;
use datafusion_pg_catalog::pg_catalog::context::EmptyContextProvider;
use datafusion_pg_catalog::pg_catalog::{
PG_CATALOG_TABLES, PgCatalogSchemaProvider, PgCatalogStaticTables, PgCatalogTable,
};
Expand All @@ -44,7 +45,7 @@ use crate::system_schema::{
/// [`PGCatalogProvider`] is the provider for a schema named `pg_catalog`, it is not a catalog.
pub struct PGCatalogProvider {
catalog_name: String,
inner: PgCatalogSchemaProvider<CatalogManagerWrapper>,
inner: PgCatalogSchemaProvider<CatalogManagerWrapper, EmptyContextProvider>,
tables: HashMap<String, TableRef>,
table_ids: HashMap<&'static str, u32>,
}
Expand All @@ -69,6 +70,7 @@ impl PGCatalogProvider {
catalog_manager,
},
Arc::new(static_tables),
EmptyContextProvider,
)
.expect("Failed to initialize PgCatalogSchemaProvider");

Expand Down
4 changes: 2 additions & 2 deletions src/common/datasource/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use bytes::{Buf, Bytes};
use datafusion::datasource::physical_plan::FileOpenFuture;
use datafusion::error::{DataFusionError, Result as DataFusionResult};
use datafusion::physical_plan::SendableRecordBatchStream;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use object_store::ObjectStore;
use snafu::ResultExt;
use tokio_util::compat::FuturesAsyncWriteCompatExt;
Expand Down Expand Up @@ -179,7 +179,7 @@ pub fn open_with_decoder<T: ArrowDecoder, F: Fn() -> DataFusionResult<T>>(
Poll::Ready(decoder.flush().transpose())
});

Ok(stream.boxed())
Ok(stream.map_err(Into::into).boxed())
}))
}

Expand Down
17 changes: 16 additions & 1 deletion src/common/function/src/aggrs/aggr_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
//! `foo_merge`'s input arg is the same as `foo_state`'s output, and its output is the same as `foo`'s input.
//!

use std::hash::{Hash, Hasher};
use std::sync::Arc;

use arrow::array::StructArray;
Expand Down Expand Up @@ -272,7 +273,7 @@ impl StateMergeHelper {
}

/// Wrapper to make an aggregate function out of a state function.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct StateWrapper {
inner: AggregateUDF,
name: String,
Expand Down Expand Up @@ -616,6 +617,20 @@ impl AggregateUDFImpl for MergeWrapper {
}
}

impl PartialEq for MergeWrapper {
fn eq(&self, other: &Self) -> bool {
self.inner == other.inner
}
}

impl Eq for MergeWrapper {}

impl Hash for MergeWrapper {
fn hash<H: Hasher>(&self, state: &mut H) {
self.inner.hash(state);
}
}

/// The merge accumulator, which modify `update_batch`'s behavior to accept one struct array which
/// include the state fields of original aggregate function, and merge said states into original accumulator
/// the output is the same as original aggregate function
Expand Down
3 changes: 1 addition & 2 deletions src/common/function/src/aggrs/aggr_wrapper/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ use datafusion::prelude::SessionContext;
use datafusion_common::arrow::array::AsArray;
use datafusion_common::arrow::datatypes::{Float64Type, UInt64Type};
use datafusion_common::{Column, TableReference};
use datafusion_expr::expr::AggregateFunction;
use datafusion_expr::sqlparser::ast::NullTreatment;
use datafusion_expr::expr::{AggregateFunction, NullTreatment};
use datafusion_expr::{
Aggregate, ColumnarValue, Expr, LogicalPlan, ScalarFunctionArgs, SortExpr, TableScan, lit,
};
Expand Down
2 changes: 1 addition & 1 deletion src/common/function/src/aggrs/count_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl CountHash {
}
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct CountHash {
signature: Signature,
}
Expand Down
4 changes: 2 additions & 2 deletions src/common/function/src/scalars/geo/geohash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl Function for GeohashFunction {
}

fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
Ok(DataType::Utf8)
Ok(DataType::Utf8View)
}

fn signature(&self) -> &Signature {
Expand Down Expand Up @@ -176,7 +176,7 @@ impl Function for GeohashNeighboursFunction {
Ok(DataType::List(Arc::new(Field::new(
"item",
DataType::Utf8View,
false,
true,
))))
}

Expand Down
4 changes: 2 additions & 2 deletions src/common/function/src/scalars/geo/h3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,9 @@ impl Function for H3CellCenterLatLng {

fn return_type(&self, _: &[DataType]) -> datafusion_common::Result<DataType> {
Ok(DataType::List(Arc::new(Field::new(
"x",
"item",
DataType::Float64,
false,
true,
))))
}

Expand Down
15 changes: 15 additions & 0 deletions src/common/function/src/scalars/udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::hash::{Hash, Hasher};

use datafusion::arrow::datatypes::DataType;
use datafusion::logical_expr::{ScalarFunctionArgs, ScalarUDFImpl};
Expand All @@ -33,6 +34,20 @@ impl Debug for ScalarUdf {
}
}

impl PartialEq for ScalarUdf {
fn eq(&self, other: &Self) -> bool {
self.function.signature() == other.function.signature()
}
}

impl Eq for ScalarUdf {}

impl Hash for ScalarUdf {
fn hash<H: Hasher>(&self, state: &mut H) {
self.function.signature().hash(state)
}
}

impl ScalarUDFImpl for ScalarUdf {
fn as_any(&self) -> &dyn Any {
self
Expand Down
14 changes: 14 additions & 0 deletions src/common/macro/src/admin_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,20 @@ fn build_struct(
Ok(datafusion_expr::ColumnarValue::Array(result_vector.to_arrow_array()))
}
}

impl PartialEq for #name {
fn eq(&self, other: &Self) -> bool {
self.signature == other.signature
}
}

impl Eq for #name {}

impl std::hash::Hash for #name {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.signature.hash(state)
}
}
}
.into()
}
3 changes: 1 addition & 2 deletions src/datatypes/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,7 @@ impl TryFrom<DFSchemaRef> for Schema {
type Error = Error;

fn try_from(value: DFSchemaRef) -> Result<Self> {
let s: ArrowSchema = value.as_ref().into();
s.try_into()
value.inner().clone().try_into()
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/datatypes/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,9 @@ impl TryFrom<ScalarValue> for Value {
.collect::<Result<Vec<Value>>>()?;
Value::Struct(StructValue::try_new(items, struct_type)?)
}
ScalarValue::Decimal256(_, _, _)
ScalarValue::Decimal32(_, _, _)
| ScalarValue::Decimal64(_, _, _)
| ScalarValue::Decimal256(_, _, _)
| ScalarValue::FixedSizeList(_)
| ScalarValue::LargeList(_)
| ScalarValue::Dictionary(_, _)
Expand Down
4 changes: 3 additions & 1 deletion src/datatypes/src/vectors/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ impl Helper {
length,
)
}
ScalarValue::Decimal256(_, _, _)
ScalarValue::Decimal32(_, _, _)
| ScalarValue::Decimal64(_, _, _)
| ScalarValue::Decimal256(_, _, _)
| ScalarValue::FixedSizeList(_)
| ScalarValue::LargeList(_)
| ScalarValue::Dictionary(_, _)
Expand Down
2 changes: 1 addition & 1 deletion src/flow/src/df_optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ fn expand_tumble_analyzer(

/// This is a placeholder for tumble_start and tumble_end function, so that datafusion can
/// recognize them as scalar function
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct TumbleExpand {
signature: Signature,
name: String,
Expand Down
17 changes: 9 additions & 8 deletions src/operator/src/expr_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -939,11 +939,10 @@ pub fn to_create_flow_task_expr(
query_ctx: &QueryContextRef,
) -> Result<CreateFlowExpr> {
// retrieve sink table name
let sink_table_ref =
object_name_to_table_reference(create_flow.sink_table_name.clone().into(), true)
.with_context(|_| ConvertIdentifierSnafu {
ident: create_flow.sink_table_name.to_string(),
})?;
let sink_table_ref = object_name_to_table_reference(create_flow.sink_table_name.clone(), true)
.with_context(|_| ConvertIdentifierSnafu {
ident: create_flow.sink_table_name.to_string(),
})?;
let catalog = sink_table_ref
.catalog()
.unwrap_or(query_ctx.current_catalog())
Expand All @@ -961,9 +960,11 @@ pub fn to_create_flow_task_expr(

let source_table_names = extract_tables_from_query(&create_flow.query)
.map(|name| {
let reference = object_name_to_table_reference(name.clone().into(), true)
.with_context(|_| ConvertIdentifierSnafu {
ident: name.to_string(),
let reference =
object_name_to_table_reference(name.clone(), true).with_context(|_| {
ConvertIdentifierSnafu {
ident: name.to_string(),
}
})?;
let catalog = reference
.catalog()
Expand Down
4 changes: 2 additions & 2 deletions src/promql/src/extension_plan/empty_metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl EmptyMetric {
physical_planner.create_physical_expr(expr, &self.time_index_schema, session_state)
})
.transpose()?;
let result_schema: SchemaRef = Arc::new(self.result_schema.as_ref().into());
let result_schema: SchemaRef = self.result_schema.inner().clone();
let properties = Arc::new(PlanProperties::new(
EquivalenceProperties::new(result_schema.clone()),
Partitioning::UnknownPartitioning(1),
Expand All @@ -134,7 +134,7 @@ impl EmptyMetric {
start: self.start,
end: self.end,
interval: self.interval,
time_index_schema: Arc::new(self.time_index_schema.as_ref().into()),
time_index_schema: self.time_index_schema.inner().clone(),
result_schema,
expr: physical_expr,
properties,
Expand Down
9 changes: 4 additions & 5 deletions src/promql/src/extension_plan/histogram_fold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ impl HistogramFold {
.index_of_column_by_name(None, &self.ts_column)
.unwrap();

let output_schema: SchemaRef = Arc::new(self.output_schema.as_ref().into());
let output_schema: SchemaRef = self.output_schema.inner().clone();
let properties = PlanProperties::new(
EquivalenceProperties::new(output_schema.clone()),
Partitioning::UnknownPartitioning(1),
Expand Down Expand Up @@ -805,14 +805,13 @@ mod test {
async fn fold_overall() {
let memory_exec = Arc::new(prepare_test_data());
let output_schema: SchemaRef = Arc::new(
(*HistogramFold::convert_schema(
HistogramFold::convert_schema(
&Arc::new(memory_exec.schema().to_dfschema().unwrap()),
"le",
)
.unwrap()
.as_ref())
.clone()
.into(),
.as_arrow()
.clone(),
);
let properties = PlanProperties::new(
EquivalenceProperties::new(output_schema.clone()),
Expand Down
Loading
Loading