@@ -29,16 +29,7 @@ def run(trace_query_lazy: nil)
29
29
first_pass = false
30
30
fiber_vars = get_fiber_variables
31
31
32
- while ( f = ( job_fibers . shift || ( ( ( job_fibers . size + next_job_fibers . size + source_tasks . size ) < jobs_fiber_limit ) && spawn_job_fiber ( trace ) ) ) )
33
- if f . alive?
34
- finished = run_fiber ( f )
35
- if !finished
36
- next_job_fibers << f
37
- end
38
- end
39
- end
40
- job_fibers . concat ( next_job_fibers )
41
- next_job_fibers . clear
32
+ run_pending_steps ( job_fibers , next_job_fibers , source_tasks , jobs_fiber_limit , trace )
42
33
43
34
Sync do |root_task |
44
35
set_fiber_variables ( fiber_vars )
@@ -54,6 +45,18 @@ def run(trace_query_lazy: nil)
54
45
next_source_tasks . clear
55
46
end
56
47
end
48
+
49
+ if !@lazies_at_depth . empty?
50
+ with_trace_query_lazy ( trace_query_lazy ) do
51
+ run_next_pending_lazies ( job_fibers , trace )
52
+ run_pending_steps ( job_fibers , next_job_fibers , source_tasks , jobs_fiber_limit , trace )
53
+ end
54
+ elsif !@steps_to_rerun_after_lazy . empty?
55
+ @pending_jobs . concat ( @steps_to_rerun_after_lazy )
56
+ f = spawn_job_fiber ( trace )
57
+ job_fibers << f
58
+ @steps_to_rerun_after_lazy . clear
59
+ end
57
60
end
58
61
trace &.end_dataloader ( self )
59
62
end
@@ -69,6 +72,19 @@ def run(trace_query_lazy: nil)
69
72
70
73
private
71
74
75
+ def run_pending_steps ( job_fibers , next_job_fibers , source_tasks , jobs_fiber_limit , trace )
76
+ while ( f = ( job_fibers . shift || ( ( ( job_fibers . size + next_job_fibers . size + source_tasks . size ) < jobs_fiber_limit ) && spawn_job_fiber ( trace ) ) ) )
77
+ if f . alive?
78
+ finished = run_fiber ( f )
79
+ if !finished
80
+ next_job_fibers << f
81
+ end
82
+ end
83
+ end
84
+ job_fibers . concat ( next_job_fibers )
85
+ next_job_fibers . clear
86
+ end
87
+
72
88
def spawn_source_task ( parent_task , condition , trace )
73
89
pending_sources = nil
74
90
@source_cache . each_value do |source_by_batch_params |
0 commit comments