Skip to content

Commit 0a6f811

Browse files
committed
Rework window handling improving both correctness and efficiency.
1 parent 7f2ddc9 commit 0a6f811

File tree

7 files changed

+94
-111
lines changed

7 files changed

+94
-111
lines changed

lib/protocol/http2/connection.rb

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@
2020

2121
require_relative 'framer'
2222
require_relative 'dependency'
23+
require_relative 'flow_controlled'
2324

2425
require 'protocol/hpack'
2526

2627
module Protocol
2728
module HTTP2
2829
class Connection
29-
include FlowControl
30+
include FlowControlled
3031

3132
def initialize(framer, local_stream_id)
3233
super()
@@ -465,6 +466,15 @@ def receive_reset_stream(frame)
465466
end
466467
end
467468

469+
# Traverse active streams in order of priority and allow them to consume the available flow-control window.
470+
# @param amount [Integer] the amount of data to write. Defaults to the current window capacity.
471+
def consume_window(size = self.available_size)
472+
# Return if there is no window to consume:
473+
return unless size > 0
474+
475+
@dependency.consume_window(size)
476+
end
477+
468478
def receive_window_update(frame)
469479
if frame.connection?
470480
super

lib/protocol/http2/dependency.rb

Lines changed: 52 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,31 @@
1818
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
1919
# THE SOFTWARE.
2020

21-
require_relative 'flow_control'
22-
2321
module Protocol
2422
module HTTP2
2523
DEFAULT_WEIGHT = 16
2624

2725
class Dependency
28-
def initialize(connection, id, dependent_id = 0, weight = DEFAULT_WEIGHT, children = nil)
26+
def initialize(connection, id, dependent_id = 0, weight = DEFAULT_WEIGHT)
2927
@connection = connection
3028
@id = id
3129

3230
# Stream priority:
3331
@dependent_id = dependent_id
3432
@weight = weight
3533

36-
# A cache of dependencies that have child.dependent_id = self.id
37-
@children = children
34+
@children = nil
35+
36+
# Cache of any associated stream:
37+
@stream = nil
38+
39+
# Cache of children for window allocation:
40+
@total_weight = 0
41+
@ordered_children = nil
42+
end
43+
44+
def <=> other
45+
@weight <=> other.weight
3846
end
3947

4048
def irrelevant?
@@ -62,34 +70,36 @@ def delete!
6270
attr_accessor :weight
6371

6472
def stream
65-
@connection.streams[@id]
73+
@stream ||= @connection.streams[@id]
6674
end
6775

68-
def streams
69-
if @children
70-
# TODO this O(N) operation affects performance.
71-
# It would be better to maintain a sorted list of children streams.
72-
@children.map{|id, dependency| dependency.stream}.compact
73-
end
76+
def clear_cache!
77+
@ordered_children = nil
7478
end
7579

7680
def add_child(dependency)
7781
@children ||= {}
7882
@children[dependency.id] = dependency
83+
84+
self.clear_cache!
7985
end
8086

8187
def remove_child(dependency)
8288
@children&.delete(dependency.id)
89+
90+
self.clear_cache!
8391
end
8492

8593
def exclusive_child(parent)
8694
parent.children = @children
95+
parent.clear_cache!
8796

8897
@children.each_value do |child|
8998
child.dependent_id = parent.id
9099
end
91100

92101
@children = {parent.id => parent}
102+
self.clear_cache!
93103

94104
parent.dependent_id = @id
95105
end
@@ -125,6 +135,8 @@ def process_priority priority
125135
@dependent_id = dependent_id
126136

127137
self.parent.add_child(self)
138+
else
139+
self.parent&.clear_cache!
128140
end
129141
end
130142

@@ -146,6 +158,34 @@ def send_priority(priority)
146158
def receive_priority(frame)
147159
self.process_priority(frame.unpack)
148160
end
161+
162+
def ordered_children
163+
unless @ordered_children
164+
if @children and !@children.empty?
165+
@ordered_children = @children.values.sort
166+
@total_weight = @ordered_children.sum(&:weight)
167+
end
168+
end
169+
170+
return @ordered_children
171+
end
172+
173+
# Traverse active streams in order of priority and allow them to consume the available flow-control window.
174+
# @param amount [Integer] the amount of data to write. Defaults to the current window capacity.
175+
def consume_window(size)
176+
# If there is an associated stream, give it priority:
177+
if stream = self.stream
178+
return if stream.window_updated(size)
179+
end
180+
181+
# Otherwise, allow the dependent children to use up the available window:
182+
self.ordered_children&.each do |child|
183+
# Compute the proportional allocation:
184+
allocated = (child.weight * size) / @total_weight
185+
186+
child.consume_window(allocated) if allocated > 0
187+
end
188+
end
149189
end
150190
end
151191
end

