Skip to content

Commit e5c8d28

Browse files
committed
upmerge
2 parents c183ead + fd53edb commit e5c8d28

File tree

24 files changed

+1375
-126
lines changed

24 files changed

+1375
-126
lines changed

.github/workflows/pr_build_linux.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ jobs:
161161
org.apache.comet.CometStringExpressionSuite
162162
org.apache.comet.CometBitwiseExpressionSuite
163163
org.apache.comet.CometMapExpressionSuite
164+
org.apache.comet.CometJsonExpressionSuite
164165
org.apache.comet.expressions.conditional.CometIfSuite
165166
org.apache.comet.expressions.conditional.CometCoalesceSuite
166167
org.apache.comet.expressions.conditional.CometCaseWhenSuite

.github/workflows/pr_build_macos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ jobs:
126126
org.apache.comet.CometStringExpressionSuite
127127
org.apache.comet.CometBitwiseExpressionSuite
128128
org.apache.comet.CometMapExpressionSuite
129+
org.apache.comet.CometJsonExpressionSuite
129130
org.apache.comet.expressions.conditional.CometIfSuite
130131
org.apache.comet.expressions.conditional.CometCoalesceSuite
131132
org.apache.comet.expressions.conditional.CometCaseWhenSuite

dev/benchmarks/comet-tpch.sh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,4 @@ $SPARK_HOME/bin/spark-submit \
5050
--data $TPCH_DATA \
5151
--queries $TPCH_QUERIES \
5252
--output . \
53-
--write /tmp \
5453
--iterations 1

docs/source/user-guide/latest/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,7 @@ These settings can be used to determine which parts of the plan are accelerated
264264
| `spark.comet.expression.IsNaN.enabled` | Enable Comet acceleration for `IsNaN` | true |
265265
| `spark.comet.expression.IsNotNull.enabled` | Enable Comet acceleration for `IsNotNull` | true |
266266
| `spark.comet.expression.IsNull.enabled` | Enable Comet acceleration for `IsNull` | true |
267+
| `spark.comet.expression.JsonToStructs.enabled` | Enable Comet acceleration for `JsonToStructs` | true |
267268
| `spark.comet.expression.KnownFloatingPointNormalized.enabled` | Enable Comet acceleration for `KnownFloatingPointNormalized` | true |
268269
| `spark.comet.expression.Length.enabled` | Enable Comet acceleration for `Length` | true |
269270
| `spark.comet.expression.LessThan.enabled` | Enable Comet acceleration for `LessThan` | true |

native/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

native/core/src/execution/expressions/strings.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@ use datafusion::common::ScalarValue;
2525
use datafusion::physical_expr::expressions::{LikeExpr, Literal};
2626
use datafusion::physical_expr::PhysicalExpr;
2727
use datafusion_comet_proto::spark_expression::Expr;
28-
use datafusion_comet_spark_expr::{RLike, SubstringExpr};
28+
use datafusion_comet_spark_expr::{FromJson, RLike, SubstringExpr};
2929

3030
use crate::execution::{
3131
expressions::extract_expr,
3232
operators::ExecutionError,
3333
planner::{expression_registry::ExpressionBuilder, PhysicalPlanner},
34+
serde::to_arrow_datatype,
3435
};
3536

3637
/// Builder for Substring expressions
@@ -98,3 +99,27 @@ impl ExpressionBuilder for RlikeBuilder {
9899
}
99100
}
100101
}
102+
103+
pub struct FromJsonBuilder;
104+
105+
impl ExpressionBuilder for FromJsonBuilder {
106+
fn build(
107+
&self,
108+
spark_expr: &Expr,
109+
input_schema: SchemaRef,
110+
planner: &PhysicalPlanner,
111+
) -> Result<Arc<dyn PhysicalExpr>, ExecutionError> {
112+
let expr = extract_expr!(spark_expr, FromJson);
113+
let child = planner.create_expr(
114+
expr.child.as_ref().ok_or_else(|| {
115+
ExecutionError::GeneralError("FromJson missing child".to_string())
116+
})?,
117+
input_schema,
118+
)?;
119+
let schema =
120+
to_arrow_datatype(expr.schema.as_ref().ok_or_else(|| {
121+
ExecutionError::GeneralError("FromJson missing schema".to_string())
122+
})?);
123+
Ok(Arc::new(FromJson::new(child, schema, &expr.timezone)))
124+
}
125+
}

