Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5ba154a
Extract dataloader changes from run-queue-3; merge lazy resolution in…
rmosolgo Aug 26, 2025
e79e04b
fix file endings
rmosolgo Sep 2, 2025
4d57c4d
Handle ExecutionErrors raised from leaf coercion + dataloader
rmosolgo Sep 2, 2025
43e8eb3
Fix stale references to lazies_at_depth; correctly re-enter execution…
rmosolgo Sep 2, 2025
e9dffaa
Quick test fixes
rmosolgo Sep 2, 2025
bb3f2da
Add deprecations to Execution::Resolve; better lazy/Dataloader integr…
rmosolgo Sep 2, 2025
e901a86
Debug mutation test
rmosolgo Sep 2, 2025
e9f8dc2
Update spec for non-depth-first behavior with FlatDataloader
rmosolgo Sep 2, 2025
4fb6307
Fix context for argument errors
rmosolgo Sep 2, 2025
c2257b8
Update specs for resolution order
rmosolgo Sep 2, 2025
fc40677
Add Lazy handling to AsyncDataloader
rmosolgo Sep 2, 2025
ba2712d
Fix initialization; rebuild snapshot
rmosolgo Sep 2, 2025
3493129
Remove steps_to_rerun_after_lazy since it's not used yet
rmosolgo Sep 2, 2025
78bd98f
Clean up lazies_at_depth, DRY FlatDataloader
rmosolgo Sep 2, 2025
d34262b
Update assertions
rmosolgo Sep 2, 2025
5c43bcc
Update more specs
rmosolgo Sep 2, 2025
2b9f736
Put back missing require
rmosolgo Sep 2, 2025
889bcf6
Add Lazy resolution to NullDataloader and remove FlatDataloader
rmosolgo Sep 3, 2025
d406d39
Use a fresh NullDataloader instance for run_isolated to support froze…
rmosolgo Sep 3, 2025
7834e2b
Revert some changes to test output
rmosolgo Sep 3, 2025
f5a87f3
Merge branch 'master' into merge-dataloader-and-lazy
rmosolgo Sep 3, 2025
add47cc
Update breadth-first spec
rmosolgo Sep 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 83 additions & 23 deletions lib/graphql/dataloader.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# frozen_string_literal: true

require "graphql/dataloader/flat_dataloader"
require "graphql/dataloader/null_dataloader"
require "graphql/dataloader/request"
require "graphql/dataloader/request_all"
Expand Down Expand Up @@ -64,8 +65,14 @@ def initialize(nonblocking: self.class.default_nonblocking, fiber_limit: self.cl
@nonblocking = nonblocking
end
@fiber_limit = fiber_limit
@steps_to_rerun_after_lazy = []
@lazies_at_depth = nil
end

attr_accessor :lazies_at_depth

attr_reader :steps_to_rerun_after_lazy

# @return [Integer, nil]
attr_reader :fiber_limit

Expand Down Expand Up @@ -140,10 +147,10 @@ def yield(source = Fiber[:__graphql_current_dataloader_source])
end

# @api private Nothing to see here
def append_job(&job)
def append_job(callable = nil, &job)
# Given a block, queue it up to be worked through when `#run` is called.
# (If the dataloader is already running, than a Fiber will pick this up later.)
@pending_jobs.push(job)
# (If the dataloader is already running, then a Fiber will pick this up later.)
@pending_jobs.push(callable || job)
nil
end

Expand Down Expand Up @@ -188,7 +195,10 @@ def run_isolated
end
end

def run
# @param trace_query_lazy [nil, Execution::Multiplex]
def run(trace_query_lazy: nil)
# TODO unify the initialization lazies_at_depth
@lazies_at_depth ||= Hash.new { |h, k| h[k] = [] }
trace = Fiber[:__graphql_current_multiplex]&.current_trace
jobs_fiber_limit, total_fiber_limit = calculate_fiber_limit
job_fibers = []
Expand All @@ -201,26 +211,18 @@ def run
while first_pass || !job_fibers.empty?
first_pass = false

while (f = (job_fibers.shift || (((next_job_fibers.size + job_fibers.size) < jobs_fiber_limit) && spawn_job_fiber(trace))))
if f.alive?
finished = run_fiber(f)
if !finished
next_job_fibers << f
end
end
end
join_queues(job_fibers, next_job_fibers)

while (!source_fibers.empty? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) })
while (f = source_fibers.shift || (((job_fibers.size + source_fibers.size + next_source_fibers.size + next_job_fibers.size) < total_fiber_limit) && spawn_source_fiber(trace)))
if f.alive?
finished = run_fiber(f)
if !finished
next_source_fibers << f
end
end
run_pending_steps(trace, job_fibers, next_job_fibers, jobs_fiber_limit, source_fibers, next_source_fibers, total_fiber_limit)

