Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,14 +517,17 @@ pub fn parse_protobuf_file_scan_config(
// Remove partition columns from the schema after recreating table_partition_cols
// because the partition columns are not in the file. They are present to allow
// the partition column types to be reconstructed after serde.
let file_schema = Arc::new(Schema::new(
schema
.fields()
.iter()
.filter(|field| !table_partition_cols.contains(field))
.cloned()
.collect::<Vec<_>>(),
));
let file_schema = Arc::new(
Schema::new(
schema
.fields()
.iter()
.filter(|field| !table_partition_cols.contains(field))
.cloned()
.collect::<Vec<_>>(),
)
.with_metadata(schema.metadata.clone()),
);

let mut output_ordering = vec![];
for node_collection in &proto.output_ordering {
Expand Down
6 changes: 5 additions & 1 deletion datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,11 @@ pub fn serialize_file_scan_config(
.cloned()
.collect::<Vec<_>>();
fields.extend(conf.table_partition_cols.iter().cloned());
let schema = Arc::new(arrow::datatypes::Schema::new(fields.clone()));

let schema = Arc::new(
arrow::datatypes::Schema::new(fields.clone())
.with_metadata(conf.file_schema.metadata.clone()),
);

Ok(protobuf::FileScanExecConf {
file_groups,
Expand Down
43 changes: 41 additions & 2 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use std::any::Any;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};

use std::sync::Arc;
Expand All @@ -42,9 +43,9 @@ use datafusion::arrow::compute::kernels::sort::SortOptions;
use datafusion::arrow::datatypes::{DataType, Field, IntervalUnit, Schema};
use datafusion::datasource::empty::EmptyTable;
use datafusion::datasource::file_format::csv::CsvSink;
use datafusion::datasource::file_format::json::JsonSink;
use datafusion::datasource::file_format::json::{JsonFormat, JsonSink};
use datafusion::datasource::file_format::parquet::ParquetSink;
use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::datasource::physical_plan::{
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileGroup,
Expand Down Expand Up @@ -2221,3 +2222,41 @@ async fn roundtrip_memory_source() -> Result<()> {
.await?;
roundtrip_test(plan)
}

#[tokio::test]
async fn roundtrip_listing_table_with_schema_metadata() -> Result<()> {
let ctx = SessionContext::new();
let file_format = JsonFormat::default();
let table_partition_cols = vec![("part".to_owned(), DataType::Int64)];
let data = "../core/tests/data/partitioned_table_json";
let listing_table_url = ListingTableUrl::parse(data)?;
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_table_partition_cols(table_partition_cols);

let config = ListingTableConfig::new(listing_table_url)
.with_listing_options(listing_options)
.infer_schema(&ctx.state())
.await?;

// Decorate metadata onto the inferred ListingTable schema
let schema_with_meta = config
.file_schema
.clone()
.map(|s| {
let mut meta: HashMap<String, String> = HashMap::new();
meta.insert("foo.bar".to_string(), "baz".to_string());
s.as_ref().clone().with_metadata(meta)
})
.expect("Must decorate metadata");

let config = config.with_schema(Arc::new(schema_with_meta));
ctx.register_table("hive_style", Arc::new(ListingTable::try_new(config)?))?;

let plan = ctx
.sql("select * from hive_style limit 1")
.await?
.create_physical_plan()
.await?;

roundtrip_test(plan)
}
Loading