Skip to content

Commit 2cf04f1

Browse files
refactor: Don't alter m during upload, move offset management to commons (#24)
* refactor: Qdrant to Qdrant impl Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: qdrant.PtrOf Signed-off-by: Anush008 <anushshetty90@gmail.com> * Bump golangci/golangci-lint-action from 7 to 8 (#22) Bumps [golangci/golangci-lint-action](https://github.com/golangci/golangci-lint-action) from 7 to 8. - [Release notes](https://github.com/golangci/golangci-lint-action/releases) - [Commits](golangci/golangci-lint-action@v7...v8) --- updated-dependencies: - dependency-name: golangci/golangci-lint-action dependency-version: '8' dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Bump github.com/grpc-ecosystem/go-grpc-middleware/v2 from 2.3.1 to 2.3.2 (#21) Bumps [github.com/grpc-ecosystem/go-grpc-middleware/v2](https://github.com/grpc-ecosystem/go-grpc-middleware) from 2.3.1 to 2.3.2. - [Release notes](https://github.com/grpc-ecosystem/go-grpc-middleware/releases) - [Commits](grpc-ecosystem/go-grpc-middleware@v2.3.1...v2.3.2) --- updated-dependencies: - dependency-name: github.com/grpc-ecosystem/go-grpc-middleware/v2 dependency-version: 2.3.2 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * chore: DRY Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: Prettify table Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: Nitfix Signed-off-by: Anush008 <anushshetty90@gmail.com> * docs: comment Signed-off-by: Anush008 <anushshetty90@gmail.com> * refactor: Don't change M Signed-off-by: Anush008 <anushshetty90@gmail.com> * refactor: Updated commons.go Signed-off-by: Anush008 <anushshetty90@gmail.com> --------- Signed-off-by: Anush008 <anushshetty90@gmail.com> Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
1 parent b69c2ac commit 2cf04f1

File tree

2 files changed

+141
-172
lines changed

2 files changed

+141
-172
lines changed

cmd/migrate_from_qdrant.go

Lines changed: 13 additions & 172 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import (
1717
"google.golang.org/grpc"
1818

1919
"github.com/qdrant/go-client/qdrant"
20+
21+
"github.com/qdrant/migration/pkg/commons"
2022
)
2123

2224
const HTTPS = "https"
@@ -124,7 +126,7 @@ func (r *MigrateFromQdrantCmd) Run(globals *Globals) error {
124126
return fmt.Errorf("failed to connect to target: %w", err)
125127
}
126128

127-
err = r.prepareMigrationOffsetsCollection(ctx, targetClient)
129+
err = commons.PrepareMigrationOffsetsCollection(ctx, r.MigrationOffsetsCollectionName, targetClient)
128130
if err != nil {
129131
return fmt.Errorf("failed to prepare migration marker collection: %w", err)
130132
}
@@ -137,7 +139,7 @@ func (r *MigrateFromQdrantCmd) Run(globals *Globals) error {
137139
return fmt.Errorf("failed to count points in source: %w", err)
138140
}
139141

140-
existingM, err := r.perpareTargetCollection(ctx, sourceClient, r.SourceCollection, targetClient, r.TargetCollection)
142+
err = r.perpareTargetCollection(ctx, sourceClient, r.SourceCollection, targetClient, r.TargetCollection)
141143
if err != nil {
142144
return fmt.Errorf("error preparing target collection: %w", err)
143145
}
@@ -171,17 +173,6 @@ func (r *MigrateFromQdrantCmd) Run(globals *Globals) error {
171173
return fmt.Errorf("failed to count points in target: %w", err)
172174
}
173175

174-
// reset m to enable indexing again
175-
err = targetClient.UpdateCollection(ctx, &qdrant.UpdateCollection{
176-
CollectionName: r.TargetCollection,
177-
HnswConfig: &qdrant.HnswConfigDiff{
178-
M: existingM,
179-
},
180-
})
181-
if err != nil {
182-
return fmt.Errorf("failed disable indexing in target collection %w", err)
183-
}
184-
185176
pterm.Info.Printfln("Target collection has %d points\n", targetPointCount)
186177

187178
return nil
@@ -227,35 +218,16 @@ func (r *MigrateFromQdrantCmd) connect(globals *Globals, host string, port int,
227218
return client, nil
228219
}
229220

230-
func (r *MigrateFromQdrantCmd) prepareMigrationOffsetsCollection(ctx context.Context, targetClient *qdrant.Client) error {
231-
migrationOffsetCollectionExists, err := targetClient.CollectionExists(ctx, r.MigrationOffsetsCollectionName)
232-
if err != nil {
233-
return fmt.Errorf("failed to check if collection exists: %w", err)
234-
}
235-
if migrationOffsetCollectionExists {
236-
return nil
237-
}
238-
return targetClient.CreateCollection(ctx, &qdrant.CreateCollection{
239-
CollectionName: r.MigrationOffsetsCollectionName,
240-
ReplicationFactor: qdrant.PtrOf(uint32(1)),
241-
ShardNumber: qdrant.PtrOf(uint32(1)),
242-
VectorsConfig: qdrant.NewVectorsConfigMap(map[string]*qdrant.VectorParams{}),
243-
StrictModeConfig: &qdrant.StrictModeConfig{
244-
Enabled: qdrant.PtrOf(false),
245-
},
246-
})
247-
}
248-
249-
func (r *MigrateFromQdrantCmd) perpareTargetCollection(ctx context.Context, sourceClient *qdrant.Client, sourceCollection string, targetClient *qdrant.Client, targetCollection string) (*uint64, error) {
221+
func (r *MigrateFromQdrantCmd) perpareTargetCollection(ctx context.Context, sourceClient *qdrant.Client, sourceCollection string, targetClient *qdrant.Client, targetCollection string) error {
250222
sourceCollectionInfo, err := sourceClient.GetCollectionInfo(ctx, sourceCollection)
251223
if err != nil {
252-
return nil, fmt.Errorf("failed to get source collection info: %w", err)
224+
return fmt.Errorf("failed to get source collection info: %w", err)
253225
}
254226

255227
if r.CreateTargetCollection {
256228
targetCollectionExists, err := targetClient.CollectionExists(ctx, targetCollection)
257229
if err != nil {
258-
return nil, fmt.Errorf("failed to check if collection exists: %w", err)
230+
return fmt.Errorf("failed to check if collection exists: %w", err)
259231
}
260232

261233
if targetCollectionExists {
@@ -278,42 +250,14 @@ func (r *MigrateFromQdrantCmd) perpareTargetCollection(ctx context.Context, sour
278250
StrictModeConfig: sourceCollectionInfo.Config.GetStrictModeConfig(),
279251
})
280252
if err != nil {
281-
return nil, fmt.Errorf("failed to create target collection: %w", err)
253+
return fmt.Errorf("failed to create target collection: %w", err)
282254
}
283255
}
284256
}
285257

286-
// get current m
287258
targetCollectionInfo, err := targetClient.GetCollectionInfo(ctx, targetCollection)
288259
if err != nil {
289-
return nil, fmt.Errorf("failed to get target collection information: %w", err)
290-
}
291-
existingM := targetCollectionInfo.Config.HnswConfig.M
292-
293-
// set m to 0 to disable indexing
294-
err = targetClient.UpdateCollection(ctx, &qdrant.UpdateCollection{
295-
CollectionName: targetCollection,
296-
HnswConfig: &qdrant.HnswConfigDiff{
297-
M: qdrant.PtrOf(uint64(0)),
298-
},
299-
})
300-
if err != nil {
301-
return nil, fmt.Errorf("failed disable indexing in target collection %w", err)
302-
}
303-
304-
// if m is 0, set it to default after wards
305-
if existingM == nil || *existingM == uint64(0) {
306-
existingM = qdrant.PtrOf(uint64(16))
307-
}
308-
309-
// add payload index for migration marker to source collection
310-
_, err = sourceClient.CreateFieldIndex(ctx, &qdrant.CreateFieldIndexCollection{
311-
CollectionName: sourceCollection,
312-
FieldName: "migrationMarker",
313-
FieldType: qdrant.FieldType_FieldTypeKeyword.Enum(),
314-
})
315-
if err != nil {
316-
return nil, fmt.Errorf("failed creating index on source collection %w", err)
260+
return fmt.Errorf("failed to get target collection information: %w", err)
317261
}
318262

319263
if r.EnsurePayloadIndexes {
@@ -339,12 +283,12 @@ func (r *MigrateFromQdrantCmd) perpareTargetCollection(ctx context.Context, sour
339283
},
340284
)
341285
if err != nil {
342-
return nil, fmt.Errorf("failed creating index on tagrget collection %w", err)
286+
return fmt.Errorf("failed creating index on tagrget collection %w", err)
343287
}
344288
}
345289
}
346290

