Skip to content

Commit fd841da

Browse files
km-64jell-o-fishi
andauthored
Fix stream id generation (#331)
* Use python do-while loop equivalent in stream id allocation * Decrement _first_stream_id by 2 to retain behaviour of first_stream_id * Mask _self_stream_id after decrementing in __init__ * Remove comment and invert logic for do-while behaviour * Add test case for issue #330 --------- Co-authored-by: jell-o-fishi <[email protected]>
1 parent b16e727 commit fd841da

File tree

2 files changed

+22
-4
lines changed

2 files changed

+22
-4
lines changed

rsocket/stream_control.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,23 +13,27 @@
1313

1414
class StreamControl:
1515
def __init__(self, first_stream_id: int):
16-
self._first_stream_id = first_stream_id
16+
self._first_stream_id = (first_stream_id - 2) & MAX_STREAM_ID
1717
self._current_stream_id = self._first_stream_id
1818
self._streams: Dict[int, StreamHandler] = {}
1919
self._maximum_stream_id = MAX_STREAM_ID
2020

2121
def allocate_stream(self) -> int:
2222
attempt_counter = 0
2323

24-
while (self._current_stream_id == CONNECTION_STREAM_ID
25-
or self._current_stream_id in self._streams):
26-
24+
available_stream_id_found = False
25+
while not available_stream_id_found:
2726
if attempt_counter > self._maximum_stream_id / 2:
2827
raise RSocketStreamAllocationFailure()
2928

3029
self._increment_stream_id()
3130
attempt_counter += 1
3231

32+
available_stream_id_found = not (
33+
self._current_stream_id == CONNECTION_STREAM_ID
34+
or self._current_stream_id in self._streams
35+
)
36+
3337
return self._current_stream_id
3438

3539
def _increment_stream_id(self):

tests/rsocket/test_stream_control.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,20 @@ def test_stream_control_reuse_old_stream_ids():
6666
assert next_stream == 5
6767

6868

69+
@pytest.mark.parametrize('first_stream_id', (1, 2))
70+
def test_stream_id_increments_after_allocation_and_registration_followed_by_finishing(first_stream_id: int):
71+
control = StreamControl(first_stream_id)
72+
dummy_stream = object()
73+
74+
allocated_id = control.allocate_stream()
75+
control.register_stream(allocated_id, dummy_stream)
76+
77+
control.finish_stream(allocated_id)
78+
new_allocated_id = control.allocate_stream()
79+
80+
assert new_allocated_id != allocated_id
81+
82+
6983
def test_stream_in_use():
7084
control = StreamControl(1)
7185

0 commit comments

Comments
 (0)