Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18944.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Introduce `Clock.call_when_running(...)` to wrap startup code in a logcontext, ensuring we can identify which server generated the logs.
1 change: 1 addition & 0 deletions changelog.d/18945.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Introduce `Clock.add_system_event_trigger(...)` to wrap system event callback code in a logcontext, ensuring we can identify which server generated the logs.
80 changes: 80 additions & 0 deletions scripts-dev/mypy_synapse_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@
category="per-homeserver-tenant-metrics",
)

PREFER_SYNAPSE_CLOCK_CALL_WHEN_RUNNING = ErrorCode(
"prefer-synapse-clock-call-when-running",
"`synapse.util.Clock.call_when_running` should be used instead of `reactor.callWhenRunning`",
category="synapse-reactor-clock",
)

PREFER_SYNAPSE_CLOCK_ADD_SYSTEM_EVENT_TRIGGER = ErrorCode(
"prefer-synapse-clock-add-system-event-trigger",
"`synapse.util.Clock.add_system_event_trigger` should be used instead of `reactor.addSystemEventTrigger`",
category="synapse-reactor-clock",
)


class Sentinel(enum.Enum):
# defining a sentinel in this way allows mypy to correctly handle the
Expand Down Expand Up @@ -229,9 +241,77 @@ def get_method_signature_hook(
):
return check_is_cacheable_wrapper

if fullname in (
"twisted.internet.interfaces.IReactorCore.callWhenRunning",
"synapse.types.ISynapseThreadlessReactor.callWhenRunning",
"synapse.types.ISynapseReactor.callWhenRunning",
):
return check_call_when_running

if fullname in (
"twisted.internet.interfaces.IReactorCore.addSystemEventTrigger",
"synapse.types.ISynapseThreadlessReactor.addSystemEventTrigger",
"synapse.types.ISynapseReactor.addSystemEventTrigger",
):
return check_add_system_event_trigger

return None


def check_call_when_running(ctx: MethodSigContext) -> CallableType:
"""
Ensure that the `reactor.callWhenRunning` callsites aren't used.

`synapse.util.Clock.call_when_running` should always be used instead of
`reactor.callWhenRunning`.

Since `reactor.callWhenRunning` is a reactor callback, the callback will start out
with the sentinel logcontext. `synapse.util.Clock` starts a default logcontext as we
want to know which server the logs came from.

Args:
ctx: The `FunctionSigContext` from mypy.
"""
signature: CallableType = ctx.default_signature
ctx.api.fail(
(
"Expected all `reactor.callWhenRunning` calls to use `synapse.util.Clock.call_when_running` instead. "
"This is so all Synapse code runs with a logcontext as we want to know which server the logs came from."
),
ctx.context,
code=PREFER_SYNAPSE_CLOCK_CALL_WHEN_RUNNING,
)

return signature


def check_add_system_event_trigger(ctx: MethodSigContext) -> CallableType:
"""
Ensure that the `reactor.addSystemEventTrigger` callsites aren't used.

`synapse.util.Clock.add_system_event_trigger` should always be used instead of
`reactor.addSystemEventTrigger`.

Since `reactor.addSystemEventTrigger` is a reactor callback, the callback will start out
with the sentinel logcontext. `synapse.util.Clock` starts a default logcontext as we
want to know which server the logs came from.

Args:
ctx: The `FunctionSigContext` from mypy.
"""
signature: CallableType = ctx.default_signature
ctx.api.fail(
(
"Expected all `reactor.addSystemEventTrigger` calls to use `synapse.util.Clock.add_system_event_trigger` instead. "
"This is so all Synapse code runs with a logcontext as we want to know which server the logs came from."
),
ctx.context,
code=PREFER_SYNAPSE_CLOCK_ADD_SYSTEM_EVENT_TRIGGER,
)

return signature


