Skip to content

Commit 19dd58d

Browse files
authored
chore: Improve CometScan metrics (#1100)
* Add native metrics for plan creation * make messages consistent * Include get_next_batch cost in metrics * formatting * fix double count of rows
1 parent b64c13d commit 19dd58d

File tree

4 files changed

+44
-15
lines changed

4 files changed

+44
-15
lines changed

native/core/src/execution/jni_api.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use jni::{
3737
sys::{jbyteArray, jint, jlong, jlongArray},
3838
JNIEnv,
3939
};
40+
use std::time::{Duration, Instant};
4041
use std::{collections::HashMap, sync::Arc, task::Poll};
4142

4243
use super::{serde, utils::SparkArrowConvert, CometMemoryPool};
@@ -81,6 +82,8 @@ struct ExecutionContext {
8182
pub runtime: Runtime,
8283
/// Native metrics
8384
pub metrics: Arc<GlobalRef>,
85+
/// The time it took to create the native plan and configure the context
86+
pub plan_creation_time: Duration,
8487
/// DataFusion SessionContext
8588
pub session_ctx: Arc<SessionContext>,
8689
/// Whether to enable additional debugging checks & messages
@@ -109,6 +112,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
109112
// Init JVM classes
110113
JVMClasses::init(&mut env);
111114

115+
let start = Instant::now();
116+
112117
let array = unsafe { JPrimitiveArray::from_raw(serialized_query) };
113118
let bytes = env.convert_byte_array(array)?;
114119

@@ -167,6 +172,8 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
167172
// dictionaries will be dropped as well.
168173
let session = prepare_datafusion_session_context(&configs, task_memory_manager)?;
169174

175+
let plan_creation_time = start.elapsed();
176+
170177
let exec_context = Box::new(ExecutionContext {
171178
id,
172179
spark_plan,
@@ -177,6 +184,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
177184
conf: configs,
178185
runtime,
179186
metrics,
187+
plan_creation_time,
180188
session_ctx: Arc::new(session),
181189
debug_native,
182190
explain_native,
@@ -321,6 +329,8 @@ fn pull_input_batches(exec_context: &mut ExecutionContext) -> Result<(), CometEr
321329
pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
322330
e: JNIEnv,
323331
_class: JClass,
332+
stage_id: jint,
333+
partition: jint,
324334
exec_context: jlong,
325335
array_addrs: jlongArray,
326336
schema_addrs: jlongArray,
@@ -335,20 +345,23 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
335345
// Because we don't know if input arrays are dictionary-encoded when we create
336346
// query plan, we need to defer stream initialization to first time execution.
337347
if exec_context.root_op.is_none() {
348+
let start = Instant::now();
338349
let planner = PhysicalPlanner::new(Arc::clone(&exec_context.session_ctx))
339350
.with_exec_id(exec_context_id);
340351
let (scans, root_op) = planner.create_plan(
341352
&exec_context.spark_plan,
342353
&mut exec_context.input_sources.clone(),
343354
)?;
355+
let physical_plan_time = start.elapsed();
344356

357+
exec_context.plan_creation_time += physical_plan_time;
345358
exec_context.root_op = Some(Arc::clone(&root_op));
346359
exec_context.scans = scans;
347360

348361
if exec_context.explain_native {
349362
let formatted_plan_str =
350363
DisplayableExecutionPlan::new(root_op.as_ref()).indent(true);
351-
info!("Comet native query plan:\n {formatted_plan_str:}");
364+
info!("Comet native query plan:\n{formatted_plan_str:}");
352365
}
353366

354367
let task_ctx = exec_context.session_ctx.task_ctx();
@@ -388,7 +401,12 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
388401
if let Some(plan) = &exec_context.root_op {
389402
let formatted_plan_str =
390403
DisplayableExecutionPlan::with_metrics(plan.as_ref()).indent(true);
391-
info!("Comet native query plan with metrics:\n{formatted_plan_str:}");
404+
info!(
405+
"Comet native query plan with metrics:\
406+
\n[Stage {} Partition {}] plan creation (including CometScans fetching first batches) took {:?}:\
407+
\n{formatted_plan_str:}",
408+
stage_id, partition, exec_context.plan_creation_time
409+
);
392410
}
393411
}
394412

native/core/src/execution/operators/scan.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,6 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use futures::Stream;
19-
use itertools::Itertools;
20-
use std::rc::Rc;
21-
use std::{
22-
any::Any,
23-
pin::Pin,
24-
sync::{Arc, Mutex},
25-
task::{Context, Poll},
26-
};
27-
2818
use crate::{
2919
errors::CometError,
3020
execution::{
@@ -48,9 +38,18 @@ use datafusion::{
4838
physical_plan::{ExecutionPlan, *},
4939
};
5040
use datafusion_common::{arrow_datafusion_err, DataFusionError, Result as DataFusionResult};
41+
use futures::Stream;
42+
use itertools::Itertools;
5143
use jni::objects::JValueGen;
5244
use jni::objects::{GlobalRef, JObject};
5345
use jni::sys::jsize;
46+
use std::rc::Rc;
47+
use std::{
48+
any::Any,
49+
pin::Pin,
50+
sync::{Arc, Mutex},
51+
task::{Context, Poll},
52+
};
5453

5554
/// ScanExec reads batches of data from Spark via JNI. The source of the scan could be a file
5655
/// scan or the result of reading a broadcast or shuffle exchange.
@@ -98,7 +97,6 @@ impl ScanExec {
9897
let batch =
9998
ScanExec::get_next(exec_context_id, input_source.as_obj(), data_types.len())?;
10099
timer.stop();
101-
baseline_metrics.record_output(batch.num_rows());
102100
batch
103101
} else {
104102
InputBatch::EOF
@@ -162,6 +160,7 @@ impl ScanExec {
162160
// This is a unit test. We don't need to call JNI.
163161
return Ok(());
164162
}
163+
let mut timer = self.baseline_metrics.elapsed_compute().timer();
165164

166165
let mut current_batch = self.batch.try_lock().unwrap();
167166
if current_batch.is_none() {
@@ -173,6 +172,8 @@ impl ScanExec {
173172
*current_batch = Some(next_batch);
174173
}
175174

175+
timer.stop();
176+
176177
Ok(())
177178
}
178179

spark/src/main/scala/org/apache/comet/CometExecIterator.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ class CometExecIterator(
9595
nativeUtil.getNextBatch(
9696
numOutputCols,
9797
(arrayAddrs, schemaAddrs) => {
98-
nativeLib.executePlan(plan, arrayAddrs, schemaAddrs)
98+
val ctx = TaskContext.get()
99+
nativeLib.executePlan(ctx.stageId(), ctx.partitionId(), plan, arrayAddrs, schemaAddrs)
99100
})
100101
}
101102

spark/src/main/scala/org/apache/comet/Native.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ class Native extends NativeBase {
5656
/**
5757
* Execute a native query plan based on given input Arrow arrays.
5858
*
59+
* @param stage
60+
* the stage ID, for informational purposes
61+
* @param partition
62+
* the partition ID, for informational purposes
5963
* @param plan
6064
* the address to native query plan.
6165
* @param arrayAddrs
@@ -65,7 +69,12 @@ class Native extends NativeBase {
6569
* @return
6670
* the number of rows, if -1, it means end of the output.
6771
*/
68-
@native def executePlan(plan: Long, arrayAddrs: Array[Long], schemaAddrs: Array[Long]): Long
72+
@native def executePlan(
73+
stage: Int,
74+
partition: Int,
75+
plan: Long,
76+
arrayAddrs: Array[Long],
77+
schemaAddrs: Array[Long]): Long
6978

7079
/**
7180
* Release and drop the native query plan object and context object.

0 commit comments

Comments
 (0)