if !@lazies_at_depth.empty?
with_trace_query_lazy(trace_query_lazy) do
run_next_pending_lazies(job_fibers, trace)
run_pending_steps(trace, job_fibers, next_job_fibers, jobs_fiber_limit, source_fibers, next_source_fibers, total_fiber_limit)
end
join_queues(source_fibers, next_source_fibers)
elsif !@steps_to_rerun_after_lazy.empty?
@pending_jobs.concat(@steps_to_rerun_after_lazy)
f = spawn_job_fiber(trace)
job_fibers << f
@steps_to_rerun_after_lazy.clear
end
end

Expand Down Expand Up @@ -248,6 +250,11 @@ def run_fiber(f)
f.resume
end

# @api private
def lazy_at_depth(depth, lazy)
@lazies_at_depth[depth] << lazy
end

def spawn_fiber
fiber_vars = get_fiber_variables
Fiber.new(blocking: !@nonblocking) {
Expand Down Expand Up @@ -275,6 +282,59 @@ def merge_records(records, index_by: :id)

private

def run_next_pending_lazies(job_fibers, trace)
smallest_depth = nil
@lazies_at_depth.each_key do |depth_key|
smallest_depth ||= depth_key
if depth_key < smallest_depth
smallest_depth = depth_key
end
end

if smallest_depth
lazies = @lazies_at_depth.delete(smallest_depth)
if !lazies.empty?
lazies.each_with_index do |l, idx|
append_job { l.value }
end
job_fibers.unshift(spawn_job_fiber(trace))
end
end
end

def run_pending_steps(trace, job_fibers, next_job_fibers, jobs_fiber_limit, source_fibers, next_source_fibers, total_fiber_limit)
while (f = (job_fibers.shift || (((next_job_fibers.size + job_fibers.size) < jobs_fiber_limit) && spawn_job_fiber(trace))))
if f.alive?
finished = run_fiber(f)
if !finished
next_job_fibers << f
end
end
end
join_queues(job_fibers, next_job_fibers)

while (!source_fibers.empty? || @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) })
while (f = source_fibers.shift || (((job_fibers.size + source_fibers.size + next_source_fibers.size + next_job_fibers.size) < total_fiber_limit) && spawn_source_fiber(trace)))
if f.alive?
finished = run_fiber(f)
if !finished
next_source_fibers << f
end
end
end
join_queues(source_fibers, next_source_fibers)
end
end

def with_trace_query_lazy(multiplex_or_nil, &block)
if (multiplex = multiplex_or_nil)
query = multiplex.queries.length == 1 ? multiplex.queries[0] : nil
multiplex.current_trace.execute_query_lazy(query: query, multiplex: multiplex, &block)
else
yield
end
end

def calculate_fiber_limit
total_fiber_limit = @fiber_limit || Float::INFINITY
if total_fiber_limit < 4
Expand Down
2 changes: 1 addition & 1 deletion lib/graphql/dataloader/async_dataloader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def yield(source = Fiber[:__graphql_current_dataloader_source])
nil
end

def run
def run(trace_query_lazy: nil)
trace = Fiber[:__graphql_current_multiplex]&.current_trace
jobs_fiber_limit, total_fiber_limit = calculate_fiber_limit
job_fibers = []
Expand Down
87 changes: 87 additions & 0 deletions lib/graphql/dataloader/flat_dataloader.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# frozen_string_literal: true
module GraphQL
class Dataloader
class FlatDataloader < Dataloader
def initialize(*)
# TODO unify the initialization lazies_at_depth
@lazies_at_depth ||= Hash.new { |h, k| h[k] = [] }
@steps_to_rerun_after_lazy = []
@queue = []
end