native/core/src/execution/jni_api.rs

Lines changed: 65 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ struct ExecutionContext {
136136
pub metrics_update_interval: Option<Duration>,
137137
// The last update time of metrics
138138
pub metrics_last_update_time: Instant,
139+
/// Counter to avoid checking time on every poll iteration (reduces syscalls)
140+
pub poll_count_since_metrics_check: u32,
139141
/// The time it took to create the native plan and configure the context
140142
pub plan_creation_time: Duration,
141143
/// DataFusion SessionContext
@@ -272,6 +274,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
272274
metrics,
273275
metrics_update_interval,
274276
metrics_last_update_time: Instant::now(),
277+
poll_count_since_metrics_check: 0,
275278
plan_creation_time,
276279
session_ctx: Arc::new(session),
277280
debug_native,
@@ -503,60 +506,74 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
503506
pull_input_batches(exec_context)?;
504507
}
505508

506-
loop {
507-
// Polling the stream.
508-
let next_item = exec_context.stream.as_mut().unwrap().next();
509-
let poll_output = get_runtime().block_on(async { poll!(next_item) });
510-
511-
// update metrics at interval
512-
if let Some(interval) = exec_context.metrics_update_interval {
513-
let now = Instant::now();
514-
if now - exec_context.metrics_last_update_time >= interval {
515-
update_metrics(&mut env, exec_context)?;
516-
exec_context.metrics_last_update_time = now;
509+
// Enter the runtime once for the entire polling loop to avoid repeated
510+
// Runtime::enter() overhead
511+
get_runtime().block_on(async {
512+
loop {
513+
// Polling the stream.
514+
let next_item = exec_context.stream.as_mut().unwrap().next();
515+
let poll_output = poll!(next_item);
516+
517+
// update metrics at interval
518+
// Only check time every 100 polls to reduce syscall overhead
519+
if let Some(interval) = exec_context.metrics_update_interval {
520+
exec_context.poll_count_since_metrics_check += 1;
521+
if exec_context.poll_count_since_metrics_check >= 100 {
522+
let now = Instant::now();
523+
if now - exec_context.metrics_last_update_time >= interval {
524+
update_metrics(&mut env, exec_context)?;
525+
exec_context.metrics_last_update_time = now;
526+
}
527+
exec_context.poll_count_since_metrics_check = 0;
528+
}
517529
}
518-
}
519530

520-
match poll_output {
521-
Poll::Ready(Some(output)) => {
522-
// prepare output for FFI transfer
523-
return prepare_output(
524-
&mut env,
525-
array_addrs,
526-
schema_addrs,
527-
output?,
528-
exec_context.debug_native,
529-
);
530-
}
531-
Poll::Ready(None) => {
532-
// Reaches EOF of output.
533-
if exec_context.explain_native {
534-
if let Some(plan) = &exec_context.root_op {
535-
let formatted_plan_str = DisplayableExecutionPlan::with_metrics(
536-
plan.native_plan.as_ref(),
537-
)
538-
.indent(true);
539-
info!(
540-
"Comet native query plan with metrics (Plan #{} Stage {} Partition {}):\
541-
\n plan creation took {:?}:\
542-
\n{formatted_plan_str:}",
543-
plan.plan_id, stage_id, partition, exec_context.plan_creation_time
544-
);
531+
match poll_output {
532+
Poll::Ready(Some(output)) => {
533+
// prepare output for FFI transfer
534+
return prepare_output(
535+
&mut env,
536+
array_addrs,
537+
schema_addrs,
538+
output?,
539+
exec_context.debug_native,
540+
);
541+
}
542+
Poll::Ready(None) => {
543+
// Reaches EOF of output.
544+
if exec_context.explain_native {
545+
if let Some(plan) = &exec_context.root_op {
546+
let formatted_plan_str = DisplayableExecutionPlan::with_metrics(
547+
plan.native_plan.as_ref(),
548+
)
549+
.indent(true);
550+
info!(
551+
"Comet native query plan with metrics (Plan #{} Stage {} Partition {}):\
552+
\n plan creation took {:?}:\
553+
\n{formatted_plan_str:}",
554+
plan.plan_id, stage_id, partition, exec_context.plan_creation_time
555+
);
556+
}
545557
}
558+
return Ok(-1);
559+
}
560+
// A poll pending means there are more than one blocking operators,
561+
// we don't need go back-forth between JVM/Native. Just keeping polling.
562+
Poll::Pending => {
563+
// TODO: Investigate if JNI calls are safe without block_in_place.
564+
// block_in_place prevents Tokio from migrating this task to another thread,
565+
// which is necessary because JNI env is thread-local. If we can guarantee
566+
// thread safety another way, we could remove this wrapper for better perf.
567+
tokio::task::block_in_place(|| {
568+
pull_input_batches(exec_context)
569+
})?;
570+
571+
// Output not ready yet
572+
continue;
546573
}
547-
return Ok(-1);
548-
}
549-
// A poll pending means there are more than one blocking operators,
550-
// we don't need go back-forth between JVM/Native. Just keeping polling.
551-
Poll::Pending => {
552-
// Pull input batches
553-
pull_input_batches(exec_context)?;
554-
555-
// Output not ready yet
556-
continue;
557574
}
558575
}
559-
}
576+
})
560577
})
561578
})
562579
}

