Skip to content

Commit 9fea7c9

Browse files
authored
Merge branch 'main' into check-duplicate
2 parents f505488 + d22cf4d commit 9fea7c9

File tree

6 files changed

+258
-22
lines changed

6 files changed

+258
-22
lines changed

.github/workflows/audit.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ on:
3131
paths:
3232
- "**/Cargo.toml"
3333
- "**/Cargo.lock"
34+
- ".github/workflows/audit.yml"
3435

3536
schedule:
3637
- cron: '0 0 * * *'
@@ -40,6 +41,10 @@ jobs:
4041
runs-on: ubuntu-latest
4142
steps:
4243
- uses: actions/checkout@v4
44+
- name: Setup Rust toolchain
45+
uses: ./.github/actions/setup-builder
46+
with:
47+
rust-version: stable
4348
- uses: rustsec/audit-check@v2.0.0
4449
with:
4550
token: ${{ secrets.GITHUB_TOKEN }}

crates/catalog/glue/src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ pub(crate) fn from_aws_sdk_error<T>(error: aws_sdk_glue::error::SdkError<T>) ->
2525
where T: Debug {
2626
Error::new(
2727
ErrorKind::Unexpected,
28-
"Operation failed for hitting aws skd error".to_string(),
28+
"Operation failed for hitting aws sdk error".to_string(),
2929
)
3030
.with_source(anyhow!("aws sdk error: {:?}", error))
3131
}

crates/catalog/s3tables/src/catalog.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ pub(crate) fn from_aws_sdk_error<T>(error: aws_sdk_s3tables::error::SdkError<T>)
501501
where T: std::fmt::Debug {
502502
Error::new(
503503
ErrorKind::Unexpected,
504-
"Operation failed for hitting aws skd error".to_string(),
504+
"Operation failed for hitting aws sdk error".to_string(),
505505
)
506506
.with_source(anyhow!("aws sdk error: {:?}", error))
507507
}

crates/iceberg/src/arrow/reader.rs

Lines changed: 152 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use std::ops::Range;
2222
use std::str::FromStr;
2323
use std::sync::Arc;
2424

25-
use arrow_arith::boolean::{and, is_not_null, is_null, not, or};
25+
use arrow_arith::boolean::{and, and_kleene, is_not_null, is_null, not, or, or_kleene};
2626
use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch};
2727
use arrow_ord::cmp::{eq, gt, gt_eq, lt, lt_eq, neq};
2828
use arrow_schema::{
@@ -827,7 +827,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
827827
Ok(Box::new(move |batch| {
828828
let left = lhs(batch.clone())?;
829829
let right = rhs(batch)?;
830-
and(&left, &right)
830+
and_kleene(&left, &right)
831831
}))
832832
}
833833

@@ -839,7 +839,7 @@ impl<'a> BoundPredicateVisitor for PredicateConverter<'a> {
839839
Ok(Box::new(move |batch| {
840840
let left = lhs(batch.clone())?;
841841
let right = rhs(batch)?;
842-
or(&left, &right)
842+
or_kleene(&left, &right)
843843
}))
844844
}
845845

