11import asyncio
2- import json
32import logging
43import uuid
54from asyncio import Queue
1312from reactivex import Observable , operators , Subject , Observer
1413
1514from examples .tutorial .reactivex .models import (Message , chat_filename_mimetype , ClientStatistics ,
16- ServerStatisticsRequest , ServerStatistics , dataclass_to_payload )
15+ ServerStatisticsRequest , ServerStatistics , dataclass_to_payload ,
16+ decode_dataclass )
1717from rsocket .extensions .composite_metadata import CompositeMetadata
1818from rsocket .extensions .helpers import composite , metadata_item
1919from rsocket .frame_helpers import ensure_bytes
@@ -119,14 +119,14 @@ async def login(payload: Payload) -> Observable:
119119
120120 @router .response ('channel.join' )
121121 async def join_channel (payload : Payload ) -> Observable :
122- channel_name = payload .data . decode ( 'utf-8' )
122+ channel_name = utf8_decode ( payload .data )
123123 ensure_channel_exists (channel_name )
124124 chat_data .channel_users [channel_name ].add (self ._session .session_id )
125125 return reactivex .empty ()
126126
127127 @router .response ('channel.leave' )
128128 async def leave_channel (payload : Payload ) -> Observable :
129- channel_name = payload .data . decode ( 'utf-8' )
129+ channel_name = utf8_decode ( payload .data )
130130 chat_data .channel_users [channel_name ].discard (self ._session .session_id )
131131 return reactivex .empty ()
132132
@@ -159,7 +159,7 @@ async def get_channels() -> Observable:
159159
160160 @router .fire_and_forget ('statistics' )
161161 async def receive_statistics (payload : Payload ):
162- statistics = ClientStatistics ( ** json . loads ( utf8_decode ( payload .data )) )
162+ statistics = decode_dataclass ( payload .data , ClientStatistics )
163163
164164 logging .info ('Received client statistics. memory usage: %s' , statistics .memory_usage )
165165
@@ -176,8 +176,8 @@ async def statistics_generator():
176176 except Exception :
177177 logging .error ('Statistics' , exc_info = True )
178178
179- def on_next (value : Payload ):
180- request = ServerStatisticsRequest ( ** json . loads ( utf8_decode ( value . data )) )
179+ def on_next (payload : Payload ):
180+ request = decode_dataclass ( payload . data , ServerStatisticsRequest )
181181
182182 logging .info (f'Received statistics request { request .ids } , { request .period_seconds } ' )
183183
@@ -199,7 +199,7 @@ def on_next(value: Payload):
199199
200200 @router .response ('message' )
201201 async def send_message (payload : Payload ) -> Observable :
202- message = Message ( ** json . loads ( payload .data ) )
202+ message = decode_dataclass ( payload .data , Message )
203203
204204 logging .info ('Received message for user: %s, channel: %s' , message .user , message .channel )
205205
0 commit comments