Skip to content

Commit 51144fd

Browse files
authored
feat: support query ORC file as rows of variants. (#18415)
1 parent 1ace76d commit 51144fd

File tree

17 files changed

+362
-106
lines changed

17 files changed

+362
-106
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/catalog/src/plan/datasource/datasource_info/orc.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@ use crate::plan::StageTableInfo;
2121
#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq, Debug)]
2222
pub struct OrcTableInfo {
2323
pub stage_table_info: StageTableInfo,
24-
pub arrow_schema: arrow_schema::SchemaRef,
25-
pub schema_from: String,
24+
pub schema: Option<(arrow_schema::SchemaRef, String)>,
2625
}
2726

2827
impl OrcTableInfo {

src/query/service/src/sessions/query_ctx.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1774,6 +1774,14 @@ impl TableContext for QueryContext {
17741774
}
17751775
}
17761776
FileFormatParams::Orc(..) => {
1777+
let is_variant =
1778+
match max_column_position {
1779+
0 => false,
1780+
1 => true,
1781+
_ => return Err(ErrorCode::SemanticError(
1782+
"[QUERY-CTX] Query from ORC file only support $1 as column position",
1783+
)),
1784+
};
17771785
let schema = Arc::new(TableSchema::empty());
17781786
let info = StageTableInfo {
17791787
schema,
@@ -1785,7 +1793,7 @@ impl TableContext for QueryContext {
17851793
default_exprs: None,
17861794
copy_into_table_options: Default::default(),
17871795
stage_root,
1788-
is_variant: false,
1796+
is_variant,
17891797
};
17901798
OrcTable::try_create(info).await
17911799
}

src/query/storages/common/stage/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ publish = { workspace = true }
77
edition = { workspace = true }
88

99
[dependencies]
10+
arrow-array = { workspace = true }
1011
databend-common-ast = { workspace = true }
1112
databend-common-catalog = { workspace = true }
1213
databend-common-exception = { workspace = true }
1314
databend-common-expression = { workspace = true }
1415
databend-common-functions = { workspace = true }
1516
databend-common-meta-app = { workspace = true }
17+
jiff = { workspace = true }
1618
serde = { workspace = true }
1719
typetag = { workspace = true }
1820

src/query/storages/parquet/src/parquet_variant_table/recordbatch_to_variant.rs renamed to src/query/storages/common/stage/src/read/columnar/arrow_to_variant.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use databend_common_expression::DataSchema;
2121
use databend_common_expression::TableDataType;
2222
use jiff::tz::TimeZone;
2323

24-
pub fn read_record_batch(
24+
pub fn read_record_batch_to_variant_column(
2525
record_batch: RecordBatch,
2626
builder: &mut BinaryColumnBuilder,
2727
tz: &TimeZone,
@@ -40,7 +40,7 @@ pub fn read_record_batch(
4040
Ok(())
4141
}
4242

43-
pub fn record_batch_to_block(
43+
pub fn record_batch_to_variant_block(
4444
record_batch: RecordBatch,
4545
tz: &TimeZone,
4646
typ: &TableDataType,
@@ -50,7 +50,7 @@ pub fn record_batch_to_block(
5050
record_batch.num_rows(),
5151
record_batch.get_array_memory_size(),
5252
);
53-
read_record_batch(record_batch, &mut builder, tz, typ, schema)?;
53+
read_record_batch_to_variant_column(record_batch, &mut builder, tz, typ, schema)?;
5454
let column = builder.build();
5555
let num_rows = column.len();
5656
Ok(DataBlock::new(

src/query/storages/common/stage/src/read/columnar/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
pub mod arrow_to_variant;
1516
mod projection;
1617

18+
pub use arrow_to_variant::read_record_batch_to_variant_column;
19+
pub use arrow_to_variant::record_batch_to_variant_block;
1720
pub use projection::project_columnar;

src/query/storages/iceberg/src/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ impl IcebergTable {
320320
let data_schema = Arc::new(data_schema);
321321
pipeline.add_source(
322322
|output| {
323-
ORCSource::try_create(
323+
ORCSource::try_create_with_schema(
324324
output,
325325
ctx.clone(),
326326
Arc::new(op.clone()),

src/query/storages/orc/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ bytes = { workspace = true }
3131
chrono = { workspace = true }
3232
dashmap = { workspace = true }
3333
futures-util = { workspace = true }
34+
jiff = { workspace = true }
3435
log = { workspace = true }
3536
opendal = { workspace = true }
3637
orc-rust = { workspace = true }

src/query/storages/orc/src/processors/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@
1414

1515
pub(crate) mod decoder;
1616
pub(crate) mod source;
17+
pub(crate) mod variant_decoder;

src/query/storages/orc/src/processors/source.rs

Lines changed: 93 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -35,22 +35,52 @@ use orc_rust::projection::ProjectionMask;
3535
use orc_rust::ArrowReaderBuilder;
3636

3737
use crate::chunk_reader_impl::OrcChunkReader;
38+
use crate::hashable_schema::HashableSchema;
3839
use crate::strip::StripeInMemory;
3940
use crate::utils::map_orc_error;
4041

42+
pub struct InferredSchema {
43+
arrow_schema: arrow_schema::SchemaRef,
44+
schema_from: Option<String>,
45+
projection: Projection,
46+
}
47+
48+
impl InferredSchema {
49+
fn check_file_schema(&self, arrow_schema: arrow_schema::SchemaRef, path: &str) -> Result<()> {
50+
if self.arrow_schema.fields != arrow_schema.fields {
51+
return Err(ErrorCode::TableSchemaMismatch(format!(
52+
"{} get diff schema in file '{}'. Expected schema: {:?}, actual: {:?}",
53+
self.schema_from
54+
.as_ref()
55+
.map_or(String::new(), |schema_from| {
56+
format!("infer schema from '{}', but ", schema_from)
57+
}),
58+
path,
59+
self.arrow_schema,
60+
arrow_schema
61+
)));
62+
}
63+
Ok(())
64+
}
65+
}
66+
67+
pub struct ReadingFile {
68+
path: String,
69+
stripe_factory: Box<StripeFactory<OrcChunkReader>>,
70+
size: usize,
71+
schema: Option<HashableSchema>,
72+
}
73+
4174
pub struct ORCSource {
4275
table_ctx: Arc<dyn TableContext>,
4376
op_registry: Arc<dyn OperatorRegistry>,
44-
pub(crate) reader: Option<(String, Box<StripeFactory<OrcChunkReader>>, usize)>,
77+
pub reader: Option<ReadingFile>,
4578
scan_progress: Arc<Progress>,
46-
47-
arrow_schema: arrow_schema::SchemaRef,
48-
schema_from: Option<String>,
49-
projection: Projection,
79+
inferred_schema: Option<InferredSchema>,
5080
}
5181

5282
impl ORCSource {
53-
pub fn try_create(
83+
pub fn try_create_with_schema(
5484
output: Arc<OutputPort>,
5585
table_ctx: Arc<dyn TableContext>,
5686
op_registry: Arc<dyn OperatorRegistry>,
@@ -65,27 +95,29 @@ impl ORCSource {
6595
op_registry,
6696
scan_progress,
6797
reader: None,
68-
arrow_schema,
69-
schema_from,
70-
projection,
98+
inferred_schema: Some(InferredSchema {
99+
arrow_schema,
100+
schema_from,
101+
projection,
102+
}),
71103
})
72104
}
73105

74-
fn check_file_schema(&self, arrow_schema: arrow_schema::SchemaRef, path: &str) -> Result<()> {
75-
if self.arrow_schema.fields != arrow_schema.fields {
76-
return Err(ErrorCode::TableSchemaMismatch(format!(
77-
"{}get diff schema in file '{}'. Expected schema: {:?}, actual: {:?}",
78-
self.schema_from
79-
.as_ref()
80-
.map_or(String::new(), |schema_from| {
81-
format!("infer schema from '{}', but ", schema_from)
82-
}),
83-
path,
84-
self.arrow_schema,
85-
arrow_schema
86-
)));
87-
}
88-
Ok(())
106+
pub fn try_create(
107+
output: Arc<OutputPort>,
108+
table_ctx: Arc<dyn TableContext>,
109+
op_registry: Arc<dyn OperatorRegistry>,
110+
inferred_schema: Option<InferredSchema>,
111+
) -> Result<ProcessorPtr> {
112+
let scan_progress = table_ctx.get_scan_progress();
113+
114+
AsyncSourcer::create(table_ctx.clone(), output, ORCSource {
115+
table_ctx,
116+
op_registry,
117+
scan_progress,
118+
reader: None,
119+
inferred_schema,
120+
})
89121
}
90122

91123
async fn next_part(&mut self) -> Result<bool> {
@@ -105,20 +137,32 @@ impl ORCSource {
105137
let builder = ArrowReaderBuilder::try_new_async(file)
106138
.await
107139
.map_err(|e| map_orc_error(e, path))?;
108-
let projection = if let Projection::Columns(projection) = &self.projection {
109-
ProjectionMask::roots(
110-
builder.file_metadata().root_data_type(),
111-
projection.iter().map(|index| index + 1),
112-
)
113-
} else {
114-
ProjectionMask::all()
115-
};
140+
let mut projection = ProjectionMask::all();
141+
if let Some(schema) = &self.inferred_schema {
142+
if let Projection::Columns(p) = &schema.projection {
143+
projection = ProjectionMask::roots(
144+
builder.file_metadata().root_data_type(),
145+
p.iter().map(|index| index + 1),
146+
);
147+
}
148+
}
149+
116150
let reader = builder.with_projection(projection).build_async();
117151
let (factory, schema) = reader.into_parts();
118-
let factory = factory.unwrap();
119-
self.check_file_schema(schema, path)?;
152+
let stripe_factory = factory.unwrap();
153+
let schema = if let Some(inferred_schema) = &self.inferred_schema {
154+
inferred_schema.check_file_schema(schema, path)?;
155+
None
156+
} else {
157+
Some(HashableSchema::try_create(schema)?)
158+
};
120159

121-
self.reader = Some((path.to_string(), factory, size));
160+
self.reader = Some(ReadingFile {
161+
path: path.to_string(),
162+
stripe_factory,
163+
size,
164+
schema,
165+
});
122166
Ok(true)
123167
}
124168
}
@@ -134,8 +178,9 @@ impl AsyncSource for ORCSource {
134178
if self.reader.is_none() && !self.next_part().await? {
135179
return Ok(None);
136180
}
137-
if let Some((path, factory, size)) = mem::take(&mut self.reader) {
138-
let (factory, stripe) = factory
181+
if let Some(file) = mem::take(&mut self.reader) {
182+
let (factory, stripe) = file
183+
.stripe_factory
139184
.read_next_stripe()
140185
.await
141186
.map_err(|e| ErrorCode::StorageOther(e.to_string()))?;
@@ -144,10 +189,10 @@ impl AsyncSource for ORCSource {
144189
self.reader = None;
145190
let progress_values = ProgressValues {
146191
rows: 0,
147-
bytes: size,
192+
bytes: file.size,
148193
};
149194
self.scan_progress.incr(&progress_values);
150-
Profile::record_usize_profile(ProfileStatisticsName::ScanBytes, size);
195+
Profile::record_usize_profile(ProfileStatisticsName::ScanBytes, file.size);
151196
Profile::record_usize_profile(ProfileStatisticsName::ScanPartitions, 1);
152197
continue;
153198
}
@@ -157,11 +202,17 @@ impl AsyncSource for ORCSource {
157202
bytes: 0,
158203
};
159204
self.scan_progress.incr(&progress_values);
160-
self.reader = Some((path.clone(), Box::new(factory), size));
205+
161206
let meta = Box::new(StripeInMemory {
162-
path,
207+
path: file.path.clone(),
163208
stripe,
164-
schema: None,
209+
schema: file.schema.clone(),
210+
});
211+
self.reader = Some(ReadingFile {
212+
path: file.path.clone(),
213+
stripe_factory: Box::new(factory),
214+
size: file.size,
215+
schema: file.schema.clone(),
165216
});
166217
return Ok(Some(DataBlock::empty_with_meta(meta)));
167218
}

0 commit comments

Comments
 (0)