|
4 | 4 |
|
5 | 5 | import asyncio |
6 | 6 | import dataclasses |
7 | | -import json |
| 7 | +import pickle |
8 | 8 | import uuid |
9 | 9 | from abc import ABC, abstractmethod |
10 | 10 | from collections.abc import AsyncIterator, Callable, Coroutine |
|
14 | 14 | from reflex.istate.manager.redis import StateManagerRedis |
15 | 15 | from reflex.state import BaseState, StateUpdate |
16 | 16 | from reflex.utils import console, prerequisites |
17 | | -from reflex.utils.format import json_dumps |
18 | 17 | from reflex.utils.tasks import ensure_task |
19 | 18 |
|
20 | 19 | if TYPE_CHECKING: |
@@ -329,7 +328,7 @@ async def link_token_to_sid(self, token: str, sid: str) -> str | None: |
329 | 328 | try: |
330 | 329 | await self.redis.set( |
331 | 330 | redis_key, |
332 | | - json.dumps(dataclasses.asdict(socket_record)), |
| 331 | + pickle.dumps(socket_record), |
333 | 332 | ex=self.token_expiration, |
334 | 333 | ) |
335 | 334 | except Exception as e: |
@@ -387,11 +386,7 @@ async def _subscribe_lost_and_found_updates( |
387 | 386 | ) |
388 | 387 | async for message in pubsub.listen(): |
389 | 388 | if message["type"] == "pmessage": |
390 | | - record_dict = json.loads(message["data"].decode()) |
391 | | - record = LostAndFoundRecord( |
392 | | - token=record_dict["token"], |
393 | | - update=StateUpdate(**record_dict["update"]), |
394 | | - ) |
| 389 | + record = pickle.loads(message["data"]) |
395 | 390 | await emit_update(record.update, record.token) |
396 | 391 |
|
397 | 392 | def ensure_lost_and_found_task( |
@@ -429,10 +424,9 @@ async def _get_token_owner(self, token: str, refresh: bool = False) -> str | Non |
429 | 424 |
|
430 | 425 | redis_key = self._get_redis_key(token) |
431 | 426 | try: |
432 | | - record_json = await self.redis.get(redis_key) |
433 | | - if record_json: |
434 | | - record_data = json.loads(record_json) |
435 | | - socket_record = SocketRecord(**record_data) |
| 427 | + record_pkl = await self.redis.get(redis_key) |
| 428 | + if record_pkl: |
| 429 | + socket_record = pickle.loads(record_pkl) |
436 | 430 | self.token_to_socket[token] = socket_record |
437 | 431 | self.sid_to_token[socket_record.sid] = token |
438 | 432 | return socket_record.instance_id |
@@ -463,7 +457,7 @@ async def emit_lost_and_found( |
463 | 457 | try: |
464 | 458 | await self.redis.publish( |
465 | 459 | f"channel:{self._get_lost_and_found_key(owner_instance_id)}", |
466 | | - json_dumps(record), |
| 460 | + pickle.dumps(record), |
467 | 461 | ) |
468 | 462 | except Exception as e: |
469 | 463 | console.error(f"Redis error publishing lost and found delta: {e}") |
|
0 commit comments