lib/protocol/http2/flow_control.rb renamed to lib/protocol/http2/flow_controlled.rb

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
module Protocol
2525
module HTTP2
26-
module FlowControl
26+
module FlowControlled
2727
def available_size
2828
@remote_window.available
2929
end
@@ -98,35 +98,9 @@ def receive_window_update(frame)
9898
# The window has been expanded by the given amount.
9999
# @param size [Integer] the maximum amount of data to send.
100100
# @return [Boolean] whether the window update was used or not.
101-
def window_updated(size = self.available_size)
101+
def window_updated(size)
102102
return false
103103
end
104-
105-
# Traverse active streams in order of priority and allow them to consume the available flow-control window.
106-
# @todo This function can get slow when there are a lot of children [INEFFICIENT].
107-
# @param amount [Integer] the amount of data to write. Defaults to the current window capacity.
108-
def consume_window(size = self.available_size)
109-
# Don't consume more than the available window size:
110-
size = [self.available_size, size].min
111-
# puts "consume_window(#{size}) local_window=#{@local_window} remote_window=#{@remote_window}"
112-
113-
# Return if there is no window to consume:
114-
return unless size > 0
115-
116-
# Allow the current flow-controlled instance to use up the window:
117-
if !self.window_updated(size) and children = self.children
118-
children = children.sort_by(&:weight)
119-
120-
# This must always be at least >= `children.size`, since stream weight can't be 0.
121-
total = children.sum(&:weight)
122-
123-
children.each do |child|
124-
# Compute the proportional allocation:
125-
allocated = (child.weight * size) / total
126-
child.consume_window(allocated)
127-
end
128-
end
129-
end
130104
end
131105
end
132106
end

lib/protocol/http2/stream.rb

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ module HTTP2
7272
#
7373
# State transition methods use a trailing "!".
7474
class Stream
75-
include FlowControl
75+
include FlowControlled
7676

7777
def self.create(connection, id)
7878
stream = self.new(connection, id)
@@ -124,10 +124,6 @@ def parent=(stream)
124124
@dependency.parent = stream.dependency
125125
end
126126

127-
def children
128-
@dependency&.streams
129-
end
130-
131127
# The stream is being closed because the connection is being closed.
132128
def close(error = nil)
133129
end

lib/protocol/http2/version.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,6 @@
2020

2121
module Protocol
2222
module HTTP2
23-
VERSION = "0.11.3"
23+
VERSION = "0.11.4"
2424
end
2525
end

spec/protocol/http2/dependency_spec.rb

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
server.read_frame
5858
end
5959

60-
expect(a.children).to be == [b]
60+
expect(a.dependency.children).to be == {b.id => b.dependency}
6161
expect(server.dependencies[a.id].children).to be == {b.id => server.dependencies[b.id]}
6262

6363
# a
@@ -71,7 +71,7 @@
7171
server.read_frame
7272
end
7373

74-
expect(a.children).to be == [b, c]
74+
expect(a.dependency.children).to be == {b.id => b.dependency, c.id => c.dependency}
7575
expect(server.dependencies[a.id].children).to be == {b.id => server.dependencies[b.id], c.id => server.dependencies[c.id]}
7676

7777
# a
@@ -88,9 +88,32 @@
8888
server.read_frame
8989
end
9090

91-
expect(a.children).to be == [d]
92-
expect(d.children).to be == [b, c]
91+
expect(a.dependency.children).to be == {d.id => d.dependency}
92+
expect(d.dependency.children).to be == {b.id => b.dependency, c.id => c.dependency}
9393
expect(server.dependencies[a.id].children).to be == {d.id => server.dependencies[d.id]}
9494
expect(server.dependencies[d.id].children).to be == {b.id => server.dependencies[b.id], c.id => server.dependencies[c.id]}
9595
end
96+
97+
it "correctly allocates window" do
98+
parent = client.create_stream
99+
children = 2.times.collect {client.create_stream}
100+
101+
children.each do |child|
102+
priority = child.priority
103+
priority.stream_dependency = parent.id
104+
child.priority = priority
105+
end
106+
107+
2.times {server.read_frame}
108+
109+
# Now we have this prioritization on the server:
110+
# a
111+
# / \
112+
# b c
113+
114+
expect(server.dependencies[children[0].id]).to receive(:consume_window).with(0xFFFF / 2).once
115+
expect(server.dependencies[children[1].id]).to receive(:consume_window).with(0xFFFF / 2).once
116+
117+
server.consume_window
118+
end
96119
end

spec/protocol/http2/flow_control_spec.rb

Lines changed: 0 additions & 60 deletions
This file was deleted.

0 commit comments

Comments
 (0)