Skip to content

Commit 2c74b98

Browse files
committed
Improve robustness of dependency tree.
The dependency tree grows unless pruned. We prune the dependency information when the stream is closed, and in addition, fix the tree manipulations to be robust in the face of missing stream dependencies.
1 parent 0c69ffa commit 2c74b98

File tree

4 files changed

+133
-76
lines changed

4 files changed

+133
-76
lines changed

lib/protocol/http2/connection.rb

Lines changed: 36 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,6 @@ def id
6363
0
6464
end
6565

66-
def parent
67-
nil
68-
end
69-
70-
def children
71-
@dependency.streams
72-
end
73-
7466
def [] id
7567
if id.zero?
7668
self
@@ -113,10 +105,7 @@ def closed?
113105

114106
def delete(id)
115107
@streams.delete(id)
116-
117-
if dependency = @dependencies[id]
118-
dependency.delete! if dependency.irrelevant?
119-
end
108+
@dependencies[id]&.delete!
120109
end
121110

122111
# Close the underlying framer and all streams.
@@ -145,20 +134,10 @@ def next_stream_id
145134
end
146135

147136
attr :streams
137+
148138
attr :dependencies
149139

150-
# Fetch (or create) the flow control windows for the specified stream id.
151-
# @param id [Integer] the stream id.
152-
def dependency_for(id)
153-
@dependencies.fetch(id) do
154-
dependency = Dependency.new(self, id)
155-
156-
# TODO this might be irrelevant, if initially processing priority frame.
157-
@dependency.add_child(dependency)
158-
159-
@dependencies[id] = dependency
160-
end
161-
end
140+
attr :dependency
162141

163142
# 6.8. GOAWAY
164143
# There is an inherent race condition between an endpoint starting new streams and the remote sending a GOAWAY frame. To deal with this case, the GOAWAY contains the stream identifier of the last peer-initiated stream that was or might be processed on the sending endpoint in this connection. For instance, if the server sends a GOAWAY frame, the identified stream is the highest-numbered stream initiated by the client.
@@ -415,10 +394,31 @@ def send_priority(stream_id, priority)
415394
write_frame(frame)
416395
end
417396

397+
def idle_stream_id?(id)
398+
if id.even?
399+
# Server-initiated streams are even.
400+
if @local_stream_id.even?
401+
id >= @local_stream_id
402+
else
403+
id > @remote_stream_id
404+
end
405+
elsif id.odd?
406+
# Client-initiated streams are odd.
407+
if @local_stream_id.odd?
408+
id >= @local_stream_id
409+
else
410+
id > @remote_stream_id
411+
end
412+
end
413+
end
414+
418415
# Sets the priority for an incoming stream.
419416
def receive_priority(frame)
420-
dependency = dependency_for(frame.stream_id)
421-
dependency.receive_priority(frame)
417+
if dependency = @dependencies[frame.stream_id]
418+
dependency.receive_priority(frame)
419+
elsif idle_stream_id?(frame.stream_id)
420+
Dependency.create(self, frame.stream_id, frame.unpack)
421+
end
422422
end
423423

424424
def receive_push_promise(frame)
@@ -472,6 +472,16 @@ def consume_window(size = self.available_size)
472472
# Return if there is no window to consume:
473473
return unless size > 0
474474

475+
# Console.logger.debug(self) do |buffer|
476+
# @dependencies.each do |id, dependency|
477+
# buffer.puts "- #{dependency}"
478+
# end
479+
#
480+
# buffer.puts
481+
#
482+
# @dependency.print_hierarchy(buffer)
483+
# end
484+
475485
@dependency.consume_window(size)
476486
end
477487

lib/protocol/http2/dependency.rb

Lines changed: 91 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,44 @@ module HTTP2
2323
DEFAULT_WEIGHT = 16
2424

2525
class Dependency
26-
def initialize(connection, id, dependent_id = 0, weight = DEFAULT_WEIGHT)
26+
def self.create(connection, id, priority = nil)
27+
weight = DEFAULT_WEIGHT
28+
exclusive = false
29+
30+
if priority
31+
if parent = connection.dependencies[priority.stream_dependency]
32+
exclusive = priority.exclusive
33+
end
34+
35+
weight = priority.weight
36+
end
37+
38+
if parent.nil?
39+
parent = connection.dependency
40+
end
41+
42+
dependency = self.new(connection, id, weight)
43+
44+
connection.dependencies[id] = dependency
45+
46+
if exclusive
47+
parent.exclusive_child(dependency)
48+
else
49+
parent.add_child(dependency)
50+
end
51+
52+
return dependency
53+
end
54+
55+
def initialize(connection, id, weight = DEFAULT_WEIGHT)
2756
@connection = connection
2857
@id = id
2958

30-
# Stream priority:
31-
@dependent_id = dependent_id
32-
@weight = weight
33-
59+
@parent = nil
3460
@children = nil
3561

62+
@weight = weight
63+
3664
# Cache of any associated stream:
3765
@stream = nil
3866

@@ -45,26 +73,17 @@ def <=> other
4573
@weight <=> other.weight
4674
end
4775

48-
def irrelevant?
49-
(@weight == DEFAULT_WEIGHT) && (@children.nil? || @children.empty?)
50-
end
51-
52-
def delete!
53-
@connection.dependencies.delete(@id)
54-
self.parent&.remove_child(self)
55-
end
56-
57-
# Cache of dependent children.
58-
attr_accessor :children
59-
6076
# The connection this stream belongs to.
6177
attr :connection
6278

