@@ -50,6 +50,22 @@ class UpgradeDatabaseException(PrepareDatabaseException):
5050 pass
5151
5252
53+ OUTDATED_SCHEMA_ON_WORKER_ERROR = (
54+ "Expected database schema version %i but got %i: run the main synapse process to "
55+ "upgrade the database schema before starting worker processes."
56+ )
57+
58+ EMPTY_DATABASE_ON_WORKER_ERROR = (
59+ "Uninitialised database: run the main synapse process to prepare the database "
60+ "schema before starting worker processes."
61+ )
62+
63+ UNAPPLIED_DELTA_ON_WORKER_ERROR = (
64+ "Database schema delta %s has not been applied: run the main synapse process to "
65+ "upgrade the database schema before starting worker processes."
66+ )
67+
68+
5369def prepare_database (
5470 db_conn : Connection ,
5571 database_engine : BaseDatabaseEngine ,
@@ -83,30 +99,49 @@ def prepare_database(
8399 # at all, so this is redundant but harmless there.)
84100 cur .execute ("BEGIN TRANSACTION" )
85101
102+ logger .info ("%r: Checking existing schema version" , databases )
86103 version_info = _get_or_create_schema_state (cur , database_engine )
87104
88105 if version_info :
89106 user_version , delta_files , upgraded = version_info
107+ logger .info (
108+ "%r: Existing schema is %i (+%i deltas)" ,
109+ databases ,
110+ user_version ,
111+ len (delta_files ),
112+ )
90113
114+ # config should only be None when we are preparing an in-memory SQLite db,
115+ # which should be empty.
91116 if config is None :
92- if user_version != SCHEMA_VERSION :
93- # If we don't pass in a config file then we are expecting to
94- # have already upgraded the DB.
95- raise UpgradeDatabaseException (
96- "Expected database schema version %i but got %i"
97- % (SCHEMA_VERSION , user_version )
98- )
99- else :
100- _upgrade_existing_database (
101- cur ,
102- user_version ,
103- delta_files ,
104- upgraded ,
105- database_engine ,
106- config ,
107- databases = databases ,
117+ raise ValueError (
118+ "config==None in prepare_database, but databse is not empty"
108119 )
120+
121+ # if it's a worker app, refuse to upgrade the database, to avoid multiple
122+ # workers doing it at once.
123+ if config .worker_app is not None and user_version != SCHEMA_VERSION :
124+ raise UpgradeDatabaseException (
125+ OUTDATED_SCHEMA_ON_WORKER_ERROR % (SCHEMA_VERSION , user_version )
126+ )
127+
128+ _upgrade_existing_database (
129+ cur ,
130+ user_version ,
131+ delta_files ,
132+ upgraded ,
133+ database_engine ,
134+ config ,
135+ databases = databases ,
136+ )
109137 else :
138+ logger .info ("%r: Initialising new database" , databases )
139+
140+ # if it's a worker app, refuse to upgrade the database, to avoid multiple
141+ # workers doing it at once.
142+ if config and config .worker_app is not None :
143+ raise UpgradeDatabaseException (EMPTY_DATABASE_ON_WORKER_ERROR )
144+
110145 _setup_new_database (cur , database_engine , databases = databases )
111146
112147 # check if any of our configured dynamic modules want a database
@@ -312,6 +347,8 @@ def _upgrade_existing_database(
312347 else :
313348 assert config
314349
350+ is_worker = config and config .worker_app is not None
351+
315352 if current_version > SCHEMA_VERSION :
316353 raise ValueError (
317354 "Cannot use this database as it is too "
@@ -339,7 +376,7 @@ def _upgrade_existing_database(
339376 specific_engine_extensions = (".sqlite" , ".postgres" )
340377
341378 for v in range (start_ver , SCHEMA_VERSION + 1 ):
342- logger .info ("Upgrading schema to v%d" , v )
379+ logger .info ("Applying schema deltas for v%d" , v )
343380
344381 # We need to search both the global and per data store schema
345382 # directories for schema updates.
@@ -399,9 +436,15 @@ def _upgrade_existing_database(
399436 continue
400437
401438 root_name , ext = os .path .splitext (file_name )
439+
402440 if ext == ".py" :
403441 # This is a python upgrade module. We need to import into some
404442 # package and then execute its `run_upgrade` function.
443+ if is_worker :
444+ raise PrepareDatabaseException (
445+ UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
446+ )
447+
405448 module_name = "synapse.storage.v%d_%s" % (v , root_name )
406449 with open (absolute_path ) as python_file :
407450 module = imp .load_source (module_name , absolute_path , python_file )
@@ -416,10 +459,18 @@ def _upgrade_existing_database(
416459 continue
417460 elif ext == ".sql" :
418461 # A plain old .sql file, just read and execute it
462+ if is_worker :
463+ raise PrepareDatabaseException (
464+ UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
465+ )
419466 logger .info ("Applying schema %s" , relative_path )
420467 executescript (cur , absolute_path )
421468 elif ext == specific_engine_extension and root_name .endswith (".sql" ):
422469 # A .sql file specific to our engine; just read and execute it
470+ if is_worker :
471+ raise PrepareDatabaseException (
472+ UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
473+ )
423474 logger .info ("Applying engine-specific schema %s" , relative_path )
424475 executescript (cur , absolute_path )
425476 elif ext in specific_engine_extensions and root_name .endswith (".sql" ):
@@ -449,6 +500,8 @@ def _upgrade_existing_database(
449500 (v , True ),
450501 )
451502
503+ logger .info ("Schema now up to date" )
504+
452505
453506def _apply_module_schemas (txn , database_engine , config ):
454507 """Apply the module schemas for the dynamic modules, if any
0 commit comments