Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 51a2091

Browse files
authored
Limit the size of HTTP responses read over federation. (#9833)
1 parent c1ddbbd commit 51a2091

File tree

4 files changed

+110
-8
lines changed

4 files changed

+110
-8
lines changed

changelog.d/9833.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Limit the size of HTTP responses read over federation.

synapse/http/client.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from canonicaljson import encode_canonical_json
3434
from netaddr import AddrFormatError, IPAddress, IPSet
3535
from prometheus_client import Counter
36+
from typing_extensions import Protocol
3637
from zope.interface import implementer, provider
3738

3839
from OpenSSL import SSL
@@ -754,6 +755,16 @@ def _timeout_to_request_timed_out_error(f: Failure):
754755
return f
755756

756757

758+
class ByteWriteable(Protocol):
759+
"""The type of object which must be passed into read_body_with_max_size.
760+
761+
Typically this is a file object.
762+
"""
763+
764+
def write(self, data: bytes) -> int:
765+
pass
766+
767+
757768
class BodyExceededMaxSize(Exception):
758769
"""The maximum allowed size of the HTTP body was exceeded."""
759770

@@ -790,7 +801,7 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
790801
transport = None # type: Optional[ITCPTransport]
791802

792803
def __init__(
793-
self, stream: BinaryIO, deferred: defer.Deferred, max_size: Optional[int]
804+
self, stream: ByteWriteable, deferred: defer.Deferred, max_size: Optional[int]
794805
):
795806
self.stream = stream
796807
self.deferred = deferred
@@ -830,7 +841,7 @@ def connectionLost(self, reason: Failure = connectionDone) -> None:
830841

831842

832843
def read_body_with_max_size(
833-
response: IResponse, stream: BinaryIO, max_size: Optional[int]
844+
response: IResponse, stream: ByteWriteable, max_size: Optional[int]
834845
) -> defer.Deferred:
835846
"""
836847
Read a HTTP response body to a file-object. Optionally enforcing a maximum file size.

synapse/http/matrixfederationclient.py

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
# Copyright 2014-2016 OpenMarket Ltd
2-
# Copyright 2018 New Vector Ltd
1+
# Copyright 2014-2021 The Matrix.org Foundation C.I.C.
32
#
43
# Licensed under the Apache License, Version 2.0 (the "License");
54
# you may not use this file except in compliance with the License.
@@ -13,11 +12,13 @@
1312
# See the License for the specific language governing permissions and
1413
# limitations under the License.
1514
import cgi
15+
import codecs
1616
import logging
1717
import random
1818
import sys
19+
import typing
1920
import urllib.parse
20-
from io import BytesIO
21+
from io import BytesIO, StringIO
2122
from typing import Callable, Dict, List, Optional, Tuple, Union
2223

2324
import attr
@@ -72,6 +73,9 @@
7273
"synapse_http_matrixfederationclient_responses", "", ["method", "code"]
7374
)
7475

76+
# a federation response can be rather large (eg a big state_ids is 50M or so), so we
77+
# need a generous limit here.
78+
MAX_RESPONSE_SIZE = 100 * 1024 * 1024
7579

7680
MAX_LONG_RETRIES = 10
7781
MAX_SHORT_RETRIES = 3
@@ -167,12 +171,27 @@ async def _handle_json_response(
167171
try:
168172
check_content_type_is_json(response.headers)
169173

170-
# Use the custom JSON decoder (partially re-implements treq.json_content).
171-
d = treq.text_content(response, encoding="utf-8")
172-
d.addCallback(json_decoder.decode)
174+
buf = StringIO()
175+
d = read_body_with_max_size(response, BinaryIOWrapper(buf), MAX_RESPONSE_SIZE)
173176
d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
174177

178+
def parse(_len: int):
179+
return json_decoder.decode(buf.getvalue())
180+
181+
d.addCallback(parse)
182+
175183
body = await make_deferred_yieldable(d)
184+
except BodyExceededMaxSize as e:
185+
# The response was too big.
186+
logger.warning(
187+
"{%s} [%s] JSON response exceeded max size %i - %s %s",
188+
request.txn_id,
189+
request.destination,
190+
MAX_RESPONSE_SIZE,
191+
request.method,
192+
request.uri.decode("ascii"),
193+
)
194+
raise RequestSendFailed(e, can_retry=False) from e
176195
except ValueError as e:
177196
# The JSON content was invalid.
178197
logger.warning(
@@ -218,6 +237,18 @@ async def _handle_json_response(
218237
return body
219238

220239

240+
class BinaryIOWrapper:
241+
"""A wrapper for a TextIO which converts from bytes on the fly."""
242+
243+
def __init__(self, file: typing.TextIO, encoding="utf-8", errors="strict"):
244+
self.decoder = codecs.getincrementaldecoder(encoding)(errors)
245+
self.file = file
246+
247+
def write(self, b: Union[bytes, bytearray]) -> int:
248+
self.file.write(self.decoder.decode(b))
249+
return len(b)
250+
251+
221252
class MatrixFederationHttpClient:
222253
"""HTTP client used to talk to other homeservers over the federation
223254
protocol. Send client certificates and signs requests.

tests/http/test_fedclient.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
from synapse.api.errors import RequestSendFailed
2828
from synapse.http.matrixfederationclient import (
29+
MAX_RESPONSE_SIZE,
2930
MatrixFederationHttpClient,
3031
MatrixFederationRequest,
3132
)
@@ -560,3 +561,61 @@ def test_json_error(self, return_value):
560561

561562
f = self.failureResultOf(test_d)
562563
self.assertIsInstance(f.value, RequestSendFailed)
564+
565+
def test_too_big(self):
566+
"""
567+
Test what happens if a huge response is returned from the remote endpoint.
568+
"""
569+
570+
test_d = defer.ensureDeferred(self.cl.get_json("testserv:8008", "foo/bar"))
571+
572+
self.pump()
573+
574+
# Nothing happened yet
575+
self.assertNoResult(test_d)
576+
577+
# Make sure treq is trying to connect
578+
clients = self.reactor.tcpClients
579+
self.assertEqual(len(clients), 1)
580+
(host, port, factory, _timeout, _bindAddress) = clients[0]
581+
self.assertEqual(host, "1.2.3.4")
582+
self.assertEqual(port, 8008)
583+
584+
# complete the connection and wire it up to a fake transport
585+
protocol = factory.buildProtocol(None)
586+
transport = StringTransport()
587+
protocol.makeConnection(transport)
588+
589+
# that should have made it send the request to the transport
590+
self.assertRegex(transport.value(), b"^GET /foo/bar")
591+
self.assertRegex(transport.value(), b"Host: testserv:8008")
592+
593+
# Deferred is still without a result
594+
self.assertNoResult(test_d)
595+
596+
# Send it a huge HTTP response
597+
protocol.dataReceived(
598+
b"HTTP/1.1 200 OK\r\n"
599+
b"Server: Fake\r\n"
600+
b"Content-Type: application/json\r\n"
601+
b"\r\n"
602+
)
603+
604+
self.pump()
605+
606+
# should still be waiting
607+
self.assertNoResult(test_d)
608+
609+
sent = 0
610+
chunk_size = 1024 * 512
611+
while not test_d.called:
612+
protocol.dataReceived(b"a" * chunk_size)
613+
sent += chunk_size
614+
self.assertLessEqual(sent, MAX_RESPONSE_SIZE)
615+
616+
self.assertEqual(sent, MAX_RESPONSE_SIZE)
617+
618+
f = self.failureResultOf(test_d)
619+
self.assertIsInstance(f.value, RequestSendFailed)
620+
621+
self.assertTrue(transport.disconnecting)

0 commit comments

Comments
 (0)