6379
# Stream ID (odd for client initiated streams, even otherwise).
6480
attr :id
6581

66-
# The stream id that this stream depends on, according to the priority.
67-
attr_accessor :dependent_id
82+
# The parent dependency.
83+
attr_accessor :parent
84+
85+
# The dependent children.
86+
attr_accessor :children
6887

6988
# The weight of the stream relative to other siblings.
7089
attr_accessor :weight
@@ -77,10 +96,26 @@ def clear_cache!
7796
@ordered_children = nil
7897
end
7998

99+
def delete!
100+
@connection.dependencies.delete(@id)
101+
102+
@parent.remove_child(self)
103+
104+
@children&.each do |id, child|
105+
parent.add_child(child)
106+
end
107+
108+
@connection = nil
109+
@parent = nil
110+
@children = nil
111+
end
112+
80113
def add_child(dependency)
81114
@children ||= {}
82115
@children[dependency.id] = dependency
83116

117+
dependency.parent = self
118+
84119
self.clear_cache!
85120
end
86121

@@ -90,33 +125,24 @@ def remove_child(dependency)
90125
self.clear_cache!
91126
end
92127

128+
# An exclusive flag allows for the insertion of a new level of dependencies. The exclusive flag causes the stream to become the sole dependency of its parent stream, causing other dependencies to become dependent on the exclusive stream.
129+
# @param parent [Dependency] the dependency which will be inserted, taking control of all current children.
93130
def exclusive_child(parent)
94131
parent.children = @children
95-
parent.clear_cache!
96132

97-
@children.each_value do |child|
98-
child.dependent_id = parent.id
133+
@children&.each_value do |child|
134+
child.parent = parent
99135
end
100136

137+
parent.clear_cache!
138+
101139
@children = {parent.id => parent}
102140
self.clear_cache!
103141

104-
parent.dependent_id = @id
105-
end
106-
107-
def parent(id = @dependent_id)
108-
@connection.dependency_for(id)
109-
end
110-
111-
def parent= dependency
112-
self.parent&.remove_child(self)
113-
114-
@dependent_id = dependency.id
115-
116-
dependency.add_child(self)
142+
parent.parent = self
117143
end
118144

119-
def process_priority priority
145+
def process_priority(priority)
120146
dependent_id = priority.stream_dependency
121147

122148
if dependent_id == @id
@@ -125,18 +151,17 @@ def process_priority priority
125151

126152
@weight = priority.weight
127153

128-
if priority.exclusive
129-
self.parent&.remove_child(self)
130-
131-
self.parent(dependent_id).exclusive_child(self)
132-
elsif dependent_id != @dependent_id
133-
self.parent&.remove_child(self)
134-
135-
@dependent_id = dependent_id
136-
137-
self.parent.add_child(self)
138-
else
139-
self.parent&.clear_cache!
154+
# We essentially ignore `dependent_id` if the dependency does not exist:
155+
if parent = @connection.dependencies[dependent_id]
156+
if priority.exclusive
157+
@parent.remove_child(self)
158+
159+
parent.exclusive_child(self)
160+
elsif !@parent.equal?(parent)
161+
@parent.remove_child(self)
162+
163+
parent.add_child(self)
164+
end
140165
end
141166
end
142167

@@ -148,7 +173,7 @@ def priority= priority
148173

149174
# The current local priority of the stream.
150175
def priority(exclusive = false)
151-
Priority.new(exclusive, @dependent_id, @weight)
176+
Priority.new(exclusive, @parent.id, @weight)
152177
end
153178

154179
def send_priority(priority)
@@ -159,6 +184,12 @@ def receive_priority(frame)
159184
self.process_priority(frame.unpack)
160185
end
161186

187+
def total_weight
188+
self.orderd_children
189+
190+
return @total_weight
191+
end
192+
162193
def ordered_children
163194
unless @ordered_children
164195
if @children and !@children.empty?
@@ -186,6 +217,17 @@ def consume_window(size)
186217
child.consume_window(allocated) if allocated > 0
187218
end
188219
end
220+
221+
def to_s
222+
"\#<#{self.class} id=#{@id} parent id=#{@parent&.id} weight=#{@weight} #{@children&.size || 0} children>"
223+
end
224+
225+
def print_hierarchy(buffer, indent: 0)
226+
buffer.puts "#{" " * indent}#{self}"
227+
@children&.each_value do |child|
228+
child.print_hierarchy(buffer, indent: indent+1)
229+
end
230+
end
189231
end
190232
end
191233
end

lib/protocol/http2/stream.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def initialize(connection, id, state = :idle)
9191
@local_window = Window.new(@connection.local_settings.initial_window_size)
9292
@remote_window = Window.new(@connection.remote_settings.initial_window_size)
9393

94-
@dependency = @connection.dependency_for(@id)
94+
@dependency = Dependency.create(@connection, @id)
9595
end
9696

9797
# The connection this stream belongs to.

spec/protocol/http2/dependency_spec.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@
5050
# /
5151
# b
5252
begin
53+
a.priority = a.priority
54+
server.read_frame
55+
5356
priority = b.priority
5457
priority.stream_dependency = a.id
5558
b.priority = priority
@@ -58,6 +61,8 @@
5861
end
5962

6063
expect(a.dependency.children).to be == {b.id => b.dependency}
64+
65+
expect(server.dependencies).to include(a.id)
6166
expect(server.dependencies[a.id].children).to be == {b.id => server.dependencies[b.id]}
6267

6368
# a

0 commit comments

Comments
 (0)