Skip to content

Invalid plans with metrics collection #260

@gabotechs

Description

@gabotechs

#259 is uncovering a bug with the metrics collection that can be very well seen in main by just adding some debug statements in the metrics collection tests.

The plans below are gathered by just placing a dbg! statement in each test:

        let display = display_plan_ascii(plan.as_ref(), true);
        dbg!(&display);

https://github.com/datafusion-contrib/datafusion-distributed/blob/gabrielmusat%2Frework-distributed-planning-logic/src/metrics/task_metrics_rewriter.rs#L470-L470

The plan shown here is:

┌───── DistributedExec ── Tasks: t0:[p0] 
│ SortPreservingMergeExec: [id@0 ASC NULLS LAST], fetch=10, metrics=[output_rows=2, elapsed_compute=12.667µs, output_bytes=24.0 B]
│   SortExec: TopK(fetch=10), expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_rows=2, elapsed_compute=326.918µs, output_bytes=0.0 B, row_replacements=2]
│     ProjectionExec: expr=[id@0 as id, count(Int64(1))@1 as count], metrics=[output_rows=2, elapsed_compute=4.169µs, output_bytes=1088.0 B]
│       AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[count(Int64(1))], metrics=[output_rows=2, elapsed_compute=233.002µs, output_bytes=1088.0 B, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, peak_mem_used=8576, aggregate_arguments_time=4.377µs, aggregation_time=48.002µs, emitting_time=4.043µs, time_calculating_group_ids=5.294µs]
│         [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 1 ── Tasks: t0:[p0..p3] t1:[p0..p3] 
  │ CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=4, elapsed_compute=34µs]
  │   RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=4, metrics=[spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=24.804248ms, repartition_time=226.338µs, send_time=8.861µs]
  │     AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[count(Int64(1))], metrics=[output_rows=4, elapsed_compute=234.542µs, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, skipped_aggregation_rows=0, peak_mem_used=8832, aggregate_arguments_time=37.172µs, aggregation_time=15.132µs, emitting_time=25.13µs, time_calculating_group_ids=23.964µs]
  │       CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=4, elapsed_compute=42.043µs]
  │         FilterExec: id@0 > 1, metrics=[output_rows=4, elapsed_compute=98.047µs]
  │           RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, metrics=[spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=4.377626ms, repartition_time=2ns, send_time=9.798µs]
  │             DataSourceExec: file_groups={1 group: [[var/folders/jg/9g6cr2vn4n17rd3n7_2kl_sr0000gp/T/table1_978d275d-fea5-467d-8278-bd8ecb324cfc.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 > 1 AND DynamicFilter [ empty ], pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 1, required_guarantees=[], metrics=[output_rows=6, elapsed_compute=2ns, batches_split=0, bytes_scanned=222, file_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, bloom_filter_eval_time=132.71µs, metadata_load_time=1.622711ms, page_index_eval_time=102.086µs, row_pushdown_eval_time=4ns, statistics_eval_time=90.043µs, time_elapsed_opening=2.528959ms, time_elapsed_processing=1.882456ms, time_elapsed_scanning_total=1.788499ms, time_elapsed_scanning_until_data=1.735626ms]
  └──────────────────────────────────────────────────

This plan is incorrect, as the leaf stage is running in two tasks, but has no PartitionIsolatorExec. The same thing happens in these other tests:

https://github.com/datafusion-contrib/datafusion-distributed/blob/gabrielmusat%2Frework-distributed-planning-logic/src/metrics/task_metrics_collector.rs#L294-L294
┌───── DistributedExec ── Tasks: t0:[p0] 
│ SortPreservingMergeExec: [id@0 ASC NULLS LAST], fetch=10, metrics=[output_rows=2, elapsed_compute=29.5µs, output_bytes=24.0 B]
│   SortExec: TopK(fetch=10), expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_rows=2, elapsed_compute=308.375µs, output_bytes=0.0 B, row_replacements=2]
│     ProjectionExec: expr=[id@0 as id, count(Int64(1))@1 as count], metrics=[output_rows=2, elapsed_compute=3.374µs, output_bytes=1088.0 B]
│       AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[count(Int64(1))], metrics=[output_rows=2, elapsed_compute=201.711µs, output_bytes=1088.0 B, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, peak_mem_used=8448, aggregate_arguments_time=4.542µs, aggregation_time=43.959µs, emitting_time=3.875µs, time_calculating_group_ids=5.209µs]
│         [Stage 1] => NetworkShuffleExec: output_partitions=2, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 1 ── Tasks: t0:[p0..p1] t1:[p0..p1] 
  │ CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=4, elapsed_compute=36.166µs]
  │   RepartitionExec: partitioning=Hash([id@0], 2), input_partitions=2, metrics=[spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=10.492249ms, repartition_time=218.795µs, send_time=5.504µs]
  │     AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[count(Int64(1))], metrics=[output_rows=4, elapsed_compute=243.875µs, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, skipped_aggregation_rows=0, peak_mem_used=8576, aggregate_arguments_time=26.335µs, aggregation_time=14.127µs, emitting_time=33.919µs, time_calculating_group_ids=62.502µs]
  │       CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=4, elapsed_compute=57.834µs]
  │         FilterExec: id@0 > 1, metrics=[output_rows=4, elapsed_compute=94.585µs]
  │           RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, metrics=[spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=3.3515ms, repartition_time=2ns, send_time=9.335µs]
  │             DataSourceExec: file_groups={1 group: [[var/folders/jg/9g6cr2vn4n17rd3n7_2kl_sr0000gp/T/table1_b6d072d8-10a2-4c6e-bf2b-04be20f94506.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 > 1 AND DynamicFilter [ empty ], pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 1, required_guarantees=[], metrics=[output_rows=6, elapsed_compute=2ns, batches_split=0, bytes_scanned=222, file_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, bloom_filter_eval_time=121.752µs, metadata_load_time=1.633961ms, page_index_eval_time=105.502µs, row_pushdown_eval_time=4ns, statistics_eval_time=91.044µs, time_elapsed_opening=2.456542ms, time_elapsed_processing=1.768374ms, time_elapsed_scanning_total=841.834µs, time_elapsed_scanning_until_data=795.083µs]
  └──────────────────────────────────────────────────

https://github.com/datafusion-contrib/datafusion-distributed/blob/gabrielmusat%2Frework-distributed-planning-logic/src/metrics/task_metrics_collector.rs#L318-L318
┌───── DistributedExec ── Tasks: t0:[p0] 
│ SortPreservingMergeExec: [country_code@0 ASC NULLS LAST], metrics=[output_rows=1, elapsed_compute=156.542µs, output_bytes=32.0 B]
│   [Stage 4] => NetworkCoalesceExec: output_partitions=4, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 4 ── Tasks: t0:[p0..p1] t1:[p0..p1] 
  │ SortExec: expr=[country_code@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_rows=1, elapsed_compute=240.586µs, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, batches_split=0]
  │   ProjectionExec: expr=[substr(table2.phone,Int64(1),Int64(2))@0 as country_code, count(Int64(1))@1 as num_customers, sum(table2.balance)@2 as total_balance], metrics=[output_rows=1, elapsed_compute=2.878µs]
  │     AggregateExec: mode=FinalPartitioned, gby=[substr(table2.phone,Int64(1),Int64(2))@0 as substr(table2.phone,Int64(1),Int64(2))], aggr=[count(Int64(1)), sum(table2.balance)], metrics=[output_rows=1, elapsed_compute=170.084µs, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, peak_mem_used=67328, aggregate_arguments_time=1.67µs, aggregation_time=42.462µs, emitting_time=3.211µs, time_calculating_group_ids=3.461µs]
  │       [Stage 3] => NetworkShuffleExec: output_partitions=2, input_tasks=1, metrics=[]
  └──────────────────────────────────────────────────
    ┌───── Stage 3 ── Tasks: t0:[p0..p3] 
    │ CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=1, elapsed_compute=18.042µs]
    │   RepartitionExec: partitioning=Hash([substr(table2.phone,Int64(1),Int64(2))@0], 4), input_partitions=2, metrics=[spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=22.033875ms, repartition_time=142.793µs, send_time=1.798µs]
    │     AggregateExec: mode=Partial, gby=[substr(phone@0, 1, 2) as substr(table2.phone,Int64(1),Int64(2))], aggr=[count(Int64(1)), sum(table2.balance)], metrics=[output_rows=1, elapsed_compute=341.333µs, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, skipped_aggregation_rows=0, peak_mem_used=33760, aggregate_arguments_time=4.335µs, aggregation_time=135.001µs, emitting_time=44.751µs, time_calculating_group_ids=75.793µs]
    │       RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, metrics=[spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=10.627707ms, repartition_time=1ns, send_time=1.626µs]
    │         NestedLoopJoinExec: join_type=Inner, filter=balance@0 > avg(table2.balance)@1, projection=[phone@0, balance@1], metrics=[output_rows=2, elapsed_compute=312.754µs, build_input_batches=2, build_input_rows=6, input_batches=1, input_rows=1, output_batches=1, build_mem_used=1456, build_time=135µs, join_time=146.211µs]
    │           CoalescePartitionsExec, metrics=[output_rows=6, elapsed_compute=55.209µs]
    │             [Stage 1] => NetworkCoalesceExec: output_partitions=4, input_tasks=2, metrics=[]
    │           AggregateExec: mode=Final, gby=[], aggr=[avg(table2.balance)], metrics=[output_rows=1, elapsed_compute=35.126µs]
    │             CoalescePartitionsExec, metrics=[output_rows=4, elapsed_compute=4.75µs]
    │               [Stage 2] => NetworkCoalesceExec: output_partitions=4, input_tasks=2, metrics=[]
    └──────────────────────────────────────────────────
      ┌───── Stage 1 ── Tasks: t0:[p0..p1] t1:[p2..p3] 
      │ CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=6, elapsed_compute=107.832µs]
      │   FilterExec: substr(phone@0, 1, 2) IN ([13, 31, 23, 29, 30, 18]), metrics=[output_rows=6, elapsed_compute=376.253µs]
      │     RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, metrics=[spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=5.078459ms, repartition_time=2ns, send_time=10.502µs]
      │       DataSourceExec: file_groups={1 group: [[var/folders/jg/9g6cr2vn4n17rd3n7_2kl_sr0000gp/T/table2_4845a43c-c203-417e-baf8-5b195767b2a8.parquet]]}, projection=[phone, balance], file_type=parquet, predicate=substr(phone@0, 1, 2) IN ([13, 31, 23, 29, 30, 18]), metrics=[output_rows=6, elapsed_compute=2ns, batches_split=0, bytes_scanned=278, file_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, bloom_filter_eval_time=4ns, metadata_load_time=1.913544ms, page_index_eval_time=86ns, row_pushdown_eval_time=4ns, statistics_eval_time=4ns, time_elapsed_opening=2.169083ms, time_elapsed_processing=1.247792ms, time_elapsed_scanning_total=2.845041ms, time_elapsed_scanning_until_data=2.776625ms]
      └──────────────────────────────────────────────────
      ┌───── Stage 2 ── Tasks: t0:[p0..p1] t1:[p2..p3] 
      │ AggregateExec: mode=Partial, gby=[], aggr=[avg(table2.balance)], metrics=[output_rows=4, elapsed_compute=171.5µs]
      │   CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=6, elapsed_compute=6.625µs]
      │     FilterExec: balance@0 > 0, metrics=[output_rows=6, elapsed_compute=9.127µs]
      │       RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, metrics=[spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=8.254584ms, repartition_time=2ns, send_time=3.46µs]
      │         DataSourceExec: file_groups={1 group: [[var/folders/jg/9g6cr2vn4n17rd3n7_2kl_sr0000gp/T/table2_4845a43c-c203-417e-baf8-5b195767b2a8.parquet]]}, projection=[balance], file_type=parquet, predicate=balance@0 > 0, pruning_predicate=balance_null_count@1 != row_count@2 AND balance_max@0 > 0, required_guarantees=[], metrics=[output_rows=6, elapsed_compute=2ns, batches_split=0, bytes_scanned=554, file_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, bloom_filter_eval_time=107.544µs, metadata_load_time=6.286461ms, page_index_eval_time=109.503µs, row_pushdown_eval_time=4ns, statistics_eval_time=57.461µs, time_elapsed_opening=6.909458ms, time_elapsed_processing=1.160207ms, time_elapsed_scanning_total=1.313418ms, time_elapsed_scanning_until_data=1.291292ms]
      └──────────────────────────────────────────────────
