@@ -1055,32 +1055,28 @@ async def _background_populate_stream_ordering2(
10551055 batch_size = max (batch_size , 1 )
10561056
10571057 def process (txn : Cursor ) -> int :
1058- # if this is the first pass, find the minimum stream ordering
1059- last_stream = progress .get ("last_stream" )
1060- if last_stream is None :
1061- txn .execute (
1062- """
1063- SELECT stream_ordering FROM events ORDER BY stream_ordering LIMIT 1
1064- """
1065- )
1066- rows = txn .fetchall ()
1067- if not rows :
1068- return 0
1069- last_stream = rows [0 ][0 ] - 1
1070-
1058+ last_stream = progress .get ("last_stream" , - (1 << 31 ))
10711059 txn .execute (
10721060 """
10731061 UPDATE events SET stream_ordering2=stream_ordering
1074- WHERE stream_ordering > ? AND stream_ordering <= ?
1062+ WHERE stream_ordering IN (
1063+ SELECT stream_ordering FROM events WHERE stream_ordering > ?
1064+ ORDER BY stream_ordering LIMIT ?
1065+ )
1066+ RETURNING stream_ordering;
10751067 """ ,
1076- (last_stream , last_stream + batch_size ),
1068+ (last_stream , batch_size ),
10771069 )
10781070 row_count = txn .rowcount
1071+ if row_count == 0 :
1072+ return 0
1073+ last_stream = max (row [0 ] for row in txn )
1074+ logger .info ("populated stream_ordering2 up to %i" , last_stream )
10791075
10801076 self .db_pool .updates ._background_update_progress_txn (
10811077 txn ,
10821078 _BackgroundUpdates .POPULATE_STREAM_ORDERING2 ,
1083- {"last_stream" : last_stream + batch_size },
1079+ {"last_stream" : last_stream },
10841080 )
10851081 return row_count
10861082
0 commit comments