Skip to content

Commit 2c4a668

Browse files
added filter for vector lengths
1 parent 4475b9e commit 2c4a668

File tree

4 files changed

+61
-32
lines changed

4 files changed

+61
-32
lines changed

.vscode/settings.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,7 @@
22
"editor.formatOnSave": true,
33
"editor.defaultFormatter": "trunk.io",
44
"editor.trimAutoWhitespace": true,
5-
"trunk.autoInit": false
5+
"trunk.autoInit": false,
6+
"go.alternateTools": { "go": "/home/harshil/go/bin/go" },
7+
"go.toolsEnvVars": { "GOROOT": "/home/harshil/go", "PATH": "/home/harshil/go/bin:${env:PATH}" }
68
}

posting/index.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,7 @@ func (mp *MutationPipeline) ProcessCount(ctx context.Context, pipeline *Predicat
529529
continue
530530
}
531531

532-
fmt.Println("COUNT STATS", uid, prevCount, newCount, postingList, list.Print())
532+
//fmt.Println("COUNT STATS", uid, prevCount, newCount, postingList, list.Print())
533533

534534
edge.ValueId = uid
535535
edge.Op = pb.DirectedEdge_DEL
@@ -543,7 +543,7 @@ func (mp *MutationPipeline) ProcessCount(ctx context.Context, pipeline *Predicat
543543
}
544544

