|
37 | 37 | AuthError, |
38 | 38 | Codes, |
39 | 39 | ConsentNotGivenError, |
40 | | - LimitExceededError, |
41 | 40 | NotFoundError, |
42 | 41 | ShadowBanError, |
43 | 42 | SynapseError, |
@@ -999,60 +998,73 @@ async def create_and_send_nonmember_event( |
999 | 998 | event.internal_metadata.stream_ordering, |
1000 | 999 | ) |
1001 | 1000 |
|
1002 | | - event, context = await self.create_event( |
1003 | | - requester, |
1004 | | - event_dict, |
1005 | | - txn_id=txn_id, |
1006 | | - allow_no_prev_events=allow_no_prev_events, |
1007 | | - prev_event_ids=prev_event_ids, |
1008 | | - state_event_ids=state_event_ids, |
1009 | | - outlier=outlier, |
1010 | | - historical=historical, |
1011 | | - depth=depth, |
1012 | | - ) |
| 1001 | + # Try several times, it could fail with PartialStateConflictError |
| 1002 | + # in handle_new_client_event, cf comment in except block. |
| 1003 | + max_retries = 5 |
| 1004 | + for i in range(max_retries): |
| 1005 | + try: |
| 1006 | + event, context = await self.create_event( |
| 1007 | + requester, |
| 1008 | + event_dict, |
| 1009 | + txn_id=txn_id, |
| 1010 | + allow_no_prev_events=allow_no_prev_events, |
| 1011 | + prev_event_ids=prev_event_ids, |
| 1012 | + state_event_ids=state_event_ids, |
| 1013 | + outlier=outlier, |
| 1014 | + historical=historical, |
| 1015 | + depth=depth, |
| 1016 | + ) |
1013 | 1017 |
|
1014 | | - assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % ( |
1015 | | - event.sender, |
1016 | | - ) |
| 1018 | + assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % ( |
| 1019 | + event.sender, |
| 1020 | + ) |
1017 | 1021 |
|
1018 | | - spam_check_result = await self.spam_checker.check_event_for_spam(event) |
1019 | | - if spam_check_result != self.spam_checker.NOT_SPAM: |
1020 | | - if isinstance(spam_check_result, tuple): |
1021 | | - try: |
1022 | | - [code, dict] = spam_check_result |
1023 | | - raise SynapseError( |
1024 | | - 403, |
1025 | | - "This message had been rejected as probable spam", |
1026 | | - code, |
1027 | | - dict, |
1028 | | - ) |
1029 | | - except ValueError: |
1030 | | - logger.error( |
1031 | | - "Spam-check module returned invalid error value. Expecting [code, dict], got %s", |
1032 | | - spam_check_result, |
1033 | | - ) |
| 1022 | + spam_check_result = await self.spam_checker.check_event_for_spam(event) |
| 1023 | + if spam_check_result != self.spam_checker.NOT_SPAM: |
| 1024 | + if isinstance(spam_check_result, tuple): |
| 1025 | + try: |
| 1026 | + [code, dict] = spam_check_result |
| 1027 | + raise SynapseError( |
| 1028 | + 403, |
| 1029 | + "This message had been rejected as probable spam", |
| 1030 | + code, |
| 1031 | + dict, |
| 1032 | + ) |
| 1033 | + except ValueError: |
| 1034 | + logger.error( |
| 1035 | + "Spam-check module returned invalid error value. Expecting [code, dict], got %s", |
| 1036 | + spam_check_result, |
| 1037 | + ) |
1034 | 1038 |
|
1035 | | - raise SynapseError( |
1036 | | - 403, |
1037 | | - "This message has been rejected as probable spam", |
1038 | | - Codes.FORBIDDEN, |
1039 | | - ) |
| 1039 | + raise SynapseError( |
| 1040 | + 403, |
| 1041 | + "This message has been rejected as probable spam", |
| 1042 | + Codes.FORBIDDEN, |
| 1043 | + ) |
1040 | 1044 |
|
1041 | | - # Backwards compatibility: if the return value is not an error code, it |
1042 | | - # means the module returned an error message to be included in the |
1043 | | - # SynapseError (which is now deprecated). |
1044 | | - raise SynapseError( |
1045 | | - 403, |
1046 | | - spam_check_result, |
1047 | | - Codes.FORBIDDEN, |
| 1045 | + # Backwards compatibility: if the return value is not an error code, it |
| 1046 | + # means the module returned an error message to be included in the |
| 1047 | + # SynapseError (which is now deprecated). |
| 1048 | + raise SynapseError( |
| 1049 | + 403, |
| 1050 | + spam_check_result, |
| 1051 | + Codes.FORBIDDEN, |
| 1052 | + ) |
| 1053 | + |
| 1054 | + ev = await self.handle_new_client_event( |
| 1055 | + requester=requester, |
| 1056 | + events_and_context=[(event, context)], |
| 1057 | + ratelimit=ratelimit, |
| 1058 | + ignore_shadow_ban=ignore_shadow_ban, |
1048 | 1059 | ) |
1049 | 1060 |
|
1050 | | - ev = await self.handle_new_client_event( |
1051 | | - requester=requester, |
1052 | | - events_and_context=[(event, context)], |
1053 | | - ratelimit=ratelimit, |
1054 | | - ignore_shadow_ban=ignore_shadow_ban, |
1055 | | - ) |
| 1061 | + break |
| 1062 | + except PartialStateConflictError as e: |
| 1063 | + # Persisting couldn't happen because the room got un-partial stated |
| 1064 | + # in the meantime and context needs to be recomputed, so let's do so. |
| 1065 | + if i == max_retries - 1: |
| 1066 | + raise e |
| 1067 | + pass |
1056 | 1068 |
|
1057 | 1069 | # we know it was persisted, so must have a stream ordering |
1058 | 1070 | assert ev.internal_metadata.stream_ordering |
@@ -1356,7 +1368,7 @@ async def handle_new_client_event( |
1356 | 1368 |
|
1357 | 1369 | Raises: |
1358 | 1370 | ShadowBanError if the requester has been shadow-banned. |
1359 | | - SynapseError(503) if attempting to persist a partial state event in |
| 1371 | + PartialStateConflictError if attempting to persist a partial state event in |
1360 | 1372 | a room that has been un-partial stated. |
1361 | 1373 | """ |
1362 | 1374 | extra_users = extra_users or [] |
@@ -1418,34 +1430,23 @@ async def handle_new_client_event( |
1418 | 1430 | # We now persist the event (and update the cache in parallel, since we |
1419 | 1431 | # don't want to block on it). |
1420 | 1432 | event, context = events_and_context[0] |
1421 | | - try: |
1422 | | - result, _ = await make_deferred_yieldable( |
1423 | | - gather_results( |
1424 | | - ( |
1425 | | - run_in_background( |
1426 | | - self._persist_events, |
1427 | | - requester=requester, |
1428 | | - events_and_context=events_and_context, |
1429 | | - ratelimit=ratelimit, |
1430 | | - extra_users=extra_users, |
1431 | | - ), |
1432 | | - run_in_background( |
1433 | | - self.cache_joined_hosts_for_events, events_and_context |
1434 | | - ).addErrback( |
1435 | | - log_failure, "cache_joined_hosts_for_event failed" |
1436 | | - ), |
| 1433 | + result, _ = await make_deferred_yieldable( |
| 1434 | + gather_results( |
| 1435 | + ( |
| 1436 | + run_in_background( |
| 1437 | + self._persist_events, |
| 1438 | + requester=requester, |
| 1439 | + events_and_context=events_and_context, |
| 1440 | + ratelimit=ratelimit, |
| 1441 | + extra_users=extra_users, |
1437 | 1442 | ), |
1438 | | - consumeErrors=True, |
1439 | | - ) |
1440 | | - ).addErrback(unwrapFirstError) |
1441 | | - except PartialStateConflictError as e: |
1442 | | - # The event context needs to be recomputed. |
1443 | | - # Turn the error into a 429, as a hint to the client to try again. |
1444 | | - logger.info( |
1445 | | - "Room %s was un-partial stated while persisting client event.", |
1446 | | - event.room_id, |
| 1443 | + run_in_background( |
| 1444 | + self.cache_joined_hosts_for_events, events_and_context |
| 1445 | + ).addErrback(log_failure, "cache_joined_hosts_for_event failed"), |
| 1446 | + ), |
| 1447 | + consumeErrors=True, |
1447 | 1448 | ) |
1448 | | - raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0) |
| 1449 | + ).addErrback(unwrapFirstError) |
1449 | 1450 |
|
1450 | 1451 | return result |
1451 | 1452 |
|
@@ -2012,26 +2013,39 @@ async def _send_dummy_event_for_room(self, room_id: str) -> bool: |
2012 | 2013 | for user_id in members: |
2013 | 2014 | requester = create_requester(user_id, authenticated_entity=self.server_name) |
2014 | 2015 | try: |
2015 | | - event, context = await self.create_event( |
2016 | | - requester, |
2017 | | - { |
2018 | | - "type": EventTypes.Dummy, |
2019 | | - "content": {}, |
2020 | | - "room_id": room_id, |
2021 | | - "sender": user_id, |
2022 | | - }, |
2023 | | - ) |
| 2016 | + # Try several times, it could fail with PartialStateConflictError |
| 2017 | + # in handle_new_client_event, cf comment in except block. |
| 2018 | + max_retries = 5 |
| 2019 | + for i in range(max_retries): |
| 2020 | + try: |
| 2021 | + event, context = await self.create_event( |
| 2022 | + requester, |
| 2023 | + { |
| 2024 | + "type": EventTypes.Dummy, |
| 2025 | + "content": {}, |
| 2026 | + "room_id": room_id, |
| 2027 | + "sender": user_id, |
| 2028 | + }, |
| 2029 | + ) |
2024 | 2030 |
|
2025 | | - event.internal_metadata.proactively_send = False |
| 2031 | + event.internal_metadata.proactively_send = False |
2026 | 2032 |
|
2027 | | - # Since this is a dummy-event it is OK if it is sent by a |
2028 | | - # shadow-banned user. |
2029 | | - await self.handle_new_client_event( |
2030 | | - requester, |
2031 | | - events_and_context=[(event, context)], |
2032 | | - ratelimit=False, |
2033 | | - ignore_shadow_ban=True, |
2034 | | - ) |
| 2033 | + # Since this is a dummy-event it is OK if it is sent by a |
| 2034 | + # shadow-banned user. |
| 2035 | + await self.handle_new_client_event( |
| 2036 | + requester, |
| 2037 | + events_and_context=[(event, context)], |
| 2038 | + ratelimit=False, |
| 2039 | + ignore_shadow_ban=True, |
| 2040 | + ) |
| 2041 | + |
| 2042 | + break |
| 2043 | + except PartialStateConflictError as e: |
| 2044 | + # Persisting couldn't happen because the room got un-partial stated |
| 2045 | + # in the meantime and context needs to be recomputed, so let's do so. |
| 2046 | + if i == max_retries - 1: |
| 2047 | + raise e |
| 2048 | + pass |
2035 | 2049 | return True |
2036 | 2050 | except AuthError: |
2037 | 2051 | logger.info( |
|
0 commit comments