Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/17893.bugfix
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've created a spec issue to clarify whether events for v3+ rooms containing an event_id field should be considered valid: matrix-org/matrix-spec#2027

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@morguldir it seems that this sytest test is currently failing with this PR. That is due to the test expecting the homeserver to return a 400 when a bad event is included in a transaction, whereas we're now just silently dropping them (and returning a 200).

Could you create a PR to update that test please? I consider that a blocking change to merging this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, perl didn't live up to the bad rumors so far :D, added in matrix-org/sytest#1391

Although according to halfie it sounds like you need to push the branch to sytest if you want CI to pass matrix-org/synapse#12754 👀

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug where all messages from a server could be blocked because of one bad event. Contributed by @morguldir.
9 changes: 8 additions & 1 deletion synapse/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,14 @@ def __init__(
for name, sigs in event_dict.pop("signatures", {}).items()
}

assert "event_id" not in event_dict
# An event should only have an event_id at this point if it's for a v1/v2 like room.
# In future room versions, the `event_id` is derived from the event canonical JSON.

# So if we see a `event_id` but the room version doesn't support
# v1/v2 events, then it's invalid and we should reject it.
assert (
"event_id" not in event_dict
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assert has always been there, this just adds an error message and a comment

), "Event ID should not be supplied in non-v1/v2 room"

unsigned = dict(event_dict.pop("unsigned", {}))

Expand Down
29 changes: 24 additions & 5 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@
SynapseError,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@morguldir Are you up for adding a test for this? Probably something in tests/federation/test_federation_server.py, make a request like this and then assert that the other PDU's in the transaction besides the corrupted one were persisted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is b6b5d81 what you had in mind? See also matrix-org/complement#743

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, I completely missed your Complement test in my initial review. The Synapse test is looking good 👍

UnsupportedRoomVersionError,
)
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.api.room_versions import (
KNOWN_ROOM_VERSIONS,
RoomVersion,
)
from synapse.crypto.event_signing import compute_event_signature
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
Expand Down Expand Up @@ -129,6 +132,8 @@
# federation.
_INBOUND_EVENT_HANDLING_LOCK_NAME = "federation_inbound_pdu"

_UNKNOWN_EVENT_ID = "<Unknown>"


class FederationServer(FederationBase):
def __init__(self, hs: "HomeServer"):
Expand Down Expand Up @@ -432,6 +437,8 @@ async def _handle_pdus_in_txn(

newest_pdu_ts = 0

pdu_results = {}

for p in transaction.pdus:
# FIXME (richardv): I don't think this works:
# https://github.com/matrix-org/synapse/issues/8429
Expand All @@ -446,7 +453,7 @@ async def _handle_pdus_in_txn(
# We try and pull out an event ID so that if later checks fail we
# can log something sensible. We don't mandate an event ID here in
# case future event formats get rid of the key.
possible_event_id = p.get("event_id", "<Unknown>")
possible_event_id = p.get("event_id", _UNKNOWN_EVENT_ID)

# Now we get the room ID so that we can check that we know the
# version of the room.
Expand All @@ -469,14 +476,26 @@ async def _handle_pdus_in_txn(
logger.info("Ignoring PDU: %s", e)
continue

event = event_from_pdu_json(p, room_version)
try:
event = event_from_pdu_json(p, room_version)
except Exception as e:
# We can only provide feedback to the federating server if we can determine what the event_id is
# but since we failed to parse the event, we can't derive the `event_id` so there is nothing
# to use as the `pdu_results` key. Best we can do is just log for our own record and move on.
if possible_event_id != _UNKNOWN_EVENT_ID:
pdu_results[possible_event_id] = {
"error": f"Failed to convert JSON into event: {e}"
}
logger.warning(
f"Failed to parse event {possible_event_id} in transaction from {origin}, due to: {e}"
)
continue

pdus_by_room.setdefault(room_id, []).append(event)

if event.origin_server_ts > newest_pdu_ts:
newest_pdu_ts = event.origin_server_ts

pdu_results = {}

# we can process different rooms in parallel (which is useful if they
# require callouts to other servers to fetch missing events), but
# impose a limit to avoid going too crazy with ram/cpu.
Expand Down
72 changes: 71 additions & 1 deletion tests/federation/test_federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@

from twisted.test.proto_helpers import MemoryReactor

from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.api.constants import EventTypes
from synapse.api.errors import NotFoundError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
from synapse.config.server import DEFAULT_ROOM_VERSION
from synapse.events import EventBase, make_event_from_dict
from synapse.events.builder import EventBuilder
from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.server import HomeServer
Expand Down Expand Up @@ -84,6 +87,73 @@ async def failing_handler(_origin: str, _content: JsonDict) -> None:
)
self.assertEqual(500, channel.code, channel.result)

def test_accept_valid_pdus_and_ignore_invalid(self) -> None:
user = self.register_user("user1", "test")
tok = self.login("user1", "test")
room_id = self.helper.create_room_as("user1", tok=tok)

def builder(message: str) -> EventBuilder:
return self.hs.get_event_builder_factory().for_room_version(
RoomVersions.V10,
{
"type": EventTypes.Message,
"sender": user,
"room_id": room_id,
"content": {"body": message, "msgtype": "m.text"},
},
)

def make_event(message: str) -> EventBase:
event, _ = self.get_success(
self.hs.get_event_creation_handler().create_new_client_event(
builder(message),
)
)
return event

event1 = make_event("event1")
event2 = make_event("event2")
event3 = make_event("event3")
event1_json = event1.get_pdu_json()
event2_json = event2.get_pdu_json()
event3_json = event3.get_pdu_json()
logging.info(event1_json)

# Purposely adding event id that shouldn't be there
event2_json["event_id"] = event2.event_id

channel = self.make_signed_federation_request(
"PUT",
"/_matrix/federation/v1/send/txn",
{"pdus": [event1_json, event2_json, event3_json]},
)
body = channel.json_body
# Ensure the response indicates an error for the corrupt event
# and that it indicates success for valid events
pdus: JsonDict = body["pdus"]
self.assertIncludes(
set(pdus.keys()), {event1.event_id, event2.event_id, event3.event_id}
)
self.assertEqual(pdus[event1.event_id], {})
self.assertNotEqual(body["pdus"][event2.event_id]["error"], "")
self.assertEqual(pdus[event3.event_id], {})

# Make sure other valid events from the send transaction were persisted successfully
self.get_success(
self.hs.get_storage_controllers().main.get_event(event1.event_id)
)

# Make sure the corrupt event isn't persisted
self.get_failure(
self.hs.get_storage_controllers().main.get_event(event2.event_id),
NotFoundError,
)

# Verify that we continue looking at events that come after the corrupted one
self.get_success(
self.hs.get_storage_controllers().main.get_event(event3.event_id)
)


class ServerACLsTestCase(unittest.TestCase):
def test_blocked_server(self) -> None:
Expand Down
Loading