https://github.com/datafusion-contrib/datafusion-distributed/blob/gabrielmusat%2Frework-distributed-planning-logic/src/metrics/task_metrics_collector.rs#L339-L339
┌───── DistributedExec ── Tasks: t0:[p0] 
│ CoalescePartitionsExec, metrics=[output_rows=1, elapsed_compute=27.041µs, output_bytes=320.0 B]
│   [Stage 2] => NetworkCoalesceExec: output_partitions=4, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 2 ── Tasks: t0:[p0..p1] t1:[p0..p1] 
  │ AggregateExec: mode=FinalPartitioned, gby=[company@0 as company], aggr=[], metrics=[output_rows=1, elapsed_compute=368.962µs, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, peak_mem_used=4600, aggregate_arguments_time=4ns, aggregation_time=4ns, emitting_time=15.212µs, time_calculating_group_ids=29.586µs]
  │   [Stage 1] => NetworkShuffleExec: output_partitions=2, input_tasks=2, metrics=[]
  └──────────────────────────────────────────────────
    ┌───── Stage 1 ── Tasks: t0:[p0..p3] t1:[p0..p3] 
    │ CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=2, elapsed_compute=39.083µs]
    │   RepartitionExec: partitioning=Hash([company@0], 4), input_partitions=2, metrics=[spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=8.02921ms, repartition_time=80.169µs, send_time=4.472µs]
    │     RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1, metrics=[spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=3.594667ms, repartition_time=2ns, send_time=9.002µs]
    │       AggregateExec: mode=Partial, gby=[company@0 as company], aggr=[], metrics=[output_rows=2, elapsed_compute=325.042µs, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, skipped_aggregation_rows=0, peak_mem_used=3026, aggregate_arguments_time=2ns, aggregation_time=2ns, emitting_time=107.75µs, time_calculating_group_ids=84.792µs]
    │         DataSourceExec: file_groups={1 group: [[var/folders/jg/9g6cr2vn4n17rd3n7_2kl_sr0000gp/T/table2_ad2f76d1-d75b-4872-9813-dcb43983dc50.parquet]]}, projection=[company], file_type=parquet, metrics=[output_rows=6, elapsed_compute=2ns, batches_split=0, bytes_scanned=90, file_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, bloom_filter_eval_time=4ns, metadata_load_time=910.376µs, page_index_eval_time=4ns, row_pushdown_eval_time=4ns, statistics_eval_time=4ns, time_elapsed_opening=1.002958ms, time_elapsed_processing=906.749µs, time_elapsed_scanning_total=2.282916ms, time_elapsed_scanning_until_data=2.1175ms]
    └──────────────────────────────────────────────────

