Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 707d5e4

Browse files
authored
Encode JSON responses on a thread in C, mk2 (#10905)
Currently we use `JsonEncoder.iterencode` to write JSON responses, which ensures that we don't block the main reactor thread when encoding huge objects. The downside to this is that `iterencode` falls back to using a pure Python encoder that is *much* less efficient and can easily burn a lot of CPU for huge responses. To fix this, while still ensuring we don't block the reactor loop, we encode the JSON on a threadpool using the standard `JsonEncoder.encode` functions, which is backed by a C library. Doing so, however, requires `respond_with_json` to have access to the reactor, which it previously didn't. There are two ways of doing this: 1. threading through the reactor object, which is a bit fiddly as e.g. `DirectServeJsonResource` doesn't currently take a reactor, but is exposed to modules and so is a PITA to change; or 2. expose the reactor in `SynapseRequest`, which requires updating a bunch of servlet types. I went with the latter as that is just a mechanical change, and I think makes sense as a request already has a reactor associated with it (via its http channel).
1 parent d378417 commit 707d5e4

File tree

4 files changed

+76
-18
lines changed

4 files changed

+76
-18
lines changed

changelog.d/10905.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Speed up responding with large JSON objects to requests.

synapse/http/server.py

Lines changed: 57 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import urllib
2222
from http import HTTPStatus
2323
from inspect import isawaitable
24-
from io import BytesIO
2524
from typing import (
2625
Any,
2726
Awaitable,
@@ -37,15 +36,15 @@
3736
)
3837

3938
import jinja2
40-
from canonicaljson import iterencode_canonical_json
39+
from canonicaljson import encode_canonical_json
4140
from typing_extensions import Protocol
4241
from zope.interface import implementer
4342

4443
from twisted.internet import defer, interfaces
4544
from twisted.python import failure
4645
from twisted.web import resource
4746
from twisted.web.server import NOT_DONE_YET, Request
48-
from twisted.web.static import File, NoRangeStaticProducer
47+
from twisted.web.static import File
4948
from twisted.web.util import redirectTo
5049

5150
from synapse.api.errors import (
@@ -56,10 +55,11 @@
5655
UnrecognizedRequestError,
5756
)
5857
from synapse.http.site import SynapseRequest
59-
from synapse.logging.context import preserve_fn
58+
from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
6059
from synapse.logging.opentracing import trace_servlet
6160
from synapse.util import json_encoder
6261
from synapse.util.caches import intern_dict
62+
from synapse.util.iterutils import chunk_seq
6363

6464
logger = logging.getLogger(__name__)
6565

@@ -620,12 +620,11 @@ def stopProducing(self) -> None:
620620
self._request = None
621621

622622

623-
def _encode_json_bytes(json_object: Any) -> Iterator[bytes]:
623+
def _encode_json_bytes(json_object: Any) -> bytes:
624624
"""
625625
Encode an object into JSON. Returns an iterator of bytes.
626626
"""
627-
for chunk in json_encoder.iterencode(json_object):
628-
yield chunk.encode("utf-8")
627+
return json_encoder.encode(json_object).encode("utf-8")
629628

630629

631630
def respond_with_json(
@@ -659,7 +658,7 @@ def respond_with_json(
659658
return None
660659

661660
if canonical_json:
662-
encoder = iterencode_canonical_json
661+
encoder = encode_canonical_json
663662
else:
664663
encoder = _encode_json_bytes
665664

@@ -670,7 +669,9 @@ def respond_with_json(
670669
if send_cors:
671670
set_cors_headers(request)
672671

673-
_ByteProducer(request, encoder(json_object))
672+
run_in_background(
673+
_async_write_json_to_request_in_thread, request, encoder, json_object
674+
)
674675
return NOT_DONE_YET
675676

676677

@@ -706,15 +707,56 @@ def respond_with_json_bytes(
706707
if send_cors:
707708
set_cors_headers(request)
708709

709-
# note that this is zero-copy (the bytesio shares a copy-on-write buffer with
710-
# the original `bytes`).
711-
bytes_io = BytesIO(json_bytes)
712-
713-
producer = NoRangeStaticProducer(request, bytes_io)
714-
producer.start()
710+
_write_bytes_to_request(request, json_bytes)
715711
return NOT_DONE_YET
716712

717713

714+
async def _async_write_json_to_request_in_thread(
715+
request: SynapseRequest,
716+
json_encoder: Callable[[Any], bytes],
717+
json_object: Any,
718+
):
719+
"""Encodes the given JSON object on a thread and then writes it to the
720+
request.
721+
722+
This is done so that encoding large JSON objects doesn't block the reactor
723+
thread.
724+
725+
Note: We don't use JsonEncoder.iterencode here as that falls back to the
726+
Python implementation (rather than the C backend), which is *much* more
727+
expensive.
728+
"""
729+
730+
json_str = await defer_to_thread(request.reactor, json_encoder, json_object)
731+
732+
_write_bytes_to_request(request, json_str)
733+
734+
735+
def _write_bytes_to_request(request: Request, bytes_to_write: bytes) -> None:
736+
"""Writes the bytes to the request using an appropriate producer.
737+
738+
Note: This should be used instead of `Request.write` to correctly handle
739+
large response bodies.
740+
"""
741+
742+
# The problem with dumping all of the response into the `Request` object at
743+
# once (via `Request.write`) is that doing so starts the timeout for the
744+
# next request to be received: so if it takes longer than 60s to stream back
745+
# the response to the client, the client never gets it.
746+
#
747+
# The correct solution is to use a Producer; then the timeout is only
748+
# started once all of the content is sent over the TCP connection.
749+
750+
# To make sure we don't write all of the bytes at once we split it up into
751+
# chunks.
752+
chunk_size = 4096
753+
bytes_generator = chunk_seq(bytes_to_write, chunk_size)
754+
755+
# We use a `_ByteProducer` here rather than `NoRangeStaticProducer` as the
756+
# unit tests can't cope with being given a pull producer.
757+
_ByteProducer(request, bytes_generator)
758+
759+
718760
def set_cors_headers(request: Request):
719761
"""Set the CORS headers so that javascript running in a web browsers can
720762
use this API

synapse/push/emailpusher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ async def _unsafe_process(self) -> None:
184184

185185
should_notify_at = max(notif_ready_at, room_ready_at)
186186

187-
if should_notify_at < self.clock.time_msec():
187+
if should_notify_at <= self.clock.time_msec():
188188
# one of our notifications is ready for sending, so we send
189189
# *one* email updating the user on their notifications,
190190
# we then consider all previously outstanding notifications

synapse/util/iterutils.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,28 @@
2121
Iterable,
2222
Iterator,
2323
Mapping,
24-
Sequence,
2524
Set,
25+
Sized,
2626
Tuple,
2727
TypeVar,
2828
)
2929

30+
from typing_extensions import Protocol
31+
3032
T = TypeVar("T")
33+
S = TypeVar("S", bound="_SelfSlice")
34+
35+
36+
class _SelfSlice(Sized, Protocol):
37+
"""A helper protocol that matches types where taking a slice results in the
38+
same type being returned.
39+
40+
This is more specific than `Sequence`, which allows another `Sequence` to be
41+
returned.
42+
"""
43+
44+
def __getitem__(self: S, i: slice) -> S:
45+
...
3146

3247

3348
def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]:
@@ -46,7 +61,7 @@ def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]:
4661
return iter(lambda: tuple(islice(sourceiter, size)), ())
4762

4863

49-
def chunk_seq(iseq: Sequence[T], maxlen: int) -> Iterable[Sequence[T]]:
64+
def chunk_seq(iseq: S, maxlen: int) -> Iterator[S]:
5065
"""Split the given sequence into chunks of the given size
5166
5267
The last chunk may be shorter than the given size.

0 commit comments

Comments
 (0)