Skip to content

Commit a1e6a39

Browse files
authored
perf: improve performance of update metrics (#1329)
1 parent 26b8d57 commit a1e6a39

File tree

12 files changed

+142
-81
lines changed

12 files changed

+142
-81
lines changed

common/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -632,6 +632,14 @@ object CometConf extends ShimCometConf {
632632
.booleanConf
633633
.createWithDefault(false)
634634

635+
val COMET_METRICS_UPDATE_INTERVAL: ConfigEntry[Long] =
636+
conf("spark.comet.metrics.updateInterval")
637+
.doc(
638+
"The interval in milliseconds to update metrics. If interval is negative," +
639+
" metrics will be updated upon task completion.")
640+
.longConf
641+
.createWithDefault(3000L)
642+
635643
/** Create a config to enable a specific operator */
636644
private def createExecEnabledConfig(
637645
exec: String,

docs/source/user-guide/configs.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ Comet provides the following configuration settings.
6767
| spark.comet.expression.allowIncompatible | Comet is not currently fully compatible with Spark for all expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false |
6868
| spark.comet.memory.overhead.factor | Fraction of executor memory to be allocated as additional non-heap memory per executor process for Comet. | 0.2 |
6969
| spark.comet.memory.overhead.min | Minimum amount of additional memory to be allocated per executor process for Comet, in MiB. | 402653184b |
70+
| spark.comet.metrics.updateInterval | The interval in milliseconds to update metrics. If interval is negative, metrics will be updated upon task completion. | 3000 |
7071
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
7172
| spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. | false |
7273
| spark.comet.parquet.read.io.adjust.readRange.skew | In the parallel reader, if the read ranges submitted are skewed in sizes, this option will cause the reader to break up larger read ranges into smaller ranges to reduce the skew. This will result in a slightly larger number of connections opened to the file system but may give improved performance. | false |

native/core/src/execution/jni_api.rs

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ struct ExecutionContext {
9191
pub runtime: Runtime,
9292
/// Native metrics
9393
pub metrics: Arc<GlobalRef>,
94+
// The interval in milliseconds to update metrics
95+
pub metrics_update_interval: Option<Duration>,
96+
// The last update time of metrics
97+
pub metrics_last_update_time: Instant,
9498
/// The time it took to create the native plan and configure the context
9599
pub plan_creation_time: Duration,
96100
/// DataFusion SessionContext
@@ -99,8 +103,6 @@ struct ExecutionContext {
99103
pub debug_native: bool,
100104
/// Whether to write native plans with metrics to stdout
101105
pub explain_native: bool,
102-
/// Map of metrics name -> jstring object to cache jni_NewStringUTF calls.
103-
pub metrics_jstrings: HashMap<String, Arc<GlobalRef>>,
104106
/// Memory pool config
105107
pub memory_pool_config: MemoryPoolConfig,
106108
}
@@ -160,6 +162,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
160162
serialized_query: jbyteArray,
161163
partition_count: jint,
162164
metrics_node: JObject,
165+
metrics_update_interval: jlong,
163166
comet_task_memory_manager_obj: JObject,
164167
batch_size: jint,
165168
use_unified_memory_manager: jboolean,
@@ -222,6 +225,12 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
222225

223226
let plan_creation_time = start.elapsed();
224227

228+
let metrics_update_interval = if metrics_update_interval > 0 {
229+
Some(Duration::from_millis(metrics_update_interval as u64))
230+
} else {
231+
None
232+
};
233+
225234
let exec_context = Box::new(ExecutionContext {
226235
id,
227236
task_attempt_id,
@@ -233,11 +242,12 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
233242
stream: None,
234243
runtime,
235244
metrics,
245+
metrics_update_interval,
246+
metrics_last_update_time: Instant::now(),
236247
plan_creation_time,
237248
session_ctx: Arc::new(session),
238249
debug_native: debug_native == 1,
239250
explain_native: explain_native == 1,
240-
metrics_jstrings: HashMap::new(),
241251
memory_pool_config,
242252
});
243253

@@ -508,8 +518,14 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
508518
let next_item = exec_context.stream.as_mut().unwrap().next();
509519
let poll_output = exec_context.runtime.block_on(async { poll!(next_item) });
510520

511-
// Update metrics
512-
update_metrics(&mut env, exec_context)?;
521+
// update metrics at interval
522+
if let Some(interval) = exec_context.metrics_update_interval {
523+
let now = Instant::now();
524+
if now - exec_context.metrics_last_update_time >= interval {
525+
update_metrics(&mut env, exec_context)?;
526+
exec_context.metrics_last_update_time = now;
527+
}
528+
}
513529

514530
match poll_output {
515531
Poll::Ready(Some(output)) => {
@@ -561,8 +577,12 @@ pub extern "system" fn Java_org_apache_comet_Native_releasePlan(
561577
_class: JClass,
562578
exec_context: jlong,
563579
) {
564-
try_unwrap_or_throw(&e, |_| unsafe {
580+
try_unwrap_or_throw(&e, |mut env| unsafe {
565581
let execution_context = get_execution_context(exec_context);
582+
583+
// Update metrics
584+
update_metrics(&mut env, execution_context)?;
585+
566586
if execution_context.memory_pool_config.pool_type == MemoryPoolType::FairSpillTaskShared
567587
|| execution_context.memory_pool_config.pool_type == MemoryPoolType::GreedyTaskShared
568588
{
@@ -586,10 +606,13 @@ pub extern "system" fn Java_org_apache_comet_Native_releasePlan(
586606

587607
/// Updates the metrics of the query plan.
588608
fn update_metrics(env: &mut JNIEnv, exec_context: &mut ExecutionContext) -> CometResult<()> {
589-
let native_query = exec_context.root_op.as_ref().unwrap();
590-
let metrics = exec_context.metrics.as_obj();
591-
let metrics_jstrings = &mut exec_context.metrics_jstrings;
592-
update_comet_metric(env, metrics, native_query, metrics_jstrings)
609+
if exec_context.root_op.is_some() {
610+
let native_query = exec_context.root_op.as_ref().unwrap();
611+
let metrics = exec_context.metrics.as_obj();
612+
update_comet_metric(env, metrics, native_query)
613+
} else {
614+
Ok(())
615+
}
593616
}
594617

595618
fn convert_datatype_arrays(

native/core/src/execution/metrics/utils.rs

Lines changed: 32 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,11 @@
1616
// under the License.
1717

1818
use crate::execution::spark_plan::SparkPlan;
19-
use crate::jvm_bridge::jni_new_global_ref;
20-
use crate::{
21-
errors::CometError,
22-
jvm_bridge::{jni_call, jni_new_string},
23-
};
19+
use crate::{errors::CometError, jvm_bridge::jni_call};
2420
use datafusion::physical_plan::metrics::MetricValue;
25-
use jni::objects::{GlobalRef, JString};
21+
use datafusion_comet_proto::spark_metric::NativeMetricNode;
2622
use jni::{objects::JObject, JNIEnv};
23+
use prost::Message;
2724
use std::collections::HashMap;
2825
use std::sync::Arc;
2926

@@ -34,10 +31,22 @@ pub fn update_comet_metric(
3431
env: &mut JNIEnv,
3532
metric_node: &JObject,
3633
spark_plan: &Arc<SparkPlan>,
37-
metrics_jstrings: &mut HashMap<String, Arc<GlobalRef>>,
3834
) -> Result<(), CometError> {
39-
// combine all metrics from all native plans for this SparkPlan
40-
let metrics = if spark_plan.additional_native_plans.is_empty() {
35+
unsafe {
36+
let native_metric = to_native_metric_node(spark_plan);
37+
let jbytes = env.byte_array_from_slice(&native_metric?.encode_to_vec())?;
38+
jni_call!(env, comet_metric_node(metric_node).set_all_from_bytes(&jbytes) -> ())?;
39+
}
40+
Ok(())
41+
}
42+
43+
pub fn to_native_metric_node(spark_plan: &Arc<SparkPlan>) -> Result<NativeMetricNode, CometError> {
44+
let mut native_metric_node = NativeMetricNode {
45+
metrics: HashMap::new(),
46+
children: Vec::new(),
47+
};
48+
49+
let node_metrics = if spark_plan.additional_native_plans.is_empty() {
4150
spark_plan.native_plan.metrics()
4251
} else {
4352
let mut metrics = spark_plan.native_plan.metrics().unwrap_or_default();
@@ -55,60 +64,21 @@ pub fn update_comet_metric(
5564
Some(metrics.aggregate_by_name())
5665
};
5766

58-
update_metrics(
59-
env,
60-
metric_node,
61-
&metrics
62-
.unwrap_or_default()
63-
.iter()
64-
.map(|m| m.value())
65-
.map(|m| (m.name(), m.as_usize() as i64))
66-
.collect::<Vec<_>>(),
67-
metrics_jstrings,
68-
)?;
67+
// add metrics
68+
node_metrics
69+
.unwrap_or_default()
70+
.iter()
71+
.map(|m| m.value())
72+
.map(|m| (m.name(), m.as_usize() as i64))
73+
.for_each(|(name, value)| {
74+
native_metric_node.metrics.insert(name.to_string(), value);
75+
});
6976

70-
unsafe {
71-
for (i, child_plan) in spark_plan.children().iter().enumerate() {
72-
let child_metric_node: JObject = jni_call!(env,
73-
comet_metric_node(metric_node).get_child_node(i as i32) -> JObject
74-
)?;
75-
if child_metric_node.is_null() {
76-
continue;
77-
}
78-
update_comet_metric(env, &child_metric_node, child_plan, metrics_jstrings)?;
79-
}
77+
// add children
78+
for child_plan in spark_plan.children() {
79+
let child_node = to_native_metric_node(child_plan)?;
80+
native_metric_node.children.push(child_node);
8081
}
81-
Ok(())
82-
}
8382

84-
#[inline]
85-
fn update_metrics(
86-
env: &mut JNIEnv,
87-
metric_node: &JObject,
88-
metric_values: &[(&str, i64)],
89-
metrics_jstrings: &mut HashMap<String, Arc<GlobalRef>>,
90-
) -> Result<(), CometError> {
91-
unsafe {
92-
for &(name, value) in metric_values {
93-
// Perform a lookup in the jstrings cache.
94-
if let Some(map_global_ref) = metrics_jstrings.get(name) {
95-
// Cache hit. Extract the jstring from the global ref.
96-
let jobject = map_global_ref.as_obj();
97-
let jstring = JString::from_raw(**jobject);
98-
// Update the metrics using the jstring as a key.
99-
jni_call!(env, comet_metric_node(metric_node).set(&jstring, value) -> ())?;
100-
} else {
101-
// Cache miss. Allocate a new string, promote to global ref, and insert into cache.
102-
let local_jstring = jni_new_string!(env, &name)?;
103-
let global_ref = jni_new_global_ref!(env, local_jstring)?;
104-
let arc_global_ref = Arc::new(global_ref);
105-
metrics_jstrings.insert(name.to_string(), Arc::clone(&arc_global_ref));
106-
let jobject = arc_global_ref.as_obj();
107-
let jstring = JString::from_raw(**jobject);
108-
// Update the metrics using the jstring as a key.
109-
jni_call!(env, comet_metric_node(metric_node).set(&jstring, value) -> ())?;
110-
}
111-
}
112-
}
113-
Ok(())
83+
Ok(native_metric_node)
11484
}

native/core/src/jvm_bridge/comet_metric_node.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ pub struct CometMetricNode<'a> {
3030
pub method_get_child_node_ret: ReturnType,
3131
pub method_set: JMethodID,
3232
pub method_set_ret: ReturnType,
33+
pub method_set_all_from_bytes: JMethodID,
34+
pub method_set_all_from_bytes_ret: ReturnType,
3335
}
3436

3537
impl<'a> CometMetricNode<'a> {
@@ -47,6 +49,12 @@ impl<'a> CometMetricNode<'a> {
4749
method_get_child_node_ret: ReturnType::Object,
4850
method_set: env.get_method_id(Self::JVM_CLASS, "set", "(Ljava/lang/String;J)V")?,
4951
method_set_ret: ReturnType::Primitive(Primitive::Void),
52+
method_set_all_from_bytes: env.get_method_id(
53+
Self::JVM_CLASS,
54+
"set_all_from_bytes",
55+
"([B)V",
56+
)?,
57+
method_set_all_from_bytes_ret: ReturnType::Primitive(Primitive::Void),
5058
class,
5159
})
5260
}

native/core/src/jvm_bridge/mod.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,6 @@ macro_rules! jvalues {
4646
}}
4747
}
4848

49-
/// Macro for create a new JNI string.
50-
macro_rules! jni_new_string {
51-
($env:expr, $value:expr) => {{
52-
$crate::jvm_bridge::jni_map_error!($env, $env.new_string($value))
53-
}};
54-
}
55-
5649
/// Macro for calling a JNI method.
5750
/// The syntax is:
5851
/// jni_call!(env, comet_metric_node(metric_node).add(jname, value) -> ())?;
@@ -173,7 +166,6 @@ macro_rules! jni_new_global_ref {
173166
pub(crate) use jni_call;
174167
pub(crate) use jni_map_error;
175168
pub(crate) use jni_new_global_ref;
176-
pub(crate) use jni_new_string;
177169
pub(crate) use jni_static_call;
178170
pub(crate) use jvalues;
179171

native/proto/build.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ fn main() -> Result<()> {
3030
prost_build::Config::new().out_dir(out_dir).compile_protos(
3131
&[
3232
"src/proto/expr.proto",
33+
"src/proto/metric.proto",
3334
"src/proto/partitioning.proto",
3435
"src/proto/operator.proto",
3536
],

native/proto/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,9 @@ pub mod spark_partitioning {
3636
pub mod spark_operator {
3737
include!(concat!("generated", "/spark.spark_operator.rs"));
3838
}
39+
40+
// Include generated modules from .proto files.
41+
#[allow(missing_docs)]
42+
pub mod spark_metric {
43+
include!(concat!("generated", "/spark.spark_metric.rs"));
44+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
19+
20+
syntax = "proto3";
21+
22+
package spark.spark_metric;
23+
24+
option java_package = "org.apache.comet.serde";
25+
26+
message NativeMetricNode {
27+
map<string, int64> metrics = 1;
28+
repeated NativeMetricNode children = 2;
29+
}

spark/src/main/scala/org/apache/comet/CometExecIterator.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.spark.internal.Logging
2424
import org.apache.spark.sql.comet.CometMetricNode
2525
import org.apache.spark.sql.vectorized._
2626

27-
import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_POOL_TYPE, COMET_EXPLAIN_NATIVE_ENABLED, COMET_WORKER_THREADS}
27+
import org.apache.comet.CometConf.{COMET_BATCH_SIZE, COMET_BLOCKING_THREADS, COMET_DEBUG_ENABLED, COMET_EXEC_MEMORY_POOL_TYPE, COMET_EXPLAIN_NATIVE_ENABLED, COMET_METRICS_UPDATE_INTERVAL, COMET_WORKER_THREADS}
2828
import org.apache.comet.vector.NativeUtil
2929

3030
/**
@@ -72,6 +72,7 @@ class CometExecIterator(
7272
protobufQueryPlan,
7373
numParts,
7474
nativeMetrics,
75+
metricsUpdateInterval = COMET_METRICS_UPDATE_INTERVAL.get(),
7576
new CometTaskMemoryManager(id),
7677
batchSize = COMET_BATCH_SIZE.get(),
7778
use_unified_memory_manager = conf.getBoolean("spark.memory.offHeap.enabled", false),

0 commit comments

Comments
 (0)