After #259, the plans will look like this:

┌───── DistributedExec ── Tasks: t0:[p0] 
│ SortPreservingMergeExec: [id@0 ASC NULLS LAST], fetch=10, metrics=[output_rows=2, elapsed_compute=11.375µs, output_bytes=24.0 B]
│   SortExec: TopK(fetch=10), expr=[id@0 ASC NULLS LAST], preserve_partitioning=[true], metrics=[output_rows=2, elapsed_compute=364.999µs, output_bytes=0.0 B, row_replacements=2]
│     ProjectionExec: expr=[id@0 as id, count(Int64(1))@1 as count], metrics=[output_rows=2, elapsed_compute=4.335µs, output_bytes=1088.0 B]
│       AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[count(Int64(1))], metrics=[output_rows=2, elapsed_compute=227.834µs, output_bytes=1088.0 B, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, peak_mem_used=8576, aggregate_arguments_time=2.502µs, aggregation_time=63.418µs, emitting_time=3.627µs, time_calculating_group_ids=2.961µs]
│         [Stage 1] => NetworkShuffleExec: output_partitions=4, input_tasks=2, metrics=[]
└──────────────────────────────────────────────────
  ┌───── Stage 1 ── Tasks: t0:[p0..p3] t1:[p0..p3] 
  │ CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=2, elapsed_compute=24.333µs]
  │   RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=4, metrics=[spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=15.328083ms, repartition_time=175.507µs, send_time=4.071µs]
  │     AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[count(Int64(1))], metrics=[output_rows=2, elapsed_compute=231.209µs, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, skipped_aggregation_rows=0, peak_mem_used=4672, aggregate_arguments_time=47.215µs, aggregation_time=16.215µs, emitting_time=27.007µs, time_calculating_group_ids=28.299µs]
  │       CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=2, elapsed_compute=70.376µs]
  │         FilterExec: id@0 > 1, metrics=[output_rows=2, elapsed_compute=157.132µs]
  │           RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, metrics=[spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=2.5985ms, repartition_time=2ns, send_time=4.215µs]
  │             PartitionIsolatorExec: t0:[p0] t1:[__] , metrics=[]
  │               DataSourceExec: file_groups={1 group: [[var/folders/jg/9g6cr2vn4n17rd3n7_2kl_sr0000gp/T/table1_0baf6913-0d06-4033-a127-3c8623bbe98a.parquet]]}, projection=[id], file_type=parquet, predicate=id@0 > 1 AND DynamicFilter [ empty ], pruning_predicate=id_null_count@1 != row_count@2 AND id_max@0 > 1, required_guarantees=[], metrics=[output_rows=3, elapsed_compute=1ns, batches_split=0, bytes_scanned=111, file_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, predicate_cache_inner_records=0, predicate_cache_records=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, bloom_filter_eval_time=143.584µs, metadata_load_time=1.230459ms, page_index_eval_time=101.751µs, row_pushdown_eval_time=2ns, statistics_eval_time=107.751µs, time_elapsed_opening=2.079792ms, time_elapsed_processing=1.548458ms, time_elapsed_scanning_total=477.834µs, time_elapsed_scanning_until_data=428.25µs]
  └──────────────────────────────────────────────────

Which is correct as that does include the PartitionIsolatorExec, but unfortunately that makes the all these tests fail.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions