Skip to content

Commit b860d4a

Browse files
perf(zero): update zero's delta proposal frequency
1 parent 5d97bd0 commit b860d4a

File tree

2 files changed

+73
-5
lines changed

2 files changed

+73
-5
lines changed

dgraph/cmd/zero/oracle.go

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -225,19 +225,47 @@ func (o *Oracle) sendDeltasToSubscribers() {
225225
// Let's ensure that we have all the commits up until the max here.
226226
// Otherwise, we'll be sending commit timestamps out of order, which
227227
// would cause Alphas to drop some of them, during writes to Badger.
228+
229+
newDelta := &pb.OracleDelta{}
230+
useNewDelta := false
228231
if o.doneUntil.DoneUntil() < waitFor() {
229-
continue // The for loop doing blocking reads from o.updates.
230-
// We need at least one entry from the updates channel to pick up a missing update.
231-
// Don't goto slurp_loop, because it would break from select immediately.
232+
if len(delta.Txns) > 5 {
233+
replacementTxn := []*pb.TxnStatus{}
234+
235+
ts := o.doneUntil.DoneUntil()
236+
for _, txn := range delta.Txns {
237+
if txn.CommitTs > ts {
238+
replacementTxn = append(replacementTxn, txn)
239+
} else {
240+
newDelta.Txns = append(newDelta.Txns, txn)
241+
newDelta.MaxAssigned = x.Max(newDelta.MaxAssigned, txn.CommitTs)
242+
}
243+
}
244+
245+
if len(newDelta.Txns) == 0 {
246+
continue
247+
}
248+
249+
useNewDelta = true
250+
delta.Txns = replacementTxn
251+
} else {
252+
continue // The for loop doing blocking reads from o.updates.
253+
// We need at least one entry from the updates channel to pick up a missing update.
254+
// Don't goto slurp_loop, because it would break from select immediately.
255+
}
232256
}
233257

234258
if glog.V(3) {
235259
glog.Infof("DoneUntil: %d. Sending delta: %+v\n", o.doneUntil.DoneUntil(), delta)
236260
}
237261
o.Lock()
262+
k := *delta
263+
if useNewDelta {
264+
k = *newDelta
265+
}
238266
for id, ch := range o.subscribers {
239267
select {
240-
case ch <- *delta:
268+
case ch <- k:
241269
default:
242270
close(ch)
243271
delete(o.subscribers, id)

worker/draft.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ type node struct {
6868
opsLock sync.Mutex
6969
cdcTracker *CDC
7070
canCampaign bool
71+
72+
startTsKey map[uint64]uint64
7173
}
7274

7375
type op int
@@ -876,6 +878,12 @@ func (n *node) processApplyCh() {
876878
glog.V(3).Infof("handling element in applyCh with #entries %v", len(entries))
877879
defer glog.V(3).Infof("done handling element in applyCh")
878880

881+
_, spanHandler := otel.Tracer("applyCh").Start(context.Background(), "Alpha.processApplyCh")
882+
defer spanHandler.End()
883+
884+
spanHandler.AddEvent("handling element in applyCh with #entries %v", trace.WithAttributes(
885+
attribute.Int64("numEntries", int64(len(entries)))))
886+
879887
var totalSize int64
880888
for _, entry := range entries {
881889
x.AssertTrue(len(entry.Data) > 0)
@@ -909,7 +917,7 @@ func (n *node) processApplyCh() {
909917
p := &P{err: perr, size: psz, seen: time.Now()}
910918
previous[key] = p
911919
}
912-
span := trace.SpanFromContext(n.ctx)
920+
span := trace.SpanFromContext(n.Ctx(key))
913921
if perr != nil {
914922
glog.Errorf("Applying proposal. Error: %v. Proposal: %q.", perr, &proposal)
915923
span.AddEvent(fmt.Sprintf("Applying proposal failed. Error: %v Proposal: %q", perr, &proposal))
@@ -919,11 +927,18 @@ func (n *node) processApplyCh() {
919927
attribute.Int64("key", int64(key)),
920928
attribute.Int64("index", int64(proposal.Index)),
921929
))
930+
spanHandler.AddEvent("Applied proposal with key: %d, index: %d. Err: %v",
931+
trace.WithAttributes(
932+
attribute.Int64("key", int64(key)),
933+
attribute.Int64("index", int64(proposal.Index)),
934+
))
922935

923936
var tags []tag.Mutator
924937
switch {
925938
case proposal.Mutations != nil:
926939
tags = append(tags, tag.Upsert(x.KeyMethod, "apply.Mutations"))
940+
span.SetAttributes(attribute.Int64("start_ts", int64(proposal.Mutations.StartTs)))
941+
n.startTsKey[proposal.Mutations.StartTs] = key
927942
case proposal.Delta != nil:
928943
tags = append(tags, tag.Upsert(x.KeyMethod, "apply.Delta"))
929944
}
@@ -966,6 +981,9 @@ func (n *node) processApplyCh() {
966981

967982
// TODO(Anurag - 4 May 2020): Are we using pkey? Remove if unused.
968983
func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error {
984+
_, span := otel.Tracer("alpha.CommitLoop").Start(context.Background(), "commitOrAbort")
985+
defer span.End()
986+
969987
x.PrintOracleDelta(delta)
970988
// First let's commit all mutations to disk.
971989
writer := posting.NewTxnWriter(pstore)
@@ -995,6 +1013,18 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error {
9951013
start, commit, err)
9961014
panic(err)
9971015
}
1016+
1017+
span.AddEvent("Committed txn with start_ts: %d, commit_ts: %d", trace.WithAttributes(
1018+
attribute.Int64("start_ts", int64(start)),
1019+
attribute.Int64("commit_ts", int64(commit)),
1020+
))
1021+
1022+
spani := trace.SpanFromContext(n.Ctx(n.startTsKey[start]))
1023+
fmt.Println("FIRST", spani)
1024+
spani.AddEvent("Committed txn with start_ts: %d, commit_ts: %d", trace.WithAttributes(
1025+
attribute.Int64("start_ts", int64(start)),
1026+
attribute.Int64("commit_ts", int64(commit)),
1027+
))
9981028
}
9991029

10001030
for _, status := range delta.Txns {
@@ -1004,6 +1034,16 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error {
10041034
return errors.Wrapf(err, "while flushing to disk")
10051035
}
10061036

1037+
span.AddEvent("Flushed to disk")
1038+
for _, status := range delta.Txns {
1039+
spani := trace.SpanFromContext(n.Ctx(n.startTsKey[status.StartTs]))
1040+
fmt.Println("Second", spani)
1041+
spani.AddEvent("Flushed txn with start_ts: %d, commit_ts: %d", trace.WithAttributes(
1042+
attribute.Int64("start_ts", int64(status.StartTs)),
1043+
attribute.Int64("commit_ts", int64(status.CommitTs)),
1044+
))
1045+
}
1046+
10071047
if x.WorkerConfig.HardSync {
10081048
if err := pstore.Sync(); err != nil {
10091049
glog.Errorf("Error while calling Sync while commitOrAbort: %v", err)

0 commit comments

Comments
 (0)