@@ -458,9 +458,10 @@ func (r *MigrateFromQdrantCmd) migrateDataParallel(ctx context.Context, sourceCl
458458
459459 var totalProcessed uint64
460460 // If not restarting, load the progress for each range.
461+ // The offset key includes NumWorkers, so changing num-workers will start a fresh migration.
461462 if ! r .Migration .Restart {
462463 for i := range ranges {
463- offsetKey := fmt .Sprintf ("%s-range-%d" , sourceCollection , ranges [i ].id )
464+ offsetKey := fmt .Sprintf ("%s-workers-%d- range-%d" , sourceCollection , r . NumWorkers , ranges [i ].id )
464465 offsetID , count , err := commons .GetStartOffset (ctx , r .Migration .OffsetsCollection , targetClient , offsetKey )
465466 if err != nil {
466467 return fmt .Errorf ("failed to get start offset: %w" , err )
@@ -507,7 +508,7 @@ func (r *MigrateFromQdrantCmd) migrateDataParallel(ctx context.Context, sourceCl
507508// migrateRange is the function executed by each worker in parallel migration.
508509// It scrolls through a specific range of points and upserts them to the target.
509510func (r * MigrateFromQdrantCmd ) migrateRange (ctx context.Context , sourceCollection , targetCollection string , sourceClient , targetClient * qdrant.Client , rg rangeSpec , shardKeys * sync.Map , bar * pterm.ProgressbarPrinter ) error {
510- offsetKey := fmt .Sprintf ("%s-range-%d" , sourceCollection , rg .id )
511+ offsetKey := fmt .Sprintf ("%s-workers-%d- range-%d" , sourceCollection , r . NumWorkers , rg .id )
511512 offset := rg .start
512513 var count uint64
513514
0 commit comments