Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
59e86c4
opt
cpegeric Dec 4, 2025
2d8f0aa
bug fix
cpegeric Dec 4, 2025
8b34225
update
cpegeric Dec 4, 2025
67f8f7c
fix usearch bug
cpegeric Dec 4, 2025
97e5853
turn on simsimd
cpegeric Dec 5, 2025
29f8a2f
exact search
cpegeric Dec 5, 2025
abb8cb9
reuse vector set
cpegeric Dec 5, 2025
1b3d4a2
gpu
cpegeric Dec 5, 2025
7181bc8
brute force index
cpegeric Dec 5, 2025
5f7da9f
brute force index
cpegeric Dec 8, 2025
3a923f2
brute force
cpegeric Dec 8, 2025
53af241
gpu brute force index
cpegeric Dec 8, 2025
46ee1ce
gpu
cpegeric Dec 8, 2025
aa1d036
productl2 release
cpegeric Dec 8, 2025
e3f0726
multi threads test
cpegeric Dec 8, 2025
54e8187
bug fix free
cpegeric Dec 8, 2025
55d6921
reuse resource
cpegeric Dec 9, 2025
98e46bb
bug fix center
cpegeric Dec 9, 2025
7613294
add test
cpegeric Dec 9, 2025
892f2f3
reset release brute_force index
cpegeric Dec 9, 2025
456480e
cpu brute force by default
cpegeric Dec 9, 2025
d9d1b63
ivf search use brute force index to centroid computation
cpegeric Dec 10, 2025
cd3872e
Merge branch 'gpu_exactsearch' of github.com:cpegeric/matrixone into …
cpegeric Dec 10, 2025
67bf867
destroy brute force index to free up memory
cpegeric Dec 10, 2025
7505152
remove unused code
cpegeric Dec 10, 2025
a98ef5b
remove unused code
cpegeric Dec 10, 2025
eb42801
bug fix: use limit instead of rt.Limit
cpegeric Dec 10, 2025
425882e
merge fix
cpegeric Dec 10, 2025
406a49d
only enable simsimd when instruction exist
cpegeric Dec 10, 2025
b8908e5
use hotfix
cpegeric Dec 11, 2025
c6a7fd9
use cpegeric usearch hotfix
cpegeric Dec 11, 2025
e53c0c8
hotfix
cpegeric Dec 11, 2025
d9e4891
hotfix
cpegeric Dec 11, 2025
937d840
add license
cpegeric Dec 11, 2025
83e8d04
update go.sim
cpegeric Dec 11, 2025
0b8908e
Merge branch 'main' into gpu_exactsearch
cpegeric Dec 12, 2025
43e1eeb
Fix: fix usearch
cpegeric Dec 12, 2025
4f56903
fix usearch
cpegeric Dec 12, 2025
d98d526
fix: get dimension from centroid column
cpegeric Dec 12, 2025
6a12e41
exact search in golang
cpegeric Dec 12, 2025
24a0ce6
speed up min
cpegeric Dec 12, 2025
91957bc
cleanup
cpegeric Dec 12, 2025
53cf9c5
cleanup
cpegeric Dec 12, 2025
27b6f46
local resource for search and shared resource for read-only index
cpegeric Dec 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ require (
github.com/tidwall/pretty v1.2.1
github.com/tmc/langchaingo v0.1.13
github.com/unum-cloud/usearch/golang v0.0.0-20251010193336-541e882da5a9
github.com/viterin/partial v1.1.0
go.starlark.net v0.0.0-20250701195324-d457b4515e0e
go.uber.org/automaxprocs v1.5.3
go.uber.org/ratelimit v0.2.0
Expand Down Expand Up @@ -242,6 +243,7 @@ replace (
github.com/lni/dragonboat/v4 v4.0.0-20220815145555-6f622e8bcbef => github.com/matrixorigin/dragonboat/v4 v4.0.0-20251203123148-9dcde946363f
github.com/lni/goutils v1.3.1-0.20220604063047-388d67b4dbc4 => github.com/matrixorigin/goutils v1.3.1-0.20220604063047-388d67b4dbc4
github.com/lni/vfs v0.2.1-0.20220616104132-8852fd867376 => github.com/matrixorigin/vfs v0.2.1-0.20220616104132-8852fd867376
github.com/unum-cloud/usearch/golang v0.0.0-20251010193336-541e882da5a9 => github.com/cpegeric/usearch/golang v0.0.0-20251212130039-afde3fa5e527
)

replace github.com/shoenig/go-m1cpu => github.com/shoenig/go-m1cpu v0.1.7
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzA
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/cpegeric/pdftotext-go v0.0.0-20241112123704-49cb86a3790e h1:tQSCiEjYPRU+AuuVR+zd+xYVOsEqX1clPhmIAM6FCHU=
github.com/cpegeric/pdftotext-go v0.0.0-20241112123704-49cb86a3790e/go.mod h1:zt7uTOYu0EEeKatGaTi9JiP0I9ePHpDvjAwpfPXh/N0=
github.com/cpegeric/usearch/golang v0.0.0-20251212130039-afde3fa5e527 h1:A8qDoMLqBZRv5Fi7RC+KA9hMngZAIK4cSuL1ME3Jy7w=
github.com/cpegeric/usearch/golang v0.0.0-20251212130039-afde3fa5e527/go.mod h1:3SN8SakyyBWzb14DNZn4t5yX8dOa7ae45KpqDioi4RA=
github.com/cpuguy83/dockercfg v0.3.1 h1:/FpZ+JaygUR/lZP2NlFI2DVfrOEMAIKP5wWEJdoYe9E=
github.com/cpuguy83/dockercfg v0.3.1/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHfjj5/jFyUJc=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
Expand Down Expand Up @@ -847,8 +849,6 @@ github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGr
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/unum-cloud/usearch/golang v0.0.0-20251010193336-541e882da5a9 h1:JrHCee+uqpF2zXooiKu7ymvKgnzlUIXtTlZ7vi21Tr0=
github.com/unum-cloud/usearch/golang v0.0.0-20251010193336-541e882da5a9/go.mod h1:NxBpQibuBBeA/V8RGbrNzVAv4OyWWL5yNao7mVz656k=
github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.6.0/go.mod h1:FstJa9V+Pj9vQ7OJie2qMHdwemEDaDiSdBnvPM1Su9w=
Expand All @@ -861,6 +861,8 @@ github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tz
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8=
github.com/vishvananda/netns v0.0.4/go.mod h1:SpkAiCQRtJ6TvvxPnOSyH3BMl6unz3xZlaprSwhNNJM=
github.com/viterin/partial v1.1.0 h1:iH1l1xqBlapXsYzADS1dcbizg3iQUKTU1rbwkHv/80E=
github.com/viterin/partial v1.1.0/go.mod h1:oKGAo7/wylWkJTLrWX8n+f4aDPtQMQ6VG4dd2qur5QA=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
Expand Down
10 changes: 10 additions & 0 deletions pkg/container/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,16 @@ func (t Type) DescString() string {
return t.Oid.String()
}

func (t Type) GetArrayElementSize() int {
switch t.Oid {
case T_array_float32:
return 4
case T_array_float64:
return 8
}
panic(moerr.NewInternalErrorNoCtx(fmt.Sprintf("unknown array type %d", t)))
}

func (t Type) Eq(b Type) bool {
switch t.Oid {
// XXX need to find out why these types have different size/width
Expand Down
215 changes: 107 additions & 108 deletions pkg/sql/colexec/productl2/product_l2.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ import (
"bytes"
"runtime"
"strings"
"sync"
"time"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/vectorindex"
"github.com/matrixorigin/matrixone/pkg/vectorindex/brute_force"
"github.com/matrixorigin/matrixone/pkg/vectorindex/cache"
"github.com/matrixorigin/matrixone/pkg/vectorindex/metric"
"github.com/matrixorigin/matrixone/pkg/vectorindex/sqlexec"
"github.com/matrixorigin/matrixone/pkg/vm"
"github.com/matrixorigin/matrixone/pkg/vm/message"
"github.com/matrixorigin/matrixone/pkg/vm/process"
Expand Down Expand Up @@ -65,6 +68,7 @@ func (productl2 *Productl2) Call(proc *process.Process) (vm.CallResult, error) {
ctr := &ap.ctr
result := vm.NewCallResult()
var err error

for {
switch ctr.state {
case Build:
Expand Down Expand Up @@ -120,6 +124,47 @@ func (productl2 *Productl2) Call(proc *process.Process) (vm.CallResult, error) {
return result, nil
}
}

}

func NewNullVector[T types.RealNumbers](dim int32) []T {
// null vector with magnitude 1
nullvec := make([]T, dim)
nullvec[0] = 1
return nullvec
}

func getIndex[T types.RealNumbers](ap *Productl2, proc *process.Process, analyzer process.Analyzer) (cache.VectorIndexSearchIf, error) {
ctr := &ap.ctr
buildCount := ctr.bat.RowCount()
centroidColPos := ap.OnExpr.GetF().GetArgs()[0].GetCol().GetColPos()

dim := ctr.bat.Vecs[centroidColPos].GetType().Width
elemSize := uint(ctr.bat.Vecs[centroidColPos].GetType().GetArrayElementSize())
centers := make([][]T, buildCount)
nullvec := NewNullVector[T](dim)

for i := 0; i < buildCount; i++ {
if ctr.bat.Vecs[centroidColPos].IsNull(uint64(i)) {
centers[i] = nullvec
continue
}

c := types.BytesToArray[T](ctr.bat.Vecs[centroidColPos].GetBytesAt(i))
centers[i] = c
}

algo, err := brute_force.NewBruteForceIndex[T](centers, uint(dim), ctr.metrictype, elemSize)
if err != nil {
return nil, err
}

err = algo.Load(sqlexec.NewSqlProcess(proc))
if err != nil {
return nil, err
}

return algo, nil
}

func (productl2 *Productl2) build(proc *process.Process, analyzer process.Analyzer) error {
Expand All @@ -142,6 +187,21 @@ func (productl2 *Productl2) build(proc *process.Process, analyzer process.Analyz
}
}
mp.Free()

centroidColPos := productl2.OnExpr.GetF().GetArgs()[0].GetCol().GetColPos()
switch ctr.bat.Vecs[centroidColPos].GetType().Oid {
case types.T_array_float32:
ctr.brute_force, err = getIndex[float32](productl2, proc, analyzer)
if err != nil {
return err
}
case types.T_array_float64:
ctr.brute_force, err = getIndex[float64](productl2, proc, analyzer)
if err != nil {
return err
}
}

return nil
}

Expand All @@ -160,56 +220,33 @@ func (productl2 *Productl2) build(proc *process.Process, analyzer process.Analyz
// }
//)

func newMat[T types.RealNumbers](ctr *container, ap *Productl2) ([][]T, [][]T) {
buildCount := ctr.bat.RowCount()
func newMat[T types.RealNumbers](ctr *container, ap *Productl2) ([][]T, error) {
probeCount := ctr.inBat.RowCount()
centroidColPos := ap.OnExpr.GetF().GetArgs()[0].GetCol().GetColPos()
tblColPos := ap.OnExpr.GetF().GetArgs()[1].GetCol().GetColPos()
centroidmat := make([][]T, buildCount)
for i := 0; i < buildCount; i++ {
switch ctr.bat.Vecs[centroidColPos].GetType().Oid {
case types.T_array_float32:
if ctr.bat.Vecs[centroidColPos].IsNull(uint64(i)) {
centroidmat[i] = nil
continue
}
centroidmat[i] = types.BytesToArray[T](ctr.bat.Vecs[centroidColPos].GetBytesAt(i))
case types.T_array_float64:
if ctr.bat.Vecs[centroidColPos].IsNull(uint64(i)) {
centroidmat[i] = nil
continue
}
centroidmat[i] = types.BytesToArray[T](ctr.bat.Vecs[centroidColPos].GetBytesAt(i))
}
}

// dimension can only get from centroid column. probe column input values can be null and dimension is 0.
centroidColPos := ap.OnExpr.GetF().GetArgs()[0].GetCol().GetColPos()
dim := ctr.bat.Vecs[centroidColPos].GetType().Width
nullvec := NewNullVector[T](dim)

// embedding mat
embedmat := make([][]T, probeCount)
probes := make([][]T, probeCount)
for j := 0; j < probeCount; j++ {

switch ctr.bat.Vecs[centroidColPos].GetType().Oid {
case types.T_array_float32:
if ctr.inBat.Vecs[tblColPos].IsNull(uint64(j)) {
embedmat[j] = nil
continue
}
embedmat[j] = types.BytesToArray[T](ctr.inBat.Vecs[tblColPos].GetBytesAt(j))
case types.T_array_float64:
if ctr.inBat.Vecs[tblColPos].IsNull(uint64(j)) {
embedmat[j] = nil
continue
}
embedmat[j] = types.BytesToArray[T](ctr.inBat.Vecs[tblColPos].GetBytesAt(j))
if ctr.inBat.Vecs[tblColPos].IsNull(uint64(j)) {
probes[j] = nullvec
continue
}

v := types.BytesToArray[T](ctr.inBat.Vecs[tblColPos].GetBytesAt(j))
probes[j] = v
}

return centroidmat, embedmat
return probes, nil
}

func (ctr *container) probe(ap *Productl2, proc *process.Process, result *vm.CallResult) error {
centroidColPos := ap.OnExpr.GetF().GetArgs()[0].GetCol().GetColPos()
switch ctr.bat.Vecs[centroidColPos].GetType().Oid {
tblColPos := ap.OnExpr.GetF().GetArgs()[1].GetCol().GetColPos()
switch ctr.inBat.Vecs[tblColPos].GetType().Oid {
case types.T_array_float32:
return probeRun[float32](ctr, ap, proc, result)
case types.T_array_float64:
Expand All @@ -218,9 +255,16 @@ func (ctr *container) probe(ap *Productl2, proc *process.Process, result *vm.Cal
return nil
}

func (ctr *container) release() {
if ctr.brute_force != nil {
ctr.brute_force.Destroy()
ctr.brute_force = nil
}
}

func probeRun[T types.RealNumbers](ctr *container, ap *Productl2, proc *process.Process, result *vm.CallResult) error {
buildCount := ctr.bat.RowCount()
probeCount := ctr.inBat.RowCount()
tblColPos := ap.OnExpr.GetF().GetArgs()[1].GetCol().GetColPos()

ncpu := runtime.NumCPU()
if probeCount < ncpu {
Expand All @@ -234,85 +278,40 @@ func probeRun[T types.RealNumbers](ctr *container, ap *Productl2, proc *process.
}
}

leastClusterIndex := make([]int, probeCount)
leastDistance := make([]T, probeCount)

for i := 0; i < probeCount; i++ {
leastClusterIndex[i] = 0
leastDistance[i] = metric.MaxFloat[T]()
}

centroidmat, embedmat := newMat[T](ctr, ap)
distfn, err := metric.ResolveDistanceFn[T](ctr.metrictype)
probes, err := newMat[T](ctr, ap)
if err != nil {
return moerr.NewInternalError(proc.Ctx, "ProductL2: failed to get distance function")
return err
}

errs := make(chan error, ncpu)
var mutex sync.Mutex
var wg sync.WaitGroup
rt := vectorindex.RuntimeConfig{Limit: 1, NThreads: uint(ncpu)}

for n := 0; n < ncpu; n++ {
anykeys, distances, err := ctr.brute_force.Search(sqlexec.NewSqlProcess(proc), probes, rt)
if err != nil {
return err
}
_ = distances

wg.Add(1)
go func(tid int) {
defer wg.Done()
for j := 0; j < probeCount; j++ {
leastClusterIndex := anykeys.([]int64)

if j%ncpu != tid {
continue
}
//os.Stderr.WriteString(fmt.Sprintf("keys %v\n", keys))
//os.Stderr.WriteString(fmt.Sprintf("distances %v\n", distances))

// for each row in probe table,
// find the nearest cluster center from the build table.
for i := 0; i < buildCount; i++ {
for j := 0; j < probeCount; j++ {

if embedmat[j] == nil || centroidmat[i] == nil {
leastDistance[j] = 0
leastClusterIndex[j] = i
} else {
dist, err := distfn(centroidmat[i], embedmat[j])
if err != nil {
errs <- err
return
}
if dist < leastDistance[j] {
leastDistance[j] = dist
leastClusterIndex[j] = i
}
}
if ctr.inBat.Vecs[tblColPos].IsNull(uint64(j)) {
leastClusterIndex[j] = 0
}
for k, rp := range ap.Result {
if rp.Rel == 0 {
if err := ctr.rbat.Vecs[k].UnionOne(ctr.inBat.Vecs[rp.Pos], int64(j), proc.Mp()); err != nil {
return err
}

err := func() error {
mutex.Lock()
defer mutex.Unlock()
for k, rp := range ap.Result {
if rp.Rel == 0 {
if err := ctr.rbat.Vecs[k].UnionOne(ctr.inBat.Vecs[rp.Pos], int64(j), proc.Mp()); err != nil {
return err
}
} else {
if err := ctr.rbat.Vecs[k].UnionOne(ctr.bat.Vecs[rp.Pos], int64(leastClusterIndex[j]), proc.Mp()); err != nil {
return err
}
}
}

return nil
}()

if err != nil {
errs <- err
return
} else {
if err := ctr.rbat.Vecs[k].UnionOne(ctr.bat.Vecs[rp.Pos], int64(leastClusterIndex[j]), proc.Mp()); err != nil {
return err
}
}
}(n)
}

wg.Wait()

if len(errs) > 0 {
return <-errs
}
}

// ctr.rbat.AddRowCount(count * count2)
Expand Down
16 changes: 10 additions & 6 deletions pkg/sql/colexec/productl2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
"github.com/matrixorigin/matrixone/pkg/container/batch"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/sql/colexec"
vmetric "github.com/matrixorigin/matrixone/pkg/vectorindex/metric"
"github.com/matrixorigin/matrixone/pkg/vectorindex/cache"
"github.com/matrixorigin/matrixone/pkg/vectorindex/metric"
"github.com/matrixorigin/matrixone/pkg/vm"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)
Expand All @@ -34,11 +35,12 @@ const (
)

type container struct {
state int
bat *batch.Batch // build batch
rbat *batch.Batch
inBat *batch.Batch // probe batch
metrictype vmetric.MetricType
state int
bat *batch.Batch // build batch
rbat *batch.Batch
inBat *batch.Batch // probe batch
metrictype metric.MetricType
brute_force cache.VectorIndexSearchIf // brute_force.BruteForceIndex
}

type Productl2 struct {
Expand Down Expand Up @@ -89,11 +91,13 @@ func (productl2 *Productl2) Reset(proc *process.Process, pipelineFailed bool, er
if productl2.ctr.rbat != nil {
productl2.ctr.rbat.CleanOnlyData()
}
productl2.ctr.release()
productl2.ctr.inBat = nil
productl2.ctr.state = Build
}

func (productl2 *Productl2) Free(proc *process.Process, pipelineFailed bool, err error) {
productl2.ctr.release()
productl2.ctr.cleanBatch(proc.Mp())
}

Expand Down
Loading
Loading