File tree Expand file tree Collapse file tree 3 files changed +46
-0
lines changed Expand file tree Collapse file tree 3 files changed +46
-0
lines changed Original file line number Diff line number Diff line change @@ -28,6 +28,14 @@ func (t *Tr) DeleteItems(values ...interface{}) ([]*Item, error) {
2828 keys = append (keys , & Key {Value : item .Value , Payload : item .Payload })
2929 }, values ... )
3030
31+ if err != nil {
32+ return nil , err
33+ }
34+
35+ // we need to sort the keys to ensure that a multidelete
36+ // distributes deletes across a single node correctly
37+ keys = keys .sort (t .config .Comparator )
38+
3139 err = t .delete (keys )
3240 if err != nil {
3341 return nil , err
Original file line number Diff line number Diff line change @@ -59,6 +59,32 @@ func (k Keys) toItems() items {
5959 return items
6060}
6161
62+ func (k Keys ) sort (comparator Comparator ) Keys {
63+ return (& keySortWrapper {comparator , k }).sort ()
64+ }
65+
66+ type keySortWrapper struct {
67+ comparator Comparator
68+ keys Keys
69+ }
70+
71+ func (sw * keySortWrapper ) Len () int {
72+ return len (sw .keys )
73+ }
74+
75+ func (sw * keySortWrapper ) Swap (i , j int ) {
76+ sw .keys [i ], sw .keys [j ] = sw .keys [j ], sw .keys [i ]
77+ }
78+
79+ func (sw * keySortWrapper ) Less (i , j int ) bool {
80+ return sw .comparator (sw .keys [i ].Value , sw .keys [j ].Value ) < 0
81+ }
82+
83+ func (sw * keySortWrapper ) sort () Keys {
84+ sort .Sort (sw )
85+ return sw .keys
86+ }
87+
6288func splitKeys (keys Keys , numParts int ) []Keys {
6389 parts := make ([]Keys , numParts )
6490 for i := int64 (0 ); i < int64 (numParts ); i ++ {
Original file line number Diff line number Diff line change @@ -587,6 +587,18 @@ func TestDeleteAfterSplitIncreasing(t *testing.T) {
587587 }
588588}
589589
590+ func TestDeleteMultipleLevelsRandomlyBulk (t * testing.T ) {
591+ num := 200
592+ cfg := defaultConfig ()
593+ rt := New (cfg )
594+ mutable := rt .AsMutable ()
595+ items := generateRandomItems (num )
596+ mutable .AddItems (items ... )
597+ mutable .DeleteItems (itemsToValues (items [:100 ]... )... )
598+ result , _ := mutable .(* Tr ).toList (itemsToValues (items ... )... )
599+ assert .Len (t , result , 100 )
600+ }
601+
590602func TestDeleteAfterSplitDecreasing (t * testing.T ) {
591603 num := 11
592604 cfg := defaultConfig ()
You can’t perform that action at this time.
0 commit comments