Skip to content

Commit c5e78b6

Browse files
authored
feat: respect batchSize/workerThreads/blockingThreads configurations for native_iceberg_compat scan (apache#1587)
1 parent dbfbff4 commit c5e78b6

File tree

5 files changed

+45
-8
lines changed

5 files changed

+45
-8
lines changed

common/src/main/java/org/apache/comet/parquet/Native.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,10 @@ public static native long initRecordBatchReader(
256256
byte[] filter,
257257
byte[] requiredSchema,
258258
byte[] dataSchema,
259-
String sessionTimezone);
259+
String sessionTimezone,
260+
int batchSize,
261+
int workerThreads,
262+
int blockingThreads);
260263

261264
// arrow native version of read batch
262265
/**

common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,20 @@ public void init() throws URISyntaxException, IOException {
353353
}
354354
}
355355

356+
int batchSize =
357+
conf.getInt(
358+
CometConf.COMET_BATCH_SIZE().key(),
359+
(Integer) CometConf.COMET_BATCH_SIZE().defaultValue().get());
360+
int workerThreads =
361+
conf.getInt(
362+
CometConf.COMET_WORKER_THREADS().key(),
363+
(Integer) CometConf.COMET_WORKER_THREADS().defaultValue().get());
364+
;
365+
int blockingThreads =
366+
conf.getInt(
367+
CometConf.COMET_BLOCKING_THREADS().key(),
368+
(Integer) CometConf.COMET_BLOCKING_THREADS().defaultValue().get());
369+
;
356370
this.handle =
357371
Native.initRecordBatchReader(
358372
filePath,
@@ -362,7 +376,10 @@ public void init() throws URISyntaxException, IOException {
362376
nativeFilter,
363377
serializedRequestedArrowSchema,
364378
serializedDataArrowSchema,
365-
timeZoneId);
379+
timeZoneId,
380+
batchSize,
381+
workerThreads,
382+
blockingThreads);
366383
isInitialized = true;
367384
}
368385

native/core/src/execution/planner.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,11 @@ impl PhysicalPlanner {
175175
}
176176
}
177177

178+
/// Return session context of this planner.
179+
pub fn session_ctx(&self) -> &Arc<SessionContext> {
180+
&self.session_ctx
181+
}
182+
178183
/// get DataFusion PartitionedFiles from a Spark FilePartition
179184
fn get_partitioned_files(
180185
&self,

native/core/src/parquet/mod.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,9 @@ use crate::parquet::parquet_support::prepare_object_store;
5454
use arrow::array::{Array, RecordBatch};
5555
use arrow::buffer::{Buffer, MutableBuffer};
5656
use datafusion::datasource::listing::PartitionedFile;
57-
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
57+
use datafusion::execution::SendableRecordBatchStream;
5858
use datafusion::physical_plan::ExecutionPlan;
59+
use datafusion::prelude::{SessionConfig, SessionContext};
5960
use futures::{poll, StreamExt};
6061
use jni::objects::{JBooleanArray, JByteArray, JLongArray, JPrimitiveArray, JString, ReleaseMode};
6162
use jni::sys::jstring;
@@ -650,21 +651,29 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
650651
required_schema: jbyteArray,
651652
data_schema: jbyteArray,
652653
session_timezone: jstring,
654+
batch_size: jint,
655+
worker_threads: jint,
656+
blocking_threads: jint,
653657
) -> jlong {
654658
try_unwrap_or_throw(&e, |mut env| unsafe {
655-
let task_ctx = TaskContext::default();
659+
let session_config = SessionConfig::new().with_batch_size(batch_size as usize);
660+
let planer =
661+
PhysicalPlanner::new(Arc::new(SessionContext::new_with_config(session_config)));
662+
let session_ctx = planer.session_ctx();
656663

657664
let path: String = env
658665
.get_string(&JString::from_raw(file_path))
659666
.unwrap()
660667
.into();
661668

662669
let runtime = tokio::runtime::Builder::new_multi_thread()
670+
.worker_threads(worker_threads as usize)
671+
.max_blocking_threads(blocking_threads as usize)
663672
.enable_all()
664673
.build()?;
665674

666675
let (object_store_url, object_store_path) =
667-
prepare_object_store(task_ctx.runtime_env(), path.clone())?;
676+
prepare_object_store(session_ctx.runtime_env(), path.clone())?;
668677

669678
let required_schema_array = JByteArray::from_raw(required_schema);
670679
let required_schema_buffer = env.convert_byte_array(&required_schema_array)?;
@@ -674,8 +683,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
674683
let data_schema_buffer = env.convert_byte_array(&data_schema_array)?;
675684
let data_schema = Arc::new(deserialize_schema(data_schema_buffer.as_bytes())?);
676685

677-
let planer = PhysicalPlanner::default();
678-
679686
let data_filters = if !filter.is_null() {
680687
let filter_array = JByteArray::from_raw(filter);
681688
let filter_buffer = env.convert_byte_array(&filter_array)?;
@@ -708,7 +715,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
708715
)?;
709716

710717
let partition_index: usize = 0;
711-
let batch_stream = Some(scan.execute(partition_index, Arc::new(task_ctx))?);
718+
let batch_stream = Some(scan.execute(partition_index, session_ctx.task_ctx())?);
712719

713720
let ctx = BatchContext {
714721
runtime,

spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,11 @@ object CometParquetFileFormat extends Logging with ShimSQLConf {
232232
hadoopConf.setBoolean(
233233
CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP.key,
234234
CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP.get())
235+
hadoopConf.setInt(CometConf.COMET_BATCH_SIZE.key, CometConf.COMET_BATCH_SIZE.get())
236+
hadoopConf.setInt(CometConf.COMET_WORKER_THREADS.key, CometConf.COMET_WORKER_THREADS.get())
237+
hadoopConf.setInt(
238+
CometConf.COMET_BLOCKING_THREADS.key,
239+
CometConf.COMET_BLOCKING_THREADS.get())
235240
}
236241

237242
def getDatetimeRebaseSpec(

0 commit comments

Comments
 (0)