def run(trace_query_lazy: nil)
while [email protected]?
run_pending_steps
with_trace_query_lazy(trace_query_lazy) do
while @lazies_at_depth&.any?
run_next_pending_lazies
run_pending_steps
end
end

if !@steps_to_rerun_after_lazy.empty?
@steps_to_rerun_after_lazy.each(&:call)
@steps_to_rerun_after_lazy.clear
end
end
end

def run_isolated
prev_queue = @queue
prev_stral = @steps_to_rerun_after_lazy
prev_lad = @lazies_at_depth
@steps_to_rerun_after_lazy = []
@queue = []
@lazies_at_depth = @lazies_at_depth.dup&.clear
res = nil
append_job {
res = yield
}
run
res
ensure
@queue = prev_queue
@steps_to_rerun_after_lazy = prev_stral
@lazies_at_depth = prev_lad
end

def clear_cache; end

def yield(_source)
raise GraphQL::Error, "GraphQL::Dataloader is not running -- add `use GraphQL::Dataloader` to your schema to use Dataloader sources."
end

def append_job(callable = nil, &block)
@queue << (callable || block)
nil
end

def with(*)
raise GraphQL::Error, "GraphQL::Dataloader is not running -- add `use GraphQL::Dataloader` to your schema to use Dataloader sources."
end

private

def run_next_pending_lazies
smallest_depth = nil
@lazies_at_depth.each_key do |depth_key|
smallest_depth ||= depth_key
if depth_key < smallest_depth
smallest_depth = depth_key
end
end

if smallest_depth
lazies = @lazies_at_depth.delete(smallest_depth)
lazies.each(&:value) # resolve these Lazy instances
end
end

def run_pending_steps
while (step = @queue.shift)
step.call
end
end
end
end
end
6 changes: 3 additions & 3 deletions lib/graphql/dataloader/null_dataloader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ class NullDataloader < Dataloader
# executed synchronously.

def initialize(*); end
def run; end
def run(trace_query_lazy: nil); end
def run_isolated; yield; end
def clear_cache; end
def yield(_source)
raise GraphQL::Error, "GraphQL::Dataloader is not running -- add `use GraphQL::Dataloader` to your schema to use Dataloader sources."
end

def append_job
yield
def append_job(callable = nil)
callable ? callable.call : yield
nil
end

Expand Down
13 changes: 2 additions & 11 deletions lib/graphql/execution/interpreter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
require "graphql/execution/interpreter/arguments_cache"
require "graphql/execution/interpreter/execution_errors"
require "graphql/execution/interpreter/runtime"
require "graphql/execution/interpreter/resolve"
require "graphql/execution/interpreter/handles_raw_value"

