Skip to content

Commit db8440e

Browse files
committed
Add streaming support on read side.
Streaming was implemented only on the write side but when multiple data sources requested a graph from the API, huge graph could have been loaded in memory before being encoded and sent over the wire. With this encoder, each relation or asset is itself encoded and sent independently and the whole graph is reconstructed on client side. The encoding format is definitely not optimized yet since it uses json format and duplicated assets could be sent multiple times but that is worth it considering the amount of memory saved. The deduplication is done on the client side anyway.
1 parent 63e5946 commit db8440e

File tree

8 files changed

+255
-17
lines changed

8 files changed

+255
-17
lines changed

cmd/go-graphkb/main.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"bytes"
45
"context"
56
"fmt"
67
"strings"
@@ -149,11 +150,15 @@ func listen(cmd *cobra.Command, args []string) {
149150

150151
func read(cmd *cobra.Command, args []string) {
151152
g := knowledge.NewGraph()
152-
err := Database.ReadGraph(context.Background(), args[0], g)
153+
buff := bytes.NewBuffer(nil)
154+
155+
encoder := knowledge.NewGraphEncoder(buff)
156+
err := Database.ReadGraph(context.Background(), args[0], encoder)
153157
if err != nil {
154158
logrus.Fatal(err)
155159
}
156160

161+
knowledge.NewGraphDecoder(buff).Decode(g)
157162
fmt.Printf("assets = %d\nrelations = %d\n", len(g.Assets()), len(g.Relations()))
158163
}
159164

cmd/test/main.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package main
2+
3+
import (
4+
"database/sql"
5+
"fmt"
6+
7+
_ "github.com/go-sql-driver/mysql"
8+
"github.com/sirupsen/logrus"
9+
)
10+
11+
var (
12+
username = "svc-criteokb"
13+
password = "@1wQ.WBE93~r1D~b5ia~"
14+
host = "security-exp01-pa4.central.criteo.preprod"
15+
databaseName = "criteokb"
16+
)
17+
18+
func main() {
19+
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@(%s)/%s", username, password,
20+
host, databaseName))
21+
if err != nil {
22+
logrus.Fatal(err)
23+
}
24+
25+
tx, err := db.Begin()
26+
if err != nil {
27+
logrus.Fatal(err)
28+
}
29+
30+
_, err = tx.Exec("INSERT INTO test (value) VALUES (?)", uint64(12435887700123278845))
31+
if err != nil {
32+
tx.Rollback()
33+
logrus.Fatal(err)
34+
}
35+
36+
err = tx.Commit()
37+
if err != nil {
38+
logrus.Fatal(err)
39+
}
40+
}

internal/client/graph_client.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,9 @@ func (gc *GraphClient) ReadCurrentGraph() (*knowledge.Graph, error) {
6868
return nil, handleUnexpectedResponse(res)
6969
}
7070

71-
b, err := ioutil.ReadAll(res.Body)
72-
if err != nil {
73-
return nil, err
74-
}
75-
71+
graphDecoder := knowledge.NewGraphDecoder(res.Body)
7672
graph := knowledge.NewGraph()
77-
err = json.Unmarshal(b, graph)
73+
err = graphDecoder.Decode(graph)
7874
if err != nil {
7975
return nil, err
8076
}

internal/database/mariadb.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ func (m *MariaDB) RemoveRelations(ctx context.Context, source string, relations
406406
}
407407

