Skip to content

Commit d8e0828

Browse files
committed
use default repartition
1 parent b91a351 commit d8e0828

File tree

2 files changed

+40
-144
lines changed

2 files changed

+40
-144
lines changed

Cargo.lock

Lines changed: 38 additions & 76 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/datasource-avro/src/source.rs

Lines changed: 2 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ use datafusion_datasource::TableSchema;
3434
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
3535
use datafusion_physical_plan::projection::ProjectionExprs;
3636

37-
use crate::read_avro_schema_from_reader;
3837
use object_store::ObjectStore;
3938
use serde_json::Value;
4039

@@ -76,19 +75,9 @@ impl AvroSource {
7675

7776
fn build_projected_reader_schema(&self) -> Result<AvroSchema> {
7877
let file_schema = self.table_schema.file_schema().as_ref();
79-
// Fast path: no projection. If we have the original writer schema JSON
80-
// in metadata, just reuse it as-is without parsing.
78+
// Fast path: no projection.
8179
if self.projection.file_indices.is_empty() {
82-
return if let Some(avro_json) =
83-
file_schema.metadata().get(SCHEMA_METADATA_KEY)
84-
{
85-
Ok(AvroSchema::new(avro_json.clone()))
86-
} else {
87-
// Fall back to deriving Avro from the full Arrow file schema, should be ok
88-
// if not using projection.
89-
Ok(AvroSchema::try_from(file_schema)
90-
.map_err(Into::<DataFusionError>::into)?)
91-
};
80+
return Ok(AvroSchema::try_from(file_schema).map_err(Into::<DataFusionError>::into)?)
9281
}
9382
// Use the writer Avro schema JSON tagged upstream to build a projected reader schema
9483
match file_schema.metadata().get(SCHEMA_METADATA_KEY) {
@@ -246,61 +235,6 @@ mod private {
246235
let config = Arc::clone(&self.config);
247236

248237
Ok(Box::pin(async move {
249-
// check if schema should be inferred
250-
let r = object_store
251-
.get(&partitioned_file.object_meta.location)
252-
.await?;
253-
let config = Arc::new(match r.payload {
254-
GetResultPayload::File(mut file, _) => {
255-
let schema = match config
256-
.table_schema
257-
.file_schema()
258-
.metadata
259-
.get(SCHEMA_METADATA_KEY)
260-
{
261-
Some(_) => Arc::clone(config.table_schema.file_schema()),
262-
None => {
263-
Arc::new(read_avro_schema_from_reader(&mut file).unwrap())
264-
} // if not inferred, read schema from file
265-
};
266-
AvroSource {
267-
table_schema: TableSchema::new(
268-
schema,
269-
config.table_schema.table_partition_cols().clone(),
270-
),
271-
batch_size: config.batch_size,
272-
projection: config.projection.clone(),
273-
metrics: config.metrics.clone(),
274-
schema_adapter_factory: config.schema_adapter_factory.clone(),
275-
}
276-
}
277-
GetResultPayload::Stream(_) => {
278-
let bytes = r.bytes().await?;
279-
let schema = match config
280-
.table_schema
281-
.file_schema()
282-
.metadata
283-
.get(SCHEMA_METADATA_KEY)
284-
{
285-
Some(_) => Arc::clone(config.table_schema.file_schema()),
286-
None => Arc::new(
287-
read_avro_schema_from_reader(&mut bytes.reader())
288-
.unwrap(),
289-
), // if not inferred, read schema from file
290-
};
291-
AvroSource {
292-
table_schema: TableSchema::new(
293-
schema,
294-
config.table_schema.table_partition_cols().clone(),
295-
),
296-
batch_size: config.batch_size,
297-
projection: config.projection.clone(),
298-
metrics: config.metrics.clone(),
299-
schema_adapter_factory: config.schema_adapter_factory.clone(),
300-
}
301-
}
302-
});
303-
304238
let r = object_store
305239
.get(&partitioned_file.object_meta.location)
306240
.await?;

0 commit comments

Comments
 (0)