Skip to content

Commit d930df9

Browse files
authored
Merge branch 'main' into feature/implement-project-node-for-insert-into-datafusion
2 parents 4d59f87 + 7cde26a commit d930df9

File tree

7 files changed

+322
-19
lines changed

7 files changed

+322
-19
lines changed

crates/catalog/glue/tests/glue_catalog_test.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ async fn get_catalog() -> GlueCatalog {
7373
sleep(std::time::Duration::from_millis(1000)).await;
7474
}
7575

76+
while !scan_port_addr(minio_socket_addr) {
77+
info!("Waiting for 1s minio to ready...");
78+
sleep(std::time::Duration::from_millis(1000)).await;
79+
}
80+
7681
let props = HashMap::from([
7782
(AWS_ACCESS_KEY_ID.to_string(), "my_access_id".to_string()),
7883
(
@@ -89,6 +94,24 @@ async fn get_catalog() -> GlueCatalog {
8994
(S3_REGION.to_string(), "us-east-1".to_string()),
9095
]);
9196

97+
// Wait for bucket to actually exist
98+
let file_io = iceberg::io::FileIO::from_path("s3a://")
99+
.unwrap()
100+
.with_props(props.clone())
101+
.build()
102+
.unwrap();
103+
104+
let mut retries = 0;
105+
while retries < 30 {
106+
if file_io.exists("s3a://warehouse/").await.unwrap_or(false) {
107+
info!("S3 bucket 'warehouse' is ready");
108+
break;
109+
}
110+
info!("Waiting for bucket creation... (attempt {})", retries + 1);
111+
sleep(std::time::Duration::from_millis(1000)).await;
112+
retries += 1;
113+
}
114+
92115
let config = GlueCatalogConfig::builder()
93116
.uri(format!("http://{}", glue_socket_addr))
94117
.warehouse("s3a://warehouse/hive".to_string())

crates/catalog/hms/tests/hms_catalog_test.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ async fn get_catalog() -> HmsCatalog {
7373
sleep(std::time::Duration::from_millis(1000)).await;
7474
}
7575

76+
while !scan_port_addr(minio_socket_addr) {
77+
info!("Waiting for 1s minio to ready...");
78+
sleep(std::time::Duration::from_millis(1000)).await;
79+
}
80+
7681
let props = HashMap::from([
7782
(
7883
S3_ENDPOINT.to_string(),
@@ -83,6 +88,24 @@ async fn get_catalog() -> HmsCatalog {
8388
(S3_REGION.to_string(), "us-east-1".to_string()),
8489
]);
8590

91+
// Wait for bucket to actually exist
92+
let file_io = iceberg::io::FileIO::from_path("s3a://")
93+
.unwrap()
94+
.with_props(props.clone())
95+
.build()
96+
.unwrap();
97+
98+
let mut retries = 0;
99+
while retries < 30 {
100+
if file_io.exists("s3a://warehouse/").await.unwrap_or(false) {
101+
info!("S3 bucket 'warehouse' is ready");
102+
break;
103+
}
104+
info!("Waiting for bucket creation... (attempt {})", retries + 1);
105+
sleep(std::time::Duration::from_millis(1000)).await;
106+
retries += 1;
107+
}
108+
86109
let config = HmsCatalogConfig::builder()
87110
.address(hms_socket_addr.to_string())
88111
.thrift_transport(HmsThriftTransport::Buffered)

crates/iceberg/src/arrow/nan_val_cnt_visitor.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use arrow_array::{ArrayRef, Float32Array, Float64Array, RecordBatch, StructArray
2525
use arrow_schema::DataType;
2626

2727
use crate::Result;
28-
use crate::arrow::ArrowArrayAccessor;
28+
use crate::arrow::{ArrowArrayAccessor, FieldMatchMode};
2929
use crate::spec::{
3030
ListType, MapType, NestedFieldRef, PrimitiveType, Schema, SchemaRef, SchemaWithPartnerVisitor,
3131
StructType, visit_struct_with_partner,
@@ -71,6 +71,7 @@ macro_rules! count_float_nans {
7171
pub struct NanValueCountVisitor {
7272
/// Stores field ID to NaN value count mapping
7373
pub nan_value_counts: HashMap<i32, u64>,
74+
match_mode: FieldMatchMode,
7475
}
7576

7677
impl SchemaWithPartnerVisitor<ArrayRef> for NanValueCountVisitor {
@@ -149,14 +150,20 @@ impl SchemaWithPartnerVisitor<ArrayRef> for NanValueCountVisitor {
149150
impl NanValueCountVisitor {
150151
/// Creates new instance of NanValueCountVisitor
151152
pub fn new() -> Self {
153+
Self::new_with_match_mode(FieldMatchMode::Id)
154+
}
155+
156+
/// Creates new instance of NanValueCountVisitor with explicit match mode
157+
pub fn new_with_match_mode(match_mode: FieldMatchMode) -> Self {
152158
Self {
153159
nan_value_counts: HashMap::new(),
160+
match_mode,
154161
}
155162
}
156163

157164
/// Compute nan value counts in given schema and record batch
158165
pub fn compute(&mut self, schema: SchemaRef, batch: RecordBatch) -> Result<()> {
159-
let arrow_arr_partner_accessor = ArrowArrayAccessor {};
166+
let arrow_arr_partner_accessor = ArrowArrayAccessor::new_with_match_mode(self.match_mode);
160167

161168
let struct_arr = Arc::new(StructArray::from(batch)) as ArrayRef;
162169
visit_struct_with_partner(

0 commit comments

Comments
 (0)