408408
// ReadGraph read source subgraph
409-
func (m *MariaDB) ReadGraph(ctx context.Context, sourceName string, graph *knowledge.Graph) error {
409+
func (m *MariaDB) ReadGraph(ctx context.Context, sourceName string, encoder *knowledge.GraphEncoder) error {
410410
logrus.Debugf("Start reading graph of data source with name %s", sourceName)
411411
sourceID, err := m.resolveSourceID(sourceName)
412412
if err != nil {
@@ -437,18 +437,25 @@ func (m *MariaDB) ReadGraph(ctx context.Context, sourceName string, graph *knowl
437437
if err := rows.Scan(&FromType, &FromKey, &ToType, &ToKey, &Type); err != nil {
438438
return err
439439
}
440-
fromAsset := knowledge.Asset{
440+
fromAsset := knowledge.AssetKey{
441441
Type: schema.AssetType(FromType),
442442
Key: FromKey,
443443
}
444-
toAsset := knowledge.Asset{
444+
toAsset := knowledge.AssetKey{
445445
Type: schema.AssetType(ToType),
446446
Key: ToKey,
447447
}
448-
graph.AddRelation(
449-
graph.AddAsset(fromAsset.Type, fromAsset.Key),
450-
schema.RelationKeyType(Type),
451-
graph.AddAsset(toAsset.Type, toAsset.Key))
448+
449+
relation := knowledge.Relation{
450+
Type: schema.RelationKeyType(Type),
451+
From: fromAsset,
452+
To: toAsset,
453+
}
454+
455+
err = encoder.EncodeRelation(relation)
456+
if err != nil {
457+
return fmt.Errorf("Unable to write relation %v: %v", relation, err)
458+
}
452459
}
453460
}
454461

@@ -472,7 +479,16 @@ func (m *MariaDB) ReadGraph(ctx context.Context, sourceName string, graph *knowl
472479
if err := rows.Scan(&Type, &Key); err != nil {
473480
return fmt.Errorf("Unable to read standalone asset: %v", err)
474481
}
475-
graph.AddAsset(schema.AssetType(Type), Key)
482+
483+
asset := knowledge.Asset{
484+
Type: schema.AssetType(Type),
485+
Key: Key,
486+
}
487+
488+
err := encoder.EncodeAsset(asset)
489+
if err != nil {
490+
return fmt.Errorf("Unable to write asset %v: %v", asset, err)
491+
}
476492
}
477493
}
478494

internal/handlers/handler_read_graph.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func GetGraphRead(registry sources.Registry, graphDB knowledge.GraphDB) http.Han
2323
}
2424

2525
g := knowledge.NewGraph()
26-
if err := graphDB.ReadGraph(r.Context(), source, g); err != nil {
26+
if err := graphDB.ReadGraph(r.Context(), source, knowledge.NewGraphEncoder(w)); err != nil {
2727
ReplyWithInternalError(w, err)
2828
return
2929
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package knowledge
2+
3+
import (
4+
"bufio"
5+
"encoding/json"
6+
"fmt"
7+
"io"
8+
)
9+
10+
// ******************************* DECODER ***********************************
11+
12+
// GraphEncoder represent a graph encoder
13+
type GraphEncoder struct {
14+
writer io.Writer
15+
16+
jsonEncoder *json.Encoder
17+
}
18+
19+
// NewGraphEncoder create an instance of a graph encoder
20+
func NewGraphEncoder(w io.Writer) *GraphEncoder {
21+
return &GraphEncoder{
22+
writer: w,
23+
jsonEncoder: json.NewEncoder(w),
24+
}
25+
}
26+
27+
// EncodeRelation encode a relation
28+
func (ge *GraphEncoder) EncodeRelation(relation Relation) error {
29+
_, err := fmt.Fprint(ge.writer, "R")
30+
if err != nil {
31+
return err
32+
}
33+
err = ge.jsonEncoder.Encode(relation)
34+
if err != nil {
35+
return err
36+
}
37+
return nil
38+
}
39+
40+
// EncodeAsset encode an asset
41+
func (ge *GraphEncoder) EncodeAsset(asset Asset) error {
42+
_, err := fmt.Fprint(ge.writer, "A")
43+
if err != nil {
44+
return err
45+
}
46+
err = ge.jsonEncoder.Encode(asset)
47+
if err != nil {
48+
return err
49+
}
50+
return nil
51+
}
52+
53+
// ******************************* DECODER ***********************************
54+
55+
// GraphDecoder represent a graph decoder
56+
type GraphDecoder struct {
57+
reader io.Reader
58+
59+
jsonDecoder *json.Decoder
60+
}
61+
62+
// NewGraphDecoder create an instance of a graph encoder
63+
func NewGraphDecoder(r io.Reader) *GraphDecoder {
64+
return &GraphDecoder{
65+
reader: r,
66+
jsonDecoder: json.NewDecoder(r),
67+
}
68+
}
69+
70+
// Decode decode a graph
71+
func (ge *GraphDecoder) Decode(graph *Graph) error {
72+
scanner := bufio.NewScanner(ge.reader)
73+
for scanner.Scan() {
74+
line := scanner.Text()
75+
switch line[0] {
76+
case 'A':
77+
var asset Asset
78+
err := json.Unmarshal([]byte(line[1:]), &asset)
79+
if err != nil {
80+
return err
81+
}
82+
graph.AddAsset(asset.Type, asset.Key)
83+
case 'R':
84+
var relation Relation
85+
err := json.Unmarshal([]byte(line[1:]), &relation)
86+
if err != nil {
87+
return err
88+
}
89+
graph.AddAsset(relation.From.Type, relation.From.Key)
90+
graph.AddAsset(relation.To.Type, relation.To.Key)
91+
graph.AddRelation(relation.From, relation.Type, relation.To)
92+
}
93+
}
94+
return nil
95+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package knowledge
2+
3+
import (
4+
"bytes"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
var (
11+
Asset1 = Asset{Type: "type1", Key: "value1"}
12+
Asset2 = Asset{Type: "type2", Key: "value2"}
13+
Asset3 = Asset{Type: "type3", Key: "value3"}
14+
)
15+
16+
var (
17+
Relation1 = Relation{From: AssetKey(Asset1), Type: "is_linked_to", To: AssetKey(Asset2)}
18+
Relation2 = Relation{From: AssetKey(Asset1), Type: "has_relation_with", To: AssetKey(Asset2)}
19+
Relation3 = Relation{From: AssetKey(Asset1), Type: "has_weird_relation_with", To: AssetKey(Asset3)}
20+
)
21+
22+
func TestEncodeAssets(t *testing.T) {
23+
buff := bytes.NewBuffer(nil)
24+
25+
encoder := NewGraphEncoder(buff)
26+
27+
assert.NoError(t, encoder.EncodeAsset(Asset1))
28+
assert.NoError(t, encoder.EncodeAsset(Asset2))
29+
30+
assert.Equal(t,
31+
"A{\"type\":\"type1\",\"key\":\"value1\"}\nA{\"type\":\"type2\",\"key\":\"value2\"}\n",
32+
string(buff.Bytes()))
33+
}
34+
35+
func TestEncodeRelations(t *testing.T) {
36+
buff := bytes.NewBuffer(nil)
37+
38+
encoder := NewGraphEncoder(buff)
39+
40+
assert.NoError(t, encoder.EncodeRelation(Relation1))
41+
assert.NoError(t, encoder.EncodeRelation(Relation2))
42+
43+
assert.Equal(t,
44+
"R{\"type\":\"is_linked_to\",\"from\":{\"type\":\"type1\",\"key\":\"value1\"},\"to\":{\"type\":\"type2\",\"key\":\"value2\"}}\nR{\"type\":\"has_relation_with\",\"from\":{\"type\":\"type1\",\"key\":\"value1\"},\"to\":{\"type\":\"type2\",\"key\":\"value2\"}}\n",
45+
string(buff.Bytes()))
46+
}
47+
48+
func TestEncodeAssetsAndRelations(t *testing.T) {
49+
buff := bytes.NewBuffer(nil)
50+
51+
encoder := NewGraphEncoder(buff)
52+
53+
assert.NoError(t, encoder.EncodeRelation(Relation1))
54+
assert.NoError(t, encoder.EncodeRelation(Relation2))
55+
56+
assert.Equal(t,
57+
"A{\"type\":\"type3\",\"key\":\"value3\"}\nR{\"type\":\"has_relation_with\",\"from\":{\"type\":\"type1\",\"key\":\"value1\"},\"to\":{\"type\":\"type2\",\"key\":\"value2\"}}\n",
58+
string(buff.Bytes()))
59+
}
60+
61+
func TestEncodeAndDecode(t *testing.T) {
62+
buff := bytes.NewBuffer(nil)
63+
64+
encoder := NewGraphEncoder(buff)
65+
66+
assert.NoError(t, encoder.EncodeRelation(Relation1))
67+
assert.NoError(t, encoder.EncodeRelation(Relation2))
68+
assert.NoError(t, encoder.EncodeRelation(Relation3))
69+
70+
decoder := NewGraphDecoder(buff)
71+
72+
graph := NewGraph()
73+
74+
assert.NoError(t, decoder.Decode(graph))
75+
76+
assert.Len(t, graph.Assets(), 3)
77+
assert.Len(t, graph.Relations(), 3)
78+
79+
assert.True(t, graph.HasAsset(Asset1))
80+
assert.True(t, graph.HasAsset(Asset2))
81+
assert.True(t, graph.HasAsset(Asset3))
82+
83+
assert.True(t, graph.HasRelation(Relation1))
84+
assert.True(t, graph.HasRelation(Relation2))
85+
assert.True(t, graph.HasRelation(Relation3))
86+
}

internal/knowledge/graphdb.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ type GraphDB interface {
1919

2020
InitializeSchema() error
2121

22-
ReadGraph(ctx context.Context, source string, graph *Graph) error
22+
ReadGraph(ctx context.Context, source string, encoder *GraphEncoder) error
2323

2424
// Atomic operations on the graph
2525
InsertAssets(ctx context.Context, source string, assets []Asset) error

0 commit comments

Comments
 (0)