Skip to content

Commit 4ee6edc

Browse files
authored
Merge pull request #5422 from rmosolgo/merge-dataloader-and-lazy
Merge lazy resolution into Dataloader
2 parents e634d8d + add47cc commit 4ee6edc

File tree

16 files changed

+208
-440
lines changed

16 files changed

+208
-440
lines changed

lib/graphql/dataloader.rb

Lines changed: 75 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ def initialize(nonblocking: self.class.default_nonblocking, fiber_limit: self.cl
6464
@nonblocking = nonblocking
6565
end
6666
@fiber_limit = fiber_limit
67+
@lazies_at_depth = Hash.new { |h, k| h[k] = [] }
6768
end
6869

6970
# @return [Integer, nil]
@@ -140,10 +141,10 @@ def yield(source = Fiber[:__graphql_current_dataloader_source])
140141
end
141142

142143
# @api private Nothing to see here
143-
def append_job(&job)
144+
def append_job(callable = nil, &job)
144145
# Given a block, queue it up to be worked through when `#run` is called.
145-
# (If the dataloader is already running, than a Fiber will pick this up later.)
146-
@pending_jobs.push(job)
146+
# (If the dataloader is already running, then a Fiber will pick this up later.)
147+
@pending_jobs.push(callable || job)
147148
nil
148149
end
149150

@@ -160,6 +161,10 @@ def clear_cache
160161
def run_isolated
161162
prev_queue = @pending_jobs
162163
prev_pending_keys = {}
164+
prev_lazies_at_depth = @lazies_at_depth
165+
@lazies_at_depth = @lazies_at_depth.dup.clear
166+
# Clear pending loads but keep already-cached records
167+
# in case they are useful to the given block.
163168
@source_cache.each do |source_class, batched_sources|
164169
batched_sources.each do |batch_args, batched_source_instance|
165170
if batched_source_instance.pending?
@@ -179,6 +184,7 @@ def run_isolated
179184
res
180185
ensure
181186
@pending_jobs = prev_queue
187+
@lazies_at_depth = prev_lazies_at_depth
182188
prev_pending_keys.each do |source_instance, pending|
183189
pending.each do |key, value|
184190
if !source_instance.results.key?(key)
@@ -188,7 +194,8 @@ def run_isolated
188194
end
189195
end
190196

191-
def run
197+
# @param trace_query_lazy [nil, Execution::Multiplex]
198+
def run(trace_query_lazy: nil)
192199
trace = Fiber[:__graphql_current_multiplex]&.current_trace
193200
jobs_fiber_limit, total_fiber_limit = calculate_fiber_limit
194201
job_fibers = []
@@ -201,26 +208,13 @@ def run
201208
while first_pass || !job_fibers.empty?
202209
first_pass = false
203210

204-
while (f = (job_fibers.shift || (((next_job_fibers.size + job_fibers.size) < jobs_fiber_limit) && spawn_job_fiber(trace))))
205-
if f.alive?
206-
finished = run_fiber(f)
207-
if !finished
208-
next_job_fibers << f
209-
end
210-
end
211-
end
212-
join_queues(job_fibers, next_job_fibers)
213-
214-
while (!source_fibers.empty? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) })
215-
while (f = source_fibers.shift || (((job_fibers.size + source_fibers.size + next_source_fibers.size + next_job_fibers.size) < total_fiber_limit) && spawn_source_fiber(trace)))
216-
if f.alive?
217-
finished = run_fiber(f)
218-
if !finished
219-
next_source_fibers << f
220-
end
221-
end
211+
run_pending_steps(trace, job_fibers, next_job_fibers, jobs_fiber_limit, source_fibers, next_source_fibers, total_fiber_limit)
212+
213+
if !@lazies_at_depth.empty?
214+
with_trace_query_lazy(trace_query_lazy) do
215+
run_next_pending_lazies(job_fibers, trace)
216+
run_pending_steps(trace, job_fibers, next_job_fibers, jobs_fiber_limit, source_fibers, next_source_fibers, total_fiber_limit)
222217
end
223-
join_queues(source_fibers, next_source_fibers)
224218
end
225219
end
226220

@@ -248,6 +242,11 @@ def run_fiber(f)
248242
f.resume
249243
end
250244

