diff --git a/faust/app/base.py b/faust/app/base.py index 9bb15a18d..881ca52b5 100644 --- a/faust/app/base.py +++ b/faust/app/base.py @@ -477,7 +477,7 @@ def __init__( # for the web server. self._monitor = monitor - # Any additional asyncio.Task's specified using @app.task decorator. + # Any additional asyncio.Tasks specified using @app.task decorator. self._app_tasks = [] # Called as soon as the a worker is fully operational. @@ -1138,6 +1138,7 @@ def Table( window: Optional[WindowT] = None, partitions: Optional[int] = None, help: Optional[str] = None, + value_serializer: str = None, **kwargs: Any, ) -> TableT: """Define new table. @@ -1169,6 +1170,7 @@ def Table( beacon=self.tables.beacon, partitions=partitions, help=help, + value_serializer=value_serializer, **kwargs, ), ) diff --git a/faust/tables/base.py b/faust/tables/base.py index ca26c0f11..d482b78bf 100644 --- a/faust/tables/base.py +++ b/faust/tables/base.py @@ -124,6 +124,7 @@ def __init__( on_window_close: Optional[WindowCloseCallback] = None, is_global: bool = False, synchronize_all_active_partitions: bool = False, + value_serializer: Optional[CodecArg] = None, **kwargs: Any, ) -> None: Service.__init__(self, loop=app.loop, **kwargs) @@ -157,7 +158,7 @@ def __init__( # Possible values json and raw # Fallback to json self.key_serializer = self._serializer_from_type(self.key_type) - self.value_serializer = self._serializer_from_type(self.value_type) + self.value_serializer = value_serializer # Table key expiration self._partition_timestamp_keys = defaultdict(set)