2929logger = logging .getLogger (__name__ )
3030
3131
32+ _REPLACE_STREAM_ORDRING_SQL_COMMANDS = (
33+ # there should be no leftover rows without a stream_ordering2, but just in case...
34+ "UPDATE events SET stream_ordering2 = stream_ordering WHERE stream_ordering2 IS NULL" ,
35+ # finally, we can drop the rule and switch the columns
36+ "DROP RULE populate_stream_ordering2 ON events" ,
37+ "ALTER TABLE events DROP COLUMN stream_ordering" ,
38+ "ALTER TABLE events RENAME COLUMN stream_ordering2 TO stream_ordering" ,
39+ )
40+
41+
42+ class _BackgroundUpdates :
43+ EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
44+ EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
45+ DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
46+ POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2"
47+ INDEX_STREAM_ORDERING2 = "index_stream_ordering2"
48+ REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"
49+
50+
3251@attr .s (slots = True , frozen = True )
3352class _CalculateChainCover :
3453 """Return value for _calculate_chain_cover_txn."""
@@ -48,19 +67,15 @@ class _CalculateChainCover:
4867
4968
5069class EventsBackgroundUpdatesStore (SQLBaseStore ):
51-
52- EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
53- EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
54- DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
55-
5670 def __init__ (self , database : DatabasePool , db_conn , hs ):
5771 super ().__init__ (database , db_conn , hs )
5872
5973 self .db_pool .updates .register_background_update_handler (
60- self .EVENT_ORIGIN_SERVER_TS_NAME , self ._background_reindex_origin_server_ts
74+ _BackgroundUpdates .EVENT_ORIGIN_SERVER_TS_NAME ,
75+ self ._background_reindex_origin_server_ts ,
6176 )
6277 self .db_pool .updates .register_background_update_handler (
63- self .EVENT_FIELDS_SENDER_URL_UPDATE_NAME ,
78+ _BackgroundUpdates .EVENT_FIELDS_SENDER_URL_UPDATE_NAME ,
6479 self ._background_reindex_fields_sender ,
6580 )
6681
@@ -85,7 +100,8 @@ def __init__(self, database: DatabasePool, db_conn, hs):
85100 )
86101
87102 self .db_pool .updates .register_background_update_handler (
88- self .DELETE_SOFT_FAILED_EXTREMITIES , self ._cleanup_extremities_bg_update
103+ _BackgroundUpdates .DELETE_SOFT_FAILED_EXTREMITIES ,
104+ self ._cleanup_extremities_bg_update ,
89105 )
90106
91107 self .db_pool .updates .register_background_update_handler (
@@ -139,6 +155,24 @@ def __init__(self, database: DatabasePool, db_conn, hs):
139155 self ._purged_chain_cover_index ,
140156 )
141157
158+ # bg updates for replacing stream_ordering with a BIGINT
159+ # (these only run on postgres.)
160+ self .db_pool .updates .register_background_update_handler (
161+ _BackgroundUpdates .POPULATE_STREAM_ORDERING2 ,
162+ self ._background_populate_stream_ordering2 ,
163+ )
164+ self .db_pool .updates .register_background_index_update (
165+ _BackgroundUpdates .INDEX_STREAM_ORDERING2 ,
166+ index_name = "events_stream_ordering" ,
167+ table = "events" ,
168+ columns = ["stream_ordering2" ],
169+ unique = True ,
170+ )
171+ self .db_pool .updates .register_background_update_handler (
172+ _BackgroundUpdates .REPLACE_STREAM_ORDERING_COLUMN ,
173+ self ._background_replace_stream_ordering_column ,
174+ )
175+
142176 async def _background_reindex_fields_sender (self , progress , batch_size ):
143177 target_min_stream_id = progress ["target_min_stream_id_inclusive" ]
144178 max_stream_id = progress ["max_stream_id_exclusive" ]
@@ -190,18 +224,18 @@ def reindex_txn(txn):
190224 }
191225
192226 self .db_pool .updates ._background_update_progress_txn (
193- txn , self .EVENT_FIELDS_SENDER_URL_UPDATE_NAME , progress
227+ txn , _BackgroundUpdates .EVENT_FIELDS_SENDER_URL_UPDATE_NAME , progress
194228 )
195229
196230 return len (rows )
197231
198232 result = await self .db_pool .runInteraction (
199- self .EVENT_FIELDS_SENDER_URL_UPDATE_NAME , reindex_txn
233+ _BackgroundUpdates .EVENT_FIELDS_SENDER_URL_UPDATE_NAME , reindex_txn
200234 )
201235
202236 if not result :
203237 await self .db_pool .updates ._end_background_update (
204- self .EVENT_FIELDS_SENDER_URL_UPDATE_NAME
238+ _BackgroundUpdates .EVENT_FIELDS_SENDER_URL_UPDATE_NAME
205239 )
206240
207241 return result
@@ -264,18 +298,18 @@ def reindex_search_txn(txn):
264298 }
265299
266300 self .db_pool .updates ._background_update_progress_txn (
267- txn , self .EVENT_ORIGIN_SERVER_TS_NAME , progress
301+ txn , _BackgroundUpdates .EVENT_ORIGIN_SERVER_TS_NAME , progress
268302 )
269303
270304 return len (rows_to_update )
271305
272306 result = await self .db_pool .runInteraction (
273- self .EVENT_ORIGIN_SERVER_TS_NAME , reindex_search_txn
307+ _BackgroundUpdates .EVENT_ORIGIN_SERVER_TS_NAME , reindex_search_txn
274308 )
275309
276310 if not result :
277311 await self .db_pool .updates ._end_background_update (
278- self .EVENT_ORIGIN_SERVER_TS_NAME
312+ _BackgroundUpdates .EVENT_ORIGIN_SERVER_TS_NAME
279313 )
280314
281315 return result
@@ -454,7 +488,7 @@ def _cleanup_extremities_bg_update_txn(txn):
454488
455489 if not num_handled :
456490 await self .db_pool .updates ._end_background_update (
457- self .DELETE_SOFT_FAILED_EXTREMITIES
491+ _BackgroundUpdates .DELETE_SOFT_FAILED_EXTREMITIES
458492 )
459493
460494 def _drop_table_txn (txn ):
@@ -1009,3 +1043,75 @@ def purged_chain_cover_txn(txn) -> int:
10091043 await self .db_pool .updates ._end_background_update ("purged_chain_cover" )
10101044
10111045 return result
1046+
1047+ async def _background_populate_stream_ordering2 (
1048+ self , progress : JsonDict , batch_size : int
1049+ ) -> int :
1050+ """Populate events.stream_ordering2, then replace stream_ordering
1051+
1052+ This is to deal with the fact that stream_ordering was initially created as a
1053+ 32-bit integer field.
1054+ """
1055+ batch_size = max (batch_size , 1 )
1056+
1057+ def process (txn : Cursor ) -> int :
1058+ # if this is the first pass, find the minimum stream ordering
1059+ last_stream = progress .get ("last_stream" )
1060+ if last_stream is None :
1061+ txn .execute (
1062+ """
1063+ SELECT stream_ordering FROM events ORDER BY stream_ordering LIMIT 1
1064+ """
1065+ )
1066+ rows = txn .fetchall ()
1067+ if not rows :
1068+ return 0
1069+ last_stream = rows [0 ][0 ] - 1
1070+
1071+ txn .execute (
1072+ """
1073+ UPDATE events SET stream_ordering2=stream_ordering
1074+ WHERE stream_ordering > ? AND stream_ordering <= ?
1075+ """ ,
1076+ (last_stream , last_stream + batch_size ),
1077+ )
1078+ row_count = txn .rowcount
1079+
1080+ self .db_pool .updates ._background_update_progress_txn (
1081+ txn ,
1082+ _BackgroundUpdates .POPULATE_STREAM_ORDERING2 ,
1083+ {"last_stream" : last_stream + batch_size },
1084+ )
1085+ return row_count
1086+
1087+ result = await self .db_pool .runInteraction (
1088+ "_background_populate_stream_ordering2" , process
1089+ )
1090+
1091+ if result != 0 :
1092+ return result
1093+
1094+ await self .db_pool .updates ._end_background_update (
1095+ _BackgroundUpdates .POPULATE_STREAM_ORDERING2
1096+ )
1097+ return 0
1098+
1099+ async def _background_replace_stream_ordering_column (
1100+ self , progress : JsonDict , batch_size : int
1101+ ) -> int :
1102+ """Drop the old 'stream_ordering' column and rename 'stream_ordering2' into its place."""
1103+
1104+ def process (txn : Cursor ) -> None :
1105+ for sql in _REPLACE_STREAM_ORDRING_SQL_COMMANDS :
1106+ logger .info ("completing stream_ordering migration: %s" , sql )
1107+ txn .execute (sql )
1108+
1109+ await self .db_pool .runInteraction (
1110+ "_background_replace_stream_ordering_column" , process
1111+ )
1112+
1113+ await self .db_pool .updates ._end_background_update (
1114+ _BackgroundUpdates .REPLACE_STREAM_ORDERING_COLUMN
1115+ )
1116+
1117+ return 0
0 commit comments