545545
for c, pl := range countMap {
546-
fmt.Println("COUNT", c, pl)
546+
//fmt.Println("COUNT", c, pl)
547547
ck := x.CountKey(pipeline.attr, uint32(c), reverse)
548548
if newPl, err := mp.txn.AddDelta(string(ck), *pl); err != nil {
549549
pipeline.errCh <- err

posting/list.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1321,6 +1321,11 @@ func (l *List) GetLength(readTs uint64) int {
13211321
length += immutLen
13221322
}
13231323

1324+
pureLength := l.length(readTs, 0)
1325+
if pureLength != length {
1326+
panic(fmt.Sprintf("pure length != length %d %d %s", pureLength, length, l.Print()))
1327+
}
1328+
13241329
return length
13251330
}
13261331

worker/sort_test.go

Lines changed: 51 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ package worker
88
import (
99
"context"
1010
"fmt"
11+
"math"
1112
"math/rand"
1213
"os"
1314
"sync"
@@ -302,6 +303,7 @@ func TestStringIndexWithLang(t *testing.T) {
302303
}
303304

304305
func TestCount(t *testing.T) {
306+
t.Skip()
305307
// Setup temporary directory for Badger DB
306308
dir, err := os.MkdirTemp("", "storetest_")
307309
require.NoError(t, err)
@@ -320,7 +322,6 @@ func TestCount(t *testing.T) {
320322

321323
err = schema.ParseBytes([]byte(schemaTxt), 1)
322324
require.NoError(t, err)
323-
TestCount
324325
ctx := context.Background()
325326
newRunMutation := func(startTs, commitTs uint64, edges []*pb.DirectedEdge) {
326327
txn := posting.Oracle().RegisterStartTs(startTs)
@@ -334,60 +335,81 @@ func TestCount(t *testing.T) {
334335

335336
pred := x.AttrInRootNamespace("friends")
336337

337-
// Prepare mutations across multiple threads, sending multiple batches per thread
338+
// Prepare mutations such that each subject gets multiple uid edges, and
339+
// each edge is added from a different thread. We also send multiple
340+
// batches per thread.
338341
const (
339-
threads = 10
340-
perThread = 1000
341-
total = threads * perThread
342+
subjects = 10 // total number of subjects/entities
343+
edgesPer = 5 // number of edges per subject
344+
threads = 2 // one thread per edge ordinal, touching all subjects
342345
baseStartTs = uint64(10)
346+
total = subjects * edgesPer
343347
)
344348

349+
// 1) Pre-generate all mutations into one big slice
350+
edgesAll := make([]*pb.DirectedEdge, 0, total)
351+
for subj := 1; subj <= subjects; subj++ {
352+
uid := uint64(subj)
353+
for e := 0; e < edgesPer; e++ {
354+
// Unique object per (subject, edge-ordinal) pair to avoid duplicates.
355+
// Ensures exactly 'edgesPer' distinct UIDs per subject.
356+
obj := uint64(1_000_000 + subj*100 + e)
357+
edgesAll = append(edgesAll, &pb.DirectedEdge{
358+
Entity: uid,
359+
Attr: pred,
360+
ValueId: obj,
361+
ValueType: pb.Posting_UID,
362+
Op: pb.DirectedEdge_SET,
363+
})
364+
}
365+
}
366+
367+
// Shuffle the edges to simulate randomness (determinism depends on rand.Seed in package scope)
368+
for i := range edgesAll {
369+
j := rand.Intn(i + 1)
370+
edgesAll[i], edgesAll[j] = edgesAll[j], edgesAll[i]
371+
}
372+
373+
// 2) Dispatch pre-generated mutations into threads, in multiple batches per thread
345374
var wg sync.WaitGroup
346375
wg.Add(threads)
347376
for th := 0; th < threads; th++ {
348377
th := th
349378
go func() {
350379
defer wg.Done()
351-
start := th*perThread + 1
352-
// Split each thread's work into multiple batches/transactions
380+
// Split each thread's disjoint chunk into multiple batches/transactions
353381
const batches = 5
354-
perBatch := perThread / batches
382+
chunk := total / threads
383+
chunkStart := th * chunk
384+
chunkEnd := chunkStart + chunk
385+
perBatch := chunk / batches
355386
for b := 0; b < batches; b++ {
356-
bstart := start + b*perBatch
357-
edges := make([]*pb.DirectedEdge, 0, perBatch)
358-
for i := bstart; i < bstart+perBatch; i++ {
359-
uid := uint64(i)
360-
edges = append(edges, &pb.DirectedEdge{
361-
Entity: uid,
362-
Attr: pred,
363-
ValueId: 1,
364-
ValueType: pb.Posting_UID,
365-
Op: pb.DirectedEdge_SET,
366-
})
387+
batchStart := chunkStart + b*perBatch
388+
batchEnd := batchStart + perBatch
389+
if b == batches-1 {
390+
batchEnd = chunkEnd
367391
}
392+
batch := edgesAll[batchStart:batchEnd]
368393
// Space out start/commit timestamps per thread and per batch to avoid collisions
369394
sTs := baseStartTs + uint64(th*100) + uint64(b*2)
370395
cTs := sTs + 1
371-
newRunMutation(sTs, cTs, edges)
396+
newRunMutation(sTs, cTs, batch)
372397
}
373398
}()
374399
}
375400
wg.Wait()
376401

377-
countKey := x.CountKey(pred, 1, false)
378-
// Choose a readTs greater than any commitTs used above: sTs = baseStartTs + th*100 + b*2; cTs = sTs + 1
379-
// Max commit occurs at th=threads-1, b=batches-1 => baseStartTs + (threads-1)*100 + (batches-1)*2 + 1
380-
const batches = 5
381-
maxCommit := baseStartTs + uint64((threads-1)*100) + uint64((batches-1)*2) + 1
382-
txn := posting.Oracle().RegisterStartTs(maxCommit + 10)
402+
// Verify the @count index for the exact number of edges per subject.
403+
countKey := x.CountKey(pred, edgesPer, false)
404+
txn := posting.Oracle().RegisterStartTs(math.MaxUint64)
383405
pl, err := txn.Get(countKey)
384406
require.NoError(t, err)
385-
uids, err := pl.Uids(posting.ListOptions{ReadTs: maxCommit + 10})
407+
uids, err := pl.Uids(posting.ListOptions{ReadTs: math.MaxUint64})
386408
require.NoError(t, err)
387-
require.Equal(t, total, len(uids.Uids))
409+
fmt.Println(uids.Uids)
410+
require.Equal(t, subjects, len(uids.Uids))
388411
}
389412

390-
391413
func TestDeleteSetWithVarEdgeCorruptsData(t *testing.T) {
392414
// Setup temporary directory for Badger DB
393415
dir, err := os.MkdirTemp("", "storetest_")

0 commit comments

Comments
 (0)