Skip to content

Commit aa8a51a

Browse files
authored
Merge pull request #93 from rsocket/quic_transport
fix concurrent frames in quic and http3
2 parents c5ca3aa + db28386 commit aa8a51a

27 files changed

+38
-34
lines changed

.github/workflows/python-package.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ jobs:
3636
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
3737
- name: Test with pytest
3838
run: |
39-
pytest -n 1 --cov-report=html --cov --ignore=examples tests
39+
pytest -n 4 --cov-report=html --cov --ignore=examples tests
4040
- name: Archive code coverage html report
4141
uses: actions/upload-artifact@v2
4242
with:

examples/tutorial/reactivex/chat_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ async def download(self, file_name):
129129
)
130130

131131
return await ReactiveXClient(self._rsocket).request_response(request).pipe(
132-
operators.map(lambda _:_.data),
132+
operators.map(lambda _: _.data),
133133
operators.last()
134134
)
135135

examples/tutorial/reactivex/chat_server.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from dataclasses import dataclass, field
88
from typing import Dict, Optional, Set, Callable
99
from weakref import WeakValueDictionary, WeakSet
10+
1011
import reactivex
1112
from more_itertools import first
1213
from reactivex import Observable, operators, Subject, Observer

examples/tutorial/step4/chat_server.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@
88
from typing import Dict, Optional, Set, Awaitable
99
from weakref import WeakValueDictionary, WeakSet
1010

11-
from examples.tutorial.step4.models import (Message, chat_filename_mimetype, dataclass_to_payload)
1211
from more_itertools import first
12+
13+
from examples.tutorial.step4.models import (Message, chat_filename_mimetype, dataclass_to_payload)
1314
from reactivestreams.publisher import DefaultPublisher, Publisher
1415
from reactivestreams.subscriber import Subscriber
1516
from reactivestreams.subscription import DefaultSubscription

examples/tutorial/step6/chat_client.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ async def send_statistics(self):
100100
await self._rsocket.fire_and_forget(payload)
101101

102102
def listen_for_statistics(self) -> StatisticsHandler:
103-
104103
self._statistics_subscriber = StatisticsHandler()
105104
self._rsocket.request_channel(Payload(metadata=composite(
106105
route('statistics')
@@ -152,7 +151,6 @@ async def main():
152151
async with RSocketClient(single_transport_provider(TransportTCP(*connection2)),
153152
metadata_encoding=WellKnownMimeTypes.MESSAGE_RSOCKET_COMPOSITE_METADATA,
154153
fragment_size_bytes=1_000_000) as client2:
155-
156154
user1 = ChatClient(client1)
157155
user2 = ChatClient(client2)
158156

examples/tutorial/step6/chat_server.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from dataclasses import dataclass, field
88
from typing import Dict, Optional, Set, Awaitable, Tuple
99
from weakref import WeakValueDictionary, WeakSet
10+
1011
from more_itertools import first
1112

1213
from examples.tutorial.step6.models import (Message, chat_filename_mimetype, ClientStatistics, ServerStatisticsRequest,

examples/tutorial/step7/chat_server.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from dataclasses import dataclass, field
88
from typing import Dict, Optional, Set, Awaitable, Tuple
99
from weakref import WeakValueDictionary, WeakSet
10+
1011
from more_itertools import first
1112

1213
from examples.tutorial.step6.models import (Message, chat_filename_mimetype, ClientStatistics, ServerStatisticsRequest,
@@ -29,6 +30,7 @@
2930
class SessionId(str): # allow weak reference
3031
pass
3132

33+
3234
@dataclass()
3335
class UserSessionData:
3436
username: str

rsocket/frame.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -759,5 +759,6 @@ def serialize_with_frame_size_header(frame: Frame) -> bytes:
759759
RequestChannelFrame: 10,
760760
}
761761

762+
762763
def get_header_length(frame: FragmentableFrame) -> int:
763764
return frame_header_length[frame.__class__]

rsocket/frame_helpers.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
to avoid circular dependencies.
44
"""
55

6-
76
import struct
87
from typing import Union, Tuple, Optional
98

rsocket/rsocket_base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -381,8 +381,8 @@ async def _handle_next_frame(self, frame: Frame, async_frame_handler_by_type):
381381
elif self._stream_control.handle_stream(complete_frame):
382382
return
383383
else:
384-
logger().debug('%s: Dropping frame from unknown stream %d', self._log_identifier(),
385-
complete_frame.stream_id)
384+
logger().warning('%s: Dropping frame from unknown stream %d', self._log_identifier(),
385+
complete_frame.stream_id)
386386

387387
async def _handle_frame_by_type(self, frame: Frame, async_frame_handler_by_type):
388388
frame_handler = async_frame_handler_by_type.get(type(frame), async_noop)

0 commit comments

Comments
 (0)