@@ -66,10 +66,12 @@ use crate::execution::memory_pools::{
6666use crate :: execution:: operators:: ScanExec ;
6767use crate :: execution:: shuffle:: { read_ipc_compressed, CompressionCodec } ;
6868use crate :: execution:: spark_plan:: SparkPlan ;
69+
70+ use crate :: execution:: tracing:: TraceGuard ;
71+ use crate :: execution:: tracing:: { log_counter, trace_begin, trace_end} ;
72+
6973use log:: info;
7074use once_cell:: sync:: Lazy ;
71- #[ cfg( target_os = "linux" ) ]
72- use procfs:: process:: Process ;
7375#[ cfg( feature = "jemalloc" ) ]
7476use tikv_jemalloc_ctl:: { epoch, stats} ;
7577
@@ -131,7 +133,7 @@ struct ExecutionContext {
131133 /// Memory pool config
132134 pub memory_pool_config : MemoryPoolConfig ,
133135 /// Whether to log memory usage on each call to execute_plan
134- pub memory_profiling_enabled : bool ,
136+ pub tracing_enabled : bool ,
135137}
136138
137139/// Accept serialized query plan and return the address of the native query plan.
@@ -157,9 +159,11 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
157159 task_attempt_id : jlong ,
158160 debug_native : jboolean ,
159161 explain_native : jboolean ,
160- memory_profiling_enabled : jboolean ,
162+ tracing_enabled : jboolean ,
161163) -> jlong {
162164 try_unwrap_or_throw ( & e, |mut env| {
165+ let _ = TraceGuard :: new ( "createPlan" , tracing_enabled != JNI_FALSE ) ;
166+
163167 // Init JVM classes
164168 JVMClasses :: init ( & mut env) ;
165169
@@ -238,7 +242,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
238242 debug_native : debug_native == 1 ,
239243 explain_native : explain_native == 1 ,
240244 memory_pool_config,
241- memory_profiling_enabled : memory_profiling_enabled != JNI_FALSE ,
245+ tracing_enabled : tracing_enabled != JNI_FALSE ,
242246 } ) ;
243247
244248 Ok ( Box :: into_raw ( exec_context) as i64 )
@@ -362,43 +366,22 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
362366 exec_context : jlong ,
363367 array_addrs : jlongArray ,
364368 schema_addrs : jlongArray ,
369+ tracing_enabled : jboolean ,
365370) -> jlong {
366371 try_unwrap_or_throw ( & e, |mut env| {
372+ let _ = TraceGuard :: new ( "executePlan" , tracing_enabled != JNI_FALSE ) ;
373+
367374 // Retrieve the query
368375 let exec_context = get_execution_context ( exec_context) ;
369376
370- // memory profiling is only available on linux
371- if exec_context. memory_profiling_enabled {
372- #[ cfg( target_os = "linux" ) ]
377+ if exec_context. tracing_enabled {
378+ #[ cfg( feature = "jemalloc" ) ]
373379 {
374- let pid = std:: process:: id ( ) ;
375- let process = Process :: new ( pid as i32 ) . unwrap ( ) ;
376- let statm = process. statm ( ) . unwrap ( ) ;
377- let page_size = procfs:: page_size ( ) ;
378- println ! (
379- "NATIVE_MEMORY: {{ resident: {:.0} }}" ,
380- ( statm. resident * page_size) as f64 / ( 1024.0 * 1024.0 )
381- ) ;
382-
383- #[ cfg( feature = "jemalloc" ) ]
384- {
385- // Obtain a MIB for the `epoch`, `stats.allocated`, and
386- // `atats.resident` keys:
387- let e = epoch:: mib ( ) . unwrap ( ) ;
388- let allocated = stats:: allocated:: mib ( ) . unwrap ( ) ;
389- let resident = stats:: resident:: mib ( ) . unwrap ( ) ;
390-
391- // Many statistics are cached and only updated
392- // when the epoch is advanced:
393- e. advance ( ) . unwrap ( ) ;
394-
395- // Read statistics using MIB key:
396- let allocated = allocated. read ( ) . unwrap ( ) as f64 / ( 1024.0 * 1024.0 ) ;
397- let resident = resident. read ( ) . unwrap ( ) as f64 / ( 1024.0 * 1024.0 ) ;
398- println ! (
399- "NATIVE_MEMORY_JEMALLOC: {{ allocated: {allocated:.0}, resident: {resident:.0} }}"
400- ) ;
401- }
380+ let e = epoch:: mib ( ) . unwrap ( ) ;
381+ let allocated = stats:: allocated:: mib ( ) . unwrap ( ) ;
382+ e. advance ( ) . unwrap ( ) ;
383+ use crate :: execution:: tracing:: log_counter;
384+ log_counter ( "jemalloc_allocated" , allocated. read ( ) . unwrap ( ) as u64 ) ;
402385 }
403386 }
404387
@@ -481,7 +464,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_executePlan(
481464 ) ;
482465 }
483466 }
484-
485467 return Ok ( -1 ) ;
486468 }
487469 // A poll pending means there are more than one blocking operators,
@@ -580,8 +562,11 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_writeSortedFileNative
580562 current_checksum : jlong ,
581563 compression_codec : jstring ,
582564 compression_level : jint ,
565+ tracing_enabled : jboolean ,
583566) -> jlongArray {
584567 try_unwrap_or_throw ( & e, |mut env| unsafe {
568+ let _ = TraceGuard :: new ( "writeSortedFileNative" , tracing_enabled != JNI_FALSE ) ;
569+
585570 let data_types = convert_datatype_arrays ( & mut env, serialized_datatypes) ?;
586571
587572 let row_address_array = JLongArray :: from_raw ( row_addresses) ;
@@ -655,12 +640,13 @@ pub extern "system" fn Java_org_apache_comet_Native_sortRowPartitionsNative(
655640 _class : JClass ,
656641 address : jlong ,
657642 size : jlong ,
643+ tracing_enabled : jboolean ,
658644) {
659645 try_unwrap_or_throw ( & e, |_| {
646+ let _ = TraceGuard :: new ( "sortRowPartitionsNative" , tracing_enabled != JNI_FALSE ) ;
660647 // SAFETY: JVM unsafe memory allocation is aligned with long.
661648 let array = unsafe { std:: slice:: from_raw_parts_mut ( address as * mut i64 , size as usize ) } ;
662649 array. rdxsort ( ) ;
663-
664650 Ok ( ( ) )
665651 } )
666652}
@@ -676,12 +662,60 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_decodeShuffleBlock(
676662 length : jint ,
677663 array_addrs : jlongArray ,
678664 schema_addrs : jlongArray ,
665+ tracing_enabled : jboolean ,
679666) -> jlong {
680667 try_unwrap_or_throw ( & e, |mut env| {
668+ let _ = TraceGuard :: new ( "decodeShuffleBlock" , tracing_enabled != JNI_FALSE ) ;
681669 let raw_pointer = env. get_direct_buffer_address ( & byte_buffer) ?;
682670 let length = length as usize ;
683671 let slice: & [ u8 ] = unsafe { std:: slice:: from_raw_parts ( raw_pointer, length) } ;
684672 let batch = read_ipc_compressed ( slice) ?;
685673 prepare_output ( & mut env, array_addrs, schema_addrs, batch, false )
686674 } )
687675}
676+
677+ #[ no_mangle]
678+ /// # Safety
679+ /// This function is inherently unsafe since it deals with raw pointers passed from JNI.
680+ pub unsafe extern "system" fn Java_org_apache_comet_Native_traceBegin (
681+ e : JNIEnv ,
682+ _class : JClass ,
683+ event : jstring ,
684+ ) {
685+ try_unwrap_or_throw ( & e, |mut env| {
686+ let name: String = env. get_string ( & JString :: from_raw ( event) ) . unwrap ( ) . into ( ) ;
687+ trace_begin ( & name) ;
688+ Ok ( ( ) )
689+ } )
690+ }
691+
692+ #[ no_mangle]
693+ /// # Safety
694+ /// This function is inherently unsafe since it deals with raw pointers passed from JNI.
695+ pub unsafe extern "system" fn Java_org_apache_comet_Native_traceEnd (
696+ e : JNIEnv ,
697+ _class : JClass ,
698+ event : jstring ,
699+ ) {
700+ try_unwrap_or_throw ( & e, |mut env| {
701+ let name: String = env. get_string ( & JString :: from_raw ( event) ) . unwrap ( ) . into ( ) ;
702+ trace_end ( & name) ;
703+ Ok ( ( ) )
704+ } )
705+ }
706+
707+ #[ no_mangle]
708+ /// # Safety
709+ /// This function is inherently unsafe since it deals with raw pointers passed from JNI.
710+ pub unsafe extern "system" fn Java_org_apache_comet_Native_logCounter (
711+ e : JNIEnv ,
712+ _class : JClass ,
713+ name : jstring ,
714+ value : jlong ,
715+ ) {
716+ try_unwrap_or_throw ( & e, |mut env| {
717+ let name: String = env. get_string ( & JString :: from_raw ( name) ) . unwrap ( ) . into ( ) ;
718+ log_counter ( & name, value as u64 ) ;
719+ Ok ( ( ) )
720+ } )
721+ }
0 commit comments