Skip to content

Commit 0685bd4

Browse files
committed
Implement lock-free DB transactions.
Lock-free write transactions allows GraphKB to have multiple instances running in parallel to scale out. To implement the feature I added the source breakdown in 'assets' and 'relations' tables. That way there is no concurrent access to pivot points anymore since each data source produces and manage its own entry in the DB. The reconciliation is done at query time once the entries are retrieved from DB.
1 parent 91c7053 commit 0685bd4

File tree

4 files changed

+343
-60
lines changed

4 files changed

+343
-60
lines changed

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3
335335
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
336336
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
337337
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
338+
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
338339
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
339340
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
340341
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=

internal/database/id_generator.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package database
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/clems4ever/go-graphkb/internal/knowledge"
7+
)
8+
9+
// AssetTemporaryIDGenerator is a kind of cache associating IDs to assets so that pivot points are joined by those IDs
10+
type AssetTemporaryIDGenerator struct {
11+
// map the DB ID with a temporarily generated ID.
12+
DBIDToTemporaryID map[int]int
13+
// map the Asset definition to the temporarily generated ID
14+
DefinitionToTemporaryID map[knowledge.Asset]int
15+
16+
// The next ID to be used.
17+
generator int
18+
}
19+
20+
// NewAssetTemporaryIDGenerator create an ID Generator
21+
func NewAssetTemporaryIDGenerator() *AssetTemporaryIDGenerator {
22+
return &AssetTemporaryIDGenerator{
23+
generator: 0,
24+
DBIDToTemporaryID: make(map[int]int),
25+
DefinitionToTemporaryID: make(map[knowledge.Asset]int),
26+
}
27+
}
28+
29+
// Push an asset into the ID generator to insert a retrieve a temporary ID
30+
func (atig *AssetTemporaryIDGenerator) Push(asset knowledge.Asset, DBID int) (int, error) {
31+
id, ok := atig.DefinitionToTemporaryID[asset]
32+
// If the asset is not in the DBID and Definition mappings, then we insert a new ID
33+
if !ok {
34+
_, ok2 := atig.DBIDToTemporaryID[DBID]
35+
// if the DBID has not been seen yet, assign the temporary ID
36+
if !ok2 {
37+
id = atig.generator
38+
atig.DefinitionToTemporaryID[asset] = id
39+
atig.DBIDToTemporaryID[DBID] = id
40+
atig.generator++
41+
} else {
42+
return 0, fmt.Errorf("DBID %d is already bound to another asset", DBID)
43+
}
44+
} else {
45+
id2, ok2 := atig.DBIDToTemporaryID[DBID]
46+
if ok2 && id != id2 {
47+
return 0, fmt.Errorf("DBID %d is already bound to another asset", DBID)
48+
}
49+
atig.DBIDToTemporaryID[DBID] = id
50+
}
51+
return id, nil
52+
}
53+
54+
// Get the temporary ID related to the given DBID
55+
func (atig *AssetTemporaryIDGenerator) Get(DBID int) (int, error) {
56+
id, ok := atig.DBIDToTemporaryID[DBID]
57+
if !ok {
58+
return 0, fmt.Errorf("DB ID %d does not exist in generator", DBID)
59+
}
60+
return id, nil
61+
}
62+
63+
// Count the number of items in the generator
64+
func (atig *AssetTemporaryIDGenerator) Count() int {
65+
return len(atig.DBIDToTemporaryID)
66+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package database
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
7+
"github.com/clems4ever/go-graphkb/internal/knowledge"
8+
"github.com/clems4ever/go-graphkb/internal/schema"
9+
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
func GenerateAssetHelper(idx int) knowledge.Asset {
14+
return knowledge.Asset{
15+
Key: fmt.Sprintf("key%d", idx),
16+
Type: schema.AssetType(fmt.Sprintf("type%d", idx)),
17+
}
18+
}
19+
20+
func TestGeneratedIDsAreDifferent(t *testing.T) {
21+
atig := NewAssetTemporaryIDGenerator()
22+
23+
id1, _ := atig.Push(GenerateAssetHelper(1), 1)
24+
id2, _ := atig.Push(GenerateAssetHelper(2), 2)
25+
26+
assert.Equal(t, 2, atig.Count())
27+
assert.NotEqual(t, id1, id2)
28+
}
29+
30+
func TestPushMultipleTimesAssetWithDifferentDBID(t *testing.T) {
31+
atig := NewAssetTemporaryIDGenerator()
32+
33+
id1, _ := atig.Push(GenerateAssetHelper(2), 2)
34+
id2, _ := atig.Push(GenerateAssetHelper(2), 4)
35+
36+
assert.Equal(t, 1, atig.Count())
37+
assert.Equal(t, id1, id2)
38+
}
39+
40+
func TestPushMultipleAssetsWithSameBID(t *testing.T) {
41+
atig := NewAssetTemporaryIDGenerator()
42+
43+
atig.Push(GenerateAssetHelper(2), 2)
44+
_, err := atig.Push(GenerateAssetHelper(4), 2)
45+
46+
assert.EqualError(t, err, "DBID 2 is already bound to another asset")
47+
}
48+
49+
func TestGetExistingAssets(t *testing.T) {
50+
atig := NewAssetTemporaryIDGenerator()
51+
52+
id1, _ := atig.Push(GenerateAssetHelper(2), 2)
53+
id2, _ := atig.Push(GenerateAssetHelper(4), 4)
54+
55+
assert.NotEqual(t, id1, id2)
56+
57+
idg, err := atig.Get(2)
58+
assert.NoError(t, err)
59+
assert.Equal(t, id1, idg)
60+
61+
idg, err = atig.Get(4)
62+
assert.NoError(t, err)
63+
assert.Equal(t, id2, idg)
64+
65+
idg, err = atig.Get(5)
66+
assert.EqualError(t, err, "DB ID 5 does not exist in generator")
67+
}
68+
69+
func TestGetSameAsset(t *testing.T) {
70+
atig := NewAssetTemporaryIDGenerator()
71+
72+
id1, _ := atig.Push(GenerateAssetHelper(2), 2)
73+
id2, _ := atig.Push(GenerateAssetHelper(2), 4)
74+
75+
assert.Equal(t, id1, id2)
76+
77+
idg, err := atig.Get(2)
78+
assert.NoError(t, err)
79+
assert.Equal(t, id1, idg)
80+
81+
idg, err = atig.Get(4)
82+
assert.NoError(t, err)
83+
assert.Equal(t, id2, idg)
84+
}

0 commit comments

Comments
 (0)