Skip to content

Commit d574fe7

Browse files
committed
check schema should be inferred
1 parent 3e2667a commit d574fe7

File tree

1 file changed

+37
-1
lines changed

1 file changed

+37
-1
lines changed

datafusion/datasource-avro/src/source.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use datafusion_datasource::TableSchema;
3333
use datafusion_physical_expr_common::sort_expr::LexOrdering;
3434
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
3535

36+
use crate::read_avro_schema_from_reader;
3637
use object_store::ObjectStore;
3738
use serde_json::Value;
3839

@@ -235,9 +236,44 @@ mod private {
235236

236237
impl FileOpener for AvroOpener {
237238
fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
238-
let config = Arc::clone(&self.config);
239239
let object_store = Arc::clone(&self.object_store);
240+
let config = self.config.clone();
241+
240242
Ok(Box::pin(async move {
243+
// check if schema should be inferred
244+
let r = object_store
245+
.get(&partitioned_file.object_meta.location)
246+
.await?;
247+
let config = Arc::new(match r.payload {
248+
GetResultPayload::File(mut file, _) => {
249+
let schema = match config.table_schema.file_schema().metadata.get(SCHEMA_METADATA_KEY) {
250+
Some(_) => config.table_schema.file_schema().clone(),
251+
None => Arc::new(read_avro_schema_from_reader(&mut file).unwrap()), // if not inferred, read schema from file
252+
};
253+
AvroSource {
254+
table_schema: TableSchema::new(schema, config.table_schema.table_partition_cols().clone()),
255+
batch_size: config.batch_size,
256+
projection: config.projection.clone(),
257+
metrics: config.metrics.clone(),
258+
schema_adapter_factory: config.schema_adapter_factory.clone(),
259+
}
260+
}
261+
GetResultPayload::Stream(_) => {
262+
let bytes = r.bytes().await?;
263+
let schema = match config.table_schema.file_schema().metadata.get(SCHEMA_METADATA_KEY) {
264+
Some(_) => config.table_schema.file_schema().clone(),
265+
None => Arc::new(read_avro_schema_from_reader(&mut bytes.reader()).unwrap()), // if not inferred, read schema from file
266+
};
267+
AvroSource {
268+
table_schema: TableSchema::new(schema, config.table_schema.table_partition_cols().clone()),
269+
batch_size: config.batch_size,
270+
projection: config.projection.clone(),
271+
metrics: config.metrics.clone(),
272+
schema_adapter_factory: config.schema_adapter_factory.clone(),
273+
}
274+
}
275+
});
276+
241277
let r = object_store
242278
.get(&partitioned_file.object_meta.location)
243279
.await?;

0 commit comments

Comments
 (0)