Skip to content

Commit 38b8495

Browse files
Implement action retry (#3375)
1 parent b0de9a7 commit 38b8495

File tree

5 files changed

+95
-67
lines changed

5 files changed

+95
-67
lines changed

openslides_backend/action/action_handler.py

Lines changed: 61 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
11
from collections.abc import Callable, Iterable
22
from copy import deepcopy
33
from http import HTTPStatus
4+
from time import sleep
45
from typing import Any, TypeVar, cast
56

67
import fastjsonschema
7-
from psycopg.errors import RaiseException
8+
from psycopg.errors import RaiseException, SerializationFailure
89

910
from openslides_backend.services.database.extended_database import ExtendedDatabase
1011
from openslides_backend.services.postgresql.db_connection_handling import (
1112
get_new_os_conn,
1213
)
14+
from openslides_backend.shared.exceptions import DatabaseException
1315

1416
from ..shared.exceptions import (
1517
ActionException,
18+
BadCodingException,
1619
DatastoreLockedException,
1720
RelationException,
1821
View400Exception,
@@ -122,52 +125,66 @@ def handle_request(
122125
except fastjsonschema.JsonSchemaException as exception:
123126
raise ActionException(exception.message)
124127

125-
try:
126-
with get_new_os_conn() as conn:
127-
self.post_edit_necessary = False
128-
self.datastore = ExtendedDatabase(conn, self.logging, self.env)
129-
results: ActionsResponseResults = []
130-
if atomic:
131-
results = self.execute_write_requests(
132-
self.parse_actions, payload
133-
)
134-
else:
135-
136-
def transform_to_list(
137-
tuple: tuple[WriteRequest | None, ActionResults | None],
138-
) -> tuple[list[WriteRequest], ActionResults | None]:
139-
return (
140-
[tuple[0]] if tuple[0] is not None else [],
141-
tuple[1],
128+
retry_count = int(self.env.ACTION_MAX_RETRIES or 1)
129+
retry_timeout = float(self.env.ACTION_RETRY_TIMEOUT or 0.4)
130+
for attempt in range(1, retry_count + 1):
131+
try:
132+
with get_new_os_conn() as conn:
133+
self.post_edit_necessary = False
134+
self.datastore = ExtendedDatabase(conn, self.logging, self.env)
135+
results: ActionsResponseResults = []
136+
if atomic:
137+
results = self.execute_write_requests(
138+
self.parse_actions, payload
142139
)
143-
144-
for element in payload:
145-
try:
146-
result = self.execute_write_requests(
147-
lambda e: transform_to_list(self.perform_action(e)),
148-
element,
140+
else:
141+
142+
def transform_to_list(
143+
tuple: tuple[WriteRequest | None, ActionResults | None],
144+
) -> tuple[list[WriteRequest], ActionResults | None]:
145+
return (
146+
[tuple[0]] if tuple[0] is not None else [],
147+
tuple[1],
149148
)
150-
results.append(result)
151-
except ActionException as exception:
152-
error = cast(ActionError, exception.get_json())
153-
results.append(error)
154-
self.datastore.reset()
155-
156-
# execute cleanup methods
157-
for on_success in self.on_success:
158-
on_success()
159-
160-
# Return action result
161-
self.logger.info("Request was successful. Send response now.")
162-
return ActionsResponse(
163-
status_code=HTTPStatus.OK.value,
164-
success=True,
165-
message="Actions handled successfully",
166-
results=results,
149+
150+
for element in payload:
151+
try:
152+
result = self.execute_write_requests(
153+
lambda e: transform_to_list(
154+
self.perform_action(e)
155+
),
156+
element,
157+
)
158+
results.append(result)
159+
except ActionException as exception:
160+
error = cast(ActionError, exception.get_json())
161+
results.append(error)
162+
self.datastore.reset()
163+
164+
# execute cleanup methods
165+
for on_success in self.on_success:
166+
on_success()
167+
168+
# Return action result
169+
self.logger.info("Request was successful. Send response now.")
170+
return ActionsResponse(
171+
status_code=HTTPStatus.OK.value,
172+
success=True,
173+
message="Actions handled successfully",
174+
results=results,
175+
)
176+
except RaiseException as e:
177+
# This is raised at the end of transaction as the constraint trigger has to be initially deferred.
178+
raise RelationException(
179+
f"Relation violates required constraint: {e}"
167180
)
168-
except RaiseException as e:
169-
# This is raised at the end of transaction as the constraint trigger has to be initially deferred.
170-
raise RelationException(f"Relation violates required constraint: {e}")
181+
except SerializationFailure:
182+
if attempt == retry_count:
183+
raise DatabaseException(
184+
"Database operation failed due to concurring actions. Please try again later."
185+
)
186+
sleep(retry_timeout)
187+
raise BadCodingException("This code should never execute")
171188

172189
def execute_internal_action(self, action: str, data: dict[str, Any]) -> None:
173190
"""Helper function to execute an internal action with user id -1."""

openslides_backend/action/mixins/tree_sort_mixin.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ def sort_tree(
3232
3333
This function traverses this tree in preorder to assign the weight.
3434
"""
35-
# TODO: Check if instances exist in DB and is not deleted.
3635

3736
# Get all item ids to verify, that the user send all ids.
3837
filter = FilterOperator("meeting_id", "=", meeting_id)

openslides_backend/services/database/database_reader.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
from typing import Any
22

33
from psycopg import Connection, rows, sql
4-
from psycopg.errors import UndefinedColumn, UndefinedFunction, UndefinedTable
4+
from psycopg.errors import (
5+
SerializationFailure,
6+
UndefinedColumn,
7+
UndefinedFunction,
8+
UndefinedTable,
9+
)
510

611
from openslides_backend.services.postgresql.db_connection_handling import (
712
retry_on_db_failure,
@@ -245,6 +250,8 @@ def execute_query(
245250
)
246251
except UndefinedFunction as e:
247252
raise InvalidFormat(e.diag.message_primary or "")
253+
except SerializationFailure as e:
254+
raise e
248255
except Exception as e:
249256
raise DatabaseException(f"Unexpected error reading from database: {e}")
250257

openslides_backend/shared/env.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ class Environment(Env):
2727

2828
vars = {
2929
"ACTION_PORT": "9002",
30+
"ACTION_MAX_RETRIES": "3",
31+
"ACTION_RETRY_TIMEOUT": "0.4",
3032
"INTERNAL_AUTH_PASSWORD_FILE": "",
3133
"MEDIA_HOST": "localhost",
3234
"MEDIA_PATH": "/internal/media",

tests/system/action/test_tree_sort_mixin.py

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
import threading
22
from typing import Any
33

4-
import pytest
5-
64
from tests.system.action.base import BaseActionTestCase
75

86
CategoryStructure = list[tuple[int, "CategoryStructure"]]
@@ -51,9 +49,6 @@ def set_mass_test_data(self) -> None:
5149
)
5250
)
5351

54-
@pytest.mark.skip(
55-
"TODO: unskip later. Currently runs into 'Unexpected error reading from database: could not serialize access due to concurrent update' error."
56-
)
5752
def test_sort_and_delete_at_once(self) -> None:
5853
self.set_mass_test_data()
5954
sort_thread = threading.Thread(target=self.thread_sort_method)
@@ -64,12 +59,9 @@ def test_sort_and_delete_at_once(self) -> None:
6459
delete_thread.start()
6560
sort_thread.join()
6661
delete_thread.join()
67-
self.assert_sort_thread_results()
62+
self.assert_sort_thread_results(expect_error_id=5, error_optional=True)
6863
self.assert_delete_thread_results(5)
6964

70-
@pytest.mark.skip(
71-
"TODO: unskip later. Currently runs into 'Unexpected error reading from database: could not serialize access due to concurrent update' error during a database request with raise_exception=False, which then crashes the transaction (SingleRelationHandler ln. 134)."
72-
)
7365
def test_sort_and_create_at_once(self) -> None:
7466
self.set_mass_test_data()
7567
sort_thread = threading.Thread(target=self.thread_sort_method)
@@ -83,9 +75,6 @@ def test_sort_and_create_at_once(self) -> None:
8375
self.assert_sort_thread_results()
8476
self.assert_create_thread_results("TIM", 42)
8577

86-
@pytest.mark.skip(
87-
"TODO: unskip later. Currently runs into 'Unexpected error reading from database: could not serialize access due to concurrent update' error."
88-
)
8978
def test_sort_and_delete_at_once_reverse(self) -> None:
9079
self.set_mass_test_data()
9180
delete_thread = threading.Thread(
@@ -97,11 +86,8 @@ def test_sort_and_delete_at_once_reverse(self) -> None:
9786
delete_thread.join()
9887
sort_thread.join()
9988
self.assert_delete_thread_results(5)
100-
self.assert_sort_thread_results()
89+
self.assert_sort_thread_results(expect_error_id=5, error_optional=True)
10190

102-
@pytest.mark.skip(
103-
"TODO: unskip later. Currently runs into 'Unexpected error reading from database: could not serialize access due to concurrent update' error during a database request with raise_exception=False, which then crashes the transaction (SingleRelationHandler ln. 134)."
104-
)
10591
def test_sort_and_create_at_once_reverse(self) -> None:
10692
self.set_mass_test_data()
10793
create_thread = threading.Thread(
@@ -134,9 +120,17 @@ def thread_sort_method(self) -> None:
134120
},
135121
)
136122

137-
def assert_sort_thread_results(self, expect_error: bool = False) -> None:
138-
if expect_error:
123+
def assert_sort_thread_results(
124+
self, expect_error_id: int | None = None, error_optional: bool = False
125+
) -> None:
126+
if expect_error_id and (
127+
not error_optional or self.sort_response.json.get("status_code") != 200
128+
):
139129
self.assert_status_code(self.sort_response, 400)
130+
self.assertIn(
131+
f"Id in sort tree does not exist: {expect_error_id}",
132+
self.sort_response.json["message"],
133+
)
140134
else:
141135
assert self.sort_response.json == {
142136
"message": "Actions handled successfully",
@@ -176,11 +170,20 @@ def assert_create_thread_results(
176170
else:
177171
assert self.create_response.json == {
178172
"message": "Actions handled successfully",
179-
"results": [[{"id": 101}]],
173+
"results": [[{"id": 102, "sequential_number": 101}]],
180174
"status_code": 200,
181175
"success": True,
182176
}
177+
self.assert_model_exists("motion_category/100")
178+
self.assert_model_not_exists(
179+
"motion_category/101"
180+
) # skipped because of retry
183181
self.assert_model_exists(
184-
"motion_category/101",
185-
{"meeting_id": 1, "name": name, "parent_id": parent_id},
182+
"motion_category/102",
183+
{
184+
"meeting_id": 1,
185+
"name": name,
186+
"parent_id": parent_id,
187+
"sequential_number": 101,
188+
},
186189
)

0 commit comments

Comments
 (0)