Skip to content

Commit 01314e1

Browse files
alambLiam Brannigan
authored andcommitted
fix: make it compile
Signed-off-by: Andrew Lamb <[email protected]> Signed-off-by: Liam Brannigan <[email protected]>
1 parent eaecc7c commit 01314e1

File tree

7 files changed

+25
-11
lines changed

7 files changed

+25
-11
lines changed

crates/core/src/data_catalog/storage/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ mod tests {
149149
use super::*;
150150
use datafusion::assert_batches_sorted_eq;
151151
use datafusion::catalog::CatalogProvider;
152-
use datafusion::catalog_common::MemoryCatalogProvider;
152+
use datafusion::catalog::MemoryCatalogProvider;
153153
use datafusion::execution::context::SessionContext;
154154

155155
#[test]

crates/core/src/delta_datafusion/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ use datafusion::physical_optimizer::pruning::PruningPredicate;
5151
use datafusion_common::scalar::ScalarValue;
5252
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
5353
use datafusion_common::{
54-
config::ConfigOptions, Column, DFSchema, DataFusionError, Result as DataFusionResult,
55-
TableReference, ToDFSchema,
54+
config::ConfigOptions, Column, Constraints, DFSchema, DataFusionError,
55+
Result as DataFusionResult, TableReference, ToDFSchema,
5656
};
5757
use datafusion_expr::execution_props::ExecutionProps;
5858
use datafusion_expr::logical_plan::CreateExternalTable;
@@ -659,6 +659,7 @@ impl<'a> DeltaScanBuilder<'a> {
659659
} else {
660660
file_groups.into_values().collect()
661661
},
662+
constraints: Constraints::default(),
662663
statistics: stats,
663664
projection: self.projection.cloned(),
664665
limit: self.limit,

crates/core/src/kernel/snapshot/log_data.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,7 @@ mod datafusion {
638638
null_count,
639639
max_value,
640640
min_value,
641+
sum_value: Precision::Absent,
641642
distinct_count: Precision::Absent,
642643
})
643644
}
@@ -653,6 +654,7 @@ mod datafusion {
653654
null_count: self.null_count.add(&other.null_count),
654655
max_value: self.max_value.max(&other.max_value),
655656
min_value: self.min_value.min(&other.min_value),
657+
sum_value: Precision::Absent,
656658
distinct_count: self.distinct_count.add(&other.distinct_count),
657659
}
658660
}

