@@ -26,22 +26,29 @@ var (
2626)
2727
2828const (
29- IndexStorageName = ".index"
30- delCollName = "deletionIndex"
31- hashCollName = "hashesIndex"
32- newHashKey = "nh"
33- oldHashKey = "oh"
34- statusKey = "s"
35- recordIdKey = "r"
29+ IndexStorageName = ".index"
30+ delCollName = "deletionIndex"
31+ hashCollName = "hashesIndex"
32+ migrationStateCollName = "migrationState"
33+ newHashKey = "nh"
34+ oldHashKey = "oh"
35+ statusKey = "s"
36+ recordIdKey = "r"
37+ diffMigrationKey = "diffState"
38+ diffVersionKey = "diffVersion"
3639)
3740
3841type IndexStorage interface {
3942 UpdateHash (ctx context.Context , update SpaceUpdate ) (err error )
4043 RemoveHash (ctx context.Context , spaceId string ) (err error )
4144 ReadHashes (ctx context.Context , iterFunc func (update SpaceUpdate ) (bool , error )) (err error )
45+ UpdateHashes (ctx context.Context , updateFunc func (spaceId , newHash , oldHash string ) (newNewHash , newOldHash string , shouldUpdate bool )) (err error )
4246 SetSpaceStatus (ctx context.Context , spaceId string , status SpaceStatus , recId string ) (err error )
4347 SpaceStatus (ctx context.Context , spaceId string ) (status SpaceStatus , err error )
4448 LastRecordId (ctx context.Context ) (id string , err error )
49+ GetDiffMigrationVersion (ctx context.Context ) (version int , err error )
50+ SetDiffMigrationVersion (ctx context.Context , version int ) (err error )
51+ RunMigrations (ctx context.Context ) (err error )
4552 Close () (err error )
4653}
4754
@@ -166,6 +173,100 @@ func (d *indexStorage) LastRecordId(ctx context.Context) (id string, err error)
166173 return "" , ErrNoLastRecordId
167174}
168175
176+ func (d * indexStorage ) RunMigrations (ctx context.Context ) (err error ) {
177+ diffMigration , err := newDiffMigration (d , log )
178+ if err != nil {
179+ return fmt .Errorf ("failed to create diff migration: %w" , err )
180+ }
181+
182+ if err := diffMigration .Run (ctx ); err != nil {
183+ return fmt .Errorf ("diff migration failed: %w" , err )
184+ }
185+
186+ return nil
187+ }
188+
189+ func (d * indexStorage ) UpdateHashes (ctx context.Context , updateFunc func (spaceId , newHash , oldHash string ) (newNewHash , newOldHash string , shouldUpdate bool )) (err error ) {
190+ iter , err := d .hashesColl .Find (query.All {}).Iter (ctx )
191+ if err != nil {
192+ return err
193+ }
194+ defer iter .Close ()
195+
196+ tx , err := d .hashesColl .WriteTx (ctx )
197+ if err != nil {
198+ return err
199+ }
200+ defer tx .Rollback ()
201+
202+ for iter .Next () {
203+ doc , err := iter .Doc ()
204+ if err != nil {
205+ return err
206+ }
207+
208+ value := doc .Value ()
209+ spaceId := value .GetString ("id" )
210+ newHash := value .GetString (newHashKey )
211+ oldHash := value .GetString (oldHashKey )
212+
213+ newNewHash , newOldHash , shouldUpdate := updateFunc (spaceId , newHash , oldHash )
214+ if ! shouldUpdate {
215+ continue
216+ }
217+
218+ arena := d .arenaPool .Get ()
219+ updatedDoc := arena .NewObject ()
220+ updatedDoc .Set ("id" , arena .NewString (spaceId ))
221+ updatedDoc .Set (newHashKey , arena .NewString (newNewHash ))
222+ updatedDoc .Set (oldHashKey , arena .NewString (newOldHash ))
223+
224+ err = d .hashesColl .UpsertOne (tx .Context (), updatedDoc )
225+ d .arenaPool .Put (arena )
226+ if err != nil {
227+ return err
228+ }
229+ }
230+
231+ return tx .Commit ()
232+ }
233+
234+ func (d * indexStorage ) GetDiffMigrationVersion (ctx context.Context ) (version int , err error ) {
235+ migrationColl , err := d .db .Collection (ctx , migrationStateCollName )
236+ if err != nil {
237+ return 0 , err
238+ }
239+
240+ doc , err := migrationColl .FindId (ctx , diffMigrationKey )
241+ if err != nil {
242+ if errors .Is (err , anystore .ErrDocNotFound ) {
243+ return 0 , nil
244+ }
245+ return 0 , err
246+ }
247+
248+ return int (doc .Value ().GetFloat64 (diffVersionKey )), nil
249+ }
250+
251+ func (d * indexStorage ) SetDiffMigrationVersion (ctx context.Context , version int ) (err error ) {
252+ migrationColl , err := d .db .Collection (ctx , migrationStateCollName )
253+ if err != nil {
254+ return err
255+ }
256+
257+ mod := query .ModifyFunc (func (a * anyenc.Arena , v * anyenc.Value ) (result * anyenc.Value , modified bool , err error ) {
258+ if v == nil {
259+ v = a .NewObject ()
260+ v .Set ("id" , a .NewString (diffMigrationKey ))
261+ }
262+ v .Set (diffVersionKey , a .NewNumberFloat64 (float64 (version )))
263+ return v , true , nil
264+ })
265+
266+ _ , err = migrationColl .UpsertId (ctx , diffMigrationKey , mod )
267+ return err
268+ }
269+
169270func (d * indexStorage ) Close () (err error ) {
170271 return d .db .Close ()
171272}
0 commit comments