Skip to content

Commit c472f9b

Browse files
Add immutable btree.
1 parent 836ef31 commit c472f9b

File tree

14 files changed

+3078
-0
lines changed

14 files changed

+3078
-0
lines changed

btree/immutable/add.go

Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,303 @@
1+
package btree
2+
3+
import (
4+
"runtime"
5+
"sort"
6+
"sync"
7+
8+
terr "github.com/Workiva/go-datastructures/threadsafe/err"
9+
)
10+
11+
func (t *Tr) AddItems(its ...*Item) ([]*Item, error) {
12+
if len(its) == 0 {
13+
return nil, nil
14+
}
15+
16+
keys := make(Keys, 0, len(its))
17+
for _, item := range its {
18+
keys = append(keys, &Key{Value: item.Value, Payload: item.Payload})
19+
}
20+
21+
overwrittens, err := t.add(keys)
22+
if err != nil {
23+
return nil, err
24+
}
25+
26+
return overwrittens.toItems(), nil
27+
}
28+
29+
func (t *Tr) add(keys Keys) (Keys, error) {
30+
if t.Root == nil {
31+
n := t.createRoot()
32+
t.Root = n.ID
33+
t.context.addNode(n)
34+
}
35+
36+
nodes, err := t.determinePaths(keys)
37+
if err != nil {
38+
return nil, err
39+
}
40+
41+
var overwrittens Keys
42+
43+
var wg sync.WaitGroup
44+
wg.Add(len(nodes))
45+
var treeLock sync.Mutex
46+
localOverwrittens := make([]Keys, len(nodes))
47+
tree := make(map[string]*path, runtime.NumCPU())
48+
lerr := terr.New()
49+
50+
i := 0
51+
for id, bundles := range nodes {
52+
go func(i int, id string, bundles []*nodeBundle) {
53+
defer wg.Done()
54+
if len(bundles) == 0 {
55+
return
56+
}
57+
58+
n, err := t.contextOrCachedNode(ID(id), true)
59+
if err != nil {
60+
lerr.Set(err)
61+
return
62+
}
63+
64+
if !t.context.nodeExists(n.ID) {
65+
n = n.copy()
66+
t.context.addNode(n)
67+
}
68+
69+
overwrittens, err := insertLastDimension(t, n, bundles)
70+
71+
if err != nil {
72+
lerr.Set(err)
73+
return
74+
}
75+
76+
localOverwrittens[i] = overwrittens
77+
path := bundles[0].path
78+
treeLock.Lock()
79+
tree[string(n.ID)] = path
80+
treeLock.Unlock()
81+
}(i, id, bundles)
82+
i++
83+
}
84+
85+
wg.Wait()
86+
87+
if lerr.Get() != nil {
88+
return nil, lerr.Get()
89+
}
90+
91+
t.walkupInsert(tree)
92+
93+
for _, chunk := range localOverwrittens {
94+
overwrittens = append(overwrittens, chunk...)
95+
}
96+
97+
t.Count += len(keys) - len(overwrittens)
98+
99+
return overwrittens, nil
100+
}
101+
102+
func (t *Tr) determinePaths(keys Keys) (map[string][]*nodeBundle, error) {
103+
chunks := splitKeys(keys, runtime.NumCPU())
104+
var wg sync.WaitGroup
105+
wg.Add(len(chunks))
106+
chunkPaths := make([]map[interface{}]*nodeBundle, len(chunks))
107+
lerr := terr.New()
108+
109+
for i := range chunks {
110+
go func(i int) {
111+
defer wg.Done()
112+
keys := chunks[i]
113+
if len(keys) == 0 {
114+
return
115+
}
116+
mp := make(map[interface{}]*nodeBundle, len(keys))
117+
for _, key := range keys {
118+
path, err := t.iterativeFind(
119+
key.Value, t.Root,
120+
)
121+
122+
if err != nil {
123+
lerr.Set(err)
124+
return
125+
}
126+
mp[key.Value] = &nodeBundle{path: path, k: key}
127+
}
128+
chunkPaths[i] = mp
129+
}(i)
130+
}
131+
132+
wg.Wait()
133+
134+
if lerr.Get() != nil {
135+
return nil, lerr.Get()
136+
}
137+
138+
nodes := make(map[string][]*nodeBundle, 10)
139+
for _, chunk := range chunkPaths {
140+
for _, pb := range chunk {
141+
nodes[string(pb.path.peek().n.ID)] = append(nodes[string(pb.path.pop().n.ID)], pb)
142+
}
143+
}
144+
145+
return nodes, nil
146+
}
147+
148+
func insertByMerge(comparator Comparator, n *Node, bundles []*nodeBundle) (Keys, error) {
149+
positions := make(map[interface{}]int, len(n.ChildValues))
150+
overwrittens := make(Keys, 0, 10)
151+
152+
for i, value := range n.ChildValues {
153+
positions[value] = i
154+
}
155+
156+
for _, bundle := range bundles {
157+
if i, ok := positions[bundle.k.Value]; ok {
158+
overwrittens = append(overwrittens, n.ChildKeys[i])
159+
n.ChildKeys[i] = bundle.k
160+
} else {
161+
n.ChildValues = append(n.ChildValues, bundle.k.Value)
162+
n.ChildKeys = append(n.ChildKeys, bundle.k)
163+
}
164+
}
165+
166+
nsw := &nodeSortWrapper{
167+
values: n.ChildValues,
168+
keys: n.ChildKeys,
169+
comparator: comparator,
170+
}
171+
172+
sort.Sort(nsw)
173+
174+
for i := 0; i < len(nsw.values); i++ {
175+
if nsw.values[i] != nil {
176+
nsw.values = nsw.values[i:]
177+
nsw.keys = nsw.keys[i:]
178+
break
179+
}
180+
181+
nsw.keys[i] = nil
182+
}
183+
184+
n.ChildValues = nsw.values
185+
n.ChildKeys = nsw.keys
186+
return overwrittens, nil
187+
}
188+
189+
func insertLastDimension(t *Tr, n *Node, bundles []*nodeBundle) (Keys, error) {
190+
if n.IsLeaf && len(bundles) >= n.lenValues()/16 { // Found through empirical testing, it appears that the memmoves are more sensitive when dealing with interface{}'s.
191+
return insertByMerge(t.config.Comparator, n, bundles)
192+
}
193+
194+
overwrittens := make(Keys, 0, len(bundles))
195+
for _, bundle := range bundles {
196+
overwritten := n.insert(t.config.Comparator, bundle.k)
197+
if overwritten != nil {
198+
overwrittens = append(overwrittens, overwritten)
199+
}
200+
}
201+
202+
return overwrittens, nil
203+
}
204+
205+
func (t *Tr) iterativeSplit(n *Node) Keys {
206+
keys := make(Keys, 0, 10)
207+
for n.needsSplit(t.config.NodeWidth) {
208+
leftValue, leftNode := n.splitAt(t.config.NodeWidth / 2)
209+
t.context.addNode(leftNode)
210+
keys = append(keys, &Key{UUID: leftNode.ID, Value: leftValue})
211+
}
212+
213+
return keys
214+
}
215+
216+
// walkupInsert walks up nodes during the insertion process and adds
217+
// any new keys due to splits. Each layer of the tree can have insertions
218+
// performed in parallel as splits are local changes.
219+
func (t *Tr) walkupInsert(nodes map[string]*path) error {
220+
mapping := make(map[string]*Node, len(nodes))
221+
222+
for len(nodes) > 0 {
223+
splitNodes := make(map[string]Keys)
224+
newNodes := make(map[string]*path)
225+
for id, path := range nodes {
226+
node := t.context.getNode(ID(id))
227+
228+
parentPath := path.pop()
229+
if parentPath == nil {
230+
t.Root = node.ID
231+
continue
232+
}
233+
234+
parent := parentPath.n
235+
newNode := mapping[string(parent.ID)]
236+
if newNode == nil {
237+
if !t.context.nodeExists(parent.ID) {
238+
cp := parent.copy()
239+
if string(t.Root) == string(parent.ID) {
240+
t.Root = cp.ID
241+
}
242+
243+
t.context.addNode(cp)
244+
mapping[string(parent.ID)] = cp
245+
parent = cp
246+
} else {
247+
newNode = t.context.getNode(parent.ID)
248+
mapping[string(parent.ID)] = newNode
249+
parent = newNode
250+
}
251+
} else {
252+
parent = newNode
253+
}
254+
255+
i := parentPath.i
256+
257+
parent.replaceKeyAt(&Key{UUID: node.ID}, i)
258+
splitNodes[string(parent.ID)] = append(splitNodes[string(parent.ID)], t.iterativeSplit(node)...)
259+
newNodes[string(parent.ID)] = path
260+
}
261+
262+
var wg sync.WaitGroup
263+
wg.Add(len(splitNodes))
264+
lerr := terr.New()
265+
266+
for id, keys := range splitNodes {
267+
go func(id ID, keys Keys) {
268+
defer wg.Done()
269+
n, err := t.contextOrCachedNode(id, true)
270+
if err != nil {
271+
lerr.Set(err)
272+
return
273+
}
274+
for _, key := range keys {
275+
n.insert(t.config.Comparator, key)
276+
}
277+
}(ID(id), keys)
278+
}
279+
280+
wg.Wait()
281+
282+
if lerr.Get() != nil {
283+
return lerr.Get()
284+
}
285+
286+
nodes = newNodes
287+
}
288+
289+
n := t.context.getNode(t.Root)
290+
for n.needsSplit(t.config.NodeWidth) {
291+
root := newNode()
292+
t.Root = root.ID
293+
t.context.addNode(root)
294+
root.appendChild(&Key{UUID: n.ID})
295+
keys := t.iterativeSplit(n)
296+
for _, key := range keys {
297+
root.insert(t.config.Comparator, key)
298+
}
299+
n = root
300+
}
301+
302+
return nil
303+
}

0 commit comments

Comments
 (0)