245+
# @api private
246+
def lazy_at_depth(depth, lazy)
247+
@lazies_at_depth[depth] << lazy
248+
end
249+
251250
def spawn_fiber
252251
fiber_vars = get_fiber_variables
253252
Fiber.new(blocking: !@nonblocking) {
@@ -275,6 +274,59 @@ def merge_records(records, index_by: :id)
275274

276275
private
277276

277+
def run_next_pending_lazies(job_fibers, trace)
278+
smallest_depth = nil
279+
@lazies_at_depth.each_key do |depth_key|
280+
smallest_depth ||= depth_key
281+
if depth_key < smallest_depth
282+
smallest_depth = depth_key
283+
end
284+
end
285+
286+
if smallest_depth
287+
lazies = @lazies_at_depth.delete(smallest_depth)
288+
if !lazies.empty?
289+
lazies.each_with_index do |l, idx|
290+
append_job { l.value }
291+
end
292+
job_fibers.unshift(spawn_job_fiber(trace))
293+
end
294+
end
295+
end
296+
297+
def run_pending_steps(trace, job_fibers, next_job_fibers, jobs_fiber_limit, source_fibers, next_source_fibers, total_fiber_limit)
298+
while (f = (job_fibers.shift || (((next_job_fibers.size + job_fibers.size) < jobs_fiber_limit) && spawn_job_fiber(trace))))
299+
if f.alive?
300+
finished = run_fiber(f)
301+
if !finished
302+
next_job_fibers << f
303+
end
304+
end
305+
end
306+
join_queues(job_fibers, next_job_fibers)
307+
308+
while (!source_fibers.empty? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) })
309+
while (f = source_fibers.shift || (((job_fibers.size + source_fibers.size + next_source_fibers.size + next_job_fibers.size) < total_fiber_limit) && spawn_source_fiber(trace)))
310+
if f.alive?
311+
finished = run_fiber(f)
312+
if !finished
313+
next_source_fibers << f
314+
end
315+
end
316+
end
317+
join_queues(source_fibers, next_source_fibers)
318+
end
319+
end
320+
321+
def with_trace_query_lazy(multiplex_or_nil, &block)
322+
if (multiplex = multiplex_or_nil)
323+
query = multiplex.queries.length == 1 ? multiplex.queries[0] : nil
324+
multiplex.current_trace.execute_query_lazy(query: query, multiplex: multiplex, &block)
325+
else
326+
yield
327+
end
328+
end
329+
278330
def calculate_fiber_limit
279331
total_fiber_limit = @fiber_limit || Float::INFINITY
280332
if total_fiber_limit < 4

lib/graphql/dataloader/async_dataloader.rb

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def yield(source = Fiber[:__graphql_current_dataloader_source])
1414
nil
1515
end
1616

17-
def run
17+
def run(trace_query_lazy: nil)
1818
trace = Fiber[:__graphql_current_multiplex]&.current_trace
1919
jobs_fiber_limit, total_fiber_limit = calculate_fiber_limit
2020
job_fibers = []
@@ -29,16 +29,7 @@ def run
2929
first_pass = false
3030
fiber_vars = get_fiber_variables
3131

32-
while (f = (job_fibers.shift || (((job_fibers.size + next_job_fibers.size + source_tasks.size) < jobs_fiber_limit) && spawn_job_fiber(trace))))
33-
if f.alive?
34-
finished = run_fiber(f)
35-
if !finished
36-
next_job_fibers << f
37-
end
38-
end
39-
end
40-
job_fibers.concat(next_job_fibers)
41-
next_job_fibers.clear
32+
run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace)
4233

4334
Sync do |root_task|
4435
set_fiber_variables(fiber_vars)
@@ -54,6 +45,13 @@ def run
5445
next_source_tasks.clear
5546
end
5647
end
48+
49+
if !@lazies_at_depth.empty?
50+
with_trace_query_lazy(trace_query_lazy) do
51+
run_next_pending_lazies(job_fibers, trace)
52+
run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace)
53+
end
54+
end
5755
end
5856
trace&.end_dataloader(self)
5957
end
@@ -69,6 +67,19 @@ def run
6967

7068
private
7169

70+
def run_pending_steps(job_fibers, next_job_fibers, source_tasks, jobs_fiber_limit, trace)
71+
while (f = (job_fibers.shift || (((job_fibers.size + next_job_fibers.size + source_tasks.size) < jobs_fiber_limit) && spawn_job_fiber(trace))))
72+
if f.alive?
73+
finished = run_fiber(f)
74+
if !finished
75+
next_job_fibers << f
76+
end
77+
end
78+
end
79+
job_fibers.concat(next_job_fibers)
80+
next_job_fibers.clear
81+
end
82+
7283
def spawn_source_task(parent_task, condition, trace)
7384
pending_sources = nil
7485
@source_cache.each_value do |source_by_batch_params|

lib/graphql/dataloader/null_dataloader.rb

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,58 @@
22

33
module GraphQL
44
class Dataloader
5-
# The default implementation of dataloading -- all no-ops.
5+
# GraphQL-Ruby uses this when Dataloader isn't enabled.
66
#
7-
# The Dataloader interface isn't public, but it enables
8-
# simple internal code while adding the option to add Dataloader.
7+
# It runs execution code inline and gathers lazy objects (eg. Promises)
8+
# and resolves them during {#run}.
99
class NullDataloader < Dataloader
10-
# These are all no-ops because code was
11-
# executed synchronously.
10+
def initialize(*)
11+
@lazies_at_depth = Hash.new { |h,k| h[k] = [] }
12+
end
13+
14+
def freeze
15+
@lazies_at_depth.default_proc = nil
16+
@lazies_at_depth.freeze
17+
super
18+
end
19+
20+
def run(trace_query_lazy: nil)
21+
with_trace_query_lazy(trace_query_lazy) do
22+
while !@lazies_at_depth.empty?
23+
smallest_depth = nil
24+
@lazies_at_depth.each_key do |depth_key|
25+
smallest_depth ||= depth_key
26+
if depth_key < smallest_depth
27+
smallest_depth = depth_key
28+
end
29+
end
30+
31+
if smallest_depth
32+
lazies = @lazies_at_depth.delete(smallest_depth)
33+
lazies.each(&:value) # resolve these Lazy instances
34+
end
35+
end
36+
end
37+
end
38+
39+
def run_isolated
40+
new_dl = self.class.new
41+
res = nil
42+
new_dl.append_job {
43+
res = yield
44+
}
45+
new_dl.run
46+
res
47+
end
1248

