|
1 | | -from _typeshed import Incomplete |
2 | | -from collections.abc import Generator |
| 1 | +import json |
| 2 | +from collections.abc import Callable, Generator, Iterator |
| 3 | +from typing import Any |
| 4 | +from typing_extensions import TypeAlias |
3 | 5 |
|
4 | | -json_decoder: Incomplete |
| 6 | +json_decoder: json.JSONDecoder |
5 | 7 |
|
6 | | -def stream_as_text(stream) -> Generator[Incomplete, None, None]: ... |
7 | | -def json_splitter(buffer): ... |
8 | | -def json_stream(stream): ... |
9 | | -def line_splitter(buffer, separator: str = "\n"): ... |
10 | | -def split_buffer(stream, splitter: Incomplete | None = None, decoder=...) -> Generator[Incomplete, None, Incomplete]: ... |
| 8 | +# Type alias for JSON, explained at: |
| 9 | +# https://github.com/python/typing/issues/182#issuecomment-1320974824. |
| 10 | +_JSON: TypeAlias = dict[str, _JSON] | list[_JSON] | str | int | float | bool | None |
| 11 | + |
| 12 | +def stream_as_text(stream: Iterator[str | bytes]) -> Generator[str, None, None]: ... |
| 13 | +def json_splitter(buffer: str) -> tuple[_JSON, str] | None: ... |
| 14 | +def json_stream(stream: Iterator[str]) -> Generator[_JSON, None, None]: ... |
| 15 | +def line_splitter(buffer: str, separator: str = "\n") -> tuple[str, str] | None: ... |
| 16 | +def split_buffer( |
| 17 | + stream: Iterator[str | bytes], splitter: Callable[[str], tuple[str, str]] | None = None, decoder: Callable[[str], Any] = ... |
| 18 | +) -> Generator[Any, None, None]: ... |
0 commit comments