crates/core/src/operations/load_cdf.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use datafusion::datasource::file_format::parquet::ParquetFormat;
1010
use datafusion::datasource::file_format::FileFormat;
1111
use datafusion::datasource::physical_plan::FileScanConfig;
1212
use datafusion::prelude::SessionContext;
13-
use datafusion_common::{ScalarValue, Statistics};
13+
use datafusion_common::{Constraints, ScalarValue, Statistics};
1414
use datafusion_physical_expr::expressions;
1515
use datafusion_physical_expr::PhysicalExpr;
1616
use datafusion_physical_plan::projection::ProjectionExec;
@@ -389,6 +389,7 @@ impl CdfLoadBuilder {
389389
object_store_url: self.log_store.object_store_url(),
390390
file_schema: cdc_file_schema.clone(),
391391
file_groups: cdc_file_groups.into_values().collect(),
392+
constraints: Constraints::default(),
392393
statistics: Statistics::new_unknown(&cdc_file_schema),
393394
projection: None,
394395
limit: None,
@@ -406,6 +407,7 @@ impl CdfLoadBuilder {
406407
object_store_url: self.log_store.object_store_url(),
407408
file_schema: add_remove_file_schema.clone(),
408409
file_groups: add_file_groups.into_values().collect(),
410+
constraints: Constraints::default(),
409411
statistics: Statistics::new_unknown(&add_remove_file_schema.clone()),
410412
projection: None,
411413
limit: None,
@@ -423,6 +425,7 @@ impl CdfLoadBuilder {
423425
object_store_url: self.log_store.object_store_url(),
424426
file_schema: add_remove_file_schema.clone(),
425427
file_groups: remove_file_groups.into_values().collect(),
428+
constraints: Constraints::default(),
426429
statistics: Statistics::new_unknown(&add_remove_file_schema),
427430
projection: None,
428431
limit: None,

crates/core/src/writer/json.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,8 @@ impl DeltaWriter<Vec<Value>> for JsonWriter {
349349
ParquetError::IndexOutOfBound(u.to_owned(), v.to_owned())
350350
}
351351
ParquetError::NYI(msg) => ParquetError::NYI(msg.to_owned()),
352+
// ParquetError is non exhaustive, so have a fallback
353+
e => ParquetError::General(e.to_string()),
352354
},
353355
skipped_values: partial_writes,
354356
}

python/src/filesystem.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::RawDeltaTable;
44
use deltalake::storage::object_store::{MultipartUpload, PutPayloadMut};
55
use deltalake::storage::{DynObjectStore, ListResult, ObjectStoreError, Path};
66
use deltalake::DeltaTableBuilder;
7+
use parking_lot::Mutex;
78
use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError};
89
use pyo3::prelude::*;
910
use pyo3::types::{IntoPyDict, PyBytes, PyType};
@@ -519,7 +520,9 @@ impl ObjectInputFile {
519520
// TODO the C++ implementation track an internal lock on all random access files, DO we need this here?
520521
#[pyclass(weakref, module = "deltalake._internal")]
521522
pub struct ObjectOutputStream {
522-
upload: Box<dyn MultipartUpload>,
523+
// wrap in mutex as rustc says `MultipartUpload` can't be
524+
// shared across threads (it isn't sync)
525+
upload: Mutex<Box<dyn MultipartUpload>>,
523526
pos: i64,
524527
#[pyo3(get)]
525528
closed: bool,
@@ -537,7 +540,7 @@ impl ObjectOutputStream {
537540
) -> Result<Self, ObjectStoreError> {
538541
let upload = store.put_multipart(&path).await?;
539542
Ok(Self {
540-
upload,
543+
upload: Mutex::new(upload),
541544
pos: 0,
542545
closed: false,
543546
mode: "wb".into(),
@@ -555,14 +558,15 @@ impl ObjectOutputStream {
555558
}
556559

557560
fn abort(&mut self) -> PyResult<()> {
558-
rt().block_on(self.upload.abort())
561+
rt().block_on(self.upload.lock().abort())
559562
.map_err(PythonError::from)?;
560563
Ok(())
561564
}
562565

563566
fn upload_buffer(&mut self) -> PyResult<()> {
564567
let payload = std::mem::take(&mut self.buffer).freeze();
565-
match rt().block_on(self.upload.put_part(payload)) {
568+
let res = rt().block_on(self.upload.lock().put_part(payload));
569+
match res {
566570
Ok(_) => Ok(()),
567571
Err(err) => {
568572
self.abort()?;
@@ -580,7 +584,7 @@ impl ObjectOutputStream {
580584
if !self.buffer.is_empty() {
581585
self.upload_buffer()?;
582586
}
583-
match rt().block_on(self.upload.complete()) {
587+
match rt().block_on(self.upload.lock().complete()) {
584588
Ok(_) => Ok(()),
585589
Err(err) => Err(PyIOError::new_err(err.to_string())),
586590
}

python/src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1563,10 +1563,12 @@ impl RawDeltaTable {
15631563
&self,
15641564
py: Python<'py>,
15651565
) -> PyResult<Bound<'py, PyCapsule>> {
1566+
// tokio runtime handle?
1567+
let handle = None;
15661568
let name = CString::new("datafusion_table_provider").unwrap();
15671569

15681570
let table = self.with_table(|t| Ok(Arc::new(t.clone())))?;
1569-
let provider = FFI_TableProvider::new(table, false);
1571+
let provider = FFI_TableProvider::new(table, false, handle);
15701572

15711573
PyCapsule::new_bound(py, provider, Some(name.clone()))
15721574
}
@@ -1820,7 +1822,7 @@ fn filestats_to_expression_next<'py>(
18201822
})?
18211823
.data_type()
18221824
.clone();
1823-
let column_type = PyArrowType(column_type).into_py(py);
1825+
let column_type = PyArrowType(column_type).into_pyobject(py)?;
18241826
pa.call_method1("scalar", (value,))?
18251827
.call_method1("cast", (column_type,))
18261828
};

0 commit comments

Comments
 (0)