347-
return existingM, nil
291+
return nil
348292
}
349293

350294
func getFieldType(dataType qdrant.PayloadSchemaType) *qdrant.FieldType {
@@ -372,7 +316,7 @@ func getFieldType(dataType qdrant.PayloadSchemaType) *qdrant.FieldType {
372316
func (r *MigrateFromQdrantCmd) migrateData(ctx context.Context, sourceClient *qdrant.Client, sourceCollection string, targetClient *qdrant.Client, targetCollection string, sourcePointCount uint64) error {
373317
startTime := time.Now()
374318
limit := r.BatchSize
375-
offset, offsetCount, err := r.getStartOffset(ctx, targetClient, sourceCollection)
319+
offset, offsetCount, err := commons.GetStartOffset(ctx, r.MigrationOffsetsCollectionName, targetClient, sourceCollection, r.RestartMigration)
376320
if err != nil {
377321
return fmt.Errorf("failed to get start offset: %w", err)
378322
}
@@ -469,7 +413,7 @@ func (r *MigrateFromQdrantCmd) migrateData(ctx context.Context, sourceClient *qd
469413

470414
offsetCount += uint64(len(points))
471415

472-
err = r.storeStartOffset(ctx, targetClient, sourceCollection, offset, offsetCount)
416+
err = commons.StoreStartOffset(ctx, r.MigrationOffsetsCollectionName, targetClient, sourceCollection, offset, offsetCount)
473417
if err != nil {
474418
return fmt.Errorf("failed to store offset: %w", err)
475419
}
@@ -498,106 +442,3 @@ func (r *MigrateFromQdrantCmd) migrateData(ctx context.Context, sourceClient *qd
498442

499443
return nil
500444
}
501-
502-
func (r *MigrateFromQdrantCmd) getStartOffset(ctx context.Context, targetClient *qdrant.Client, sourceCollection string) (*qdrant.PointId, uint64, error) {
503-
if r.RestartMigration {
504-
return nil, 0, nil
505-
}
506-
point, err := r.getOffsetPoint(ctx, targetClient, sourceCollection)
507-
if err != nil {
508-
return nil, 0, fmt.Errorf("failed to get start offset point: %w", err)
509-
}
510-
if point == nil {
511-
return nil, 0, nil
512-
}
513-
offset, ok := point.Payload[sourceCollection+"_offset"]
514-
if !ok {
515-
return nil, 0, nil
516-
}
517-
offsetCount, ok := point.Payload[sourceCollection+"_offsetCount"]
518-
if !ok {
519-
return nil, 0, nil
520-
}
521-
522-
offsetCountValue, ok := offsetCount.GetKind().(*qdrant.Value_IntegerValue)
523-
if !ok {
524-
return nil, 0, fmt.Errorf("failed to get offset count: %w", err)
525-
}
526-
527-
offsetIntegerValue, ok := offset.GetKind().(*qdrant.Value_IntegerValue)
528-
if ok {
529-
return qdrant.NewIDNum(uint64(offsetIntegerValue.IntegerValue)), uint64(offsetCountValue.IntegerValue), nil
530-
}
531-
532-
offsetStringValue, ok := offset.GetKind().(*qdrant.Value_StringValue)
533-
if ok {
534-
return qdrant.NewIDUUID(offsetStringValue.StringValue), uint64(offsetCountValue.IntegerValue), nil
535-
}
536-
537-
return nil, 0, nil
538-
}
539-
540-
func getPointID(offset *qdrant.PointId) (interface{}, error) {
541-
switch pointID := offset.GetPointIdOptions().(type) {
542-
case *qdrant.PointId_Num:
543-
return pointID.Num, nil
544-
case *qdrant.PointId_Uuid:
545-
return pointID.Uuid, nil
546-
default:
547-
return nil, fmt.Errorf("unsupported offset type: %T", pointID)
548-
}
549-
}
550-
551-
func (r *MigrateFromQdrantCmd) storeStartOffset(ctx context.Context, targetClient *qdrant.Client, sourceCollection string, offset *qdrant.PointId, offsetCount uint64) error {
552-
if offset == nil {
553-
return nil
554-
}
555-
offsetId, err := getPointID(offset)
556-
if err != nil {
557-
return err
558-
}
559-
560-
payload := qdrant.NewValueMap(map[string]any{
561-
sourceCollection + "_offset": offsetId,
562-
sourceCollection + "_offsetCount": offsetCount,
563-
sourceCollection + "_lastUpsertAt": time.Now().Format("2006-01-02 15:04:05"),
564-
})
565-
566-
_, err = targetClient.Upsert(ctx, &qdrant.UpsertPoints{
567-
CollectionName: r.MigrationOffsetsCollectionName,
568-
Points: []*qdrant.PointStruct{
569-
{
570-
Id: r.getOffsetPointId(sourceCollection),
571-
Payload: payload,
572-
Vectors: qdrant.NewVectorsMap(map[string]*qdrant.Vector{}),
573-
},
574-
},
575-
})
576-
577-
if err != nil {
578-
return fmt.Errorf("failed to store offset: %w", err)
579-
}
580-
return nil
581-
}
582-
583-
func (r *MigrateFromQdrantCmd) getOffsetPoint(ctx context.Context, targetClient *qdrant.Client, sourceCollection string) (*qdrant.RetrievedPoint, error) {
584-
points, err := targetClient.Get(ctx, &qdrant.GetPoints{
585-
CollectionName: r.MigrationOffsetsCollectionName,
586-
Ids: []*qdrant.PointId{r.getOffsetPointId(sourceCollection)},
587-
WithPayload: qdrant.NewWithPayload(true),
588-
})
589-
if err != nil {
590-
return nil, fmt.Errorf("failed to get start offset: %w", err)
591-
}
592-
if len(points) == 0 {
593-
return nil, nil
594-
}
595-
596-
return points[0], nil
597-
}
598-
599-
func (r *MigrateFromQdrantCmd) getOffsetPointId(sourceCollection string) *qdrant.PointId {
600-
deterministicUUID := uuid.NewSHA1(uuid.NameSpaceURL, []byte(sourceCollection))
601-
602-
return qdrant.NewIDUUID(deterministicUUID.String())
603-
}

pkg/commons/commons.go

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package commons
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/google/uuid"
9+
10+
"github.com/qdrant/go-client/qdrant"
11+
)
12+
13+
func PrepareMigrationOffsetsCollection(ctx context.Context, migrationOffsetsCollectionName string, targetClient *qdrant.Client) error {
14+
migrationOffsetCollectionExists, err := targetClient.CollectionExists(ctx, migrationOffsetsCollectionName)
15+
if err != nil {
16+
return fmt.Errorf("failed to check if collection exists: %w", err)
17+
}
18+
if migrationOffsetCollectionExists {
19+
return nil
20+
}
21+
return targetClient.CreateCollection(ctx, &qdrant.CreateCollection{
22+
CollectionName: migrationOffsetsCollectionName,
23+
VectorsConfig: qdrant.NewVectorsConfigMap(map[string]*qdrant.VectorParams{}),
24+
})
25+
}
26+
27+
func GetStartOffset(ctx context.Context, migrationOffsetsCollectionName string, targetClient *qdrant.Client, sourceCollection string, restartMigration bool) (*qdrant.PointId, uint64, error) {
28+
if restartMigration {
29+
return nil, 0, nil
30+
}
31+
point, err := getOffsetPoint(ctx, migrationOffsetsCollectionName, targetClient, sourceCollection)
32+
if err != nil {
33+
return nil, 0, fmt.Errorf("failed to get start offset point: %w", err)
34+
}
35+
if point == nil {
36+
return nil, 0, nil
37+
}
38+
offset, ok := point.Payload[sourceCollection+"_offset"]
39+
if !ok {
40+
return nil, 0, nil
41+
}
42+
offsetCount, ok := point.Payload[sourceCollection+"_offsetCount"]
43+
if !ok {
44+
return nil, 0, nil
45+
}
46+
47+
offsetCountValue, ok := offsetCount.GetKind().(*qdrant.Value_IntegerValue)
48+
if !ok {
49+
return nil, 0, fmt.Errorf("failed to get offset count: %w", err)
50+
}
51+
52+
offsetIntegerValue, ok := offset.GetKind().(*qdrant.Value_IntegerValue)
53+
if ok {
54+
return qdrant.NewIDNum(uint64(offsetIntegerValue.IntegerValue)), uint64(offsetCountValue.IntegerValue), nil
55+
}
56+
57+
offsetStringValue, ok := offset.GetKind().(*qdrant.Value_StringValue)
58+
if ok {
59+
return qdrant.NewIDUUID(offsetStringValue.StringValue), uint64(offsetCountValue.IntegerValue), nil
60+
}
61+
62+
return nil, 0, nil
63+
}
64+
65+
func getOffsetIdAsValue(offset *qdrant.PointId) (interface{}, error) {
66+
switch pointID := offset.GetPointIdOptions().(type) {
67+
case *qdrant.PointId_Num:
68+
return pointID.Num, nil
69+
case *qdrant.PointId_Uuid:
70+
return pointID.Uuid, nil
71+
default:
72+
return nil, fmt.Errorf("unsupported offset type: %T", pointID)
73+
}
74+
}
75+
76+
func StoreStartOffset(ctx context.Context, migrationOffsetsCollectionName string, targetClient *qdrant.Client, sourceCollection string, offset *qdrant.PointId, offsetCount uint64) error {
77+
if offset == nil {
78+
return nil
79+
}
80+
offsetId, err := getOffsetIdAsValue(offset)
81+
if err != nil {
82+
return err
83+
}
84+
85+
payload := qdrant.NewValueMap(map[string]any{
86+
sourceCollection + "_offset": offsetId,
87+
sourceCollection + "_offsetCount": offsetCount,
88+
sourceCollection + "_lastUpsertAt": time.Now().Format("2006-01-02 15:04:05"),
89+
})
90+
91+
_, err = targetClient.Upsert(ctx, &qdrant.UpsertPoints{
92+
CollectionName: migrationOffsetsCollectionName,
93+
Points: []*qdrant.PointStruct{
94+
{
95+
Id: getOffsetPointId(sourceCollection),
96+
Payload: payload,
97+
Vectors: qdrant.NewVectorsMap(map[string]*qdrant.Vector{}),
98+
},
99+
},
100+
})
101+
102+
if err != nil {
103+
return fmt.Errorf("failed to store offset: %w", err)
104+
}
105+
return nil
106+
}
107+
108+
func getOffsetPoint(ctx context.Context, migrationOffsetsCollectionName string, targetClient *qdrant.Client, sourceCollection string) (*qdrant.RetrievedPoint, error) {
109+
points, err := targetClient.Get(ctx, &qdrant.GetPoints{
110+
CollectionName: migrationOffsetsCollectionName,
111+
Ids: []*qdrant.PointId{getOffsetPointId(sourceCollection)},
112+
WithPayload: qdrant.NewWithPayload(true),
113+
})
114+
if err != nil {
115+
return nil, fmt.Errorf("failed to get start offset: %w", err)
116+
}
117+
if len(points) == 0 {
118+
return nil, nil
119+
}
120+
121+
return points[0], nil
122+
}
123+
124+
func getOffsetPointId(sourceCollection string) *qdrant.PointId {
125+
deterministicUUID := uuid.NewSHA1(uuid.NameSpaceURL, []byte(sourceCollection))
126+
127+
return qdrant.NewIDUUID(deterministicUUID.String())
128+
}

0 commit comments

Comments
 (0)