13-
def initialize(*); end
14-
def run; end
15-
def run_isolated; yield; end
1649
def clear_cache; end
50+
1751
def yield(_source)
1852
raise GraphQL::Error, "GraphQL::Dataloader is not running -- add `use GraphQL::Dataloader` to your schema to use Dataloader sources."
1953
end
2054

21-
def append_job
22-
yield
55+
def append_job(callable = nil)
56+
callable ? callable.call : yield
2357
nil
2458
end
2559

lib/graphql/execution/interpreter.rb

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ def run_all(schema, query_options, context: {}, max_complexity: schema.max_compl
4242
trace.execute_multiplex(multiplex: multiplex) do
4343
schema = multiplex.schema
4444
queries = multiplex.queries
45-
lazies_at_depth = Hash.new { |h, k| h[k] = [] }
4645
multiplex_analyzers = schema.multiplex_analyzers
4746
if multiplex.max_complexity
4847
multiplex_analyzers += [GraphQL::Analysis::MaxQueryComplexity]
@@ -73,7 +72,7 @@ def run_all(schema, query_options, context: {}, max_complexity: schema.max_compl
7372
# Although queries in a multiplex _share_ an Interpreter instance,
7473
# they also have another item of state, which is private to that query
7574
# in particular, assign it here:
76-
runtime = Runtime.new(query: query, lazies_at_depth: lazies_at_depth)
75+
runtime = Runtime.new(query: query)
7776
query.context.namespace(:interpreter_runtime)[:runtime] = runtime
7877

7978
query.current_trace.execute_query(query: query) do
@@ -88,16 +87,7 @@ def run_all(schema, query_options, context: {}, max_complexity: schema.max_compl
8887
}
8988
end
9089

91-
multiplex.dataloader.run
92-
93-
# Then, work through lazy results in a breadth-first way
94-
multiplex.dataloader.append_job {
95-
query = multiplex.queries.length == 1 ? multiplex.queries[0] : nil
96-
multiplex.current_trace.execute_query_lazy(multiplex: multiplex, query: query) do
97-
Interpreter::Resolve.resolve_each_depth(lazies_at_depth, multiplex.dataloader)
98-
end
99-
}
100-
multiplex.dataloader.run
90+
multiplex.dataloader.run(trace_query_lazy: multiplex)
10191

10292
# Then, find all errors and assign the result to the query object
10393
results.each_with_index do |data_result, idx|

lib/graphql/execution/interpreter/resolve.rb

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,17 @@ class Interpreter
66
module Resolve
77
# Continue field results in `results` until there's nothing else to continue.
88
# @return [void]
9+
# @deprecated Call `dataloader.run` instead
910
def self.resolve_all(results, dataloader)
11+
warn "#{self}.#{__method__} is deprecated; Use `dataloader.run` instead.#{caller(1, 5).map { |l| "\n #{l}"}.join}"
1012
dataloader.append_job { resolve(results, dataloader) }
1113
nil
1214
end
1315

16+
# @deprecated Call `dataloader.run` instead
1417
def self.resolve_each_depth(lazies_at_depth, dataloader)
18+
warn "#{self}.#{__method__} is deprecated; Use `dataloader.run` instead.#{caller(1, 5).map { |l| "\n #{l}"}.join}"
19+
1520
smallest_depth = nil
1621
lazies_at_depth.each_key do |depth_key|
1722
smallest_depth ||= depth_key
@@ -34,20 +39,9 @@ def self.resolve_each_depth(lazies_at_depth, dataloader)
3439
nil
3540
end
3641

37-
# After getting `results` back from an interpreter evaluation,
38-
# continue it until you get a response-ready Ruby value.
39-
#
40-
# `results` is one level of _depth_ of a query or multiplex.
41-
#
42-
# Resolve all lazy values in that depth before moving on
43-
# to the next level.
44-
#
45-
# It's assumed that the lazies will
46-
# return {Lazy} instances if there's more work to be done,
47-
# or return {Hash}/{Array} if the query should be continued.
48-
#
49-
# @return [void]
42+
# @deprecated Call `dataloader.run` instead
5043
def self.resolve(results, dataloader)
44+
warn "#{self}.#{__method__} is deprecated; Use `dataloader.run` instead.#{caller(1, 5).map { |l| "\n #{l}"}.join}"
5145
# There might be pending jobs here that _will_ write lazies
5246
# into the result hash. We should run them out, so we
5347
# can be sure that all lazies will be present in the result hashes.

0 commit comments

Comments
 (0)