Skip to content

Commit 7038a01

Browse files
committed
symbolize join approach
1 parent cd170a9 commit 7038a01

File tree

8 files changed

+1292
-384
lines changed

8 files changed

+1292
-384
lines changed

datafusion-cli/src/debuginfo.rs

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions};
2+
use arrow::util::data_gen::create_random_array;
3+
use async_trait::async_trait;
4+
use datafusion::arrow::datatypes::{DataType, Field, Fields, Schema};
5+
use datafusion::common::DataFusionError;
6+
use datafusion::datasource::memory::{DataSourceExec, MemorySourceConfig};
7+
use datafusion::datasource::TableType;
8+
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
9+
use datafusion::physical_plan::ExecutionPlan;
10+
use datafusion_common::create_array;
11+
use std::any::Any;
12+
use std::sync::Arc;
13+
14+
#[derive(Debug)]
15+
pub struct CatalogProviderList {}
16+
17+
impl Default for CatalogProviderList {
18+
fn default() -> Self {
19+
Self::new()
20+
}
21+
}
22+
23+
impl CatalogProviderList {
24+
pub fn new() -> Self {
25+
CatalogProviderList {}
26+
}
27+
}
28+
29+
impl datafusion::catalog::CatalogProviderList for CatalogProviderList {
30+
fn as_any(&self) -> &dyn Any {
31+
self
32+
}
33+
34+
// Register catalog not implemented since catalogs can't be created in our
35+
// system.
36+
fn register_catalog(
37+
&self,
38+
_: String,
39+
_: Arc<dyn datafusion::catalog::CatalogProvider>,
40+
) -> Option<Arc<dyn datafusion::catalog::CatalogProvider>> {
41+
None
42+
}
43+
44+
fn catalog_names(&self) -> Vec<String> {
45+
vec![]
46+
}
47+
48+
fn catalog(
49+
&self,
50+
_name: &str,
51+
) -> Option<Arc<dyn datafusion::catalog::CatalogProvider>> {
52+
Some(Arc::new(CatalogProvider {}))
53+
}
54+
}
55+
56+
#[derive(Debug)]
57+
pub struct CatalogProvider {}
58+
59+
impl datafusion::catalog::CatalogProvider for CatalogProvider {
60+
fn as_any(&self) -> &dyn Any {
61+
self
62+
}
63+
64+
fn schema_names(&self) -> Vec<String> {
65+
vec![]
66+
}
67+
68+
fn schema(&self, _: &str) -> Option<Arc<dyn datafusion::catalog::SchemaProvider>> {
69+
Some(Arc::new(SchemaProvider {}))
70+
}
71+
72+
fn register_schema(
73+
&self,
74+
_name: &str,
75+
_schema: Arc<dyn datafusion::catalog::SchemaProvider>,
76+
) -> datafusion::error::Result<Option<Arc<dyn datafusion::catalog::SchemaProvider>>>
77+
{
78+
Err(DataFusionError::NotImplemented(
79+
"register_schema".to_string(),
80+
))
81+
}
82+
83+
fn deregister_schema(
84+
&self,
85+
_name: &str,
86+
_cascade: bool,
87+
) -> datafusion::error::Result<Option<Arc<dyn datafusion::catalog::SchemaProvider>>>
88+
{
89+
Err(DataFusionError::NotImplemented(
90+
"deregister_schema".to_string(),
91+
))
92+
}
93+
}
94+
95+
const TABLE_NAME: &str = "debuginfo";
96+
97+
#[derive(Debug, Clone)]
98+
pub struct SchemaProvider {}
99+
100+
#[async_trait]
101+
impl datafusion::catalog::SchemaProvider for SchemaProvider {
102+
fn as_any(&self) -> &dyn Any {
103+
self
104+
}
105+
106+
fn table_names(&self) -> Vec<String> {
107+
vec![TABLE_NAME.to_string()]
108+
}
109+
110+
async fn table(
111+
&self,
112+
name: &str,
113+
) -> Result<Option<Arc<dyn datafusion::datasource::TableProvider>>, DataFusionError>
114+
{
115+
if name != TABLE_NAME {
116+
return Ok(None);
117+
}
118+
Ok(Some(Arc::new(TableProvider {})))
119+
}
120+
121+
fn register_table(
122+
&self,
123+
_name: String,
124+
_table: Arc<dyn datafusion::datasource::TableProvider>,
125+
) -> Result<Option<Arc<dyn datafusion::datasource::TableProvider>>, DataFusionError>
126+
{
127+
Err(DataFusionError::NotImplemented(
128+
"cannot register other tables".to_string(),
129+
))
130+
}
131+
132+
fn deregister_table(
133+
&self,
134+
__name: &str,
135+
) -> Result<Option<Arc<dyn datafusion::datasource::TableProvider>>, DataFusionError>
136+
{
137+
Err(DataFusionError::NotImplemented(
138+
"cannot deregister tables".to_string(),
139+
))
140+
}
141+
142+
fn table_exist(&self, name: &str) -> bool {
143+
if name == TABLE_NAME {
144+
return true;
145+
}
146+
false
147+
}
148+
}
149+
150+
#[derive(Debug)]
151+
pub struct TableProvider {}
152+
153+
#[async_trait]
154+
impl datafusion::datasource::TableProvider for TableProvider {
155+
fn as_any(&self) -> &dyn Any {
156+
unimplemented!()
157+
}
158+
159+
fn schema(&self) -> Arc<Schema> {
160+
// TODO(asubiotto): This is a simplified schema to work with mock data.
161+
// The real schema includes run end encoding and dictionary types.
162+
Arc::new(Schema::new(vec![
163+
Field::new("mapping_build_id", DataType::Utf8, false),
164+
Field::new("address", DataType::UInt64, false),
165+
Field::new(
166+
"lines",
167+
DataType::List(Arc::new(Field::new(
168+
"item",
169+
DataType::Struct(Fields::from(vec![
170+
Field::new("line", DataType::Int64, false),
171+
Field::new("function_name", DataType::Utf8, false),
172+
Field::new("function_system_name", DataType::Utf8, false),
173+
Field::new("function_filename", DataType::Utf8, false),
174+
Field::new("function_start_line", DataType::Int64, false),
175+
])),
176+
false,
177+
))),
178+
true,
179+
),
180+
]))
181+
}
182+
183+
fn table_type(&self) -> TableType {
184+
TableType::Base
185+
}
186+
187+
async fn scan(
188+
&self,
189+
_state: &dyn datafusion::catalog::Session,
190+
projections: Option<&Vec<usize>>,
191+
filters: &[Expr],
192+
limit: Option<usize>,
193+
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
194+
println!("scan called projections: {projections:?} filters: {filters:?} limit: {limit:?}");
195+
let schema = self.schema();
196+
let columns = schema
197+
.fields()
198+
.iter()
199+
.map(|field| match field.name().as_str() {
200+
"address" => create_array!(UInt64, vec![4385521; 10]) as ArrayRef,
201+
"mapping_build_id" => create_array!(
202+
Utf8,
203+
vec!["87987728412ffaff58e302177248f3fd6436d132"; 10]
204+
) as ArrayRef,
205+
_ => create_random_array(field, 10, 0.0, 0.0).unwrap(),
206+
})
207+
.collect::<Vec<ArrayRef>>();
208+
209+
let batch = RecordBatch::try_new_with_options(
210+
schema.clone(),
211+
columns,
212+
&RecordBatchOptions::new().with_match_field_names(false),
213+
)?;
214+
Ok(Arc::new(DataSourceExec::new(Arc::new(
215+
MemorySourceConfig::try_new(&[vec![batch]], schema, projections.cloned())?,
216+
))))
217+
}
218+
219+
fn supports_filters_pushdown(
220+
&self,
221+
filters: &[&Expr],
222+
) -> datafusion_common::Result<Vec<TableProviderFilterPushDown>> {
223+
// Inexact filtering because it's using it to perform large-grained file pruning.
224+
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
225+
}
226+
}
227+
228+
/*#[cfg(test)]
229+
mod tests {
230+
use super::*;
231+
use datafusion::prelude::SessionContext;
232+
233+
#[tokio::test]
234+
async fn test_catalog_provider_list() -> Result<(), anyhow::Error> {
235+
let cpl = Arc::new(CatalogProviderList::new());
236+
let ctx = SessionContext::new();
237+
ctx.register_catalog_list(cpl);
238+
let result = ctx.sql("select * from debuginfo").await?.collect().await?;
239+
println!("{result:?}");
240+
Ok(())
241+
}
242+
}
243+
*/

