Skip to content

Commit cba7b9e

Browse files
feat: Chroma Source (#33)
* refactor: Qdrant to Qdrant impl Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: qdrant.PtrOf Signed-off-by: Anush008 <anushshetty90@gmail.com> * Bump golangci/golangci-lint-action from 7 to 8 (#22) Bumps [golangci/golangci-lint-action](https://github.com/golangci/golangci-lint-action) from 7 to 8. - [Release notes](https://github.com/golangci/golangci-lint-action/releases) - [Commits](golangci/golangci-lint-action@v7...v8) --- updated-dependencies: - dependency-name: golangci/golangci-lint-action dependency-version: '8' dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * Bump github.com/grpc-ecosystem/go-grpc-middleware/v2 from 2.3.1 to 2.3.2 (#21) Bumps [github.com/grpc-ecosystem/go-grpc-middleware/v2](https://github.com/grpc-ecosystem/go-grpc-middleware) from 2.3.1 to 2.3.2. - [Release notes](https://github.com/grpc-ecosystem/go-grpc-middleware/releases) - [Commits](grpc-ecosystem/go-grpc-middleware@v2.3.1...v2.3.2) --- updated-dependencies: - dependency-name: github.com/grpc-ecosystem/go-grpc-middleware/v2 dependency-version: 2.3.2 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * chore: DRY Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: Prettify table Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: Nitfix Signed-off-by: Anush008 <anushshetty90@gmail.com> * docs: comment Signed-off-by: Anush008 <anushshetty90@gmail.com> * refactor: Don't change M Signed-off-by: Anush008 <anushshetty90@gmail.com> * refactor: Updated commons.go Signed-off-by: Anush008 <anushshetty90@gmail.com> * feat: Milvus Source Signed-off-by: Anush008 <anushshetty90@gmail.com> * refactor: Use nested flags Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: file renamed Signed-off-by: Anush008 <anushshetty90@gmail.com> * test: Milvus source Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: Formatting Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: Use time.RFC3339 Signed-off-by: Anush008 <anushshetty90@gmail.com> * refactor: Renamed PrepareMigrationOffsetsCollection Signed-off-by: Anush008 <anushshetty90@gmail.com> * DRY Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: Use %q to log withing quotes Signed-off-by: Anush008 <anushshetty90@gmail.com> * refactor: Rename to offsetId Signed-off-by: Anush008 <anushshetty90@gmail.com> * Pinecone Signed-off-by: Anush008 <anushshetty90@gmail.com> * refactor: Pinecone source Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: Lint fixes Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: Vector names Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: Updated docstring Signed-off-by: Anush008 <anushshetty90@gmail.com> * refactor: No index name Signed-off-by: Anush008 <anushshetty90@gmail.com> * refactor: Move flags Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: Set CreateCollection to true by default Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: If ID already a valid UUID, use it directly Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: Adds Chroma source deps and go mod tidy Signed-off-by: Anush008 <anushshetty90@gmail.com> * feat: Chroma source (#34) * feat: Chroma source Signed-off-by: Anush008 <anushshetty90@gmail.com> * refactor: Options parsing Signed-off-by: Anush008 <anushshetty90@gmail.com> * refactor: Options Signed-off-by: Anush008 <anushshetty90@gmail.com> * chore: Update config options Signed-off-by: Anush008 <anushshetty90@gmail.com> --------- Signed-off-by: Anush008 <anushshetty90@gmail.com> --------- Signed-off-by: Anush008 <anushshetty90@gmail.com> Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
1 parent 7303353 commit cba7b9e

File tree

7 files changed

+653
-688
lines changed

7 files changed

+653
-688
lines changed

cmd/migrate_from_chroma.go

Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
package cmd
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"log"
9+
"os"
10+
"os/signal"
11+
"syscall"
12+
"time"
13+
14+
chroma "github.com/amikos-tech/chroma-go/pkg/api/v2"
15+
"github.com/pterm/pterm"
16+
17+
"github.com/qdrant/go-client/qdrant"
18+
19+
"github.com/qdrant/migration/pkg/commons"
20+
)
21+
22+
type MigrateFromChromaCmd struct {
23+
Chroma commons.ChromaConfig `embed:"" prefix:"chroma."`
24+
Qdrant commons.QdrantConfig `embed:"" prefix:"qdrant."`
25+
Migration commons.MigrationConfig `embed:"" prefix:"migration."`
26+
IdField string `prefix:"qdrant." help:"Field storing Chroma IDs in Qdrant." default:"__id__"`
27+
DenseVector string `prefix:"qdrant." help:"Name of the dense vector in Qdrant" default:"dense_vector"`
28+
Distance string `prefix:"qdrant." enum:"cosine,dot,euclid" help:"Distance metric for the Qdrant collection" default:"euclid"`
29+
DocumentField string `prefix:"qdrant." help:"Field storing Chroma documents in Qdrant." default:"document"`
30+
31+
targetHost string
32+
targetPort int
33+
targetTLS bool
34+
}
35+
36+
func (r *MigrateFromChromaCmd) Parse() error {
37+
var err error
38+
r.targetHost, r.targetPort, r.targetTLS, err = parseQdrantUrl(r.Qdrant.Url)
39+
if err != nil {
40+
return fmt.Errorf("failed to parse target URL: %w", err)
41+
}
42+
43+
return nil
44+
}
45+
46+
func (r *MigrateFromChromaCmd) Validate() error {
47+
return validateBatchSize(r.Migration.BatchSize)
48+
}
49+
50+
func (r *MigrateFromChromaCmd) Run(globals *Globals) error {
51+
pterm.DefaultHeader.WithFullWidth().Println("Chroma to Qdrant Data Migration")
52+
53+
err := r.Parse()
54+
if err != nil {
55+
return fmt.Errorf("failed to parse input: %w", err)
56+
}
57+
58+
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
59+
defer stop()
60+
61+
sourceClient, sourceCollection, err := r.connectToChroma(ctx)
62+
if err != nil {
63+
return fmt.Errorf("failed to connect to Chroma source: %w", err)
64+
}
65+
defer sourceCollection.Close()
66+
defer sourceClient.Close()
67+
68+
targetClient, err := connectToQdrant(globals, r.targetHost, r.targetPort, r.Qdrant.APIKey, r.targetTLS)
69+
if err != nil {
70+
return fmt.Errorf("failed to connect to Qdrant target: %w", err)
71+
}
72+
73+
err = commons.PrepareOffsetsCollection(ctx, r.Migration.OffsetsCollection, targetClient)
74+
if err != nil {
75+
return fmt.Errorf("failed to prepare migration marker collection: %w", err)
76+
}
77+
78+
sourcePointCount, err := r.countChromaVectors(ctx, sourceCollection)
79+
if err != nil {
80+
return fmt.Errorf("failed to count points in source: %w", err)
81+
}
82+
83+
err = r.prepareTargetCollection(ctx, sourceCollection, targetClient)
84+
if err != nil {
85+
return fmt.Errorf("error preparing target collection: %w", err)
86+
}
87+
88+
displayMigrationStart("chroma", *r.Chroma.Collection, r.Qdrant.Collection)
89+
90+
err = r.migrateData(ctx, sourceCollection, targetClient, sourcePointCount)
91+
if err != nil {
92+
return fmt.Errorf("failed to migrate data: %w", err)
93+
}
94+
95+
targetPointCount, err := targetClient.Count(ctx, &qdrant.CountPoints{
96+
CollectionName: r.Qdrant.Collection,
97+
Exact: qdrant.PtrOf(true),
98+
})
99+
if err != nil {
100+
return fmt.Errorf("failed to count points in target: %w", err)
101+
}
102+
103+
pterm.Info.Printfln("Target collection has %d points\n", targetPointCount)
104+
105+
return nil
106+
}
107+
108+
func (r *MigrateFromChromaCmd) parseChromaOptions() ([]chroma.ClientOption, error) {
109+
clientOptions := []chroma.ClientOption{chroma.WithBaseURL(*r.Chroma.Url)}
110+
111+
if r.Chroma.Database != nil && r.Chroma.Tenant != nil {
112+
clientOptions = append(clientOptions, chroma.WithDatabaseAndTenant(*r.Chroma.Database, *r.Chroma.Tenant))
113+
} else if r.Chroma.Tenant != nil {
114+
clientOptions = append(clientOptions, chroma.WithTenant(*r.Chroma.Tenant))
115+
}
116+
117+
switch *r.Chroma.AuthType {
118+
case "basic":
119+
if r.Chroma.Username == nil || r.Chroma.Password == nil {
120+
return nil, errors.New("username and password are required for basic authentication")
121+
}
122+
authProvider := chroma.NewBasicAuthCredentialsProvider(*r.Chroma.Username, *r.Chroma.Password)
123+
clientOptions = append(clientOptions, chroma.WithAuth(authProvider))
124+
case "token":
125+
if r.Chroma.Token == nil {
126+
return nil, errors.New("token is required for token authentication")
127+
}
128+
authProvider := chroma.NewTokenAuthCredentialsProvider(*r.Chroma.Token, chroma.TokenTransportHeader(*r.Chroma.TokenHeader))
129+
clientOptions = append(clientOptions, chroma.WithAuth(authProvider))
130+
}
131+
132+
return clientOptions, nil
133+
}
134+
func (r *MigrateFromChromaCmd) connectToChroma(ctx context.Context) (chroma.Client, chroma.Collection, error) {
135+
clientOptions, err := r.parseChromaOptions()
136+
if err != nil {
137+
return nil, nil, fmt.Errorf("failed to get parse Chroma options: %w", err)
138+
}
139+
140+
client, err := chroma.NewHTTPClient(clientOptions...)
141+
if err != nil {
142+
return nil, nil, fmt.Errorf("failed to create Chroma client: %w", err)
143+
}
144+
145+
collection, err := client.GetOrCreateCollection(ctx, *r.Chroma.Collection)
146+
if err != nil {
147+
return nil, nil, fmt.Errorf("failed to get Chroma collection: %w", err)
148+
}
149+
150+
return client, collection, nil
151+
}
152+
153+
func (r *MigrateFromChromaCmd) countChromaVectors(ctx context.Context, collection chroma.Collection) (uint64, error) {
154+
count, err := collection.Count(ctx)
155+
if err != nil {
156+
return 0, fmt.Errorf("failed to get collection count: %w", err)
157+
}
158+
159+
return uint64(count), nil
160+
}
161+
162+
func (r *MigrateFromChromaCmd) prepareTargetCollection(ctx context.Context, collection chroma.Collection, targetClient *qdrant.Client) error {
163+
if !r.Migration.CreateCollection {
164+
return nil
165+
}
166+
167+
targetCollectionExists, err := targetClient.CollectionExists(ctx, r.Qdrant.Collection)
168+
if err != nil {
169+
return fmt.Errorf("failed to check if collection exists: %w", err)
170+
}
171+
172+
if targetCollectionExists {
173+
pterm.Info.Printfln("Target collection '%s' already exists. Skipping creation.", r.Qdrant.Collection)
174+
return nil
175+
}
176+
177+
distanceMapping := map[string]qdrant.Distance{
178+
"euclid": qdrant.Distance_Euclid,
179+
"cosine": qdrant.Distance_Cosine,
180+
"dot": qdrant.Distance_Dot,
181+
}
182+
183+
createReq := &qdrant.CreateCollection{
184+
CollectionName: r.Qdrant.Collection,
185+
VectorsConfig: qdrant.NewVectorsConfigMap(map[string]*qdrant.VectorParams{
186+
r.DenseVector: {
187+
Size: uint64(collection.Dimension()),
188+
Distance: distanceMapping[r.Distance],
189+
},
190+
}),
191+
}
192+
193+
if err := targetClient.CreateCollection(ctx, createReq); err != nil {
194+
return fmt.Errorf("failed to create target collection: %w", err)
195+
}
196+
197+
pterm.Success.Printfln("Created target collection '%s' with dimension", r.Qdrant.Collection)
198+
return nil
199+
}
200+
201+
func (r *MigrateFromChromaCmd) migrateData(ctx context.Context, collection chroma.Collection, targetClient *qdrant.Client, sourcePointCount uint64) error {
202+
startTime := time.Now()
203+
batchSize := r.Migration.BatchSize
204+
205+
var currentOffset uint64 = 0
206+
207+
if !r.Migration.Restart {
208+
_, offsetStored, err := commons.GetStartOffset(ctx, r.Migration.OffsetsCollection, targetClient, *r.Chroma.Collection)
209+
if err != nil {
210+
return fmt.Errorf("failed to get start offset: %w", err)
211+
}
212+
currentOffset = offsetStored
213+
}
214+
215+
bar, _ := pterm.DefaultProgressbar.WithTotal(int(sourcePointCount)).Start()
216+
displayMigrationProgress(bar, currentOffset)
217+
218+
for {
219+
resp, err := collection.Get(
220+
ctx,
221+
chroma.WithLimitGet(int(batchSize)),
222+
chroma.WithOffsetGet(int(currentOffset)),
223+
chroma.WithIncludeGet("metadatas", "documents", "embeddings"),
224+
)
225+
if err != nil {
226+
return fmt.Errorf("failed to get vectors from Chroma: %w", err)
227+
}
228+
229+
count := resp.Count()
230+
if count == 0 {
231+
break
232+
}
233+
234+
targetPoints := make([]*qdrant.PointStruct, 0, count)
235+
ids := resp.GetIDs()
236+
embeddings := resp.GetEmbeddings()
237+
documents := resp.GetDocuments()
238+
239+
// The Chroma Go client's metadata type, `chroma.DocumentMetadatas`` is restrictive.
240+
// So we convert it to a list of generic maps, `[]map[string]any``.
241+
// That is later parse into Qdrant payload with `qdrant.NewValueMap(...)``
242+
metadatas := resp.GetMetadatas()
243+
jsonData, err := json.Marshal(metadatas)
244+
if err != nil {
245+
log.Fatalf("Error marshaling metadata: %v", err)
246+
}
247+
var metadatasGeneric []map[string]any
248+
err = json.Unmarshal(jsonData, &metadatasGeneric)
249+
if err != nil {
250+
log.Fatalf("Error unmarshaling metadata: %v", err)
251+
}
252+
253+
for i := 0; i < count; i++ {
254+
id := ids[i]
255+
embedding := embeddings[i]
256+
metadataValue := metadatasGeneric[i]
257+
258+
point := &qdrant.PointStruct{
259+
Id: arbitraryIDToUUID(string(id)),
260+
}
261+
262+
vectorMap := make(map[string]*qdrant.Vector)
263+
vectorMap[r.DenseVector] = qdrant.NewVectorDense(embedding.ContentAsFloat32())
264+
point.Vectors = qdrant.NewVectorsMap(vectorMap)
265+
266+
payload := qdrant.NewValueMap(metadataValue)
267+
payload[r.IdField] = qdrant.NewValueString(string(id))
268+
269+
if i < len(documents) && documents[i].ContentString() != "" {
270+
payload[r.DocumentField] = qdrant.NewValueString(documents[i].ContentString())
271+
}
272+
273+
point.Payload = payload
274+
275+
targetPoints = append(targetPoints, point)
276+
}
277+
278+
_, err = targetClient.Upsert(ctx, &qdrant.UpsertPoints{
279+
CollectionName: r.Qdrant.Collection,
280+
Points: targetPoints,
281+
Wait: qdrant.PtrOf(true),
282+
})
283+
if err != nil {
284+
return fmt.Errorf("failed to insert data into target: %w", err)
285+
}
286+
287+
currentOffset += uint64(count)
288+
// Just a placeholder ID for offset tracking.
289+
// We're only using the offset count
290+
offsetId := qdrant.NewIDNum(0)
291+
err = commons.StoreStartOffset(ctx, r.Migration.OffsetsCollection, targetClient, *r.Chroma.Collection, offsetId, currentOffset)
292+
if err != nil {
293+
return fmt.Errorf("failed to store offset: %w", err)
294+
}
295+
296+
bar.Add(count)
297+
298+
// If one minute elapsed get updated sourcePointCount
299+
// Useful if any new points were added to the source during migration
300+
if time.Since(startTime) > time.Minute {
301+
sourcePointCount, err = r.countChromaVectors(ctx, collection)
302+
if err != nil {
303+
return fmt.Errorf("failed to count vectors in Chroma: %w", err)
304+
}
305+
bar.Total = int(sourcePointCount)
306+
}
307+
}
308+
309+
pterm.Success.Printfln("Data migration finished successfully")
310+
return nil
311+
}

cmd/migrate_from_pinecone.go

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"syscall"
1010
"time"
1111

12-
"github.com/google/uuid"
1312
"github.com/pinecone-io/go-pinecone/v3/pinecone"
1413
"github.com/pterm/pterm"
1514

@@ -263,7 +262,7 @@ func (r *MigrateFromPineconeCmd) migrateData(ctx context.Context, sourceIndexCon
263262
// Ref: https://qdrant.tech/documentation/concepts/points/#point-ids
264263
// So we create a deterministic UUID based on the original ID.
265264
// A copy of the original ID is stored in the payload.
266-
Id: pineconeIDToUUID(id),
265+
Id: arbitraryIDToUUID(id),
267266
}
268267
vectorMap := make(map[string]*qdrant.Vector)
269268

@@ -327,13 +326,3 @@ func (r *MigrateFromPineconeCmd) migrateData(ctx context.Context, sourceIndexCon
327326
pterm.Success.Printfln("Data migration finished successfully")
328327
return nil
329328
}
330-
331-
func pineconeIDToUUID(id string) *qdrant.PointId {
332-
// If already a valid UUID, return it directly
333-
if _, err := uuid.Parse(id); err == nil {
334-
return qdrant.NewIDUUID(id)
335-
}
336-
337-
deterministicUUID := uuid.NewSHA1(uuid.NameSpaceURL, []byte(id))
338-
return qdrant.NewIDUUID(deterministicUUID.String())
339-
}

cmd/root.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type CLI struct {
2020
Qdrant MigrateFromQdrantCmd `cmd:"" help:"Migrate data from a Qdrant database to Qdrant."`
2121
Milvus MigrateFromMilvusCmd `cmd:"" help:"Migrate data from a Milvus database to Qdrant."`
2222
Pinecone MigrateFromPineconeCmd `cmd:"" help:"Migrate data from a Pinecone database to Qdrant."`
23+
Chroma MigrateFromChromaCmd `cmd:"" help:"Migrate data from a Chroma database to Qdrant."`
2324
}
2425

2526
func Execute(projectVersion, projectBuild string) {

cmd/utils.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"net/url"
88
"strconv"
99

10+
"github.com/google/uuid"
1011
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
1112
"github.com/pterm/pterm"
1213
"google.golang.org/grpc"
@@ -121,3 +122,14 @@ func displayMigrationProgress(bar *pterm.ProgressbarPrinter, offsetCount uint64)
121122
}
122123
fmt.Print("\n")
123124
}
125+
126+
func arbitraryIDToUUID(id string) *qdrant.PointId {
127+
// If already a valid UUID, use it directly
128+
if _, err := uuid.Parse(id); err == nil {
129+
return qdrant.NewIDUUID(id)
130+
}
131+
132+
// Otherwise create a deterministic UUID based on the ID
133+
deterministicUUID := uuid.NewSHA1(uuid.NameSpaceURL, []byte(id))
134+
return qdrant.NewIDUUID(deterministicUUID.String())
135+
}

0 commit comments

Comments
 (0)