@@ -1165,18 +1165,29 @@ impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
11651165
#[cfg(test)]
11661166
mod tests {
11671167
use std::collections::{HashMap, HashSet};
1168+
use std::fs::File;
11681169
use std::sync::Arc;
11691170

1171+
use arrow_array::cast::AsArray;
1172+
use arrow_array::{ArrayRef, RecordBatch, StringArray};
11701173
use arrow_schema::{DataType, Field, Schema as ArrowSchema, TimeUnit};
1171-
use parquet::arrow::ProjectionMask;
1174+
use futures::TryStreamExt;
1175+
use parquet::arrow::{ArrowWriter, ProjectionMask};
1176+
use parquet::basic::Compression;
1177+
use parquet::file::properties::WriterProperties;
11721178
use parquet::schema::parser::parse_message_type;
11731179
use parquet::schema::types::SchemaDescriptor;
1180+
use tempfile::TempDir;
11741181

11751182
use crate::arrow::reader::{CollectFieldIdVisitor, PARQUET_FIELD_ID_META_KEY};
1176-
use crate::arrow::ArrowReader;
1183+
use crate::arrow::{ArrowReader, ArrowReaderBuilder};
11771184
use crate::expr::visitors::bound_predicate_visitor::visit;
1178-
use crate::expr::{Bind, Reference};
1179-
use crate::spec::{NestedField, PrimitiveType, Schema, SchemaRef, Type};
1185+
use crate::expr::{Bind, Predicate, Reference};
1186+
use crate::io::FileIO;
1187+
use crate::scan::{FileScanTask, FileScanTaskStream};
1188+
use crate::spec::{
1189+
DataContentType, DataFileFormat, Datum, NestedField, PrimitiveType, Schema, SchemaRef, Type,
1190+
};
11801191
use crate::ErrorKind;
11811192

11821193
fn table_schema_simple() -> SchemaRef {
@@ -1336,4 +1347,138 @@ message schema {
13361347
.expect("Some ProjectionMask");
13371348
assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0]));
13381349
}
1350+
1351+
#[tokio::test]
1352+
async fn test_kleene_logic_or_behaviour() {
1353+
// a IS NULL OR a = 'foo'
1354+
let predicate = Reference::new("a")
1355+
.is_null()
1356+
.or(Reference::new("a").equal_to(Datum::string("foo")));
1357+
1358+
// Table data: [NULL, "foo", "bar"]
1359+
let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
1360+
1361+
// Expected: [NULL, "foo"].
1362+
let expected = vec![None, Some("foo".to_string())];
1363+
1364+
let (file_io, schema, table_location, _temp_dir) = setup_kleene_logic(data_for_col_a);
1365+
let reader = ArrowReaderBuilder::new(file_io).build();
1366+
1367+
let result_data = test_perform_read(predicate, schema, table_location, reader).await;
1368+
1369+
assert_eq!(result_data, expected);
1370+
}
1371+
1372+
#[tokio::test]
1373+
async fn test_kleene_logic_and_behaviour() {
1374+
// a IS NOT NULL AND a != 'foo'
1375+
let predicate = Reference::new("a")
1376+
.is_not_null()
1377+
.and(Reference::new("a").not_equal_to(Datum::string("foo")));
1378+
1379+
// Table data: [NULL, "foo", "bar"]
1380+
let data_for_col_a = vec![None, Some("foo".to_string()), Some("bar".to_string())];
1381+
1382+
// Expected: ["bar"].
1383+
let expected = vec![Some("bar".to_string())];
1384+
1385+
let (file_io, schema, table_location, _temp_dir) = setup_kleene_logic(data_for_col_a);
1386+
let reader = ArrowReaderBuilder::new(file_io).build();
1387+
1388+
let result_data = test_perform_read(predicate, schema, table_location, reader).await;
1389+
1390+
assert_eq!(result_data, expected);
1391+
}
1392+
1393+
async fn test_perform_read(
1394+
predicate: Predicate,
1395+
schema: SchemaRef,
1396+
table_location: String,
1397+
reader: ArrowReader,
1398+
) -> Vec<Option<String>> {
1399+
let tasks = Box::pin(futures::stream::iter(
1400+
vec![Ok(FileScanTask {
1401+
start: 0,
1402+
length: 0,
1403+
record_count: None,
1404+
data_file_path: format!("{}/1.parquet", table_location),
1405+
data_file_content: DataContentType::Data,
1406+
data_file_format: DataFileFormat::Parquet,
1407+
schema: schema.clone(),
1408+
project_field_ids: vec![1],
1409+
predicate: Some(predicate.bind(schema, true).unwrap()),
1410+
deletes: vec![],
1411+
})]
1412+
.into_iter(),
1413+
)) as FileScanTaskStream;
1414+
1415+
let result = reader
1416+
.read(tasks)
1417+
.await
1418+
.unwrap()
1419+
.try_collect::<Vec<RecordBatch>>()
1420+
.await
1421+
.unwrap();
1422+
1423+
let result_data = result[0].columns()[0]
1424+
.as_string_opt::<i32>()
1425+
.unwrap()
1426+
.iter()
1427+
.map(|v| v.map(ToOwned::to_owned))
1428+
.collect::<Vec<_>>();
1429+
1430+
result_data
1431+
}
1432+
1433+
fn setup_kleene_logic(
1434+
data_for_col_a: Vec<Option<String>>,
1435+
) -> (FileIO, SchemaRef, String, TempDir) {
1436+
let schema = Arc::new(
1437+
Schema::builder()
1438+
.with_schema_id(1)
1439+
.with_fields(vec![NestedField::optional(
1440+
1,
1441+
"a",
1442+
Type::Primitive(PrimitiveType::String),
1443+
)
1444+
.into()])
1445+
.build()
1446+
.unwrap(),
1447+
);
1448+
1449+
let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
1450+
"a",
1451+
DataType::Utf8,
1452+
true,
1453+
)
1454+
.with_metadata(HashMap::from([(
1455+
PARQUET_FIELD_ID_META_KEY.to_string(),
1456+
"1".to_string(),
1457+
)]))]));
1458+
1459+
let tmp_dir = TempDir::new().unwrap();
1460+
let table_location = tmp_dir.path().to_str().unwrap().to_string();
1461+
1462+
let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap();
1463+
1464+
let col = Arc::new(StringArray::from(data_for_col_a)) as ArrayRef;
1465+
1466+
let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![col]).unwrap();
1467+
1468+
// Write the Parquet files
1469+
let props = WriterProperties::builder()
1470+
.set_compression(Compression::SNAPPY)
1471+
.build();
1472+
1473+
let file = File::create(format!("{}/1.parquet", &table_location)).unwrap();
1474+
let mut writer =
1475+
ArrowWriter::try_new(file, to_write.schema(), Some(props.clone())).unwrap();
1476+
1477+
writer.write(&to_write).expect("Writing batch");
1478+
1479+
// writer must be closed to write footer
1480+
writer.close().unwrap();
1481+
1482+
(file_io, schema, table_location, tmp_dir)
1483+
}
13391484
}