def analyze_prometheus_metric_classes(ctx: ClassDefContext) -> None:
"""
Cross-check the list of Prometheus metric classes against the
Expand Down
2 changes: 1 addition & 1 deletion scripts-dev/sign_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.util import json_encoder
from synapse.util.json import json_encoder


def main() -> None:
Expand Down
64 changes: 24 additions & 40 deletions synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@
from synapse.config.database import DatabaseConnectionConfig
from synapse.config.homeserver import HomeServerConfig
from synapse.logging.context import (
LoggingContext,
make_deferred_yieldable,
run_in_background,
)
from synapse.notifier import ReplicationNotifier
from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.storage.database import DatabasePool, LoggingTransaction, make_conn
from synapse.storage.databases.main import FilteringWorkerStore
from synapse.storage.databases.main.account_data import AccountDataWorkerStore
Expand Down Expand Up @@ -98,8 +98,7 @@
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
from synapse.types import ISynapseReactor
from synapse.util import SYNAPSE_VERSION, Clock
from synapse.util.stringutils import random_string
from synapse.util import SYNAPSE_VERSION

# Cast safety: Twisted does some naughty magic which replaces the
# twisted.internet.reactor module with a Reactor instance at runtime.
Expand Down Expand Up @@ -318,31 +317,16 @@ def set_room_is_public(self, room_id: str, is_public: bool) -> NoReturn:
)


class MockHomeserver:
def __init__(self, config: HomeServerConfig):
self.clock = Clock(reactor)
self.config = config
self.hostname = config.server.server_name
self.version_string = SYNAPSE_VERSION
self.instance_id = random_string(5)

def get_clock(self) -> Clock:
return self.clock

def get_reactor(self) -> ISynapseReactor:
return reactor

def get_instance_id(self) -> str:
return self.instance_id

def get_instance_name(self) -> str:
return "master"
class MockHomeserver(HomeServer):
DATASTORE_CLASS = DataStore

def should_send_federation(self) -> bool:
return False

def get_replication_notifier(self) -> ReplicationNotifier:
return ReplicationNotifier()
def __init__(self, config: HomeServerConfig):
super().__init__(
hostname=config.server.server_name,
config=config,
reactor=reactor,
version_string=f"Synapse/{SYNAPSE_VERSION}",
)


class Porter:
Expand All @@ -351,12 +335,12 @@ def __init__(
sqlite_config: Dict[str, Any],
progress: "Progress",
batch_size: int,
hs_config: HomeServerConfig,
hs: HomeServer,
):
self.sqlite_config = sqlite_config
self.progress = progress
self.batch_size = batch_size
self.hs_config = hs_config
self.hs = hs

async def setup_table(self, table: str) -> Tuple[str, int, int, int, int]:
if table in APPEND_ONLY_TABLES:
Expand Down Expand Up @@ -676,8 +660,7 @@ def build_db_store(

engine = create_engine(db_config.config)

hs = MockHomeserver(self.hs_config)
server_name = hs.hostname
server_name = self.hs.hostname

with make_conn(
db_config=db_config,
Expand All @@ -688,16 +671,16 @@ def build_db_store(
engine.check_database(
db_conn, allow_outdated_version=allow_outdated_version
)
prepare_database(db_conn, engine, config=self.hs_config)
prepare_database(db_conn, engine, config=self.hs.config)
# Type safety: ignore that we're using Mock homeservers here.
store = Store(
DatabasePool(
hs, # type: ignore[arg-type]
self.hs,
db_config,
engine,
),
db_conn,
hs, # type: ignore[arg-type]
self.hs,
)
db_conn.commit()

Expand Down Expand Up @@ -795,7 +778,7 @@ async def run(self) -> None:
return

self.postgres_store = self.build_db_store(
self.hs_config.database.get_single_database()
self.hs.config.database.get_single_database()
)

await self.remove_ignored_background_updates_from_database()
Expand Down Expand Up @@ -1584,6 +1567,8 @@ def main() -> None:
config = HomeServerConfig()
config.parse_config_dict(hs_config, "", "")

hs = MockHomeserver(config)

def start(stdscr: Optional["curses.window"] = None) -> None:
progress: Progress
if stdscr:
Expand All @@ -1595,15 +1580,14 @@ def start(stdscr: Optional["curses.window"] = None) -> None:
sqlite_config=sqlite_config,
progress=progress,
batch_size=args.batch_size,
hs_config=config,
hs=hs,
)

@defer.inlineCallbacks
def run() -> Generator["defer.Deferred[Any]", Any, None]:
with LoggingContext("synapse_port_db_run"):
yield defer.ensureDeferred(porter.run())
yield defer.ensureDeferred(porter.run())

reactor.callWhenRunning(run)
hs.get_clock().call_when_running(run)

reactor.run()

Expand Down
2 changes: 1 addition & 1 deletion synapse/_scripts/update_synapse_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def run() -> None:
)
)

reactor.callWhenRunning(run)
hs.get_clock().call_when_running(run)

reactor.run()

Expand Down
2 changes: 1 addition & 1 deletion synapse/api/auth/mas.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@
from synapse.metrics import SERVER_NAME_LABEL
from synapse.synapse_rust.http_client import HttpClient
from synapse.types import JsonDict, Requester, UserID, create_requester
from synapse.util import json_decoder
from synapse.util.caches.cached_call import RetryOnExceptionCachedCall
from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext
from synapse.util.json import json_decoder

from . import introspection_response_timer

Expand Down
2 changes: 1 addition & 1 deletion synapse/api/auth/msc3861_delegated.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@
from synapse.metrics import SERVER_NAME_LABEL
from synapse.synapse_rust.http_client import HttpClient
from synapse.types import Requester, UserID, create_requester
from synapse.util import json_decoder
from synapse.util.caches.cached_call import RetryOnExceptionCachedCall
from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext
from synapse.util.json import json_decoder

from . import introspection_response_timer

Expand Down
2 changes: 1 addition & 1 deletion synapse/api/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

from twisted.web import http

from synapse.util import json_decoder
from synapse.util.json import json_decoder

if typing.TYPE_CHECKING:
from synapse.config.homeserver import HomeServerConfig
Expand Down
2 changes: 1 addition & 1 deletion synapse/api/ratelimiting.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from synapse.config.ratelimiting import RatelimitSettings
from synapse.storage.databases.main import DataStore
from synapse.types import Requester
from synapse.util import Clock
from synapse.util.clock import Clock

if TYPE_CHECKING:
# To avoid circular imports:
Expand Down
13 changes: 8 additions & 5 deletions synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def redirect_stdio_to_logs() -> None:


def register_start(
cb: Callable[P, Awaitable], *args: P.args, **kwargs: P.kwargs
hs: "HomeServer", cb: Callable[P, Awaitable], *args: P.args, **kwargs: P.kwargs
) -> None:
"""Register a callback with the reactor, to be called once it is running