datafusion-cli/src/main.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@ use datafusion_cli::{
3939
use clap::Parser;
4040
use datafusion::catalog::CatalogProviderList;
4141
use datafusion::execution::SessionStateBuilder;
42-
use datafusion_cli::symbolize::{SymbolizeOptimizerRule, SymbolizeQueryPlanner};
42+
use datafusion_cli::symbolize::logical::{
43+
SymbolizeMarkerOptimizerRule, SymbolizeQueryPlanner,
44+
};
45+
use datafusion_cli::symbolize::udf::register_symbolize_udf;
4346
use mimalloc::MiMalloc;
4447

4548
#[global_allocator]
@@ -178,12 +181,12 @@ async fn main_inner() -> Result<()> {
178181
.with_runtime_env(runtime_env)
179182
.with_default_features()
180183
.with_query_planner(Arc::new(SymbolizeQueryPlanner {}))
181-
.with_optimizer_rule(Arc::new(SymbolizeOptimizerRule {}))
184+
.with_optimizer_rule(Arc::new(SymbolizeMarkerOptimizerRule {}))
182185
.build();
183186
let ctx = SessionContext::new_with_state(state).enable_url_table();
184187
ctx.refresh_catalogs().await?;
188+
register_symbolize_udf(&ctx);
185189
// install dynamic catalog provider that can register required object stores
186-
println!("catalog list {:?}", ctx.state().catalog_list());
187190
let dynamic_catalog = Arc::new(DynamicObjectStoreCatalog::new(
188191
ctx.state().catalog_list().clone(),
189192
ctx.state_weak_ref(),

0 commit comments

Comments
 (0)