Skip to content

Commit 5ffd2de

Browse files
abey79emilk
andauthored
Fix wrong datatype displayed for non-list column in the table view (#11140)
Co-authored-by: Emil Ernerfeldt <[email protected]>
1 parent 8a8ab86 commit 5ffd2de

19 files changed

+200
-153
lines changed

crates/store/re_sorbet/src/migrations/mod.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::cmp::Ordering;
1010

1111
use arrow::{array::RecordBatch, datatypes::SchemaRef};
1212

13-
use crate::SorbetSchema;
13+
use crate::{BatchType, SorbetSchema};
1414

1515
mod make_list_arrays;
1616

@@ -105,11 +105,13 @@ fn maybe_apply<M: Migration>(
105105

106106
/// Migrate a sorbet record batch of unknown version to the latest version.
107107
#[tracing::instrument(level = "debug", skip_all)]
108-
pub fn migrate_record_batch(mut batch: RecordBatch) -> RecordBatch {
108+
pub fn migrate_record_batch(mut batch: RecordBatch, batch_type: BatchType) -> RecordBatch {
109109
batch = migrate_record_batch_impl(batch);
110110

111-
// TODO(emilk): only call make_all_data_columns_list_arrays for chunk batches?
112-
make_list_arrays::make_all_data_columns_list_arrays(&batch)
111+
match batch_type {
112+
BatchType::Chunk => make_list_arrays::make_all_data_columns_list_arrays(&batch),
113+
BatchType::Dataframe => batch,
114+
}
113115
}
114116

115117
fn migrate_record_batch_impl(mut batch: RecordBatch) -> RecordBatch {

crates/store/re_sorbet/src/sorbet_batch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ impl SorbetBatch {
178178
re_tracing::profile_function!();
179179

180180
// First migrate the incoming batch to the latest format:
181-
let batch = crate::migrations::migrate_record_batch(batch.clone());
181+
let batch = crate::migrations::migrate_record_batch(batch.clone(), batch_type);
182182

183183
let sorbet_schema =
184184
SorbetSchema::try_from_migrated_arrow_schema(batch.schema_ref().as_ref())?;

crates/store/re_sorbet/src/sorbet_schema.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
use arrow::datatypes::Schema as ArrowSchema;
1+
use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
22

33
use re_log_types::EntityPath;
44
use re_types_core::ChunkId;
55

6-
use crate::{ArrowBatchMetadata, SorbetColumnDescriptors, SorbetError, TimestampMetadata};
6+
use crate::{
7+
ArrowBatchMetadata, SorbetColumnDescriptors, SorbetError, TimestampMetadata, migrate_schema_ref,
8+
};
79

810
// ----------------------------------------------------------------------------
911

@@ -116,6 +118,11 @@ impl SorbetSchema {
116118
}
117119

118120
impl SorbetSchema {
121+
/// Parse an arbitrary arrow schema by first migrating it to the Rerun schema.
122+
pub fn try_from_raw_arrow_schema(arrow_schema: ArrowSchemaRef) -> Result<Self, SorbetError> {
123+
Self::try_from_migrated_arrow_schema(&migrate_schema_ref(arrow_schema))
124+
}
125+
119126
/// Parse an already migrated Arrow schema.
120127
#[tracing::instrument(level = "trace", skip_all)]
121128
pub(crate) fn try_from_migrated_arrow_schema(

crates/viewer/re_dataframe_ui/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ datafusion.workspace = true
4141
egui.workspace = true
4242
egui_dnd.workspace = true
4343
egui_table.workspace = true
44+
itertools.workspace = true
4445
nohash-hasher.workspace = true
4546
parking_lot.workspace = true
4647
serde.workspace = true

crates/viewer/re_dataframe_ui/src/datafusion_adapter.rs

Lines changed: 47 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::sync::Arc;
22

3-
use arrow::datatypes::{DataType, Fields};
3+
use arrow::datatypes::{DataType, SchemaRef};
44
use datafusion::common::{DataFusionError, TableReference};
55
use datafusion::functions::expr_fn::concat;
66
use datafusion::logical_expr::{binary_expr, col as datafusion_col, lit};
@@ -61,6 +61,19 @@ impl From<&TableBlueprint> for DataFusionQueryData {
6161
}
6262
}
6363

64+
/// Result of the async datafusion query process.
65+
#[derive(Debug, Clone)]
66+
pub struct DataFusionQueryResult {
67+
/// The record batches to display.
68+
pub sorbet_batches: Vec<SorbetBatch>,
69+
70+
/// The schema of the record batches.
71+
pub original_schema: SchemaRef,
72+
73+
/// The migrated schema of the record batches (useful when the list of batches is empty).
74+
pub sorbet_schema: re_sorbet::SorbetSchema,
75+
}
76+
6477
/// A table blueprint along with the context required to execute the corresponding datafusion query.
6578
#[derive(Clone)]
6679
struct DataFusionQuery {
@@ -83,12 +96,11 @@ impl DataFusionQuery {
8396
}
8497
}
8598

86-
/// Execute the query and produce a vector of [`SorbetBatch`]s along with physical columns
87-
/// names.
99+
/// Execute the query to produce the data to display.
88100
///
89101
/// Note: the future returned by this function must be `'static`, so it takes `self`. Use
90102
/// `clone()` as required.
91-
async fn execute(self) -> Result<(Vec<SorbetBatch>, Fields), DataFusionError> {
103+
async fn execute(self) -> Result<DataFusionQueryResult, DataFusionError> {
92104
let mut dataframe = self.session_ctx.table(self.table_ref).await?;
93105

94106
let DataFusionQueryData {
@@ -182,20 +194,16 @@ impl DataFusionQuery {
182194
}
183195

184196
//
185-
// Collect
197+
// Collect record batches
186198
//
187199

200+
let original_schema = Arc::clone(dataframe.schema().inner());
188201
let record_batches = dataframe.collect().await?;
189202

190-
// TODO(#10421) IMPORTANT: fields must be copied here *before* converting to `SorbetBatch`,
191-
// because that conversion modifies the field names. As a result, the schema contained in a
192-
// `SorbetBatch` cannot be used to derive the physical column names as seen by DataFusion.
193-
let fields = record_batches
194-
.first()
195-
.map(|record_batch| record_batch.schema().fields.clone())
196-
.unwrap_or_default();
203+
//
204+
// Convert to `SorbetBatch`
205+
//
197206

198-
// convert to SorbetBatch
199207
let sorbet_batches = record_batches
200208
.iter()
201209
.map(|record_batch| {
@@ -204,7 +212,23 @@ impl DataFusionQuery {
204212
.collect::<Result<Vec<_>, _>>()
205213
.map_err(|err| DataFusionError::External(err.into()))?;
206214

207-
Ok((sorbet_batches, fields))
215+
//
216+
// Get (or create) `SorbetSchema`
217+
//
218+
219+
let sorbet_schema = sorbet_batches
220+
.first()
221+
.map(|batch| Ok(batch.sorbet_schema().clone()))
222+
.unwrap_or_else(|| {
223+
re_sorbet::SorbetSchema::try_from_raw_arrow_schema(Arc::clone(&original_schema))
224+
.map_err(|err| DataFusionError::External(err.into()))
225+
})?;
226+
227+
Ok(DataFusionQueryResult {
228+
sorbet_batches,
229+
original_schema,
230+
sorbet_schema,
231+
})
208232
}
209233
}
210234

@@ -222,8 +246,8 @@ impl PartialEq for DataFusionQuery {
222246
}
223247
}
224248

225-
type RequestedSorbetBatches =
226-
RequestedObject<Result<(Vec<SorbetBatch>, arrow::datatypes::Fields), DataFusionError>>;
249+
type RequestedDataFusionQueryResult =
250+
RequestedObject<Result<DataFusionQueryResult, DataFusionError>>;
227251

228252
/// Helper struct to manage the datafusion async query and the resulting `SorbetBatch`.
229253
#[derive(Clone)]
@@ -237,11 +261,11 @@ pub struct DataFusionAdapter {
237261
query: DataFusionQuery,
238262

239263
// Used to have something to display while the new dataframe is being queried.
240-
pub last_sorbet_batches: Option<(Vec<SorbetBatch>, Fields)>,
264+
pub last_query_results: Option<DataFusionQueryResult>,
241265

242266
// TODO(ab, lucasmerlin): this `Mutex` is only needed because of the `Clone` bound in egui
243267
// so we should clean that up if the bound is lifted.
244-
pub requested_sorbet_batches: Arc<Mutex<RequestedSorbetBatches>>,
268+
pub requested_query_result: Arc<Mutex<RequestedDataFusionQueryResult>>,
245269

246270
pub queried_at: Timestamp,
247271
}
@@ -271,13 +295,13 @@ impl DataFusionAdapter {
271295
let table_state = Self {
272296
id,
273297
blueprint: initial_blueprint,
274-
requested_sorbet_batches: Arc::new(Mutex::new(RequestedObject::new_with_repaint(
298+
requested_query_result: Arc::new(Mutex::new(RequestedObject::new_with_repaint(
275299
runtime,
276300
ui.ctx().clone(),
277301
query.clone().execute(),
278302
))),
279303
query,
280-
last_sorbet_batches: None,
304+
last_query_results: None,
281305
queried_at: Timestamp::now(),
282306
};
283307

@@ -288,7 +312,7 @@ impl DataFusionAdapter {
288312
table_state
289313
});
290314

291-
adapter.requested_sorbet_batches.lock().on_frame_start();
315+
adapter.requested_query_result.lock().on_frame_start();
292316

293317
adapter
294318
}
@@ -314,10 +338,10 @@ impl DataFusionAdapter {
314338
if self.query.query_data != new_query_data {
315339
self.query.query_data = new_query_data;
316340

317-
let mut dataframe = self.requested_sorbet_batches.lock();
341+
let mut dataframe = self.requested_query_result.lock();
318342

319343
if let Some(Ok(sorbet_batches)) = dataframe.try_as_ref() {
320-
self.last_sorbet_batches = Some(sorbet_batches.clone());
344+
self.last_query_results = Some(sorbet_batches.clone());
321345
}
322346

323347
*dataframe = RequestedObject::new_with_repaint(

0 commit comments

Comments
 (0)