@@ -99,7 +99,7 @@ func (pp *PredicatePipeline) close() {
9999 pp .wg .Done ()
100100}
101101
102- func (mp * MutationPipeline ) InsertTokenizerIndexes (ctx context.Context , pipeline * PredicatePipeline , postings * map [uint64 ]* pb.PostingList ) error {
102+ func (mp * MutationPipeline ) InsertTokenizerIndexes (ctx context.Context , pipeline * PredicatePipeline , postings * map [uint64 ]* pb.PostingList , info predicateInfo ) error {
103103 startTime := time .Now ()
104104 defer func () {
105105 fmt .Println ("Inserting tokenizer indexes for predicate" , pipeline .attr , "took" , time .Since (startTime ))
@@ -230,8 +230,17 @@ func (mp *MutationPipeline) InsertTokenizerIndexes(ctx context.Context, pipeline
230230 }
231231
232232 globalMapI := f (100 )
233+
233234 mp .txn .cache .Lock ()
234235 defer mp .txn .cache .Unlock ()
236+
237+ if info .hasUpsert {
238+ globalMapI .Iterate (func (key string , value * pb.PostingList ) error {
239+ mp .txn .addConflictKey (farm .Fingerprint64 ([]byte (key )))
240+ return nil
241+ })
242+ }
243+
235244 globalMap := mp .txn .cache .deltas .GetIndexMapForPredicate (pipeline .attr )
236245 if globalMap == nil {
237246 globalMap = types .NewLockedShardedMap [string , * pb.PostingList ]()
@@ -248,7 +257,16 @@ func (mp *MutationPipeline) InsertTokenizerIndexes(ctx context.Context, pipeline
248257 return nil
249258}
250259
251- func (mp * MutationPipeline ) ProcessList (ctx context.Context , pipeline * PredicatePipeline , index bool , reverse bool , count bool , noConflict bool ) error {
260+ type predicateInfo struct {
261+ isList bool
262+ index bool
263+ reverse bool
264+ count bool
265+ noConflict bool
266+ hasUpsert bool
267+ }
268+
269+ func (mp * MutationPipeline ) ProcessList (ctx context.Context , pipeline * PredicatePipeline , info predicateInfo ) error {
252270 su , schemaExists := schema .State ().Get (ctx , pipeline .attr )
253271
254272 mutations := make (map [uint64 ]* MutableLayer , 1000 )
@@ -285,20 +303,20 @@ func (mp *MutationPipeline) ProcessList(ctx context.Context, pipeline *Predicate
285303 postings [uid ] = pl .currentEntries
286304 }
287305
288- if reverse {
289- if err := mp .ProcessReverse (ctx , pipeline , & postings , true , reverse , count ); err != nil {
306+ if info . reverse {
307+ if err := mp .ProcessReverse (ctx , pipeline , & postings , info ); err != nil {
290308 return err
291309 }
292310 }
293311
294- if index {
295- if err := mp .InsertTokenizerIndexes (ctx , pipeline , & postings ); err != nil {
312+ if info . index {
313+ if err := mp .InsertTokenizerIndexes (ctx , pipeline , & postings , info ); err != nil {
296314 return err
297315 }
298316 }
299317
300- if count {
301- if err := mp .ProcessCount (ctx , pipeline , & postings , false , false ); err != nil {
318+ if info . count {
319+ if err := mp .ProcessCount (ctx , pipeline , & postings , info ); err != nil {
302320 return err
303321 }
304322 }
@@ -315,8 +333,8 @@ func (mp *MutationPipeline) ProcessList(ctx context.Context, pipeline *Predicate
315333 if newPl , err := mp .txn .AddDelta (baseKey + string (dataKey [len (dataKey )- 8 :]), * pl ); err != nil {
316334 return err
317335 } else {
318- if ! noConflict {
319- mp .txn .addConflictKeyWithUid (dataKey , newPl )
336+ if ! info . noConflict {
337+ mp .txn .addConflictKeyWithUid (dataKey , newPl , info . hasUpsert , info . noConflict )
320338 }
321339 }
322340 }
@@ -336,7 +354,7 @@ func findSingleValueInPostingList(pb *pb.PostingList) *pb.Posting {
336354 return nil
337355}
338356
339- func (mp * MutationPipeline ) ProcessReverse (ctx context.Context , pipeline * PredicatePipeline , postings * map [uint64 ]* pb.PostingList , list bool , reverse bool , count bool ) error {
357+ func (mp * MutationPipeline ) ProcessReverse (ctx context.Context , pipeline * PredicatePipeline , postings * map [uint64 ]* pb.PostingList , info predicateInfo ) error {
340358 key := x .ReverseKey (pipeline .attr , 0 )
341359 edge := & pb.DirectedEdge {
342360 Attr : pipeline .attr ,
@@ -359,8 +377,8 @@ func (mp *MutationPipeline) ProcessReverse(ctx context.Context, pipeline *Predic
359377 }
360378 }
361379
362- if count {
363- return mp .ProcessCount (ctx , pipeline , & reverseredMap , false , true )
380+ if info . count {
381+ return mp .ProcessCount (ctx , pipeline , & reverseredMap , info )
364382 }
365383
366384 for uid , pl := range reverseredMap {
@@ -371,7 +389,7 @@ func (mp *MutationPipeline) ProcessReverse(ctx context.Context, pipeline *Predic
371389 if newPl , err := mp .txn .AddDelta (string (key ), * pl ); err != nil {
372390 return err
373391 } else {
374- mp .txn .addConflictKeyWithUid (key , newPl )
392+ mp .txn .addConflictKeyWithUid (key , newPl , info . hasUpsert , info . noConflict )
375393 }
376394 }
377395
@@ -430,21 +448,28 @@ func (mp *MutationPipeline) handleOldDeleteForSingle(pipeline *PredicatePipeline
430448 return nil
431449}
432450
433- func (txn * Txn ) addConflictKeyWithUid (key []byte , pl * pb.PostingList ) {
451+ func (txn * Txn ) addConflictKeyWithUid (key []byte , pl * pb.PostingList , hasUpsert bool , hasNoConflict bool ) {
452+ if hasNoConflict {
453+ return
454+ }
434455 txn .Lock ()
435456 defer txn .Unlock ()
436457 if txn .conflicts == nil {
437458 txn .conflicts = make (map [uint64 ]struct {})
438459 }
439460 keyHash := farm .Fingerprint64 (key )
461+ if hasUpsert {
462+ txn .conflicts [keyHash ] = struct {}{}
463+ return
464+ }
440465 for _ , post := range pl .Postings {
441466 txn .conflicts [keyHash ^ post .Uid ] = struct {}{}
442467 }
443468}
444469
445- func (mp * MutationPipeline ) ProcessCount (ctx context.Context , pipeline * PredicatePipeline , postings * map [uint64 ]* pb.PostingList , isSingle bool , reverse bool ) error {
470+ func (mp * MutationPipeline ) ProcessCount (ctx context.Context , pipeline * PredicatePipeline , postings * map [uint64 ]* pb.PostingList , info predicateInfo ) error {
446471 dataKey := x .DataKey (pipeline .attr , 0 )
447- if reverse {
472+ if info . reverse {
448473 dataKey = x .ReverseKey (pipeline .attr , 0 )
449474 }
450475 edge := pb.DirectedEdge {
@@ -485,18 +510,20 @@ func (mp *MutationPipeline) ProcessCount(ctx context.Context, pipeline *Predicat
485510 }
486511 }
487512
488- list .updateMutationLayer (post , isSingle , true )
513+ list .updateMutationLayer (post , ! info . isList , true )
489514 }
490515
491516 newCount := list .GetLength (mp .txn .StartTs )
492517 updated := list .mutationMap .currentEntries != nil
493518 list .Unlock ()
494519
495520 if updated {
496- if isSingle {
497- mp .txn .addConflictKey (farm .Fingerprint64 (dataKey ))
521+ if ! info .isList {
522+ if ! info .noConflict {
523+ mp .txn .addConflictKey (farm .Fingerprint64 (dataKey ))
524+ }
498525 } else {
499- mp .txn .addConflictKeyWithUid (dataKey , postingList )
526+ mp .txn .addConflictKeyWithUid (dataKey , postingList , info . hasUpsert , info . noConflict )
500527 }
501528 }
502529
@@ -519,24 +546,24 @@ func (mp *MutationPipeline) ProcessCount(ctx context.Context, pipeline *Predicat
519546
520547 for c , pl := range countMap {
521548 //fmt.Println("COUNT", c, pl)
522- ck := x .CountKey (pipeline .attr , uint32 (c ), reverse )
549+ ck := x .CountKey (pipeline .attr , uint32 (c ), info . reverse )
523550 if newPl , err := mp .txn .AddDelta (string (ck ), * pl ); err != nil {
524551 return err
525552 } else {
526- mp .txn .addConflictKeyWithUid (ck , newPl )
553+ mp .txn .addConflictKeyWithUid (ck , newPl , info . hasUpsert , info . noConflict )
527554 }
528555 }
529556
530557 return nil
531558}
532559
533- func (mp * MutationPipeline ) ProcessSingle (ctx context.Context , pipeline * PredicatePipeline , index bool , reverse bool , count bool , noConflict bool ) error {
560+ func (mp * MutationPipeline ) ProcessSingle (ctx context.Context , pipeline * PredicatePipeline , info predicateInfo ) error {
534561 su , schemaExists := schema .State ().Get (ctx , pipeline .attr )
535562
536563 postings := make (map [uint64 ]* pb.PostingList , 1000 )
537564
538565 dataKey := x .DataKey (pipeline .attr , 0 )
539- insertDeleteAllEdge := ! (index || reverse || count )
566+ insertDeleteAllEdge := ! (info . index || info . reverse || info . count )
540567
541568 var oldVal * pb.Posting
542569 for edge := range pipeline .edges {
@@ -611,27 +638,27 @@ func (mp *MutationPipeline) ProcessSingle(ctx context.Context, pipeline *Predica
611638 }
612639 }
613640
614- if index || reverse || count {
641+ if info . index || info . reverse || info . count {
615642 if err := mp .handleOldDeleteForSingle (pipeline , postings ); err != nil {
616643 return err
617644 }
618645 }
619646
620- if index {
621- if err := mp .InsertTokenizerIndexes (ctx , pipeline , & postings ); err != nil {
647+ if info . index {
648+ if err := mp .InsertTokenizerIndexes (ctx , pipeline , & postings , info ); err != nil {
622649 return err
623650 }
624651 }
625652
626- if reverse {
627- if err := mp .ProcessReverse (ctx , pipeline , & postings , false , reverse , count ); err != nil {
653+ if info . reverse {
654+ if err := mp .ProcessReverse (ctx , pipeline , & postings , info ); err != nil {
628655 return err
629656 }
630657 }
631658
632- if count {
659+ if info . count {
633660 // Count should take care of updating the posting list
634- return mp .ProcessCount (ctx , pipeline , & postings , true , false )
661+ return mp .ProcessCount (ctx , pipeline , & postings , info )
635662 }
636663
637664 baseKey := string (dataKey [:len (dataKey )- 8 ]) // Avoid repeated conversion
@@ -640,7 +667,7 @@ func (mp *MutationPipeline) ProcessSingle(ctx context.Context, pipeline *Predica
640667 binary .BigEndian .PutUint64 (dataKey [len (dataKey )- 8 :], uid )
641668 key := baseKey + string (dataKey [len (dataKey )- 8 :])
642669
643- if ! noConflict {
670+ if ! info . noConflict {
644671 mp .txn .addConflictKey (farm .Fingerprint64 ([]byte (key )))
645672 }
646673
@@ -704,36 +731,32 @@ func (mp *MutationPipeline) ProcessPredicate(ctx context.Context, pipeline *Pred
704731 // We shouldn't check whether this Alpha serves this predicate or not. Membership information
705732 // isn't consistent across the entire cluster. We should just apply whatever is given to us.
706733 su , ok := schema .State ().Get (ctx , pipeline .attr )
707- hasCountIndex := false
708- hasReversedIndex := false
709- hasIndex := false
710- isList := false
711- hasNoConflict := false
734+ info := predicateInfo {}
712735
713736 if ok {
714- isList = su . GetList ( )
715- hasIndex = schema .State ().IsIndexed (ctx , pipeline .attr )
716- hasCountIndex = schema .State ().HasCount (ctx , pipeline .attr )
717- hasReversedIndex = schema .State ().IsReversed ( ctx , pipeline .attr )
718- hasNoConflict = schema .State ().HasNoConflict (pipeline .attr )
737+ info . index = schema . State (). IsIndexed ( ctx , pipeline . attr )
738+ info . count = schema .State ().HasCount (ctx , pipeline .attr )
739+ info . reverse = schema .State ().IsReversed (ctx , pipeline .attr )
740+ info . noConflict = schema .State ().HasNoConflict ( pipeline .attr )
741+ info . hasUpsert = schema .State ().HasUpsert (pipeline .attr )
719742 }
720743
721744 runListFn := false
722745
723746 if ok {
724- if isList || su .Lang {
747+ if info . isList || su .Lang {
725748 runListFn = true
726749 }
727750 }
728751
729752 if runListFn {
730- if err := mp .ProcessList (ctx , pipeline , hasIndex , hasReversedIndex , hasCountIndex , hasNoConflict ); err != nil {
753+ if err := mp .ProcessList (ctx , pipeline , info ); err != nil {
731754 return err
732755 }
733756 }
734757
735758 if ok && ! runListFn {
736- if err := mp .ProcessSingle (ctx , pipeline , hasIndex , hasReversedIndex , hasCountIndex , hasNoConflict ); err != nil {
759+ if err := mp .ProcessSingle (ctx , pipeline , info ); err != nil {
737760 return err
738761 }
739762 }
0 commit comments