Skip to content

Commit 6e6b88a

Browse files
committed
Add logs into the client + parallelize client upload.
1 parent c590095 commit 6e6b88a

File tree

3 files changed

+56
-17
lines changed

3 files changed

+56
-17
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ require (
1818
github.com/stretchr/testify v1.4.0
1919
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
2020
gopkg.in/VividCortex/ewma.v1 v1.1.1 // indirect
21-
gopkg.in/cheggaaa/pb.v2 v2.0.7 // indirect
21+
gopkg.in/cheggaaa/pb.v2 v2.0.7
2222
gopkg.in/fatih/color.v1 v1.7.0 // indirect
2323
gopkg.in/mattn/go-colorable.v0 v0.1.0 // indirect
2424
gopkg.in/mattn/go-isatty.v0 v0.0.4 // indirect

internal/client/transaction.go

Lines changed: 54 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"sync"
77
"time"
88

9+
"gopkg.in/cheggaaa/pb.v2"
10+
911
"github.com/clems4ever/go-graphkb/internal/knowledge"
1012
"github.com/clems4ever/go-graphkb/internal/schema"
1113
)
@@ -64,34 +66,71 @@ func withRetryOnTooManyRequests(fn func() error, maxRetries int) error {
6466
func (cgt *Transaction) Commit() (*knowledge.Graph, error) {
6567
sg := cgt.newGraph.ExtractSchema()
6668

69+
fmt.Println("Start uploading the schema of the graph...")
6770
if err := cgt.client.UpdateSchema(sg); err != nil {
6871
return nil, fmt.Errorf("Unable to update the schema of the graph")
6972
}
7073

74+
fmt.Println("Finished uploading the schema of the graph...")
75+
7176
bulk := knowledge.GenerateGraphUpdatesBulk(cgt.currentGraph, cgt.newGraph)
7277

73-
for _, r := range bulk.GetRelationRemovals() {
74-
if err := withRetryOnTooManyRequests(func() error { return cgt.client.DeleteRelation(r) }, 10); err != nil {
75-
return nil, fmt.Errorf("Unable to remove the relation %v: %v", r, err)
78+
fmt.Println("Start uploading the graph...")
79+
80+
progress := pb.New(len(bulk.GetAssetRemovals()) + len(bulk.GetAssetUpserts()) + len(bulk.GetRelationRemovals()) + len(bulk.GetRelationUpserts()))
81+
progress.Start()
82+
defer progress.Finish()
83+
84+
var wg sync.WaitGroup
85+
86+
wg.Add(2)
87+
88+
var err1, err2 error
89+
90+
go func() {
91+
defer wg.Done()
92+
for _, r := range bulk.GetRelationRemovals() {
93+
if err := withRetryOnTooManyRequests(func() error { return cgt.client.DeleteRelation(r) }, 10); err != nil {
94+
err1 = fmt.Errorf("Unable to remove the relation %v: %v", r, err)
95+
return
96+
}
97+
progress.Increment()
7698
}
77-
}
78-
for _, a := range bulk.GetAssetRemovals() {
79-
if err := withRetryOnTooManyRequests(func() error { return cgt.client.DeleteAsset(a) }, 10); err != nil {
80-
return nil, fmt.Errorf("Unable to remove the asset %v: %v", a, err)
99+
for _, a := range bulk.GetAssetRemovals() {
100+
if err := withRetryOnTooManyRequests(func() error { return cgt.client.DeleteAsset(a) }, 10); err != nil {
101+
err1 = fmt.Errorf("Unable to remove the asset %v: %v", a, err)
102+
}
103+
progress.Increment()
81104
}
82-
}
105+
}()
83106

84-
for _, a := range bulk.GetAssetUpserts() {
85-
if err := withRetryOnTooManyRequests(func() error { return cgt.client.UpsertAsset(a) }, 10); err != nil {
86-
return nil, fmt.Errorf("Unable to upsert the asset %v: %v", a, err)
107+
go func() {
108+
defer wg.Done()
109+
for _, a := range bulk.GetAssetUpserts() {
110+
if err := withRetryOnTooManyRequests(func() error { return cgt.client.UpsertAsset(a) }, 10); err != nil {
111+
err2 = fmt.Errorf("Unable to upsert the asset %v: %v", a, err)
112+
}
113+
progress.Increment()
87114
}
88-
}
89-
for _, r := range bulk.GetRelationUpserts() {
90-
if err := withRetryOnTooManyRequests(func() error { return cgt.client.UpsertRelation(r) }, 10); err != nil {
91-
return nil, fmt.Errorf("Unable to upsert the relation %v: %v", r, err)
115+
for _, r := range bulk.GetRelationUpserts() {
116+
if err := withRetryOnTooManyRequests(func() error { return cgt.client.UpsertRelation(r) }, 10); err != nil {
117+
err2 = fmt.Errorf("Unable to upsert the relation %v: %v", r, err)
118+
}
119+
progress.Increment()
92120
}
121+
}()
122+
123+
wg.Wait()
124+
125+
if err1 != nil {
126+
return nil, err1
127+
}
128+
if err2 != nil {
129+
return nil, err2
93130
}
94131

132+
fmt.Println("Finished uploading the graph...")
133+
95134
g := cgt.newGraph
96135
cgt.newGraph = knowledge.NewGraph()
97136
return g, nil // give ownership of the transaction graph so that it can be cached if needed

internal/database/mariadb.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ CREATE TABLE IF NOT EXISTS relations (
8181
CONSTRAINT pk_relation PRIMARY KEY (id),
8282
CONSTRAINT fk_from FOREIGN KEY (from_id) REFERENCES assets (id),
8383
CONSTRAINT fk_to FOREIGN KEY (to_id) REFERENCES assets (id),
84-
CONSTRAINT fk_relation_source FOREIGN KEY (source_id) REFERENCES sources (id),
84+
CONSTRAINT fk_relation_source FOREIGN KEY (source_id) REFERENCES sources (id) ON DELETE CASCADE,
8585
8686
INDEX full_relation_type_from_to_idx (type, from_id, to_id),
8787
INDEX full_relation_type_to_from_idx (type, to_id, from_id),

0 commit comments

Comments
 (0)