Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog.d/17847.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix a bug in the admin redact endpoint where the background task would not run if a worker was specified in
the config option `run_background_tasks_on`.
6 changes: 5 additions & 1 deletion synapse/handlers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ def __init__(self, hs: "HomeServer"):
self._redact_all_events, REDACT_ALL_EVENTS_ACTION_NAME
)

self.hs = hs

async def get_redact_task(self, redact_id: str) -> Optional[ScheduledTask]:
"""Get the current status of an active redaction process

Expand Down Expand Up @@ -423,8 +425,10 @@ async def _redact_all_events(
user_id = task.params.get("user_id")
assert user_id is not None

# puppet the user if they're ours, otherwise use admin to redact
requester = create_requester(
user_id, authenticated_entity=admin.user.to_string()
user_id if self.hs.is_mine_id(user_id) else admin.user.to_string(),
authenticated_entity=admin.user.to_string(),
)

reason = task.params.get("reason")
Expand Down
1 change: 1 addition & 0 deletions synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ class HomeServer(metaclass=abc.ABCMeta):
"""

REQUIRED_ON_BACKGROUND_TASK_STARTUP = [
"admin",
"account_validity",
"auth",
"deactivate_account",
Expand Down
100 changes: 99 additions & 1 deletion tests/rest/admin/test_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
from synapse.util import Clock

from tests import unittest
from tests.replication._base import BaseMultiWorkerStreamTestCase
from tests.test_utils import SMALL_PNG
from tests.unittest import override_config

Expand Down Expand Up @@ -5127,7 +5128,6 @@ def test_redact_messages_all_rooms(self) -> None:
"""
Test that request to redact events in all rooms user is member of is successful
"""

# join rooms, send some messages
originals = []
for rm in [self.rm1, self.rm2, self.rm3]:
Expand Down Expand Up @@ -5404,3 +5404,101 @@ def test_admin_redact_works_if_user_kicked_or_banned(self) -> None:
matches.append((event_id, event))
# we redacted 6 messages
self.assertEqual(len(matches), 6)


class UserRedactionBackgroundTaskTestCase(BaseMultiWorkerStreamTestCase):
servlets = [
synapse.rest.admin.register_servlets,
login.register_servlets,
admin.register_servlets,
room.register_servlets,
sync.register_servlets,
]

def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.admin = self.register_user("thomas", "pass", True)
self.admin_tok = self.login("thomas", "pass")

self.bad_user = self.register_user("teresa", "pass")
self.bad_user_tok = self.login("teresa", "pass")

self.store = hs.get_datastores().main

self.spam_checker = hs.get_module_api_callbacks().spam_checker

# create rooms - room versions 11+ store the `redacts` key in content while
# earlier ones don't so we use a mix of room versions
self.rm1 = self.helper.create_room_as(
self.admin, tok=self.admin_tok, room_version="7"
)
self.rm2 = self.helper.create_room_as(self.admin, tok=self.admin_tok)
self.rm3 = self.helper.create_room_as(
self.admin, tok=self.admin_tok, room_version="11"
)

@override_config({"run_background_tasks_on": "worker1"})
def test_redact_messages_all_rooms(self) -> None:
"""
Test that redact task successfully runs when `run_background_tasks_on` is specified
"""
self.make_worker_hs(
"synapse.app.generic_worker",
extra_config={
"worker_name": "worker1",
"run_background_tasks_on": "worker1",
"redis": {"enabled": True},
},
)

# join rooms, send some messages
originals = []
for rm in [self.rm1, self.rm2, self.rm3]:
join = self.helper.join(rm, self.bad_user, tok=self.bad_user_tok)
originals.append(join["event_id"])
for i in range(15):
event = {"body": f"hello{i}", "msgtype": "m.text"}
res = self.helper.send_event(
rm, "m.room.message", event, tok=self.bad_user_tok, expect_code=200
)
originals.append(res["event_id"])

# redact all events in all rooms
channel = self.make_request(
"POST",
f"/_synapse/admin/v1/user/{self.bad_user}/redact",
content={"rooms": []},
access_token=self.admin_tok,
)
self.assertEqual(channel.code, 200)
id = channel.json_body.get("redact_id")

for _ in range(100):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very janky way of saying "please main process wait until the worker process has finished executing the redaction task before running the rest of the test code." I would love to know the actual way to do this.

Copy link
Contributor

@MadLittleMods MadLittleMods Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sort of thing might be good enough. For example, look at how wait_for_background_updates() works.

Perhaps just change this to a while loop with a timeout (example in wait_on_thread()).

Could be updated to use the TaskScheduler functions directly instead of the HTTP requests but keeping it as a black box test is also great 👍

One alternative is that because this particular background task is producing a bunch of redactions which you end up checking anyway, is to just /sync until you see a redaction for all the events. I don't know if we have something for that in Synapse but Complement has MustSyncUntil for example that we could emulate over here.


It looks like the TaskScheduler uses run_as_background_process(...) behind the scenes. Here is a previous PR where I wanted to wait for things that ran in a background process:

Although, I'm unable to get the trick suggested there to work here. Here is what I tried:

        worker = self.make_worker_hs(...)

        [...]

        # Wait for the next run of the scheduler loop
        worker.get_reactor().advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000))
        # Ensure `run_as_background_process(...)` has a chance to run (essentially
        # `wait_for_background_processes()`)
        worker.get_reactor().pump((0.1,))
        # Even adding this for good measure, doesn't work
        self.reactor.pump((0.1,))

As an aside (for my own reference), reactor.pump([1]) vs reactor.advance(1) seem to be the same thing. I wonder why both exist.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opted to use a while loop - hopefully this is sufficient? I went down the rabbit hole of trying to poke at the reactor before resorting to what I originally submitted, so if the while loop is acceptable let's leave it at that :)

channel2 = self.make_request(
"GET",
f"/_synapse/admin/v1/user/redact_status/{id}",
access_token=self.admin_tok,
)
redact_result = channel2.json_body["status"]
if redact_result == "complete":
break
if redact_result == "failed":
self.fail("Redaction task failed.")

matched = []
for rm in [self.rm1, self.rm2, self.rm3]:
filter = json.dumps({"types": [EventTypes.Redaction]})
channel = self.make_request(
"GET",
f"rooms/{rm}/messages?filter={filter}&limit=50",
access_token=self.admin_tok,
)
self.assertEqual(channel.code, 200)

for event in channel.json_body["chunk"]:
for event_id in originals:
if (
event["type"] == "m.room.redaction"
and event["redacts"] == event_id
):
matched.append(event_id)
self.assertEqual(len(matched), len(originals))
Loading