Skip to content

Commit ecbd662

Browse files
committed
Separate stream and priority logic to improve memory efficiency.
1 parent 2b5f3e9 commit ecbd662

File tree

8 files changed

+311
-251
lines changed

8 files changed

+311
-251
lines changed

lib/protocol/http2/connection.rb

Lines changed: 32 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
# THE SOFTWARE.
2020

2121
require_relative 'framer'
22-
require_relative 'flow_control'
22+
require_relative 'dependency'
2323

2424
require 'protocol/hpack'
2525

@@ -32,10 +32,13 @@ def initialize(framer, local_stream_id)
3232
super()
3333

3434
@state = :new
35+
36+
# Hash(Integer, Stream)
3537
@streams = {}
36-
@children = {}
3738

38-
@active = 0
39+
# Hash(Integer, Dependency)
40+
@dependency = Dependency.new(self, 0)
41+
@dependencies = {0 => @dependency}
3942

4043
@framer = framer
4144
@local_stream_id = local_stream_id
@@ -59,6 +62,10 @@ def parent
5962
nil
6063
end
6164

65+
def children
66+
@dependency.streams
67+
end
68+
6269
def [] id
6370
if id.zero?
6471
self
@@ -99,24 +106,18 @@ def closed?
99106
@state == :closed
100107
end
101108

102-
# The stream has become active (i.e. not idle or closed).
103-
def activate(stream)
104-
@active += 1
105-
end
106-
107-
# The stream is no longer active (i.e. has become closed).
108-
def deactivate(stream)
109-
@active -= 1
110-
end
111-
112-
# The number of active streams.
113-
def active_streams
114-
@active
109+
def delete(id)
110+
@streams.delete(id)
111+
112+
if dependency = @dependencies[id]
113+
dependency.delete! if dependency.irrelevant?
114+
end
115115
end
116116

117117
# Close the underlying framer and all streams.
118118
def close(error = nil)
119119
@framer.close
120+
# @framer = nil
120121

121122
@streams.each_value{|stream| stream.close(error)}
122123
end
@@ -139,26 +140,19 @@ def next_stream_id
139140
end
140141

141142
attr :streams
142-
attr :children
143-
144-
def add_child(stream)
145-
@children[stream.id] = stream
146-
end
143+
attr :dependencies
147144

148-
def remove_child(stream)
149-
@children.delete(stream.id)
150-
end
151-
152-
def exclusive_child(stream)
153-
stream.children = @children
154-
155-
@children.each_value do |child|
156-
child.dependent_id = stream.id
145+
# Fetch (or create) the flow control windows for the specified stream id.
146+
# @param id [Integer] the stream id.
147+
def dependency_for(id)
148+
@dependencies.fetch(id) do
149+
dependency = Dependency.new(self, id)
150+
151+
# TODO this might be irrelevant, if initially processing priority frame.
152+
@dependency.add_child(dependency)
153+
154+
@dependencies[id] = dependency
157155
end
158-
159-
@children = {stream.id => stream}
160-
161-
stream.dependent_id = 0
162156
end
163157

164158
# 6.8. GOAWAY
@@ -398,7 +392,7 @@ def receive_headers(frame)
398392
raise ProtocolError, "Invalid stream id: #{stream_id} <= #{@remote_stream_id}!"
399393
end
400394

401-
if self.active_streams.size < self.maximum_concurrent_streams
395+
if @streams.size < self.maximum_concurrent_streams
402396
stream = accept_stream(stream_id)
403397
@remote_stream_id = stream_id
404398

@@ -418,13 +412,8 @@ def send_priority(stream_id, priority)
418412

419413
# Sets the priority for an incoming stream.
420414
def receive_priority(frame)
421-
if stream = @streams[frame.stream_id]
422-
stream.receive_priority(frame)
423-
else
424-
# Stream doesn't exist yet.
425-
stream = accept_stream(frame.stream_id)
426-
stream.receive_priority(frame)
427-
end
415+
dependency = dependency_for(frame.stream_id)
416+
dependency.receive_priority(frame)
428417
end
429418

430419
def receive_push_promise(frame)
@@ -450,7 +439,7 @@ def receive_window_update(frame)
450439
rescue ProtocolError => error
451440
stream.send_reset_stream(error.code)
452441
end
453-
else
442+
elsif frame.stream_id > @remote_stream_id
454443
# Receiving any frame other than HEADERS or PRIORITY on a stream in this state MUST be treated as a connection error of type PROTOCOL_ERROR.
455444
raise ProtocolError, "Cannot update window of idle stream #{frame.stream_id}"
456445
end

