Skip to content

Commit a24c873

Browse files
added filter for vector lengths
1 parent abe93cc commit a24c873

File tree

3 files changed

+10
-12
lines changed

3 files changed

+10
-12
lines changed

posting/oracle.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/dgraph-io/badger/v4"
1717
"github.com/golang/glog"
1818
ostats "go.opencensus.io/stats"
19+
"go.opentelemetry.io/otel/trace"
1920

2021
"github.com/hypermodeinc/dgraph/v25/protos/pb"
2122
"github.com/hypermodeinc/dgraph/v25/tok/index"
@@ -54,6 +55,8 @@ type Txn struct {
5455
lastUpdate time.Time
5556

5657
cache *LocalCache // This pointer does not get modified.
58+
59+
Span trace.Span
5760
}
5861

5962
// struct to implement Txn interface from vector-indexer

worker/draft.go

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,6 @@ type node struct {
6868
opsLock sync.Mutex
6969
cdcTracker *CDC
7070
canCampaign bool
71-
72-
startTsKey map[uint64]uint64
7371
}
7472

7573
type op int
@@ -279,7 +277,6 @@ func newNode(store *raftwal.DiskStorage, gid uint32, id uint64, myAddr string) *
279277
closer: z.NewCloser(4), // Matches CLOSER:1
280278
ops: make(map[op]operation),
281279
cdcTracker: newCDC(),
282-
startTsKey: make(map[uint64]uint64),
283280
}
284281
return n
285282
}
@@ -939,8 +936,8 @@ func (n *node) processApplyCh() {
939936
case proposal.Mutations != nil:
940937
tags = append(tags, tag.Upsert(x.KeyMethod, "apply.Mutations"))
941938
span.SetAttributes(attribute.Int64("start_ts", int64(proposal.Mutations.StartTs)))
942-
n.startTsKey[proposal.Mutations.StartTs] = key
943-
fmt.Println("Setting proposal key", proposal.Mutations.StartTs, key)
939+
posting.Oracle().GetTxn(proposal.Mutations.StartTs).Span = span
940+
944941
case proposal.Delta != nil:
945942
tags = append(tags, tag.Upsert(x.KeyMethod, "apply.Delta"))
946943
}
@@ -1021,9 +1018,7 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error {
10211018
attribute.Int64("commit_ts", int64(commit)),
10221019
))
10231020

1024-
spani := trace.SpanFromContext(n.Ctx(n.startTsKey[start]))
1025-
fmt.Println("FIRST", spani, n.startTsKey[start], start)
1026-
spani.AddEvent("Committed txn with start_ts: %d, commit_ts: %d", trace.WithAttributes(
1021+
txn.Span.AddEvent("Committed txn with start_ts: %d, commit_ts: %d", trace.WithAttributes(
10271022
attribute.Int64("start_ts", int64(start)),
10281023
attribute.Int64("commit_ts", int64(commit)),
10291024
))
@@ -1038,13 +1033,11 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error {
10381033

10391034
span.AddEvent("Flushed to disk")
10401035
for _, status := range delta.Txns {
1041-
spani := trace.SpanFromContext(n.Ctx(n.startTsKey[status.StartTs]))
1042-
fmt.Println("Second", spani)
1043-
spani.AddEvent("Flushed txn with start_ts: %d, commit_ts: %d", trace.WithAttributes(
1036+
txn := posting.Oracle().GetTxn(status.StartTs)
1037+
txn.Span.AddEvent("Flushed txn with start_ts: %d, commit_ts: %d", trace.WithAttributes(
10441038
attribute.Int64("start_ts", int64(status.StartTs)),
10451039
attribute.Int64("commit_ts", int64(status.CommitTs)),
10461040
))
1047-
delete(n.startTsKey, status.StartTs)
10481041
}
10491042

10501043
if x.WorkerConfig.HardSync {

worker/proposal.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,8 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.Proposal) (perr
221221
cctx, cancel := context.WithCancel(context.Background())
222222
defer cancel()
223223

224+
cctx = trace.ContextWithSpan(cctx, span)
225+
224226
errCh := make(chan error, 1)
225227
pctx := &conn.ProposalCtx{
226228
ErrCh: errCh,

0 commit comments

Comments
 (0)