native/core/src/execution/operators/scan.rs

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,6 @@ pub struct ScanExec {
7777
metrics: ExecutionPlanMetricsSet,
7878
/// Baseline metrics
7979
baseline_metrics: BaselineMetrics,
80-
/// Time waiting for JVM input plan to execute and return batches
81-
jvm_fetch_time: Time,
82-
/// Time spent in FFI
83-
arrow_ffi_time: Time,
8480
/// Whether native code can assume ownership of batches that it receives
8581
arrow_ffi_safe: bool,
8682
}
@@ -95,8 +91,6 @@ impl ScanExec {
9591
) -> Result<Self, CometError> {
9692
let metrics_set = ExecutionPlanMetricsSet::default();
9793
let baseline_metrics = BaselineMetrics::new(&metrics_set, 0);
98-
let arrow_ffi_time = MetricBuilder::new(&metrics_set).subset_time("arrow_ffi_time", 0);
99-
let jvm_fetch_time = MetricBuilder::new(&metrics_set).subset_time("jvm_fetch_time", 0);
10094

10195
// Build schema directly from data types since get_next now always unpacks dictionaries
10296
let schema = schema_from_data_types(&data_types);
@@ -119,8 +113,6 @@ impl ScanExec {
119113
cache,
120114
metrics: metrics_set,
121115
baseline_metrics,
122-
jvm_fetch_time,
123-
arrow_ffi_time,
124116
schema,
125117
arrow_ffi_safe,
126118
})
@@ -155,8 +147,6 @@ impl ScanExec {
155147
self.exec_context_id,
156148
self.input_source.as_ref().unwrap().as_obj(),
157149
self.data_types.len(),
158-
&self.jvm_fetch_time,
159-
&self.arrow_ffi_time,
160150
self.arrow_ffi_safe,
161151
)?;
162152
*current_batch = Some(next_batch);
@@ -172,8 +162,6 @@ impl ScanExec {
172162
exec_context_id: i64,
173163
iter: &JObject,
174164
num_cols: usize,
175-
jvm_fetch_time: &Time,
176-
arrow_ffi_time: &Time,
177165
arrow_ffi_safe: bool,
178166
) -> Result<InputBatch, CometError> {
179167
if exec_context_id == TEST_EXEC_CONTEXT_ID {
@@ -189,15 +177,11 @@ impl ScanExec {
189177

190178
let mut env = JVMClasses::get_env()?;
191179

192-
let mut timer = jvm_fetch_time.timer();
193-
194180
let num_rows: i32 = unsafe {
195181
jni_call!(&mut env,
196182
comet_batch_iterator(iter).has_next() -> i32)?
197183
};
198184

199-
timer.stop();
200-
201185
if num_rows == -1 {
202186
return Ok(InputBatch::EOF);
203187
}
@@ -206,11 +190,9 @@ impl ScanExec {
206190
// JVM via FFI
207191
// Selection vectors can be provided by, for instance, Iceberg to
208192
// remove rows that have been deleted.
209-
let selection_indices_arrays =
210-
Self::get_selection_indices(&mut env, iter, num_cols, jvm_fetch_time, arrow_ffi_time)?;
193+
let selection_indices_arrays = Self::get_selection_indices(&mut env, iter, num_cols)?;
211194

212195
// fetch batch data from JVM via FFI
213-
let mut timer = arrow_ffi_time.timer();
214196
let (num_rows, array_addrs, schema_addrs) =
215197
Self::allocate_and_fetch_batch(&mut env, iter, num_cols)?;
216198

@@ -262,8 +244,6 @@ impl ScanExec {
262244
}
263245
}
264246

265-
timer.stop();
266-
267247
// If selection was applied, determine the actual row count from the selected arrays
268248
let actual_num_rows = if let Some(ref selection_arrays) = selection_indices_arrays {
269249
if !selection_arrays.is_empty() {
@@ -332,21 +312,15 @@ impl ScanExec {
332312
env: &mut jni::JNIEnv,
333313
iter: &JObject,
334314
num_cols: usize,
335-
jvm_fetch_time: &Time,
336-
arrow_ffi_time: &Time,
337315
) -> Result<Option<Vec<ArrayRef>>, CometError> {
338316
// Check if all columns have selection vectors
339-
let mut timer = jvm_fetch_time.timer();
340317
let has_selection_vectors_result: jni::sys::jboolean = unsafe {
341318
jni_call!(env,
342319
comet_batch_iterator(iter).has_selection_vectors() -> jni::sys::jboolean)?
343320
};
344-
timer.stop();
345321
let has_selection_vectors = has_selection_vectors_result != 0;
346322

347323
let selection_indices_arrays = if has_selection_vectors {
348-
let mut timer = arrow_ffi_time.timer();
349-
350324
// Allocate arrays for selection indices export (one per column)
351325
let mut indices_array_addrs = Vec::with_capacity(num_cols);
352326
let mut indices_schema_addrs = Vec::with_capacity(num_cols);
@@ -364,21 +338,16 @@ impl ScanExec {
364338
env.set_long_array_region(&indices_array_obj, 0, &indices_array_addrs)?;
365339
env.set_long_array_region(&indices_schema_obj, 0, &indices_schema_addrs)?;
366340

367-
timer.stop();
368-
369341
// Export selection indices from JVM
370-
let mut timer = jvm_fetch_time.timer();
371342
let _exported_count: i32 = unsafe {
372343
jni_call!(env,
373344
comet_batch_iterator(iter).export_selection_indices(
374345
JValueGen::Object(JObject::from(indices_array_obj).as_ref()),
375346
JValueGen::Object(JObject::from(indices_schema_obj).as_ref())
376347
) -> i32)?
377348
};
378-
timer.stop();
379349

380350
// Convert to ArrayRef for easier handling
381-
let mut timer = arrow_ffi_time.timer();
382351
let mut selection_arrays = Vec::with_capacity(num_cols);
383352
for i in 0..num_cols {
384353
let array_data =
@@ -391,7 +360,6 @@ impl ScanExec {
391360
Rc::from_raw(indices_schema_addrs[i] as *const FFI_ArrowSchema);
392361
}
393362
}
394-
timer.stop();
395363

396364
Some(selection_arrays)
397365
} else {

native/core/src/execution/planner/expression_registry.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ pub enum ExpressionType {
9494
CreateNamedStruct,
9595
GetStructField,
9696
ToJson,
97+
FromJson,
9798
ToPrettyString,
9899
ListExtract,
99100
GetArrayStructFields,
@@ -282,6 +283,8 @@ impl ExpressionRegistry {
282283
.insert(ExpressionType::Like, Box::new(LikeBuilder));
283284
self.builders
284285
.insert(ExpressionType::Rlike, Box::new(RlikeBuilder));
286+
self.builders
287+
.insert(ExpressionType::FromJson, Box::new(FromJsonBuilder));
285288
}
286289

287290
/// Extract expression type from Spark protobuf expression
@@ -337,6 +340,7 @@ impl ExpressionRegistry {
337340
Some(ExprStruct::CreateNamedStruct(_)) => Ok(ExpressionType::CreateNamedStruct),
338341
Some(ExprStruct::GetStructField(_)) => Ok(ExpressionType::GetStructField),
339342
Some(ExprStruct::ToJson(_)) => Ok(ExpressionType::ToJson),
343+
Some(ExprStruct::FromJson(_)) => Ok(ExpressionType::FromJson),
340344
Some(ExprStruct::ToPrettyString(_)) => Ok(ExpressionType::ToPrettyString),
341345
Some(ExprStruct::ListExtract(_)) => Ok(ExpressionType::ListExtract),
342346
Some(ExprStruct::GetArrayStructFields(_)) => Ok(ExpressionType::GetArrayStructFields),

0 commit comments

Comments
 (0)