crates/iceberg/src/writer/base_writer/data_file_writer.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,16 @@ mod test {
144144

145145
let mut data_file_writer = DataFileWriterBuilder::new(pw, None).build().await.unwrap();
146146

147+
let arrow_schema = arrow_schema::Schema::new(vec![
148+
Field::new("foo", DataType::Int32, false),
149+
Field::new("bar", DataType::Utf8, false),
150+
]);
151+
let batch = RecordBatch::try_new(Arc::new(arrow_schema.clone()), vec![
152+
Arc::new(Int32Array::from(vec![1, 2, 3])),
153+
Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])),
154+
])?;
155+
data_file_writer.write(batch).await?;
156+
147157
let data_files = data_file_writer.close().await.unwrap();
148158
assert_eq!(data_files.len(), 1);
149159

crates/iceberg/src/writer/file_writer/parquet_writer.rs

Lines changed: 89 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -84,24 +84,16 @@ impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for ParquetWr
8484
type R = ParquetWriter;
8585

8686
async fn build(self) -> crate::Result<Self::R> {
87-
let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
8887
let written_size = Arc::new(AtomicI64::new(0));
8988
let out_file = self.file_io.new_output(
9089
self.location_generator
9190
.generate_location(&self.file_name_generator.generate_file_name()),
9291
)?;
93-
let inner_writer = TrackWriter::new(out_file.writer().await?, written_size.clone());
94-
let async_writer = AsyncFileWriter::new(inner_writer);
95-
let writer =
96-
AsyncArrowWriter::try_new(async_writer, arrow_schema.clone(), Some(self.props))
97-
.map_err(|err| {
98-
Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.")
99-
.with_source(err)
100-
})?;
10192

10293
Ok(ParquetWriter {
10394
schema: self.schema.clone(),
104-
writer,
95+
inner_writer: None,
96+
writer_properties: self.props,
10597
written_size,
10698
current_row_num: 0,
10799
out_file,
@@ -226,7 +218,8 @@ impl SchemaVisitor for IndexByParquetPathName {
226218
pub struct ParquetWriter {
227219
schema: SchemaRef,
228220
out_file: OutputFile,
229-
writer: AsyncArrowWriter<AsyncFileWriter<TrackWriter>>,
221+
inner_writer: Option<AsyncArrowWriter<AsyncFileWriter<TrackWriter>>>,
222+
writer_properties: WriterProperties,
230223
written_size: Arc<AtomicI64>,
231224
current_row_num: usize,
232225
}
@@ -520,8 +513,35 @@ impl ParquetWriter {
520513

521514
impl FileWriter for ParquetWriter {
522515
async fn write(&mut self, batch: &arrow_array::RecordBatch) -> crate::Result<()> {
516+
// Skip empty batch
517+
if batch.num_rows() == 0 {
518+
return Ok(());
519+
}
520+
523521
self.current_row_num += batch.num_rows();
524-
self.writer.write(batch).await.map_err(|err| {
522+
523+
// Lazy initialize the writer
524+
let writer = if let Some(writer) = &mut self.inner_writer {
525+
writer
526+
} else {
527+
let arrow_schema: ArrowSchemaRef = Arc::new(self.schema.as_ref().try_into()?);
528+
let inner_writer =
529+
TrackWriter::new(self.out_file.writer().await?, self.written_size.clone());
530+
let async_writer = AsyncFileWriter::new(inner_writer);
531+
let writer = AsyncArrowWriter::try_new(
532+
async_writer,
533+
arrow_schema.clone(),
534+
Some(self.writer_properties.clone()),
535+
)
536+
.map_err(|err| {
537+
Error::new(ErrorKind::Unexpected, "Failed to build parquet writer.")
538+
.with_source(err)
539+
})?;
540+
self.inner_writer = Some(writer);
541+
self.inner_writer.as_mut().unwrap()
542+
};
543+
544+
writer.write(batch).await.map_err(|err| {
525545
Error::new(
526546
ErrorKind::Unexpected,
527547
"Failed to write using parquet writer.",
@@ -532,7 +552,10 @@ impl FileWriter for ParquetWriter {
532552
}
533553

534554
async fn close(self) -> crate::Result<Vec<crate::spec::DataFileBuilder>> {
535-
let metadata = self.writer.close().await.map_err(|err| {
555+
let Some(writer) = self.inner_writer else {
556+
return Ok(vec![]);
557+
};
558+
let metadata = writer.close().await.map_err(|err| {
536559
Error::new(ErrorKind::Unexpected, "Failed to close parquet writer.").with_source(err)
537560
})?;
538561

@@ -1538,4 +1561,57 @@ mod tests {
15381561

15391562
Ok(())
15401563
}
1564+
1565+
#[tokio::test]
1566+
async fn test_empty_write() -> Result<()> {
1567+
let temp_dir = TempDir::new().unwrap();
1568+
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
1569+
let location_gen =
1570+
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
1571+
let file_name_gen =
1572+
DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet);
1573+
1574+
// Test that file will create if data to write
1575+
let schema = {
1576+
let fields = vec![
1577+
arrow_schema::Field::new("col", arrow_schema::DataType::Int64, true).with_metadata(
1578+
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "0".to_string())]),
1579+
),
1580+
];
1581+
Arc::new(arrow_schema::Schema::new(fields))
1582+
};
1583+
let col = Arc::new(Int64Array::from_iter_values(0..1024)) as ArrayRef;
1584+
let to_write = RecordBatch::try_new(schema.clone(), vec![col]).unwrap();
1585+
let mut pw = ParquetWriterBuilder::new(
1586+
WriterProperties::builder().build(),
1587+
Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1588+
file_io.clone(),
1589+
location_gen.clone(),
1590+
file_name_gen,
1591+
)
1592+
.build()
1593+
.await?;
1594+
pw.write(&to_write).await?;
1595+
let file_path = pw.out_file.location().to_string();
1596+
pw.close().await.unwrap();
1597+
assert!(file_io.exists(file_path).await.unwrap());
1598+
1599+
// Test that file will not create if no data to write
1600+
let file_name_gen =
1601+
DefaultFileNameGenerator::new("test_empty".to_string(), None, DataFileFormat::Parquet);
1602+
let pw = ParquetWriterBuilder::new(
1603+
WriterProperties::builder().build(),
1604+
Arc::new(to_write.schema().as_ref().try_into().unwrap()),
1605+
file_io.clone(),
1606+
location_gen,
1607+
file_name_gen,
1608+
)
1609+
.build()
1610+
.await?;
1611+
let file_path = pw.out_file.location().to_string();
1612+
pw.close().await.unwrap();
1613+
assert!(!file_io.exists(file_path).await.unwrap());
1614+
1615+
Ok(())
1616+
}
15411617
}

0 commit comments

Comments
 (0)