diff --git a/go.mod b/go.mod index 47bffb9dc4b18..04d7491df4cb4 100644 --- a/go.mod +++ b/go.mod @@ -92,6 +92,7 @@ require ( github.com/tidwall/pretty v1.2.1 github.com/tmc/langchaingo v0.1.13 github.com/unum-cloud/usearch/golang v0.0.0-20251010193336-541e882da5a9 + github.com/viterin/partial v1.1.0 go.starlark.net v0.0.0-20250701195324-d457b4515e0e go.uber.org/automaxprocs v1.5.3 go.uber.org/ratelimit v0.2.0 @@ -242,6 +243,7 @@ replace ( github.com/lni/dragonboat/v4 v4.0.0-20220815145555-6f622e8bcbef => github.com/matrixorigin/dragonboat/v4 v4.0.0-20251203123148-9dcde946363f github.com/lni/goutils v1.3.1-0.20220604063047-388d67b4dbc4 => github.com/matrixorigin/goutils v1.3.1-0.20220604063047-388d67b4dbc4 github.com/lni/vfs v0.2.1-0.20220616104132-8852fd867376 => github.com/matrixorigin/vfs v0.2.1-0.20220616104132-8852fd867376 + github.com/unum-cloud/usearch/golang v0.0.0-20251010193336-541e882da5a9 => github.com/cpegeric/usearch/golang v0.0.0-20251212130039-afde3fa5e527 ) replace github.com/shoenig/go-m1cpu => github.com/shoenig/go-m1cpu v0.1.7 diff --git a/go.sum b/go.sum index a371965827208..c597294dbb33e 100644 --- a/go.sum +++ b/go.sum @@ -195,6 +195,8 @@ github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzA github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpegeric/pdftotext-go v0.0.0-20241112123704-49cb86a3790e h1:tQSCiEjYPRU+AuuVR+zd+xYVOsEqX1clPhmIAM6FCHU= github.com/cpegeric/pdftotext-go v0.0.0-20241112123704-49cb86a3790e/go.mod h1:zt7uTOYu0EEeKatGaTi9JiP0I9ePHpDvjAwpfPXh/N0= +github.com/cpegeric/usearch/golang v0.0.0-20251212130039-afde3fa5e527 h1:A8qDoMLqBZRv5Fi7RC+KA9hMngZAIK4cSuL1ME3Jy7w= +github.com/cpegeric/usearch/golang v0.0.0-20251212130039-afde3fa5e527/go.mod h1:3SN8SakyyBWzb14DNZn4t5yX8dOa7ae45KpqDioi4RA= github.com/cpuguy83/dockercfg v0.3.1 h1:/FpZ+JaygUR/lZP2NlFI2DVfrOEMAIKP5wWEJdoYe9E= github.com/cpuguy83/dockercfg v0.3.1/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= @@ -847,8 +849,6 @@ github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGr github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= -github.com/unum-cloud/usearch/golang v0.0.0-20251010193336-541e882da5a9 h1:JrHCee+uqpF2zXooiKu7ymvKgnzlUIXtTlZ7vi21Tr0= -github.com/unum-cloud/usearch/golang v0.0.0-20251010193336-541e882da5a9/go.mod h1:NxBpQibuBBeA/V8RGbrNzVAv4OyWWL5yNao7mVz656k= github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.6.0/go.mod h1:FstJa9V+Pj9vQ7OJie2qMHdwemEDaDiSdBnvPM1Su9w= @@ -861,6 +861,8 @@ github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tz github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8= github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM= +github.com/viterin/partial v1.1.0 h1:iH1l1xqBlapXsYzADS1dcbizg3iQUKTU1rbwkHv/80E= +github.com/viterin/partial v1.1.0/go.mod h1:oKGAo7/wylWkJTLrWX8n+f4aDPtQMQ6VG4dd2qur5QA= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo= github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= diff --git a/pkg/container/types/types.go b/pkg/container/types/types.go index 275cc93cfa25a..d1379969157eb 100644 --- a/pkg/container/types/types.go +++ b/pkg/container/types/types.go @@ -565,6 +565,16 @@ func (t Type) DescString() string { return t.Oid.String() } +func (t Type) GetArrayElementSize() int { + switch t.Oid { + case T_array_float32: + return 4 + case T_array_float64: + return 8 + } + panic(moerr.NewInternalErrorNoCtx(fmt.Sprintf("unknown array type %d", t))) +} + func (t Type) Eq(b Type) bool { switch t.Oid { // XXX need to find out why these types have different size/width diff --git a/pkg/sql/colexec/productl2/product_l2.go b/pkg/sql/colexec/productl2/product_l2.go index ad40bdf674c4f..c9038cf2ae3d1 100644 --- a/pkg/sql/colexec/productl2/product_l2.go +++ b/pkg/sql/colexec/productl2/product_l2.go @@ -18,14 +18,17 @@ import ( "bytes" "runtime" "strings" - "sync" "time" "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" + "github.com/matrixorigin/matrixone/pkg/vectorindex" + "github.com/matrixorigin/matrixone/pkg/vectorindex/brute_force" + "github.com/matrixorigin/matrixone/pkg/vectorindex/cache" "github.com/matrixorigin/matrixone/pkg/vectorindex/metric" + "github.com/matrixorigin/matrixone/pkg/vectorindex/sqlexec" "github.com/matrixorigin/matrixone/pkg/vm" "github.com/matrixorigin/matrixone/pkg/vm/message" "github.com/matrixorigin/matrixone/pkg/vm/process" @@ -65,6 +68,7 @@ func (productl2 *Productl2) Call(proc *process.Process) (vm.CallResult, error) { ctr := &ap.ctr result := vm.NewCallResult() var err error + for { switch ctr.state { case Build: @@ -120,6 +124,47 @@ func (productl2 *Productl2) Call(proc *process.Process) (vm.CallResult, error) { return result, nil } } + +} + +func NewNullVector[T types.RealNumbers](dim int32) []T { + // null vector with magnitude 1 + nullvec := make([]T, dim) + nullvec[0] = 1 + return nullvec +} + +func getIndex[T types.RealNumbers](ap *Productl2, proc *process.Process, analyzer process.Analyzer) (cache.VectorIndexSearchIf, error) { + ctr := &ap.ctr + buildCount := ctr.bat.RowCount() + centroidColPos := ap.OnExpr.GetF().GetArgs()[0].GetCol().GetColPos() + + dim := ctr.bat.Vecs[centroidColPos].GetType().Width + elemSize := uint(ctr.bat.Vecs[centroidColPos].GetType().GetArrayElementSize()) + centers := make([][]T, buildCount) + nullvec := NewNullVector[T](dim) + + for i := 0; i < buildCount; i++ { + if ctr.bat.Vecs[centroidColPos].IsNull(uint64(i)) { + centers[i] = nullvec + continue + } + + c := types.BytesToArray[T](ctr.bat.Vecs[centroidColPos].GetBytesAt(i)) + centers[i] = c + } + + algo, err := brute_force.NewBruteForceIndex[T](centers, uint(dim), ctr.metrictype, elemSize) + if err != nil { + return nil, err + } + + err = algo.Load(sqlexec.NewSqlProcess(proc)) + if err != nil { + return nil, err + } + + return algo, nil } func (productl2 *Productl2) build(proc *process.Process, analyzer process.Analyzer) error { @@ -142,6 +187,21 @@ func (productl2 *Productl2) build(proc *process.Process, analyzer process.Analyz } } mp.Free() + + centroidColPos := productl2.OnExpr.GetF().GetArgs()[0].GetCol().GetColPos() + switch ctr.bat.Vecs[centroidColPos].GetType().Oid { + case types.T_array_float32: + ctr.brute_force, err = getIndex[float32](productl2, proc, analyzer) + if err != nil { + return err + } + case types.T_array_float64: + ctr.brute_force, err = getIndex[float64](productl2, proc, analyzer) + if err != nil { + return err + } + } + return nil } @@ -160,56 +220,33 @@ func (productl2 *Productl2) build(proc *process.Process, analyzer process.Analyz // } //) -func newMat[T types.RealNumbers](ctr *container, ap *Productl2) ([][]T, [][]T) { - buildCount := ctr.bat.RowCount() +func newMat[T types.RealNumbers](ctr *container, ap *Productl2) ([][]T, error) { probeCount := ctr.inBat.RowCount() - centroidColPos := ap.OnExpr.GetF().GetArgs()[0].GetCol().GetColPos() tblColPos := ap.OnExpr.GetF().GetArgs()[1].GetCol().GetColPos() - centroidmat := make([][]T, buildCount) - for i := 0; i < buildCount; i++ { - switch ctr.bat.Vecs[centroidColPos].GetType().Oid { - case types.T_array_float32: - if ctr.bat.Vecs[centroidColPos].IsNull(uint64(i)) { - centroidmat[i] = nil - continue - } - centroidmat[i] = types.BytesToArray[T](ctr.bat.Vecs[centroidColPos].GetBytesAt(i)) - case types.T_array_float64: - if ctr.bat.Vecs[centroidColPos].IsNull(uint64(i)) { - centroidmat[i] = nil - continue - } - centroidmat[i] = types.BytesToArray[T](ctr.bat.Vecs[centroidColPos].GetBytesAt(i)) - } - } + + // dimension can only get from centroid column. probe column input values can be null and dimension is 0. + centroidColPos := ap.OnExpr.GetF().GetArgs()[0].GetCol().GetColPos() + dim := ctr.bat.Vecs[centroidColPos].GetType().Width + nullvec := NewNullVector[T](dim) // embedding mat - embedmat := make([][]T, probeCount) + probes := make([][]T, probeCount) for j := 0; j < probeCount; j++ { - switch ctr.bat.Vecs[centroidColPos].GetType().Oid { - case types.T_array_float32: - if ctr.inBat.Vecs[tblColPos].IsNull(uint64(j)) { - embedmat[j] = nil - continue - } - embedmat[j] = types.BytesToArray[T](ctr.inBat.Vecs[tblColPos].GetBytesAt(j)) - case types.T_array_float64: - if ctr.inBat.Vecs[tblColPos].IsNull(uint64(j)) { - embedmat[j] = nil - continue - } - embedmat[j] = types.BytesToArray[T](ctr.inBat.Vecs[tblColPos].GetBytesAt(j)) + if ctr.inBat.Vecs[tblColPos].IsNull(uint64(j)) { + probes[j] = nullvec + continue } - + v := types.BytesToArray[T](ctr.inBat.Vecs[tblColPos].GetBytesAt(j)) + probes[j] = v } - return centroidmat, embedmat + return probes, nil } func (ctr *container) probe(ap *Productl2, proc *process.Process, result *vm.CallResult) error { - centroidColPos := ap.OnExpr.GetF().GetArgs()[0].GetCol().GetColPos() - switch ctr.bat.Vecs[centroidColPos].GetType().Oid { + tblColPos := ap.OnExpr.GetF().GetArgs()[1].GetCol().GetColPos() + switch ctr.inBat.Vecs[tblColPos].GetType().Oid { case types.T_array_float32: return probeRun[float32](ctr, ap, proc, result) case types.T_array_float64: @@ -218,9 +255,16 @@ func (ctr *container) probe(ap *Productl2, proc *process.Process, result *vm.Cal return nil } +func (ctr *container) release() { + if ctr.brute_force != nil { + ctr.brute_force.Destroy() + ctr.brute_force = nil + } +} + func probeRun[T types.RealNumbers](ctr *container, ap *Productl2, proc *process.Process, result *vm.CallResult) error { - buildCount := ctr.bat.RowCount() probeCount := ctr.inBat.RowCount() + tblColPos := ap.OnExpr.GetF().GetArgs()[1].GetCol().GetColPos() ncpu := runtime.NumCPU() if probeCount < ncpu { @@ -234,85 +278,40 @@ func probeRun[T types.RealNumbers](ctr *container, ap *Productl2, proc *process. } } - leastClusterIndex := make([]int, probeCount) - leastDistance := make([]T, probeCount) - - for i := 0; i < probeCount; i++ { - leastClusterIndex[i] = 0 - leastDistance[i] = metric.MaxFloat[T]() - } - - centroidmat, embedmat := newMat[T](ctr, ap) - distfn, err := metric.ResolveDistanceFn[T](ctr.metrictype) + probes, err := newMat[T](ctr, ap) if err != nil { - return moerr.NewInternalError(proc.Ctx, "ProductL2: failed to get distance function") + return err } - errs := make(chan error, ncpu) - var mutex sync.Mutex - var wg sync.WaitGroup + rt := vectorindex.RuntimeConfig{Limit: 1, NThreads: uint(ncpu)} - for n := 0; n < ncpu; n++ { + anykeys, distances, err := ctr.brute_force.Search(sqlexec.NewSqlProcess(proc), probes, rt) + if err != nil { + return err + } + _ = distances - wg.Add(1) - go func(tid int) { - defer wg.Done() - for j := 0; j < probeCount; j++ { + leastClusterIndex := anykeys.([]int64) - if j%ncpu != tid { - continue - } + //os.Stderr.WriteString(fmt.Sprintf("keys %v\n", keys)) + //os.Stderr.WriteString(fmt.Sprintf("distances %v\n", distances)) - // for each row in probe table, - // find the nearest cluster center from the build table. - for i := 0; i < buildCount; i++ { + for j := 0; j < probeCount; j++ { - if embedmat[j] == nil || centroidmat[i] == nil { - leastDistance[j] = 0 - leastClusterIndex[j] = i - } else { - dist, err := distfn(centroidmat[i], embedmat[j]) - if err != nil { - errs <- err - return - } - if dist < leastDistance[j] { - leastDistance[j] = dist - leastClusterIndex[j] = i - } - } + if ctr.inBat.Vecs[tblColPos].IsNull(uint64(j)) { + leastClusterIndex[j] = 0 + } + for k, rp := range ap.Result { + if rp.Rel == 0 { + if err := ctr.rbat.Vecs[k].UnionOne(ctr.inBat.Vecs[rp.Pos], int64(j), proc.Mp()); err != nil { + return err } - - err := func() error { - mutex.Lock() - defer mutex.Unlock() - for k, rp := range ap.Result { - if rp.Rel == 0 { - if err := ctr.rbat.Vecs[k].UnionOne(ctr.inBat.Vecs[rp.Pos], int64(j), proc.Mp()); err != nil { - return err - } - } else { - if err := ctr.rbat.Vecs[k].UnionOne(ctr.bat.Vecs[rp.Pos], int64(leastClusterIndex[j]), proc.Mp()); err != nil { - return err - } - } - } - - return nil - }() - - if err != nil { - errs <- err - return + } else { + if err := ctr.rbat.Vecs[k].UnionOne(ctr.bat.Vecs[rp.Pos], int64(leastClusterIndex[j]), proc.Mp()); err != nil { + return err } } - }(n) - } - - wg.Wait() - - if len(errs) > 0 { - return <-errs + } } // ctr.rbat.AddRowCount(count * count2) diff --git a/pkg/sql/colexec/productl2/types.go b/pkg/sql/colexec/productl2/types.go index 621fb5ce7b558..6effcf0a7d824 100644 --- a/pkg/sql/colexec/productl2/types.go +++ b/pkg/sql/colexec/productl2/types.go @@ -20,7 +20,8 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/batch" "github.com/matrixorigin/matrixone/pkg/pb/plan" "github.com/matrixorigin/matrixone/pkg/sql/colexec" - vmetric "github.com/matrixorigin/matrixone/pkg/vectorindex/metric" + "github.com/matrixorigin/matrixone/pkg/vectorindex/cache" + "github.com/matrixorigin/matrixone/pkg/vectorindex/metric" "github.com/matrixorigin/matrixone/pkg/vm" "github.com/matrixorigin/matrixone/pkg/vm/process" ) @@ -34,11 +35,12 @@ const ( ) type container struct { - state int - bat *batch.Batch // build batch - rbat *batch.Batch - inBat *batch.Batch // probe batch - metrictype vmetric.MetricType + state int + bat *batch.Batch // build batch + rbat *batch.Batch + inBat *batch.Batch // probe batch + metrictype metric.MetricType + brute_force cache.VectorIndexSearchIf // brute_force.BruteForceIndex } type Productl2 struct { @@ -89,11 +91,13 @@ func (productl2 *Productl2) Reset(proc *process.Process, pipelineFailed bool, er if productl2.ctr.rbat != nil { productl2.ctr.rbat.CleanOnlyData() } + productl2.ctr.release() productl2.ctr.inBat = nil productl2.ctr.state = Build } func (productl2 *Productl2) Free(proc *process.Process, pipelineFailed bool, err error) { + productl2.ctr.release() productl2.ctr.cleanBatch(proc.Mp()) } diff --git a/pkg/vectorindex/brute_force/brute_force.go b/pkg/vectorindex/brute_force/brute_force.go new file mode 100644 index 0000000000000..415d020e83c78 --- /dev/null +++ b/pkg/vectorindex/brute_force/brute_force.go @@ -0,0 +1,295 @@ +// Copyright 2022 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package brute_force + +import ( + "fmt" + "runtime" + "slices" + "sync" + + "github.com/matrixorigin/matrixone/pkg/common/moerr" + "github.com/matrixorigin/matrixone/pkg/common/util" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/vectorindex" + "github.com/matrixorigin/matrixone/pkg/vectorindex/cache" + "github.com/matrixorigin/matrixone/pkg/vectorindex/metric" + "github.com/matrixorigin/matrixone/pkg/vectorindex/sqlexec" + usearch "github.com/unum-cloud/usearch/golang" + "github.com/viterin/partial" +) + +type UsearchBruteForceIndex[T types.RealNumbers] struct { + Dataset []T // flattend vector + Metric usearch.Metric + Dimension uint + Count uint + Quantization usearch.Quantization + ElementSize uint +} + +type GoBruteForceIndex[T types.RealNumbers] struct { + Dataset [][]T // flattend vector + Metric metric.MetricType + Dimension uint + Count uint +} + +var _ cache.VectorIndexSearchIf = &UsearchBruteForceIndex[float32]{} +var _ cache.VectorIndexSearchIf = &GoBruteForceIndex[float32]{} + +func GetUsearchQuantizationFromType(v any) (usearch.Quantization, error) { + switch v.(type) { + case float32: + return usearch.F32, nil + case float64: + return usearch.F64, nil + default: + return 0, moerr.NewInternalErrorNoCtx(fmt.Sprintf("usearch not support type %T", v)) + } +} + +func NewCpuBruteForceIndex[T types.RealNumbers](dataset [][]T, + dimension uint, + m metric.MetricType, + elemsz uint) (cache.VectorIndexSearchIf, error) { + + switch m { + case metric.Metric_L1Distance: + return NewGoBruteForceIndex(dataset, dimension, m, elemsz) + default: + return NewUsearchBruteForceIndex(dataset, dimension, m, elemsz) + } +} + +func NewGoBruteForceIndex[T types.RealNumbers](dataset [][]T, + dimension uint, + m metric.MetricType, + elemsz uint) (cache.VectorIndexSearchIf, error) { + + idx := &GoBruteForceIndex[T]{} + idx.Metric = m + idx.Dimension = dimension + idx.Count = uint(len(dataset)) + idx.Dataset = dataset + return idx, nil +} + +func NewUsearchBruteForceIndex[T types.RealNumbers](dataset [][]T, + dimension uint, + m metric.MetricType, + elemsz uint) (cache.VectorIndexSearchIf, error) { + var err error + + idx := &UsearchBruteForceIndex[T]{} + idx.Metric = metric.MetricTypeToUsearchMetric[m] + idx.Quantization, err = GetUsearchQuantizationFromType(T(0)) + if err != nil { + return nil, err + } + idx.Dimension = dimension + idx.Count = uint(len(dataset)) + idx.ElementSize = elemsz + + idx.Dataset = make([]T, idx.Count*idx.Dimension) + for i := 0; i < len(dataset); i++ { + offset := i * int(dimension) + copy(idx.Dataset[offset:], dataset[i]) + } + + return idx, nil +} + +func (idx *UsearchBruteForceIndex[T]) Load(sqlproc *sqlexec.SqlProcess) error { + return nil +} + +func (idx *UsearchBruteForceIndex[T]) Search(proc *sqlexec.SqlProcess, _queries any, rt vectorindex.RuntimeConfig) (keys any, distances []float64, err error) { + queries, ok := _queries.([][]T) + if !ok { + return nil, nil, moerr.NewInternalErrorNoCtx("queries type invalid") + } + + var flatten []T + if len(queries) == 1 { + flatten = queries[0] + } else { + flatten = make([]T, len(queries)*int(idx.Dimension)) + for i := 0; i < len(queries); i++ { + offset := i * int(idx.Dimension) + copy(flatten[offset:], queries[i]) + } + } + //fmt.Printf("flattened %v\n", flatten) + + // limit must be less than idx.Count + limit := rt.Limit + if limit > idx.Count { + limit = idx.Count + } + + keys_ui64, distances_f32, err := usearch.ExactSearchUnsafe( + util.UnsafePointer(&(idx.Dataset[0])), + util.UnsafePointer(&(flatten[0])), + uint(idx.Count), + uint(len(queries)), + idx.Dimension*idx.ElementSize, + idx.Dimension*idx.ElementSize, + idx.Dimension, + idx.Metric, + idx.Quantization, + limit, + rt.NThreads) + + if err != nil { + return + } + + distances = make([]float64, len(distances_f32)) + for i, dist := range distances_f32 { + distances[i] = float64(dist) + } + + keys_i64 := make([]int64, len(keys_ui64)) + for i, key := range keys_ui64 { + keys_i64[i] = int64(key) + } + keys = keys_i64 + + runtime.KeepAlive(flatten) + runtime.KeepAlive(idx.Dataset) + return +} + +func (idx *UsearchBruteForceIndex[T]) UpdateConfig(sif cache.VectorIndexSearchIf) error { + return nil +} + +func (idx *UsearchBruteForceIndex[T]) Destroy() { +} + +func (idx *GoBruteForceIndex[T]) Load(sqlproc *sqlexec.SqlProcess) error { + return nil +} + +func (idx *GoBruteForceIndex[T]) UpdateConfig(sif cache.VectorIndexSearchIf) error { + return nil +} + +func (idx *GoBruteForceIndex[T]) Destroy() { +} + +func (idx *GoBruteForceIndex[T]) Search(proc *sqlexec.SqlProcess, _queries any, rt vectorindex.RuntimeConfig) (keys any, distances []float64, err error) { + queries, ok := _queries.([][]T) + if !ok { + return nil, nil, moerr.NewInternalErrorNoCtx("queries type invalid") + } + + errs := make(chan error) + distfn, err := metric.ResolveDistanceFn[T](idx.Metric) + if err != nil { + return nil, nil, err + } + + nthreads := rt.NThreads + if nthreads == 0 { + runtime.NumCPU() + } + + // datasize * nqueries + + nqueries := len(queries) + ndataset := len(idx.Dataset) + + // create distance matric + results := make([]vectorindex.SearchResult, nqueries*ndataset) + + var wg sync.WaitGroup + for n := 0; n < int(nthreads); n++ { + + wg.Add(1) + go func(tid int) { + defer wg.Done() + for i := 0; i < nqueries; i++ { + if i%int(nthreads) != tid { + continue + } + for j := 0; j < ndataset; j++ { + dist, err := distfn(queries[i], idx.Dataset[j]) + if err != nil { + errs <- err + return + } + results[i*ndataset+j].Id = int64(j) + results[i*ndataset+j].Distance = float64(dist) + } + } + + }(n) + } + + wg.Wait() + + if len(errs) > 0 { + return nil, nil, <-errs + } + + cmpfn := func(a, b vectorindex.SearchResult) int { + if a.Distance < b.Distance { + return -1 + } else if a.Distance == b.Distance { + return 0 + } + return 1 + } + + // get min + keys64 := make([]int64, nqueries*int(rt.Limit)) + distances = make([]float64, nqueries*int(rt.Limit)) + var wg2 sync.WaitGroup + for n := 0; n < int(nthreads); n++ { + wg2.Add(1) + go func(tid int) { + defer wg2.Done() + for i := 0; i < nqueries; i++ { + if i%int(nthreads) != tid { + continue + } + + if rt.Limit == 1 { + // min + first := slices.MinFunc(results[i*ndataset:(i+1)*ndataset], cmpfn) + results[i*ndataset] = first + + } else { + // partial sort + partial.SortFunc(results[i*ndataset:(i+1)*ndataset], int(rt.Limit), cmpfn) + + } + } + }(n) + } + + wg2.Wait() + + for i := 0; i < nqueries; i++ { + for j := 0; j < int(rt.Limit); j++ { + keys64[i*int(rt.Limit)+j] = results[i*ndataset+j].Id + distances[i*int(rt.Limit)+j] = results[i*ndataset+j].Distance + } + } + + return keys64, distances, nil +} diff --git a/pkg/vectorindex/brute_force/brute_force_test.go b/pkg/vectorindex/brute_force/brute_force_test.go new file mode 100644 index 0000000000000..9c7677e68d10f --- /dev/null +++ b/pkg/vectorindex/brute_force/brute_force_test.go @@ -0,0 +1,66 @@ +//go:build !gpu + +// Copyright 2022 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package brute_force + +import ( + "fmt" + "testing" + + "github.com/matrixorigin/matrixone/pkg/vectorindex" + "github.com/matrixorigin/matrixone/pkg/vectorindex/metric" + "github.com/stretchr/testify/require" +) + +func TestBruteForce(t *testing.T) { + + dataset := [][]float32{{1, 2, 3}, {3, 4, 5}} + query := [][]float32{{1, 2, 3}, {3, 4, 5}} + dimension := uint(3) + ncpu := uint(1) + limit := uint(2) + elemsz := uint(4) // float32 + + idx, err := NewUsearchBruteForceIndex[float32](dataset, dimension, metric.Metric_L2sqDistance, elemsz) + require.NoError(t, err) + + rt := vectorindex.RuntimeConfig{Limit: limit, NThreads: ncpu} + + keys, distances, err := idx.Search(nil, query, rt) + require.NoError(t, err) + fmt.Printf("keys %v, dist %v\n", keys, distances) + +} + +func TestGoBruteForce(t *testing.T) { + + dataset := [][]float32{{1, 2, 3}, {3, 4, 5}} + query := [][]float32{{1, 2, 3}, {3, 4, 5}} + dimension := uint(3) + ncpu := uint(1) + limit := uint(2) + elemsz := uint(4) // float32 + + idx, err := NewGoBruteForceIndex[float32](dataset, dimension, metric.Metric_L2sqDistance, elemsz) + require.NoError(t, err) + + rt := vectorindex.RuntimeConfig{Limit: limit, NThreads: ncpu} + + keys, distances, err := idx.Search(nil, query, rt) + require.NoError(t, err) + fmt.Printf("keys %v, dist %v\n", keys, distances) + +} diff --git a/pkg/vectorindex/brute_force/cpu.go b/pkg/vectorindex/brute_force/cpu.go new file mode 100644 index 0000000000000..b60f8e5b68a4b --- /dev/null +++ b/pkg/vectorindex/brute_force/cpu.go @@ -0,0 +1,31 @@ +//go:build !gpu + +// Copyright 2022 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package brute_force + +import ( + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/vectorindex/cache" + "github.com/matrixorigin/matrixone/pkg/vectorindex/metric" +) + +func NewBruteForceIndex[T types.RealNumbers](dataset [][]T, + dimension uint, + m metric.MetricType, + elemsz uint) (cache.VectorIndexSearchIf, error) { + + return NewCpuBruteForceIndex[T](dataset, dimension, m, elemsz) +} diff --git a/pkg/vectorindex/brute_force/gpu.go b/pkg/vectorindex/brute_force/gpu.go new file mode 100644 index 0000000000000..9a1bc7a7b213e --- /dev/null +++ b/pkg/vectorindex/brute_force/gpu.go @@ -0,0 +1,202 @@ +//go:build gpu + +// Copyright 2022 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package brute_force + +import ( + // "fmt" + + "github.com/matrixorigin/matrixone/pkg/common/moerr" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/vectorindex" + "github.com/matrixorigin/matrixone/pkg/vectorindex/cache" + "github.com/matrixorigin/matrixone/pkg/vectorindex/metric" + "github.com/matrixorigin/matrixone/pkg/vectorindex/sqlexec" + cuvs "github.com/rapidsai/cuvs/go" + "github.com/rapidsai/cuvs/go/brute_force" +) + +type GpuBruteForceIndex[T cuvs.TensorNumberType] struct { + Resource *cuvs.Resource // shared resource for read-only index + Dataset *cuvs.Tensor[T] + Index *brute_force.BruteForceIndex + Metric cuvs.Distance + Dimension uint + Count uint + ElementSize uint +} + +var _ cache.VectorIndexSearchIf = &GpuBruteForceIndex[float32]{} + +func NewBruteForceIndex[T types.RealNumbers](dataset [][]T, + dimension uint, + m metric.MetricType, + elemsz uint) (cache.VectorIndexSearchIf, error) { + + return NewCpuBruteForceIndex[T](dataset, dimension, m, elemsz) +} + +// cuvs library has bug. comment out the GPU version until cuvs fix the bug +/* +func NewBruteForceIndex[T types.RealNumbers](dataset [][]T, + dimension uint, + m metric.MetricType, + elemsz uint) (cache.VectorIndexSearchIf, error) { + + switch dset := any(dataset).(type) { + case [][]float64: + return NewCpuBruteForceIndex[T](dataset, dimension, m, elemsz) + case [][]float32: + idx := &GpuBruteForceIndex[float32]{} + + resource, _ := cuvs.NewResource(nil) + idx.Resource = &resource + + tensor, err := cuvs.NewTensor(dset) + if err != nil { + return nil, err + } + idx.Dataset = &tensor + + idx.Metric = metric.MetricTypeToCuvsMetric[m] + idx.Dimension = dimension + idx.Count = uint(len(dataset)) + idx.ElementSize = elemsz + return idx, nil + default: + return nil, moerr.NewInternalErrorNoCtx("type not supported for BruteForceIndex") + } + +} +*/ + +func (idx *GpuBruteForceIndex[T]) Load(sqlproc *sqlexec.SqlProcess) (err error) { + if _, err = idx.Dataset.ToDevice(idx.Resource); err != nil { + return err + } + + idx.Index, err = brute_force.CreateIndex() + if err != nil { + return + } + + err = brute_force.BuildIndex[T](*idx.Resource, idx.Dataset, idx.Metric, 0, idx.Index) + if err != nil { + return + } + + if err = idx.Resource.Sync(); err != nil { + return + } + + return +} + +func (idx *GpuBruteForceIndex[T]) Search(proc *sqlexec.SqlProcess, _queries any, rt vectorindex.RuntimeConfig) (retkeys any, retdistances []float64, err error) { + queriesvec, ok := _queries.([][]T) + if !ok { + return nil, nil, moerr.NewInternalErrorNoCtx("queries type invalid") + } + + // local resource for concurrent search + resource, err := cuvs.NewResource(nil) + if err != nil { + return nil, nil, err + } + defer resource.Close() + + queries, err := cuvs.NewTensor(queriesvec) + if err != nil { + return nil, nil, err + } + defer queries.Close() + + neighbors, err := cuvs.NewTensorOnDevice[int64](&resource, []int64{int64(len(queriesvec)), int64(rt.Limit)}) + if err != nil { + return nil, nil, err + } + defer neighbors.Close() + + distances, err := cuvs.NewTensorOnDevice[float32](&resource, []int64{int64(len(queriesvec)), int64(rt.Limit)}) + if err != nil { + return nil, nil, err + } + defer distances.Close() + + if _, err = queries.ToDevice(&resource); err != nil { + return nil, nil, err + } + + err = brute_force.SearchIndex(resource, *idx.Index, &queries, &neighbors, &distances) + if err != nil { + return nil, nil, err + } + + if _, err = neighbors.ToHost(&resource); err != nil { + return nil, nil, err + } + + if _, err = distances.ToHost(&resource); err != nil { + return nil, nil, err + } + + if err = resource.Sync(); err != nil { + return nil, nil, err + } + + neighborsSlice, err := neighbors.Slice() + if err != nil { + return nil, nil, err + } + + distancesSlice, err := distances.Slice() + if err != nil { + return nil, nil, err + } + + //fmt.Printf("flattened %v\n", flatten) + retdistances = make([]float64, len(distancesSlice)*int(rt.Limit)) + for i := range distancesSlice { + for j, dist := range distancesSlice[i] { + retdistances[i*int(rt.Limit)+j] = float64(dist) + } + } + + keys := make([]int64, len(neighborsSlice)*int(rt.Limit)) + for i := range neighborsSlice { + for j, key := range neighborsSlice[i] { + keys[i*int(rt.Limit)+j] = int64(key) + } + } + retkeys = keys + return +} + +func (idx *GpuBruteForceIndex[T]) UpdateConfig(sif cache.VectorIndexSearchIf) error { + return nil +} + +func (idx *GpuBruteForceIndex[T]) Destroy() { + if idx.Dataset != nil { + idx.Dataset.Close() + } + if idx.Resource != nil { + idx.Resource.Close() + } + if idx.Index != nil { + idx.Index.Close() + } +} diff --git a/pkg/vectorindex/brute_force/gpu_test.go b/pkg/vectorindex/brute_force/gpu_test.go new file mode 100644 index 0000000000000..a54fbb0caed49 --- /dev/null +++ b/pkg/vectorindex/brute_force/gpu_test.go @@ -0,0 +1,72 @@ +//go:build gpu + +// Copyright 2022 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package brute_force + +import ( + //"fmt" + "sync" + "testing" + + "github.com/matrixorigin/matrixone/pkg/vectorindex" + "github.com/matrixorigin/matrixone/pkg/vectorindex/metric" + "github.com/stretchr/testify/require" +) + +func TestGpuBruteForce(t *testing.T) { + + dataset := [][]float32{{1, 2, 3}, {3, 4, 5}} + query := [][]float32{{1, 2, 3}, {3, 4, 5}} + dimension := uint(3) + ncpu := uint(1) + limit := uint(1) + elemsz := uint(4) // float32 + + idx, err := NewBruteForceIndex[float32](dataset, dimension, metric.Metric_L2sqDistance, elemsz) + require.NoError(t, err) + defer idx.Destroy() + + err = idx.Load(nil) + require.NoError(t, err) + + rt := vectorindex.RuntimeConfig{Limit: limit, NThreads: ncpu} + + var wg sync.WaitGroup + + for n := 0; n < 4; n++ { + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + keys, distances, err := idx.Search(nil, query, rt) + require.NoError(t, err) + + keys_i64, ok := keys.([]int64) + require.Equal(t, ok, true) + + for j, key := range keys_i64 { + require.Equal(t, key, int64(j)) + require.Equal(t, distances[j], float64(0)) + } + // fmt.Printf("keys %v, dist %v\n", keys, distances) + } + }() + } + + wg.Wait() + +} diff --git a/pkg/vectorindex/ivfflat/kmeans/device/gpu.go b/pkg/vectorindex/ivfflat/kmeans/device/gpu.go index 59bbca5cc18e6..e10f339d29d69 100644 --- a/pkg/vectorindex/ivfflat/kmeans/device/gpu.go +++ b/pkg/vectorindex/ivfflat/kmeans/device/gpu.go @@ -17,6 +17,9 @@ package device import ( + //"os" + + "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/vectorindex/ivfflat/kmeans" "github.com/matrixorigin/matrixone/pkg/vectorindex/ivfflat/kmeans/elkans" @@ -26,11 +29,10 @@ import ( ) type GpuClusterer[T cuvs.TensorNumberType] struct { - resource *cuvs.Resource - index *ivf_flat.IvfFlatIndex indexParams *ivf_flat.IndexParams - dataset *cuvs.Tensor[T] - centroids *cuvs.Tensor[T] + nlist int + dim int + vectors [][]T } func (c *GpuClusterer[T]) InitCentroids() error { @@ -40,43 +42,51 @@ func (c *GpuClusterer[T]) InitCentroids() error { func (c *GpuClusterer[T]) Cluster() (any, error) { - if _, err := c.dataset.ToDevice(c.resource); err != nil { + resource, err := cuvs.NewResource(nil) + if err != nil { return nil, err } + defer resource.Close() - if err := ivf_flat.BuildIndex(*c.resource, c.indexParams, c.dataset, c.index); err != nil { + dataset, err := cuvs.NewTensor(c.vectors) + if err != nil { return nil, err } + defer dataset.Close() - if err := c.resource.Sync(); err != nil { + index, err := ivf_flat.CreateIndex(c.indexParams, &dataset) + if err != nil { return nil, err } + defer index.Close() - nlist, err := ivf_flat.GetNLists(c.index) - if err != nil { + if _, err := dataset.ToDevice(&resource); err != nil { return nil, err } - dim, err := ivf_flat.GetDim(c.index) + centers, err := cuvs.NewTensorOnDevice[T](&resource, []int64{int64(c.nlist), int64(c.dim)}) if err != nil { return nil, err } + defer centers.Close() - centers, err := cuvs.NewTensorOnDevice[T](c.resource, []int64{int64(nlist), int64(dim)}) - if err != nil { + if err := ivf_flat.BuildIndex(resource, c.indexParams, &dataset, index); err != nil { return nil, err } - c.centroids = ¢ers - if _, err := centers.ToDevice(c.resource); err != nil { + if err := resource.Sync(); err != nil { return nil, err } - if err := ivf_flat.GetCenters(c.index, ¢ers); err != nil { + if err := ivf_flat.GetCenters(index, ¢ers); err != nil { return nil, err } - if _, err := centers.ToHost(c.resource); err != nil { + if _, err := centers.ToHost(&resource); err != nil { + return nil, err + } + + if err := resource.Sync(); err != nil { return nil, err } @@ -93,23 +103,9 @@ func (c *GpuClusterer[T]) SSE() (float64, error) { } func (c *GpuClusterer[T]) Close() error { - if c.indexParams != nil { c.indexParams.Close() } - if c.dataset != nil { - c.dataset.Close() - - } - if c.resource != nil { - c.resource.Close() - } - if c.index != nil { - c.index.Close() - } - if c.centroids != nil { - c.centroids.Close() - } return nil } @@ -140,11 +136,12 @@ func NewKMeans[T types.RealNumbers](vectors [][]T, clusterCnt, case [][]float32: c := &GpuClusterer[float32]{} - resources, err := cuvs.NewResource(nil) - if err != nil { - return nil, err + c.nlist = clusterCnt + if len(vectors) == 0 { + return nil, moerr.NewInternalErrorNoCtx("empty dataset") } - c.resource = &resources + c.vectors = vecs + c.dim = len(vecs[0]) indexParams, err := ivf_flat.CreateIndexParams() if err != nil { @@ -155,15 +152,6 @@ func NewKMeans[T types.RealNumbers](vectors [][]T, clusterCnt, indexParams.SetKMeansNIters(uint32(maxIterations)) indexParams.SetKMeansTrainsetFraction(1) // train all sample c.indexParams = indexParams - - dataset, err := cuvs.NewTensor(vecs) - if err != nil { - return nil, err - } - c.dataset = &dataset - - c.index, _ = ivf_flat.CreateIndex(c.indexParams, c.dataset) - return c, nil default: return elkans.NewKMeans(vectors, clusterCnt, maxIterations, deltaThreshold, distanceType, initType, spherical, nworker) diff --git a/pkg/vectorindex/ivfflat/kmeans/device/gpu_test.go b/pkg/vectorindex/ivfflat/kmeans/device/gpu_test.go index b683cb40d178f..5b4c30301d38a 100644 --- a/pkg/vectorindex/ivfflat/kmeans/device/gpu_test.go +++ b/pkg/vectorindex/ivfflat/kmeans/device/gpu_test.go @@ -17,12 +17,16 @@ package device import ( - "fmt" + //"fmt" "math/rand/v2" + "sync" "testing" + "github.com/matrixorigin/matrixone/pkg/vectorindex" + mobf "github.com/matrixorigin/matrixone/pkg/vectorindex/brute_force" "github.com/matrixorigin/matrixone/pkg/vectorindex/metric" "github.com/stretchr/testify/require" + ) func TestGpu(t *testing.T) { @@ -44,10 +48,81 @@ func TestGpu(t *testing.T) { centers, err := c.Cluster() require.NoError(t, err) + _, ok := centers.([][]float32) + require.True(t, ok) + + /* + for k, center := range centroids { + fmt.Printf("center[%d] = %v\n", k, center) + } + */ +} + +func TestIVFAndBruteForce(t *testing.T) { + + dimension := uint(128) + ncpu := uint(1) + limit := uint(1) + elemsz := uint(4) // float32 + + dsize := 100000 + nlist := 128 + vecs := make([][]float32, dsize) + for i := range vecs { + vecs[i] = make([]float32, dimension) + for j := range vecs[i] { + vecs[i][j] = rand.Float32() + } + } + + c, err := NewKMeans[float32](vecs, nlist, 10, 0, metric.Metric_L2Distance, 0, false, 0) + require.NoError(t, err) + + centers, err := c.Cluster() + require.NoError(t, err) + centroids, ok := centers.([][]float32) require.True(t, ok) + /* for k, center := range centroids { fmt.Printf("center[%d] = %v\n", k, center) } + */ + + queries := vecs[:8192] + idx, err := mobf.NewBruteForceIndex[float32](centroids, dimension, metric.Metric_L2sqDistance, elemsz) + require.NoError(t, err) + defer idx.Destroy() + + err = idx.Load(nil) + require.NoError(t, err) + + rt := vectorindex.RuntimeConfig{Limit: limit, NThreads: ncpu} + + var wg sync.WaitGroup + + for n := 0; n < 4; n++ { + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + keys, distances, err := idx.Search(nil, queries, rt) + require.NoError(t, err) + + keys_i64, ok := keys.([]int64) + require.Equal(t, ok, true) + + for j, key := range keys_i64 { + require.Equal(t, key, int64(j)) + require.Equal(t, distances[j], float64(0)) + } + // fmt.Printf("keys %v, dist %v\n", keys, distances) + } + }() + } + + wg.Wait() + } diff --git a/pkg/vectorindex/ivfflat/kmeans/device/issue_test.go b/pkg/vectorindex/ivfflat/kmeans/device/issue_test.go new file mode 100644 index 0000000000000..a8256307a8fec --- /dev/null +++ b/pkg/vectorindex/ivfflat/kmeans/device/issue_test.go @@ -0,0 +1,264 @@ +//go:build gpu + +// Copyright 2023 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package device + +import ( + //"fmt" + "math/rand/v2" + "sync" + "testing" + //"os" + + "github.com/stretchr/testify/require" + + cuvs "github.com/rapidsai/cuvs/go" + "github.com/rapidsai/cuvs/go/brute_force" + "github.com/rapidsai/cuvs/go/ivf_flat" +) + +func getCenters(vecs [][]float32, dim int, clusterCnt int, distanceType cuvs.Distance, maxIterations int) ([][]float32, error) { + + resource, err := cuvs.NewResource(nil) + if err != nil { + return nil, err + } + defer resource.Close() + + indexParams, err := ivf_flat.CreateIndexParams() + if err != nil { + return nil, err + } + defer indexParams.Close() + + indexParams.SetNLists(uint32(clusterCnt)) + indexParams.SetMetric(distanceType) + indexParams.SetKMeansNIters(uint32(maxIterations)) + indexParams.SetKMeansTrainsetFraction(1) // train all sample + + dataset, err := cuvs.NewTensor(vecs) + if err != nil { + return nil, err + } + defer dataset.Close() + + index, _ := ivf_flat.CreateIndex(indexParams, &dataset) + defer index.Close() + + if _, err := dataset.ToDevice(&resource); err != nil { + return nil, err + } + + centers, err := cuvs.NewTensorOnDevice[float32](&resource, []int64{int64(clusterCnt), int64(dim)}) + if err != nil { + return nil, err + } + + if err := ivf_flat.BuildIndex(resource, indexParams, &dataset, index); err != nil { + return nil, err + } + + if err := resource.Sync(); err != nil { + return nil, err + } + + if err := ivf_flat.GetCenters(index, ¢ers); err != nil { + return nil, err + } + + if _, err := centers.ToHost(&resource); err != nil { + return nil, err + } + + if err := resource.Sync(); err != nil { + return nil, err + } + + result, err := centers.Slice() + if err != nil { + return nil, err + } + + return result, nil + +} + +func Search(datasetvec [][]float32, queriesvec [][]float32, limit uint, distanceType cuvs.Distance) (retkeys any, retdistances []float64, err error) { + //os.Stderr.WriteString(fmt.Sprintf("probe set %d\n", len(queriesvec))) + //os.Stderr.WriteString("brute force index search start\n") + + resource, err := cuvs.NewResource(nil) + if err != nil { + return + } + defer resource.Close() + + dataset, err := cuvs.NewTensor(datasetvec) + if err != nil { + return + } + defer dataset.Close() + + index, err := brute_force.CreateIndex() + if err != nil { + return + } + defer index.Close() + + queries, err := cuvs.NewTensor(queriesvec) + if err != nil { + return + } + defer queries.Close() + + neighbors, err := cuvs.NewTensorOnDevice[int64](&resource, []int64{int64(len(queriesvec)), int64(limit)}) + if err != nil { + return + } + defer neighbors.Close() + + distances, err := cuvs.NewTensorOnDevice[float32](&resource, []int64{int64(len(queriesvec)), int64(limit)}) + if err != nil { + return + } + defer distances.Close() + + if _, err = dataset.ToDevice(&resource); err != nil { + return + } + + if err = resource.Sync(); err != nil { + return + } + + err = brute_force.BuildIndex(resource, &dataset, distanceType, 2.0, index) + if err != nil { + //os.Stderr.WriteString(fmt.Sprintf("BruteForceIndex: build index failed %v\n", err)) + //os.Stderr.WriteString(fmt.Sprintf("BruteForceIndex: build index failed centers %v\n", datasetvec)) + return + } + + if err = resource.Sync(); err != nil { + return + } + //os.Stderr.WriteString("built brute force index\n") + + if _, err = queries.ToDevice(&resource); err != nil { + return + } + + //os.Stderr.WriteString("brute force index search Runing....\n") + err = brute_force.SearchIndex(resource, *index, &queries, &neighbors, &distances) + if err != nil { + return + } + //os.Stderr.WriteString("brute force index search finished Runing....\n") + + if _, err = neighbors.ToHost(&resource); err != nil { + return + } + //os.Stderr.WriteString("brute force index search neighbour to host done....\n") + + if _, err = distances.ToHost(&resource); err != nil { + return + } + //os.Stderr.WriteString("brute force index search distances to host done....\n") + + if err = resource.Sync(); err != nil { + return + } + + //os.Stderr.WriteString("brute force index search return result....\n") + neighborsSlice, err := neighbors.Slice() + if err != nil { + return + } + + distancesSlice, err := distances.Slice() + if err != nil { + return + } + + //fmt.Printf("flattened %v\n", flatten) + retdistances = make([]float64, len(distancesSlice)*int(limit)) + for i := range distancesSlice { + for j, dist := range distancesSlice[i] { + retdistances[i*int(limit)+j] = float64(dist) + } + } + + keys := make([]int64, len(neighborsSlice)*int(limit)) + for i := range neighborsSlice { + for j, key := range neighborsSlice[i] { + keys[i*int(limit)+j] = int64(key) + } + } + retkeys = keys + //os.Stderr.WriteString("brute force index search RETURN NOW....\n") + return +} + +func TestIvfAndBruteForceForIssue(t *testing.T) { + + dimension := uint(128) + limit := uint(1) + /* + ncpu := uint(1) + elemsz := uint(4) // float32 + */ + + dsize := 100000 + nlist := 128 + vecs := make([][]float32, dsize) + for i := range vecs { + vecs[i] = make([]float32, dimension) + for j := range vecs[i] { + vecs[i][j] = rand.Float32() + } + } + queries := vecs[:8192] + + centers, err := getCenters(vecs, int(dimension), nlist, cuvs.DistanceL2, 10) + require.NoError(t, err) + + var wg sync.WaitGroup + + for n := 0; n < 4; n++ { + + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 1000; i++ { + _, _, err := Search(centers, queries, limit, cuvs.DistanceL2) + require.NoError(t, err) + + /* + keys_i64, ok := keys.([]int64) + require.Equal(t, ok, true) + + for j, key := range keys_i64 { + require.Equal(t, key, int64(j)) + require.Equal(t, distances[j], float64(0)) + } + */ + // fmt.Printf("keys %v, dist %v\n", keys, distances) + } + }() + } + + wg.Wait() + +} diff --git a/pkg/vectorindex/ivfflat/search.go b/pkg/vectorindex/ivfflat/search.go index 7335b03876998..26b0120d4c5b3 100644 --- a/pkg/vectorindex/ivfflat/search.go +++ b/pkg/vectorindex/ivfflat/search.go @@ -15,7 +15,6 @@ package ivfflat import ( - "container/heap" "fmt" "strconv" @@ -25,6 +24,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/vectorindex" + "github.com/matrixorigin/matrixone/pkg/vectorindex/brute_force" "github.com/matrixorigin/matrixone/pkg/vectorindex/cache" "github.com/matrixorigin/matrixone/pkg/vectorindex/metric" "github.com/matrixorigin/matrixone/pkg/vectorindex/sqlexec" @@ -32,15 +32,10 @@ import ( var runSql = sqlexec.RunSql -type Centroid[T types.RealNumbers] struct { - Id int64 - Vec []T -} - // Ivf search index struct to hold the usearch index type IvfflatSearchIndex[T types.RealNumbers] struct { Version int64 - Centroids []Centroid[T] + Centroids cache.VectorIndexSearchIf } // This is the Ivf search implementation that implement VectorIndexSearchIf interface @@ -74,7 +69,9 @@ func (idx *IvfflatSearchIndex[T]) LoadIndex(proc *sqlexec.SqlProcess, idxcfg vec return nil } - idx.Centroids = make([]Centroid[T], 0, idxcfg.Ivfflat.Lists) + ncenters := 0 + centroids := make([][]T, idxcfg.Ivfflat.Lists) + elemsz := res.Batches[0].Vecs[1].GetType().GetArrayElementSize() for _, bat := range res.Batches { idVec := bat.Vecs[0] faVec := bat.Vecs[1] @@ -82,57 +79,52 @@ func (idx *IvfflatSearchIndex[T]) LoadIndex(proc *sqlexec.SqlProcess, idxcfg vec hasNull := faVec.HasNull() for i, id := range ids { if hasNull && faVec.IsNull(uint64(i)) { - //os.Stderr.WriteString("Centroid is NULL\n") continue } val := faVec.GetStringAt(i) vec := types.BytesToArray[T](util.UnsafeStringToBytes(val)) - idx.Centroids = append(idx.Centroids, Centroid[T]{Id: id, Vec: vec}) + centroids[id] = vec + ncenters += 1 } } + if ncenters == 0 { + return nil + } + + if uint(ncenters) != idxcfg.Ivfflat.Lists { + return moerr.NewInternalErrorNoCtx("number of centroids in db != Nlist") + } + + bfidx, err := brute_force.NewBruteForceIndex[T](centroids, idxcfg.Ivfflat.Dimensions, metric.MetricType(idxcfg.Ivfflat.Metric), uint(elemsz)) + if err != nil { + return err + } + err = bfidx.Load(proc) + if err != nil { + return err + } + + idx.Centroids = bfidx //os.Stderr.WriteString(fmt.Sprintf("%d centroids loaded... lists = %d, centroid %v\n", len(idx.Centroids), idxcfg.Ivfflat.Lists, idx.Centroids)) return nil } func (idx *IvfflatSearchIndex[T]) findCentroids(sqlproc *sqlexec.SqlProcess, query []T, distfn metric.DistanceFunction[T], _ vectorindex.IndexConfig, probe uint, _ int64) ([]int64, error) { - if len(idx.Centroids) == 0 { + if idx.Centroids == nil { // empty index has id = 1 return []int64{1}, nil } - hp := make(vectorindex.SearchResultMaxHeap, 0, int(probe)) - for _, c := range idx.Centroids { - dist, err := distfn(query, c.Vec) - if err != nil { - return nil, err - } - dist64 := float64(dist) - - if len(hp) >= int(probe) { - if hp[0].GetDistance() > dist64 { - hp[0] = &vectorindex.SearchResult{Id: c.Id, Distance: dist64} - heap.Fix(&hp, 0) - } - } else { - heap.Push(&hp, &vectorindex.SearchResult{Id: c.Id, Distance: dist64}) - } - } - - n := hp.Len() - res := make([]int64, 0, n) - for range n { - srif := heap.Pop(&hp) - sr, ok := srif.(*vectorindex.SearchResult) - if !ok { - return nil, moerr.NewInternalError(sqlproc.GetContext(), "findCentroids: heap return key is not int64") - } - res = append(res, sr.Id) + queries := [][]T{query} + rt := vectorindex.RuntimeConfig{Limit: probe, NThreads: 1} + keys, _, err := idx.Centroids.Search(sqlproc, queries, rt) + if err != nil { + return nil, err } - //os.Stderr.WriteString(fmt.Sprintf("probe %d... return centroid ids %v\n", probe, res)) - return res, nil + return keys.([]int64), nil } // Call usearch.Search @@ -220,7 +212,10 @@ func (idx *IvfflatSearchIndex[T]) Search( } func (idx *IvfflatSearchIndex[T]) Destroy() { - idx.Centroids = nil + if idx.Centroids != nil { + idx.Centroids.Destroy() + idx.Centroids = nil + } } func NewIvfflatSearch[T types.RealNumbers]( diff --git a/pkg/vectorindex/metric/gpu.go b/pkg/vectorindex/metric/gpu.go new file mode 100644 index 0000000000000..a061c563ad1cd --- /dev/null +++ b/pkg/vectorindex/metric/gpu.go @@ -0,0 +1,32 @@ +//go:build gpu + +// Copyright 2022 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + cuvs "github.com/rapidsai/cuvs/go" +) + +var ( + MetricTypeToCuvsMetric = map[MetricType]cuvs.Distance{ + Metric_L2sqDistance: cuvs.DistanceSQEuclidean, + Metric_L2Distance: cuvs.DistanceSQEuclidean, + Metric_InnerProduct: cuvs.DistanceInnerProduct, + Metric_CosineDistance: cuvs.DistanceCosine, + Metric_L1Distance: cuvs.DistanceL1, + } +) + diff --git a/pkg/vectorindex/metric/types.go b/pkg/vectorindex/metric/types.go index 7e1576a24498d..86df29c3254cb 100644 --- a/pkg/vectorindex/metric/types.go +++ b/pkg/vectorindex/metric/types.go @@ -83,6 +83,13 @@ var ( */ } + MetricTypeToUsearchMetric = map[MetricType]usearch.Metric{ + Metric_L2Distance: usearch.L2sq, + Metric_L2sqDistance: usearch.L2sq, + Metric_InnerProduct: usearch.InnerProduct, + Metric_CosineDistance: usearch.Cosine, + } + MetricTypeToDistFuncName = map[MetricType]string{ Metric_L2Distance: DistFn_L2Distance, Metric_L2sqDistance: DistFn_L2sqDistance, diff --git a/pkg/vectorindex/types.go b/pkg/vectorindex/types.go index 05df94e64feb0..e2d60d92aef10 100644 --- a/pkg/vectorindex/types.go +++ b/pkg/vectorindex/types.go @@ -113,6 +113,7 @@ type RuntimeConfig struct { Probe uint OrigFuncName string BackgroundQueries []*plan.Query + NThreads uint // Brute Force Index } type VectorIndexCdc[T types.RealNumbers] struct { diff --git a/thirdparties/Makefile b/thirdparties/Makefile index b25a4f48b9bd5..786083a2ef305 100644 --- a/thirdparties/Makefile +++ b/thirdparties/Makefile @@ -15,8 +15,8 @@ PWD=$(shell dirname $(realpath $(lastword $(MAKEFILE_LIST)))) UNAME_S=$(shell uname -s | tr A-Z a-z) UNAME_M=$(shell uname -m) -USEARCH_DIR=USearch-2.21.1 -USEARCH_TAR=usearch-2.21.1.tar.gz +USEARCH_DIR=usearch-2.21.1-hotfix-20251212 +USEARCH_TAR=usearch-2.21.1-hotfix-20251212.tar.gz STRINGZILLA_DIR=StringZilla-4.2.1 STRINGZILLA_TAR=$(STRINGZILLA_DIR).tar.gz SIMSIMD_DIR=SimSIMD-6.5.3 @@ -59,13 +59,13 @@ ifeq ($(UNAME_S),linux) ifeq ($(MUSL),) USEARCH_CMAKE_FLAG += -DUSEARCH_USE_OPENMP=1 endif - AVX512FP16 := $(shell lscpu | grep avx512fp16) + AVX512FP16 := $(shell lscpu | grep -e avx512fp16 -e avx512_bf16) endif ifeq ($(UNAME_M),x86_64) ifneq ($(AVX512FP16),) ifeq ($(MUSL),) - USEARCH_CMAKE_FLAG += -DUSEARCH_USE_SIMSIMD=1 + USEARCH_CMAKE_FLAG += -DUSEARCH_USE_SIMSIMD=1 -DUSEARCH_DEFINED_CLANG=1 endif endif endif diff --git a/thirdparties/usearch-2.21.1-hotfix-20251212.tar.gz b/thirdparties/usearch-2.21.1-hotfix-20251212.tar.gz new file mode 100644 index 0000000000000..38ac7d13c9eeb Binary files /dev/null and b/thirdparties/usearch-2.21.1-hotfix-20251212.tar.gz differ diff --git a/thirdparties/usearch-2.21.1.tar.gz b/thirdparties/usearch-2.21.1.tar.gz deleted file mode 100644 index 3e70aaf663688..0000000000000 Binary files a/thirdparties/usearch-2.21.1.tar.gz and /dev/null differ