module GraphQL
Expand Down Expand Up @@ -43,6 +42,7 @@ def run_all(schema, query_options, context: {}, max_complexity: schema.max_compl
schema = multiplex.schema
queries = multiplex.queries
lazies_at_depth = Hash.new { |h, k| h[k] = [] }
multiplex.dataloader.lazies_at_depth = lazies_at_depth
multiplex_analyzers = schema.multiplex_analyzers
if multiplex.max_complexity
multiplex_analyzers += [GraphQL::Analysis::MaxQueryComplexity]
Expand Down Expand Up @@ -88,16 +88,7 @@ def run_all(schema, query_options, context: {}, max_complexity: schema.max_compl
}
end

multiplex.dataloader.run

# Then, work through lazy results in a breadth-first way
multiplex.dataloader.append_job {
query = multiplex.queries.length == 1 ? multiplex.queries[0] : nil
multiplex.current_trace.execute_query_lazy(multiplex: multiplex, query: query) do
Interpreter::Resolve.resolve_each_depth(lazies_at_depth, multiplex.dataloader)
end
}
multiplex.dataloader.run
multiplex.dataloader.run(trace_query_lazy: multiplex)

# Then, find all errors and assign the result to the query object
results.each_with_index do |data_result, idx|
Expand Down
20 changes: 7 additions & 13 deletions lib/graphql/execution/interpreter/resolve.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ class Interpreter
module Resolve
# Continue field results in `results` until there's nothing else to continue.
# @return [void]
# @deprecated Call `dataloader.run` instead
def self.resolve_all(results, dataloader)
warn "#{self}.#{__method__} is deprecated; Use `dataloader.run` instead.#{caller(1, 5).map { |l| "\n #{l}"}.join}"
dataloader.append_job { resolve(results, dataloader) }
nil
end

# @deprecated Call `dataloader.run` instead
def self.resolve_each_depth(lazies_at_depth, dataloader)
warn "#{self}.#{__method__} is deprecated; Use `dataloader.run` instead.#{caller(1, 5).map { |l| "\n #{l}"}.join}"

smallest_depth = nil
lazies_at_depth.each_key do |depth_key|
smallest_depth ||= depth_key
Expand All @@ -34,20 +39,9 @@ def self.resolve_each_depth(lazies_at_depth, dataloader)
nil
end

# After getting `results` back from an interpreter evaluation,
# continue it until you get a response-ready Ruby value.
#
# `results` is one level of _depth_ of a query or multiplex.
#
# Resolve all lazy values in that depth before moving on
# to the next level.
#
# It's assumed that the lazies will
# return {Lazy} instances if there's more work to be done,
# or return {Hash}/{Array} if the query should be continued.
#
# @return [void]
# @deprecated Call `dataloader.run` instead
def self.resolve(results, dataloader)
warn "#{self}.#{__method__} is deprecated; Use `dataloader.run` instead.#{caller(1, 5).map { |l| "\n #{l}"}.join}"
# There might be pending jobs here that _will_ write lazies
# into the result hash. We should run them out, so we
# can be sure that all lazies will be present in the result hashes.
Expand Down
13 changes: 8 additions & 5 deletions lib/graphql/execution/interpreter/runtime.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def initialize(query:, lazies_at_depth:)
@query = query
@current_trace = query.current_trace
@dataloader = query.multiplex.dataloader
@lazies_at_depth = lazies_at_depth
@schema = query.schema
@context = query.context
@response = nil
Expand Down Expand Up @@ -446,7 +445,7 @@ def evaluate_selection_with_resolved_keyword_args(kwarg_arguments, resolved_argu
}
end

field_result = call_method_on_directives(:resolve, object, directives) do
call_method_on_directives(:resolve, object, directives) do
if !directives.empty?
# This might be executed in a different context; reset this info
runtime_state = get_current_runtime_state
Expand Down Expand Up @@ -488,7 +487,7 @@ def evaluate_selection_with_resolved_keyword_args(kwarg_arguments, resolved_argu
# all of its child fields before moving on to the next root mutation field.
# (Subselections of this mutation will still be resolved level-by-level.)
if selection_result.graphql_is_eager
Interpreter::Resolve.resolve_all([field_result], @dataloader)
@dataloader.run
end
end

Expand Down Expand Up @@ -667,7 +666,11 @@ def continue_field(value, owner_type, field, current_type, ast_node, next_select
rescue GraphQL::ExecutionError => ex_err
return continue_value(ex_err, field, is_non_null, ast_node, result_name, selection_result)
rescue StandardError => err
query.handle_or_reraise(err)
begin
query.handle_or_reraise(err)
rescue GraphQL::ExecutionError => ex_err
return continue_value(ex_err, field, is_non_null, ast_node, result_name, selection_result)
end
end
set_result(selection_result, result_name, r, false, is_non_null)
r
Expand Down Expand Up @@ -928,7 +931,7 @@ def after_lazy(lazy_obj, field:, owner_object:, arguments:, ast_node:, result:,
current_depth += 1
result = result.graphql_parent
end
@lazies_at_depth[current_depth] << lazy
@dataloader.lazy_at_depth(current_depth, lazy)
lazy
end
else
Expand Down
Loading
Loading