@@ -47,6 +47,22 @@ class UpgradeDatabaseException(PrepareDatabaseException):
4747 pass
4848
4949
50+ OUTDATED_SCHEMA_ON_WORKER_ERROR = (
51+ "Expected database schema version %i but got %i: run the main synapse process to "
52+ "upgrade the database schema before starting worker processes."
53+ )
54+
55+ EMPTY_DATABASE_ON_WORKER_ERROR = (
56+ "Uninitialised database: run the main synapse process to prepare the database "
57+ "schema before starting worker processes."
58+ )
59+
60+ UNAPPLIED_DELTA_ON_WORKER_ERROR = (
61+ "Database schema delta %s has not been applied: run the main synapse process to "
62+ "upgrade the database schema before starting worker processes."
63+ )
64+
65+
5066def prepare_database (db_conn , database_engine , config , databases = ["main" , "state" ]):
5167 """Prepares a physical database for usage. Will either create all necessary tables
5268 or upgrade from an older schema version.
@@ -71,25 +87,35 @@ def prepare_database(db_conn, database_engine, config, databases=["main", "state
7187 if version_info :
7288 user_version , delta_files , upgraded = version_info
7389
90+ # config should only be None when we are preparing an in-memory SQLite db,
91+ # which should be empty.
7492 if config is None :
75- if user_version != SCHEMA_VERSION :
76- # If we don't pass in a config file then we are expecting to
77- # have already upgraded the DB.
78- raise UpgradeDatabaseException (
79- "Expected database schema version %i but got %i"
80- % (SCHEMA_VERSION , user_version )
81- )
82- else :
83- _upgrade_existing_database (
84- cur ,
85- user_version ,
86- delta_files ,
87- upgraded ,
88- database_engine ,
89- config ,
90- databases = databases ,
93+ raise ValueError (
94+ "config==None in prepare_database, but databse is not empty"
9195 )
96+
97+ # if it's a worker app, refuse to upgrade the database, to avoid multiple
98+ # workers doing it at once.
99+ if config .worker_app is not None and user_version != SCHEMA_VERSION :
100+ raise UpgradeDatabaseException (
101+ OUTDATED_SCHEMA_ON_WORKER_ERROR % (SCHEMA_VERSION , user_version )
102+ )
103+
104+ _upgrade_existing_database (
105+ cur ,
106+ user_version ,
107+ delta_files ,
108+ upgraded ,
109+ database_engine ,
110+ config ,
111+ databases = databases ,
112+ )
92113 else :
114+ # if it's a worker app, refuse to upgrade the database, to avoid multiple
115+ # workers doing it at once.
116+ if config and config .worker_app is not None :
117+ raise UpgradeDatabaseException (EMPTY_DATABASE_ON_WORKER_ERROR )
118+
93119 _setup_new_database (cur , database_engine , databases = databases )
94120
95121 # check if any of our configured dynamic modules want a database
@@ -295,6 +321,8 @@ def _upgrade_existing_database(
295321 else :
296322 assert config
297323
324+ is_worker = config and config .worker_app is not None
325+
298326 if current_version > SCHEMA_VERSION :
299327 raise ValueError (
300328 "Cannot use this database as it is too "
@@ -322,7 +350,7 @@ def _upgrade_existing_database(
322350 specific_engine_extensions = (".sqlite" , ".postgres" )
323351
324352 for v in range (start_ver , SCHEMA_VERSION + 1 ):
325- logger .info ("Upgrading schema to v%d" , v )
353+ logger .info ("Applying schema deltas for v%d" , v )
326354
327355 # We need to search both the global and per data store schema
328356 # directories for schema updates.
@@ -382,9 +410,15 @@ def _upgrade_existing_database(
382410 continue
383411
384412 root_name , ext = os .path .splitext (file_name )
413+
385414 if ext == ".py" :
386415 # This is a python upgrade module. We need to import into some
387416 # package and then execute its `run_upgrade` function.
417+ if is_worker :
418+ raise PrepareDatabaseException (
419+ UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
420+ )
421+
388422 module_name = "synapse.storage.v%d_%s" % (v , root_name )
389423 with open (absolute_path ) as python_file :
390424 module = imp .load_source (module_name , absolute_path , python_file )
@@ -399,10 +433,18 @@ def _upgrade_existing_database(
399433 continue
400434 elif ext == ".sql" :
401435 # A plain old .sql file, just read and execute it
436+ if is_worker :
437+ raise PrepareDatabaseException (
438+ UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
439+ )
402440 logger .info ("Applying schema %s" , relative_path )
403441 executescript (cur , absolute_path )
404442 elif ext == specific_engine_extension and root_name .endswith (".sql" ):
405443 # A .sql file specific to our engine; just read and execute it
444+ if is_worker :
445+ raise PrepareDatabaseException (
446+ UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
447+ )
406448 logger .info ("Applying engine-specific schema %s" , relative_path )
407449 executescript (cur , absolute_path )
408450 elif ext in specific_engine_extensions and root_name .endswith (".sql" ):
@@ -432,6 +474,8 @@ def _upgrade_existing_database(
432474 (v , True ),
433475 )
434476
477+ logger .info ("Schema now up to date" )
478+
435479
436480def _apply_module_schemas (txn , database_engine , config ):
437481 """Apply the module schemas for the dynamic modules, if any
0 commit comments