@@ -26,23 +26,14 @@ import (
2626// - Grab attachment migration manager and assert it has run upon db startup
2727// - Assert job has written syncInfo metaVersion as expected to the bucket
2828func TestMigrationJobStartOnDbStart (t * testing.T ) {
29- if base .UnitTestUrlIsWalrus () {
30- t .Skip ("rosmar does not support DCP client, pending CBG-4249" )
31- }
29+ base .TestRequiresOneShotDCPClient (t )
3230 rt := rest .NewRestTesterPersistentConfig (t )
3331 defer rt .Close ()
34- ctx := rt .Context ()
35-
36- ds := rt .GetSingleDataStore ()
37- dbCtx := rt .GetDatabase ()
3832
39- mgr := dbCtx .AttachmentMigrationManager
40-
41- // wait for migration job to finish
42- db .RequireBackgroundManagerState (t , ctx , mgr , db .BackgroundProcessStateCompleted )
33+ waitForAttachmentMigrationState (rt , db .BackgroundProcessStateCompleted )
4334
4435 // assert that sync info with metadata version written to the collection
45- db .AssertSyncInfoMetaVersion (t , ds )
36+ db .AssertSyncInfoMetaVersion (t , rt . GetSingleDataStore () )
4637}
4738
4839// TestChangeDbCollectionsRestartMigrationJob:
@@ -55,9 +46,7 @@ func TestMigrationJobStartOnDbStart(t *testing.T) {
5546// to be processed twice in the job, so we can assert that the job has processed more docs than we added
5647// - Assert sync info: metaVersion is written to BOTH collections in the db config
5748func TestChangeDbCollectionsRestartMigrationJob (t * testing.T ) {
58- if base .UnitTestUrlIsWalrus () {
59- t .Skip ("rosmar does not support DCP client, pending CBG-4249" )
60- }
49+ base .TestRequiresOneShotDCPClient (t )
6150 base .TestRequiresCollections (t )
6251 base .RequireNumTestDataStores (t , 2 )
6352
@@ -119,11 +108,9 @@ func TestChangeDbCollectionsRestartMigrationJob(t *testing.T) {
119108 rest .RequireStatus (t , resp , http .StatusCreated )
120109
121110 dbCtx := rt .GetDatabase ()
122- mgr := dbCtx .AttachmentMigrationManager
123111 scNames := base.ScopeAndCollectionNames {base.ScopeAndCollectionName {Scope : scope , Collection : collection1 }}
124112 assert .ElementsMatch (t , scNames , dbCtx .RequireAttachmentMigration )
125- // wait for migration job to start
126- db .RequireBackgroundManagerState (t , ctx , mgr , db .BackgroundProcessStateRunning )
113+ waitForAttachmentMigrationState (rt , db .BackgroundProcessStateRunning )
127114
128115 // update db config to include second collection
129116 dbConfig = rt .NewDbConfig ()
@@ -132,19 +119,11 @@ func TestChangeDbCollectionsRestartMigrationJob(t *testing.T) {
132119 resp = rt .UpsertDbConfig (dbName , dbConfig )
133120 rest .RequireStatus (t , resp , http .StatusCreated )
134121
135- // wait for attachment migration job to start and finish
136122 dbCtx = rt .GetDatabase ()
137- mgr = dbCtx .AttachmentMigrationManager
138123 scNames = append (scNames , base.ScopeAndCollectionName {Scope : scope , Collection : collection2 })
139124 assert .ElementsMatch (t , scNames , dbCtx .RequireAttachmentMigration )
140- db . RequireBackgroundManagerState ( t , ctx , mgr , db .BackgroundProcessStateRunning )
125+ mgrStatus := waitForAttachmentMigrationState ( rt , db .BackgroundProcessStateCompleted )
141126
142- db .RequireBackgroundManagerState (t , ctx , mgr , db .BackgroundProcessStateCompleted )
143-
144- var mgrStatus db.AttachmentMigrationManagerResponse
145- stat , err := mgr .GetStatus (ctx )
146- require .NoError (t , err )
147- require .NoError (t , base .JSONUnmarshal (stat , & mgrStatus ))
148127 // assert that number of docs precessed is greater than the total docs added, this will be because when updating
149128 // the db config to include a new collection this should force reset of DCP checkpoints and start DCP feed from 0 again
150129 assert .Greater (t , mgrStatus .DocsProcessed , int64 (totalDocsAdded ))
@@ -164,9 +143,7 @@ func TestChangeDbCollectionsRestartMigrationJob(t *testing.T) {
164143// after update to db config + assert on collections requiring migration
165144// - Assert that syncInfo: metaVersion is written for new collection (and is still present in original collection)
166145func TestMigrationNewCollectionToDbNoRestart (t * testing.T ) {
167- if base .UnitTestUrlIsWalrus () {
168- t .Skip ("rosmar does not support DCP client, pending CBG-4249" )
169- }
146+ base .TestRequiresOneShotDCPClient (t )
170147 base .TestRequiresCollections (t )
171148 base .RequireNumTestDataStores (t , 2 )
172149 base .SetUpTestLogging (t , base .LevelInfo , base .KeyAll )
@@ -223,17 +200,13 @@ func TestMigrationNewCollectionToDbNoRestart(t *testing.T) {
223200 rest .RequireStatus (t , resp , http .StatusCreated )
224201
225202 dbCtx := rt .GetDatabase ()
226- mgr := dbCtx .AttachmentMigrationManager
227203 assert .Len (t , dbCtx .RequireAttachmentMigration , 1 )
228204 // wait for migration job to finish on single collection
229- db . RequireBackgroundManagerState ( t , ctx , mgr , db .BackgroundProcessStateCompleted )
205+ mgrStatus := waitForAttachmentMigrationState ( rt , db .BackgroundProcessStateCompleted )
230206
231- var mgrStatus db.AttachmentMigrationManagerResponse
232- stat , err := mgr .GetStatus (ctx )
233- require .NoError (t , err )
234- require .NoError (t , base .JSONUnmarshal (stat , & mgrStatus ))
235- // assert that number of docs precessed is equal to docs in collection 1
236- assert .Equal (t , int64 (totalDocsAddedCollOne ), mgrStatus .DocsProcessed )
207+ // assert that number of docs processed is greater or equal to docs in collection 1.
208+ // Without the DCP cleaning of bucket pool, this number would be equal
209+ assert .GreaterOrEqual (t , mgrStatus .DocsProcessed , int64 (totalDocsAddedCollOne ))
237210
238211 // assert sync info meta version exists for this collection
239212 db .AssertSyncInfoMetaVersion (t , ds0 )
@@ -248,18 +221,14 @@ func TestMigrationNewCollectionToDbNoRestart(t *testing.T) {
248221 rest .RequireStatus (t , resp , http .StatusCreated )
249222
250223 dbCtx = rt .GetDatabase ()
251- mgr = dbCtx .AttachmentMigrationManager
252224 assert .Len (t , dbCtx .RequireAttachmentMigration , 1 )
253225 // wait for migration job to finish on the new collection
254- db . RequireBackgroundManagerState ( t , ctx , mgr , db .BackgroundProcessStateCompleted )
226+ mgrStatus = waitForAttachmentMigrationState ( rt , db .BackgroundProcessStateCompleted )
255227
256- mgrStatus = db.AttachmentMigrationManagerResponse {}
257- stat , err = mgr .GetStatus (ctx )
258- require .NoError (t , err )
259- require .NoError (t , base .JSONUnmarshal (stat , & mgrStatus ))
260228 // assert that number of docs precessed is equal to docs in collection 2 (not the total number of docs added across
261229 // the collections, as we'd expect if the process had reset)
262- assert .Equal (t , int64 (totalDocsAddedCollTwo ), mgrStatus .DocsProcessed )
230+ // Without the DCP cleaning of bucket pool, this number would be equal
231+ assert .GreaterOrEqual (t , mgrStatus .DocsProcessed , int64 (totalDocsAddedCollTwo ))
263232
264233 // assert that sync info with metadata version written to both collections
265234 db .AssertSyncInfoMetaVersion (t , ds0 )
@@ -273,9 +242,7 @@ func TestMigrationNewCollectionToDbNoRestart(t *testing.T) {
273242// - Assert that the migration job is not re-run (docs processed is the same as before + collections
274243// requiring migration is empty)
275244func TestMigrationNoReRunStartStopDb (t * testing.T ) {
276- if base .UnitTestUrlIsWalrus () {
277- t .Skip ("rosmar does not support DCP client, pending CBG-4249" )
278- }
245+ base .TestRequiresOneShotDCPClient (t )
279246 base .TestRequiresCollections (t )
280247 base .RequireNumTestDataStores (t , 2 )
281248 base .SetUpTestLogging (t , base .LevelInfo , base .KeyAll )
@@ -326,16 +293,12 @@ func TestMigrationNoReRunStartStopDb(t *testing.T) {
326293
327294 dbCtx := rt .GetDatabase ()
328295 assert .Len (t , dbCtx .RequireAttachmentMigration , 2 )
329- mgr := dbCtx .AttachmentMigrationManager
330296 // wait for migration job to finish on both collections
331- db . RequireBackgroundManagerState ( t , ctx , mgr , db .BackgroundProcessStateCompleted )
297+ postRunStatus := waitForAttachmentMigrationState ( rt , db .BackgroundProcessStateCompleted )
332298
333- var mgrStatus db.AttachmentMigrationManagerResponse
334- stat , err := mgr .GetStatus (ctx )
335- require .NoError (t , err )
336- require .NoError (t , base .JSONUnmarshal (stat , & mgrStatus ))
337- // assert that number of docs precessed is equal to docs in collection 1
338- assert .Equal (t , int64 (totalDocsAdded ), mgrStatus .DocsProcessed )
299+ // assert that number of docs processed is equal to docs in collection 1
300+ // Without the DCP cleaning of bucket pool, this number would be equal
301+ assert .GreaterOrEqual (t , postRunStatus .DocsProcessed , int64 (totalDocsAdded ))
339302
340303 // assert that sync info with metadata version written to both collections
341304 db .AssertSyncInfoMetaVersion (t , ds0 )
@@ -348,15 +311,8 @@ func TestMigrationNoReRunStartStopDb(t *testing.T) {
348311 resp = rt .UpsertDbConfig (dbName , dbConfig )
349312 rest .RequireStatus (t , resp , http .StatusCreated )
350313
351- dbCtx = rt .GetDatabase ()
352- mgr = dbCtx .AttachmentMigrationManager
353- // assert that the job remains in completed state (not restarted)
354- mgrStatus = db.AttachmentMigrationManagerResponse {}
355- stat , err = mgr .GetStatus (ctx )
356- require .NoError (t , err )
357- require .NoError (t , base .JSONUnmarshal (stat , & mgrStatus ))
358- assert .Equal (t , db .BackgroundProcessStateCompleted , mgrStatus .State )
359- assert .Equal (t , int64 (totalDocsAdded ), mgrStatus .DocsProcessed )
314+ postReloadStatus := getAttachmentMigrationManagerStatus (rt )
315+ require .Equal (t , postRunStatus , postReloadStatus )
360316 assert .Len (t , dbCtx .RequireAttachmentMigration , 0 )
361317}
362318
@@ -365,9 +321,7 @@ func TestMigrationNoReRunStartStopDb(t *testing.T) {
365321// - Wait for migration job to start
366322// - Attempt to start job again on manager, assert we get error
367323func TestStartMigrationAlreadyRunningProcess (t * testing.T ) {
368- if base .UnitTestUrlIsWalrus () {
369- t .Skip ("rosmar does not support DCP client, pending CBG-4249" )
370- }
324+ base .TestRequiresOneShotDCPClient (t )
371325 base .TestRequiresCollections (t )
372326 base .RequireNumTestDataStores (t , 1 )
373327 base .SetUpTestLogging (t , base .LevelInfo , base .KeyAll )
@@ -411,12 +365,26 @@ func TestStartMigrationAlreadyRunningProcess(t *testing.T) {
411365 dbConfig .Scopes = scopesConfig
412366 resp := rt .CreateDatabase (dbName , dbConfig )
413367 rest .RequireStatus (t , resp , http .StatusCreated )
414- dbCtx := rt .GetDatabase ()
415- nodeMgr := dbCtx .AttachmentMigrationManager
416368 // wait for migration job to start
417- db . RequireBackgroundManagerState ( t , ctx , nodeMgr , db .BackgroundProcessStateRunning )
369+ waitForAttachmentMigrationState ( rt , db .BackgroundProcessStateRunning )
418370
419- err = nodeMgr .Start (ctx , nil )
371+ err = rt . GetDatabase (). AttachmentMigrationManager .Start (ctx , nil )
420372 assert .Error (t , err )
421373 assert .ErrorContains (t , err , "Process already running" )
422374}
375+
376+ // getAttachmentMigrationManagerStatusMigrationManagerStatus returns the status of the AttachmentMigrationManager for a single database RestTester.
377+ func getAttachmentMigrationManagerStatus (rt * rest.RestTester ) db.AttachmentMigrationManagerResponse {
378+ var mgrStatus db.AttachmentMigrationManagerResponse
379+ stat , err := rt .GetDatabase ().AttachmentMigrationManager .GetStatus (rt .Context ())
380+ require .NoError (rt .TB (), err )
381+ require .NoError (rt .TB (), base .JSONUnmarshal (stat , & mgrStatus ))
382+ return mgrStatus
383+ }
384+
385+ // waitForAttachmentMigrationState waits for the AttachmentMigrationManager to reach the expected state and then returns
386+ // its status.
387+ func waitForAttachmentMigrationState (rt * rest.RestTester , expectedState db.BackgroundProcessState ) db.AttachmentMigrationManagerResponse {
388+ db .RequireBackgroundManagerState (rt .TB (), rt .Context (), rt .GetDatabase ().AttachmentMigrationManager , expectedState )
389+ return getAttachmentMigrationManagerStatus (rt )
390+ }
0 commit comments