Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions cmd/migrate_from_chroma.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"os/signal"
"syscall"
"time"

chroma "github.com/amikos-tech/chroma-go/pkg/api/v2"
"github.com/pterm/pterm"
Expand Down Expand Up @@ -269,13 +270,13 @@ func (r *MigrateFromChromaCmd) migrateData(ctx context.Context, collection chrom
targetPoints = append(targetPoints, point)
}

_, err = targetClient.Upsert(ctx, &qdrant.UpsertPoints{
err = upsertWithRetry(ctx, targetClient, &qdrant.UpsertPoints{
CollectionName: r.Qdrant.Collection,
Points: targetPoints,
Wait: qdrant.PtrOf(true),
})
if err != nil {
return fmt.Errorf("failed to insert data into target: %w", err)
return err
}

currentOffset += uint64(count)
Expand All @@ -288,6 +289,11 @@ func (r *MigrateFromChromaCmd) migrateData(ctx context.Context, collection chrom
}

bar.Add(count)

// Apply batch delay if configured (helps with rate limiting)
if r.Migration.BatchDelay > 0 {
time.Sleep(time.Duration(r.Migration.BatchDelay) * time.Millisecond)
}
}

pterm.Success.Printfln("Data migration finished successfully")
Expand Down
10 changes: 8 additions & 2 deletions cmd/migrate_from_elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os/signal"
"strings"
"syscall"
"time"

"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
Expand Down Expand Up @@ -353,13 +354,13 @@ func (r *MigrateFromElasticsearchCmd) migrateData(ctx context.Context, sourceCli
targetPoints = append(targetPoints, point)
}

_, err = targetClient.Upsert(ctx, &qdrant.UpsertPoints{
err = upsertWithRetry(ctx, targetClient, &qdrant.UpsertPoints{
CollectionName: r.Qdrant.Collection,
Points: targetPoints,
Wait: qdrant.PtrOf(true),
})
if err != nil {
return fmt.Errorf("failed to insert data into target: %w", err)
return err
}

offsetCount += uint64(len(targetPoints))
Expand Down Expand Up @@ -391,6 +392,11 @@ func (r *MigrateFromElasticsearchCmd) migrateData(ctx context.Context, sourceCli
}

bar.Add(len(targetPoints))

// Apply batch delay if configured (helps with rate limiting)
if r.Migration.BatchDelay > 0 {
time.Sleep(time.Duration(r.Migration.BatchDelay) * time.Millisecond)
}
}

pterm.Success.Printfln("Data migration finished successfully")
Expand Down
9 changes: 7 additions & 2 deletions cmd/migrate_from_milvus.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/signal"
"strconv"
"syscall"
"time"

"github.com/milvus-io/milvus/client/v2/column"
"github.com/milvus-io/milvus/client/v2/entity"
Expand Down Expand Up @@ -297,13 +298,13 @@ func (r *MigrateFromMilvusCmd) migrateData(ctx context.Context, sourceClient *mi
targetPoints = append(targetPoints, point)
}

_, err = targetClient.Upsert(ctx, &qdrant.UpsertPoints{
err = upsertWithRetry(ctx, targetClient, &qdrant.UpsertPoints{
CollectionName: r.Qdrant.Collection,
Points: targetPoints,
Wait: qdrant.PtrOf(true),
})
if err != nil {
return fmt.Errorf("failed to insert data into target: %w", err)
return err
}

offsetCount += uint64(len(targetPoints))
Expand All @@ -314,6 +315,10 @@ func (r *MigrateFromMilvusCmd) migrateData(ctx context.Context, sourceClient *mi

bar.Add(len(targetPoints))

// Apply batch delay if configured (helps with rate limiting)
if r.Migration.BatchDelay > 0 {
time.Sleep(time.Duration(r.Migration.BatchDelay) * time.Millisecond)
}
}

pterm.Success.Printfln("Data migration finished successfully")
Expand Down
10 changes: 8 additions & 2 deletions cmd/migrate_from_mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/signal"
"slices"
"syscall"
"time"

"github.com/pterm/pterm"
"go.mongodb.org/mongo-driver/v2/bson"
Expand Down Expand Up @@ -222,13 +223,13 @@ func (r *MigrateFromMongoDBCmd) migrateData(ctx context.Context, sourceClient *m
targetPoints = append(targetPoints, point)
}

_, err = targetClient.Upsert(ctx, &qdrant.UpsertPoints{
err = upsertWithRetry(ctx, targetClient, &qdrant.UpsertPoints{
CollectionName: r.Qdrant.Collection,
Points: targetPoints,
Wait: qdrant.PtrOf(true),
})
if err != nil {
return fmt.Errorf("failed to insert data into target: %w", err)
return err
}

offsetCount += uint64(len(targetPoints))
Expand All @@ -240,6 +241,11 @@ func (r *MigrateFromMongoDBCmd) migrateData(ctx context.Context, sourceClient *m

bar.Add(len(targetPoints))
page++

// Apply batch delay if configured (helps with rate limiting)
if r.Migration.BatchDelay > 0 {
time.Sleep(time.Duration(r.Migration.BatchDelay) * time.Millisecond)
}
}

pterm.Success.Printfln("Data migration finished successfully")
Expand Down
10 changes: 8 additions & 2 deletions cmd/migrate_from_opensearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os/signal"
"strings"
"syscall"
"time"

"github.com/opensearch-project/opensearch-go"
"github.com/pterm/pterm"
Expand Down Expand Up @@ -339,13 +340,13 @@ func (r *MigrateFromOpenSearchCmd) migrateData(ctx context.Context, sourceClient
targetPoints = append(targetPoints, point)
}

_, err = targetClient.Upsert(ctx, &qdrant.UpsertPoints{
err = upsertWithRetry(ctx, targetClient, &qdrant.UpsertPoints{
CollectionName: r.Qdrant.Collection,
Points: targetPoints,
Wait: qdrant.PtrOf(true),
})
if err != nil {
return fmt.Errorf("failed to insert data into target: %w", err)
return err
}

offsetCount += uint64(len(targetPoints))
Expand Down Expand Up @@ -374,6 +375,11 @@ func (r *MigrateFromOpenSearchCmd) migrateData(ctx context.Context, sourceClient
}

bar.Add(len(targetPoints))

// Apply batch delay if configured (helps with rate limiting)
if r.Migration.BatchDelay > 0 {
time.Sleep(time.Duration(r.Migration.BatchDelay) * time.Millisecond)
}
}

pterm.Success.Printfln("Data migration finished successfully")
Expand Down
34 changes: 18 additions & 16 deletions cmd/migrate_from_pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,13 @@ func (r *MigrateFromPGCmd) migrateDataSequential(ctx context.Context, sourceConn

targetPoints := r.convertRowsToPoints(batchRows)

_, err = targetClient.Upsert(ctx, &qdrant.UpsertPoints{
err = upsertWithRetry(ctx, targetClient, &qdrant.UpsertPoints{
CollectionName: r.Qdrant.Collection,
Points: targetPoints,
Wait: qdrant.PtrOf(true),
})
if err != nil {
return fmt.Errorf("failed to insert data into target: %w", err)
return err
}

// Just a placeholder ID.
Expand All @@ -316,6 +316,11 @@ func (r *MigrateFromPGCmd) migrateDataSequential(ctx context.Context, sourceConn
}

bar.Add(len(targetPoints))

// Apply batch delay if configured (helps with rate limiting)
if r.Migration.BatchDelay > 0 {
time.Sleep(time.Duration(r.Migration.BatchDelay) * time.Millisecond)
}
}

pterm.Success.Printfln("Data migration finished successfully")
Expand Down Expand Up @@ -553,20 +558,12 @@ func (r *MigrateFromPGCmd) migrateRange(ctx context.Context, pool *pgxpool.Pool,
targetPoints := r.convertRowsToPoints(batchRows)

// Upsert with retries to handle Qdrant's transient consistency errors during parallel writes.
var upsertErr error
for attempt := 0; attempt < MAX_RETRIES; attempt++ {
_, upsertErr = targetClient.Upsert(ctx, &qdrant.UpsertPoints{
CollectionName: r.Qdrant.Collection,
Points: targetPoints,
Wait: qdrant.PtrOf(true),
})
if upsertErr == nil || !strings.Contains(upsertErr.Error(), "Please retry") {
break
}
time.Sleep(time.Duration(attempt+1) * 100 * time.Millisecond)
}
if upsertErr != nil {
return fmt.Errorf("failed to insert data into target: %w", upsertErr)
if err := upsertWithRetry(ctx, targetClient, &qdrant.UpsertPoints{
CollectionName: r.Qdrant.Collection,
Points: targetPoints,
Wait: qdrant.PtrOf(true),
}); err != nil {
return err
}

lastRow := batchRows[len(batchRows)-1]
Expand All @@ -581,6 +578,11 @@ func (r *MigrateFromPGCmd) migrateRange(ctx context.Context, pool *pgxpool.Pool,
if err := commons.StoreStartOffset(ctx, r.Migration.OffsetsCollection, targetClient, offsetKey, qdrant.NewIDUUID(*lastKey), count); err != nil {
return fmt.Errorf("failed to store offset: %w", err)
}

// Apply batch delay if configured (helps with rate limiting)
if r.Migration.BatchDelay > 0 {
time.Sleep(time.Duration(r.Migration.BatchDelay) * time.Millisecond)
}
}

return nil
Expand Down
10 changes: 8 additions & 2 deletions cmd/migrate_from_pinecone.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"os/signal"
"syscall"
"time"

"github.com/pinecone-io/go-pinecone/v3/pinecone"
"github.com/pterm/pterm"
Expand Down Expand Up @@ -280,13 +281,13 @@ func (r *MigrateFromPineconeCmd) migrateData(ctx context.Context, sourceIndexCon
targetPoints = append(targetPoints, point)
}

_, err = targetClient.Upsert(ctx, &qdrant.UpsertPoints{
err = upsertWithRetry(ctx, targetClient, &qdrant.UpsertPoints{
CollectionName: r.Qdrant.Collection,
Points: targetPoints,
Wait: qdrant.PtrOf(true),
})
if err != nil {
return fmt.Errorf("failed to insert data into target: %w", err)
return err
}

if listRes.NextPaginationToken != nil {
Expand All @@ -300,6 +301,11 @@ func (r *MigrateFromPineconeCmd) migrateData(ctx context.Context, sourceIndexCon

bar.Add(len(targetPoints))

// Apply batch delay if configured (helps with rate limiting)
if r.Migration.BatchDelay > 0 {
time.Sleep(time.Duration(r.Migration.BatchDelay) * time.Millisecond)
}

if listRes.NextPaginationToken == nil {
break
}
Expand Down
27 changes: 12 additions & 15 deletions cmd/migrate_from_qdrant.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
)

const (
// MAX_RETRIES is the maximum number of retries for upsert operations on transient errors.
MAX_RETRIES = 3
// SAMPLE_SIZE_PER_WORKER is the number of points to sample per worker to determine ranges for parallel migration.
SAMPLE_SIZE_PER_WORKER = 10
)
Expand Down Expand Up @@ -356,19 +354,8 @@ func (r *MigrateFromQdrantCmd) processBatch(ctx context.Context, points []*qdran
// Specify the shard key for the upsert request.
req.ShardKeySelector = &qdrant.ShardKeySelector{ShardKeys: []*qdrant.ShardKey{shardKeyObjs[key]}}
}
var err error
// Upsert with retries.
// This is to handle Qdrant's transient consistency errors during parallel writes.
for attempt := 0; attempt < MAX_RETRIES; attempt++ {
_, err = targetClient.Upsert(ctx, req)
if err == nil || !strings.Contains(err.Error(), "Please retry") {
break
}
// Exponential backoff for retries.
time.Sleep(time.Duration(attempt+1) * 100 * time.Millisecond)
}
if err != nil {
return fmt.Errorf("failed to insert data into target: %w", err)
if err := upsertWithRetry(ctx, targetClient, req); err != nil {
return err
}
}
return nil
Expand Down Expand Up @@ -428,6 +415,11 @@ func (r *MigrateFromQdrantCmd) migrateDataSequential(ctx context.Context, source
if offset == nil {
break
}

// Apply batch delay if configured (helps with rate limiting)
if r.Migration.BatchDelay > 0 {
time.Sleep(time.Duration(r.Migration.BatchDelay) * time.Millisecond)
}
}

pterm.Success.Printfln("Data migration finished successfully")
Expand Down Expand Up @@ -557,6 +549,11 @@ func (r *MigrateFromQdrantCmd) migrateRange(ctx context.Context, sourceCollectio
if offset == nil || (rg.end != nil && !comparePointIDs(points[len(points)-1].Id, rg.end)) {
break
}

// Apply batch delay if configured (helps with rate limiting)
if r.Migration.BatchDelay > 0 {
time.Sleep(time.Duration(r.Migration.BatchDelay) * time.Millisecond)
}
}
return nil
}
10 changes: 8 additions & 2 deletions cmd/migrate_from_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os/signal"
"strconv"
"syscall"
"time"

"github.com/pterm/pterm"
"github.com/redis/go-redis/v9"
Expand Down Expand Up @@ -197,13 +198,13 @@ func (r *MigrateFromRedisCmd) migrateData(ctx context.Context, rdb *redis.Client
}

if len(targetPoints) > 0 {
_, err = targetClient.Upsert(ctx, &qdrant.UpsertPoints{
err = upsertWithRetry(ctx, targetClient, &qdrant.UpsertPoints{
CollectionName: r.Qdrant.Collection,
Points: targetPoints,
Wait: qdrant.PtrOf(true),
})
if err != nil {
return fmt.Errorf("failed to insert data into target: %w", err)
return err
}
}

Expand All @@ -217,6 +218,11 @@ func (r *MigrateFromRedisCmd) migrateData(ctx context.Context, rdb *redis.Client
}

bar.Add(count)

// Apply batch delay if configured (helps with rate limiting)
if r.Migration.BatchDelay > 0 {
time.Sleep(time.Duration(r.Migration.BatchDelay) * time.Millisecond)
}
}

pterm.Success.Printfln("Data migration finished successfully")
Expand Down
Loading
Loading