lib/protocol/http2/dependency.rb

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
# Copyright, 2018, by Samuel G. D. Williams. <http://www.codeotaku.com>
2+
#
3+
# Permission is hereby granted, free of charge, to any person obtaining a copy
4+
# of this software and associated documentation files (the "Software"), to deal
5+
# in the Software without restriction, including without limitation the rights
6+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
# copies of the Software, and to permit persons to whom the Software is
8+
# furnished to do so, subject to the following conditions:
9+
#
10+
# The above copyright notice and this permission notice shall be included in
11+
# all copies or substantial portions of the Software.
12+
#
13+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
# THE SOFTWARE.
20+
21+
require_relative 'flow_control'
22+
23+
module Protocol
24+
module HTTP2
25+
DEFAULT_WEIGHT = 16
26+
27+
class Dependency
28+
def initialize(connection, id, dependent_id = 0, weight = DEFAULT_WEIGHT, children = nil)
29+
@connection = connection
30+
@id = id
31+
32+
# Stream priority:
33+
@dependent_id = dependent_id
34+
@weight = weight
35+
36+
# A cache of dependencies that have child.dependent_id = self.id
37+
@children = children
38+
end
39+
40+
def irrelevant?
41+
(@weight == DEFAULT_WEIGHT) && (@children.nil? || @children.empty?)
42+
end
43+
44+
def delete!
45+
@connection.dependencies.delete(@id)
46+
self.parent&.remove_child(self)
47+
end
48+
49+
# Cache of dependent children.
50+
attr_accessor :children
51+
52+
# The connection this stream belongs to.
53+
attr :connection
54+
55+
# Stream ID (odd for client initiated streams, even otherwise).
56+
attr :id
57+
58+
# The stream id that this stream depends on, according to the priority.
59+
attr_accessor :dependent_id
60+
61+
# The weight of the stream relative to other siblings.
62+
attr_accessor :weight
63+
64+
def stream
65+
@connection.streams[@id]
66+
end
67+
68+
def streams
69+
if @children
70+
# TODO this O(N) operation affects performance.
71+
# It would be better to maintain a sorted list of children streams.
72+
@children.map{|id, dependency| dependency.stream}.compact
73+
end
74+
end
75+
76+
def add_child(dependency)
77+
@children ||= {}
78+
@children[dependency.id] = dependency
79+
end
80+
81+
def remove_child(dependency)
82+
@children&.delete(dependency.id)
83+
end
84+
85+
def exclusive_child(parent)
86+
parent.children = @children
87+
88+
@children.each_value do |child|
89+
child.dependent_id = parent.id
90+
end
91+
92+
@children = {parent.id => parent}
93+
94+
parent.dependent_id = @id
95+
end
96+
97+
def parent(id = @dependent_id)
98+
@connection.dependency_for(id)
99+
end
100+
101+
def parent= dependency
102+
self.parent&.remove_child(self)
103+
104+
@dependent_id = dependency.id
105+
106+
dependency.add_child(self)
107+
end
108+
109+
def process_priority priority
110+
dependent_id = priority.stream_dependency
111+
112+
if dependent_id == @id
113+
raise ProtocolError, "Stream priority for stream id #{@id} cannot depend on itself!"
114+
end
115+
116+
@weight = priority.weight
117+
118+
if priority.exclusive
119+
self.parent&.remove_child(self)
120+
121+
self.parent(dependent_id).exclusive_child(self)
122+
elsif dependent_id != @dependent_id
123+
self.parent&.remove_child(self)
124+
125+
@dependent_id = dependent_id
126+
127+
self.parent.add_child(self)
128+
end
129+
end
130+
131+
# Change the priority of the stream both locally and remotely.
132+
def priority= priority
133+
send_priority(priority)
134+
process_priority(priority)
135+
end
136+
137+
# The current local priority of the stream.
138+
def priority(exclusive = false)
139+
Priority.new(exclusive, @dependent_id, @weight)
140+
end
141+
142+
def send_priority(priority)
143+
@connection.send_priority(@id, priority)
144+
end
145+
146+
def receive_priority(frame)
147+
self.process_priority(frame.unpack)
148+
end
149+
end
150+
end
151+
end

lib/protocol/http2/flow_control.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ def consume_window(size = self.available_size)
115115

116116
# Allow the current flow-controlled instance to use up the window:
117117
if !self.window_updated(size) and children = self.children
118-
children = children.values.sort_by(&:weight)
118+
children = children.sort_by(&:weight)
119119

120120
# This must always be at least >= `children.size`, since stream weight can't be 0.
121121
total = children.sum(&:weight)

0 commit comments

Comments
 (0)