Expand Down Expand Up @@ -278,7 +278,8 @@ async def wrapper() -> None:
# on as normal.
os._exit(1)

reactor.callWhenRunning(lambda: defer.ensureDeferred(wrapper()))
clock = hs.get_clock()
clock.call_when_running(lambda: defer.ensureDeferred(wrapper()))


def listen_metrics(bind_addresses: StrCollection, port: int) -> None:
Expand Down Expand Up @@ -517,7 +518,9 @@ async def start(hs: "HomeServer") -> None:
# numbers of DNS requests don't starve out other users of the threadpool.
resolver_threadpool = ThreadPool(name="gai_resolver")
resolver_threadpool.start()
reactor.addSystemEventTrigger("during", "shutdown", resolver_threadpool.stop)
hs.get_clock().add_system_event_trigger(
"during", "shutdown", resolver_threadpool.stop
)
reactor.installNameResolver(
GAIResolver(reactor, getThreadPool=lambda: resolver_threadpool)
)
Expand Down Expand Up @@ -604,7 +607,7 @@ def log_shutdown() -> None:
logger.info("Shutting down...")

# Log when we start the shut down process.
hs.get_reactor().addSystemEventTrigger("before", "shutdown", log_shutdown)
hs.get_clock().add_system_event_trigger("before", "shutdown", log_shutdown)

setup_sentry(hs)
setup_sdnotify(hs)
Expand Down Expand Up @@ -719,7 +722,7 @@ def setup_sdnotify(hs: "HomeServer") -> None:
# we're not using systemd.
sdnotify(b"READY=1\nMAINPID=%i" % (os.getpid(),))

hs.get_reactor().addSystemEventTrigger(
hs.get_clock().add_system_event_trigger(
"before", "shutdown", sdnotify, b"STOPPING=1"
)

Expand Down
6 changes: 2 additions & 4 deletions synapse/app/generic_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,9 @@ def start(config_options: List[str]) -> None:
handle_startup_exception(e)

async def start() -> None:
# Re-establish log context now that we're back from the reactor
with LoggingContext("start"):
await _base.start(hs)
await _base.start(hs)

register_start(start)
register_start(hs, start)

# redirect stdio to the logs, if configured.
if not hs.config.logging.no_redirect_stdio:
Expand Down
18 changes: 8 additions & 10 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,19 +377,17 @@ def setup(config_options: List[str]) -> SynapseHomeServer:
handle_startup_exception(e)

async def start() -> None:
# Re-establish log context now that we're back from the reactor
with LoggingContext("start"):
# Load the OIDC provider metadatas, if OIDC is enabled.
if hs.config.oidc.oidc_enabled:
oidc = hs.get_oidc_handler()
# Loading the provider metadata also ensures the provider config is valid.
await oidc.load_metadata()
# Load the OIDC provider metadatas, if OIDC is enabled.
if hs.config.oidc.oidc_enabled:
oidc = hs.get_oidc_handler()
# Loading the provider metadata also ensures the provider config is valid.
await oidc.load_metadata()

await _base.start(hs)
await _base.start(hs)

hs.get_datastores().main.db_pool.updates.start_doing_background_updates()
hs.get_datastores().main.db_pool.updates.start_doing_background_updates()

register_start(start)
register_start(hs, start)

return hs

Expand Down
Loading
Loading