11"""
22Parallel streaming implementation for high-throughput data loading.
33
4- This module implements parallel query execution using ThreadPoolExecutor.
5- It partitions streaming queries by block_num ranges using CTEs (Common Table Expressions)
6- that DataFusion inlines efficiently.
4+ This module implements parallel query execution using ThreadPoolExecutor.
5+ It partitions streaming queries by block_num ranges
76
87Key design decisions:
9- - Uses CTEs to shadow table names with filtered versions for clean partitioning
108- Only supports streaming queries (not regular load operations)
119- Block range partitioning only (block_num or _block_num columns)
1210"""
@@ -175,8 +173,7 @@ def wrap_query_with_partition(self, user_query: str, partition: QueryPartition)
175173
176174 # Create partition filter
177175 partition_filter = (
178- f"{ partition .block_column } >= { partition .start_block } "
179- f"AND { partition .block_column } < { partition .end_block } "
176+ f'{ partition .block_column } >= { partition .start_block } AND { partition .block_column } < { partition .end_block } '
180177 )
181178
182179 # Check if query already has a WHERE clause (case-insensitive)
@@ -201,11 +198,7 @@ def wrap_query_with_partition(self, user_query: str, partition: QueryPartition)
201198 end_pos = keyword_pos
202199
203200 # Insert partition filter with AND
204- partitioned_query = (
205- user_query [:end_pos ] +
206- f" AND ({ partition_filter } )" +
207- user_query [end_pos :]
208- )
201+ partitioned_query = user_query [:end_pos ] + f' AND ({ partition_filter } )' + user_query [end_pos :]
209202 else :
210203 # No WHERE clause - add one before ORDER BY, LIMIT, GROUP BY, or SETTINGS
211204 end_keywords = [' ORDER BY ' , ' LIMIT ' , ' GROUP BY ' , ' SETTINGS ' ]
@@ -217,11 +210,7 @@ def wrap_query_with_partition(self, user_query: str, partition: QueryPartition)
217210 insert_pos = keyword_pos
218211
219212 # Insert WHERE clause with partition filter
220- partitioned_query = (
221- user_query [:insert_pos ] +
222- f" WHERE { partition_filter } " +
223- user_query [insert_pos :]
224- )
213+ partitioned_query = user_query [:insert_pos ] + f' WHERE { partition_filter } ' + user_query [insert_pos :]
225214
226215 return partitioned_query
227216
@@ -270,7 +259,7 @@ def _detect_current_max_block(self) -> int:
270259 Raises:
271260 RuntimeError: If query fails or returns no results
272261 """
273- query = f" SELECT MAX({ self .config .block_column } ) as max_block FROM { self .config .table_name } "
262+ query = f' SELECT MAX({ self .config .block_column } ) as max_block FROM { self .config .table_name } '
274263 self .logger .info (f'Detecting current max block with query: { query } ' )
275264
276265 try :
@@ -290,7 +279,7 @@ def _detect_current_max_block(self) -> int:
290279
291280 except Exception as e :
292281 self .logger .error (f'Failed to detect max block: { e } ' )
293- raise RuntimeError (f'Failed to detect current max block from { self .config .table_name } : { e } ' )
282+ raise RuntimeError (f'Failed to detect current max block from { self .config .table_name } : { e } ' ) from e
294283
295284 def execute_parallel_stream (
296285 self , user_query : str , destination : str , connection_name : str , load_config : Optional [Dict [str , Any ]] = None
@@ -443,20 +432,16 @@ def execute_parallel_stream(
443432 # Add block filter to start from (detected_max - buffer) to catch potential reorgs
444433 # Check if query already has WHERE clause
445434 where_pos = streaming_query_upper .find (' WHERE ' )
446- block_filter = f" { self .config .block_column } >= { continuous_start_block } "
435+ block_filter = f' { self .config .block_column } >= { continuous_start_block } '
447436
448437 if where_pos != - 1 :
449438 # Has WHERE clause - append with AND
450439 # Find position after WHERE keyword
451440 insert_pos = where_pos + len (' WHERE ' )
452- streaming_query = (
453- streaming_query [:insert_pos ] +
454- f"({ block_filter } ) AND " +
455- streaming_query [insert_pos :]
456- )
441+ streaming_query = streaming_query [:insert_pos ] + f'({ block_filter } ) AND ' + streaming_query [insert_pos :]
457442 else :
458443 # No WHERE clause - add one before SETTINGS if present
459- streaming_query += f" WHERE { block_filter } "
444+ streaming_query += f' WHERE { block_filter } '
460445
461446 # Now add streaming settings for continuous mode
462447 streaming_query += ' SETTINGS stream = true'
@@ -521,7 +506,7 @@ def _execute_partition(
521506 destination = destination ,
522507 connection_name = connection_name ,
523508 read_all = False , # Stream batches for memory efficiency
524- ** load_config
509+ ** load_config ,
525510 )
526511
527512 # Aggregate results from streaming iterator
@@ -543,7 +528,7 @@ def _execute_partition(
543528 self .logger .info (
544529 f'Worker { partition .partition_id } completed: '
545530 f'{ total_rows :,} rows in { duration :.2f} s '
546- f'({ batch_count } batches, { total_rows / duration :.0f} rows/sec)'
531+ f'({ batch_count } batches, { total_rows / duration :.0f} rows/sec)'
547532 )
548533
549534 # Return aggregated result
@@ -603,4 +588,4 @@ def _log_final_stats(self):
603588 f'avg throughput: { avg_throughput :,.0f} rows/sec per worker'
604589 )
605590 else :
606- self .logger .error (f'Parallel execution failed: all { self ._stats .workers_failed } workers failed' )
591+ self .logger .error (f'Parallel execution failed: all { self ._stats .workers_failed } workers failed' )
0 commit comments