|
| 1 | +use arrow::array::{ArrayRef, RecordBatch, RecordBatchOptions}; |
| 2 | +use arrow::util::data_gen::create_random_array; |
| 3 | +use async_trait::async_trait; |
| 4 | +use datafusion::arrow::datatypes::{DataType, Field, Fields, Schema}; |
| 5 | +use datafusion::common::DataFusionError; |
| 6 | +use datafusion::datasource::memory::{DataSourceExec, MemorySourceConfig}; |
| 7 | +use datafusion::datasource::TableType; |
| 8 | +use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; |
| 9 | +use datafusion::physical_plan::ExecutionPlan; |
| 10 | +use datafusion_common::create_array; |
| 11 | +use std::any::Any; |
| 12 | +use std::sync::Arc; |
| 13 | + |
| 14 | +#[derive(Debug)] |
| 15 | +pub struct CatalogProviderList {} |
| 16 | + |
| 17 | +impl Default for CatalogProviderList { |
| 18 | + fn default() -> Self { |
| 19 | + Self::new() |
| 20 | + } |
| 21 | +} |
| 22 | + |
| 23 | +impl CatalogProviderList { |
| 24 | + pub fn new() -> Self { |
| 25 | + CatalogProviderList {} |
| 26 | + } |
| 27 | +} |
| 28 | + |
| 29 | +impl datafusion::catalog::CatalogProviderList for CatalogProviderList { |
| 30 | + fn as_any(&self) -> &dyn Any { |
| 31 | + self |
| 32 | + } |
| 33 | + |
| 34 | + // Register catalog not implemented since catalogs can't be created in our |
| 35 | + // system. |
| 36 | + fn register_catalog( |
| 37 | + &self, |
| 38 | + _: String, |
| 39 | + _: Arc<dyn datafusion::catalog::CatalogProvider>, |
| 40 | + ) -> Option<Arc<dyn datafusion::catalog::CatalogProvider>> { |
| 41 | + None |
| 42 | + } |
| 43 | + |
| 44 | + fn catalog_names(&self) -> Vec<String> { |
| 45 | + vec![] |
| 46 | + } |
| 47 | + |
| 48 | + fn catalog( |
| 49 | + &self, |
| 50 | + _name: &str, |
| 51 | + ) -> Option<Arc<dyn datafusion::catalog::CatalogProvider>> { |
| 52 | + Some(Arc::new(CatalogProvider {})) |
| 53 | + } |
| 54 | +} |
| 55 | + |
| 56 | +#[derive(Debug)] |
| 57 | +pub struct CatalogProvider {} |
| 58 | + |
| 59 | +impl datafusion::catalog::CatalogProvider for CatalogProvider { |
| 60 | + fn as_any(&self) -> &dyn Any { |
| 61 | + self |
| 62 | + } |
| 63 | + |
| 64 | + fn schema_names(&self) -> Vec<String> { |
| 65 | + vec![] |
| 66 | + } |
| 67 | + |
| 68 | + fn schema(&self, _: &str) -> Option<Arc<dyn datafusion::catalog::SchemaProvider>> { |
| 69 | + Some(Arc::new(SchemaProvider {})) |
| 70 | + } |
| 71 | + |
| 72 | + fn register_schema( |
| 73 | + &self, |
| 74 | + _name: &str, |
| 75 | + _schema: Arc<dyn datafusion::catalog::SchemaProvider>, |
| 76 | + ) -> datafusion::error::Result<Option<Arc<dyn datafusion::catalog::SchemaProvider>>> |
| 77 | + { |
| 78 | + Err(DataFusionError::NotImplemented( |
| 79 | + "register_schema".to_string(), |
| 80 | + )) |
| 81 | + } |
| 82 | + |
| 83 | + fn deregister_schema( |
| 84 | + &self, |
| 85 | + _name: &str, |
| 86 | + _cascade: bool, |
| 87 | + ) -> datafusion::error::Result<Option<Arc<dyn datafusion::catalog::SchemaProvider>>> |
| 88 | + { |
| 89 | + Err(DataFusionError::NotImplemented( |
| 90 | + "deregister_schema".to_string(), |
| 91 | + )) |
| 92 | + } |
| 93 | +} |
| 94 | + |
| 95 | +const TABLE_NAME: &str = "debuginfo"; |
| 96 | + |
| 97 | +#[derive(Debug, Clone)] |
| 98 | +pub struct SchemaProvider {} |
| 99 | + |
| 100 | +#[async_trait] |
| 101 | +impl datafusion::catalog::SchemaProvider for SchemaProvider { |
| 102 | + fn as_any(&self) -> &dyn Any { |
| 103 | + self |
| 104 | + } |
| 105 | + |
| 106 | + fn table_names(&self) -> Vec<String> { |
| 107 | + vec![TABLE_NAME.to_string()] |
| 108 | + } |
| 109 | + |
| 110 | + async fn table( |
| 111 | + &self, |
| 112 | + name: &str, |
| 113 | + ) -> Result<Option<Arc<dyn datafusion::datasource::TableProvider>>, DataFusionError> |
| 114 | + { |
| 115 | + if name != TABLE_NAME { |
| 116 | + return Ok(None); |
| 117 | + } |
| 118 | + Ok(Some(Arc::new(TableProvider {}))) |
| 119 | + } |
| 120 | + |
| 121 | + fn register_table( |
| 122 | + &self, |
| 123 | + _name: String, |
| 124 | + _table: Arc<dyn datafusion::datasource::TableProvider>, |
| 125 | + ) -> Result<Option<Arc<dyn datafusion::datasource::TableProvider>>, DataFusionError> |
| 126 | + { |
| 127 | + Err(DataFusionError::NotImplemented( |
| 128 | + "cannot register other tables".to_string(), |
| 129 | + )) |
| 130 | + } |
| 131 | + |
| 132 | + fn deregister_table( |
| 133 | + &self, |
| 134 | + __name: &str, |
| 135 | + ) -> Result<Option<Arc<dyn datafusion::datasource::TableProvider>>, DataFusionError> |
| 136 | + { |
| 137 | + Err(DataFusionError::NotImplemented( |
| 138 | + "cannot deregister tables".to_string(), |
| 139 | + )) |
| 140 | + } |
| 141 | + |
| 142 | + fn table_exist(&self, name: &str) -> bool { |
| 143 | + if name == TABLE_NAME { |
| 144 | + return true; |
| 145 | + } |
| 146 | + false |
| 147 | + } |
| 148 | +} |
| 149 | + |
| 150 | +#[derive(Debug)] |
| 151 | +pub struct TableProvider {} |
| 152 | + |
| 153 | +#[async_trait] |
| 154 | +impl datafusion::datasource::TableProvider for TableProvider { |
| 155 | + fn as_any(&self) -> &dyn Any { |
| 156 | + unimplemented!() |
| 157 | + } |
| 158 | + |
| 159 | + fn schema(&self) -> Arc<Schema> { |
| 160 | + // TODO(asubiotto): This is a simplified schema to work with mock data. |
| 161 | + // The real schema includes run end encoding and dictionary types. |
| 162 | + Arc::new(Schema::new(vec![ |
| 163 | + Field::new("mapping_build_id", DataType::Utf8, false), |
| 164 | + Field::new("address", DataType::UInt64, false), |
| 165 | + Field::new( |
| 166 | + "lines", |
| 167 | + DataType::List(Arc::new(Field::new( |
| 168 | + "item", |
| 169 | + DataType::Struct(Fields::from(vec![ |
| 170 | + Field::new("line", DataType::Int64, false), |
| 171 | + Field::new("function_name", DataType::Utf8, false), |
| 172 | + Field::new("function_system_name", DataType::Utf8, false), |
| 173 | + Field::new("function_filename", DataType::Utf8, false), |
| 174 | + Field::new("function_start_line", DataType::Int64, false), |
| 175 | + ])), |
| 176 | + false, |
| 177 | + ))), |
| 178 | + true, |
| 179 | + ), |
| 180 | + ])) |
| 181 | + } |
| 182 | + |
| 183 | + fn table_type(&self) -> TableType { |
| 184 | + TableType::Base |
| 185 | + } |
| 186 | + |
| 187 | + async fn scan( |
| 188 | + &self, |
| 189 | + _state: &dyn datafusion::catalog::Session, |
| 190 | + projections: Option<&Vec<usize>>, |
| 191 | + filters: &[Expr], |
| 192 | + limit: Option<usize>, |
| 193 | + ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> { |
| 194 | + println!("scan called projections: {projections:?} filters: {filters:?} limit: {limit:?}"); |
| 195 | + let schema = self.schema(); |
| 196 | + let columns = schema |
| 197 | + .fields() |
| 198 | + .iter() |
| 199 | + .map(|field| match field.name().as_str() { |
| 200 | + "address" => create_array!(UInt64, vec![4385521; 10]) as ArrayRef, |
| 201 | + "mapping_build_id" => create_array!( |
| 202 | + Utf8, |
| 203 | + vec!["87987728412ffaff58e302177248f3fd6436d132"; 10] |
| 204 | + ) as ArrayRef, |
| 205 | + _ => create_random_array(field, 10, 0.0, 0.0).unwrap(), |
| 206 | + }) |
| 207 | + .collect::<Vec<ArrayRef>>(); |
| 208 | + |
| 209 | + let batch = RecordBatch::try_new_with_options( |
| 210 | + schema.clone(), |
| 211 | + columns, |
| 212 | + &RecordBatchOptions::new().with_match_field_names(false), |
| 213 | + )?; |
| 214 | + Ok(Arc::new(DataSourceExec::new(Arc::new( |
| 215 | + MemorySourceConfig::try_new(&[vec![batch]], schema, projections.cloned())?, |
| 216 | + )))) |
| 217 | + } |
| 218 | + |
| 219 | + fn supports_filters_pushdown( |
| 220 | + &self, |
| 221 | + filters: &[&Expr], |
| 222 | + ) -> datafusion_common::Result<Vec<TableProviderFilterPushDown>> { |
| 223 | + // Inexact filtering because it's using it to perform large-grained file pruning. |
| 224 | + Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) |
| 225 | + } |
| 226 | +} |
| 227 | + |
| 228 | +/*#[cfg(test)] |
| 229 | +mod tests { |
| 230 | + use super::*; |
| 231 | + use datafusion::prelude::SessionContext; |
| 232 | +
|
| 233 | + #[tokio::test] |
| 234 | + async fn test_catalog_provider_list() -> Result<(), anyhow::Error> { |
| 235 | + let cpl = Arc::new(CatalogProviderList::new()); |
| 236 | + let ctx = SessionContext::new(); |
| 237 | + ctx.register_catalog_list(cpl); |
| 238 | + let result = ctx.sql("select * from debuginfo").await?.collect().await?; |
| 239 | + println!("{result:?}"); |
| 240 | + Ok(()) |
| 241 | + } |
| 242 | +} |
| 243 | +*/ |
0 commit comments