Skip to content

Commit d842d9c

Browse files
ShimmerGlassclems4ever
authored andcommitted
Client: handle graph cache internally
This moves the graph caching on the library side to better handle failure cases: When the last tx was successful, we can reuse the last graph to avoid a read, but when we have an error, the remote graph state is unknown, so we must refresh it.
1 parent e407047 commit d842d9c

File tree

3 files changed

+48
-15
lines changed

3 files changed

+48
-15
lines changed

cmd/datasource-csv/main.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,11 @@ func (cs *CSVSource) Publish() error {
4141

4242
r := csv.NewReader(file)
4343

44-
previousGraph, err := cs.graphAPI.ReadCurrentGraph()
44+
tx, err := cs.graphAPI.CreateTransaction()
4545
if err != nil {
46-
return fmt.Errorf("Unable to read previous graph: %v", err)
46+
return err
4747
}
4848

49-
tx := cs.graphAPI.CreateTransaction(previousGraph)
50-
5149
header := true
5250

5351
for {
@@ -74,8 +72,10 @@ func (cs *CSVSource) Publish() error {
7472
tx.Relate(record[1], relationType, record[4])
7573
}
7674

77-
_, err = tx.Commit()
78-
fmt.Println("CSV data has been sent successfully")
75+
err = tx.Commit()
76+
if err == nil {
77+
fmt.Println("CSV data has been sent successfully")
78+
}
7979
return err
8080
}
8181

internal/client/graph_api.go

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package client
22

33
import (
4+
"fmt"
45
"time"
56

67
"github.com/clems4ever/go-graphkb/internal/knowledge"
@@ -11,6 +12,8 @@ type GraphAPI struct {
1112
client *GraphClient
1213

1314
options GraphAPIOptions
15+
16+
currentGraph *knowledge.Graph
1417
}
1518

1619
// GraphAPIOptions options to pass to build graph API
@@ -47,7 +50,16 @@ func NewGraphAPI(options GraphAPIOptions) *GraphAPI {
4750

4851
// CreateTransaction create a full graph transaction. This kind of transaction will diff the new graph
4952
// with previous version of it.
50-
func (gapi *GraphAPI) CreateTransaction(currentGraph *knowledge.Graph) *Transaction {
53+
func (gapi *GraphAPI) CreateTransaction() (*Transaction, error) {
54+
if gapi.currentGraph == nil {
55+
fmt.Println("transaction: fetching remote graph")
56+
g, err := gapi.ReadCurrentGraph()
57+
if err != nil {
58+
return nil, fmt.Errorf("create transaction: %w", err)
59+
}
60+
gapi.currentGraph = g
61+
}
62+
5163
var parallelization = gapi.options.Parallelization
5264
if parallelization == 0 {
5365
parallelization = 30
@@ -77,14 +89,28 @@ func (gapi *GraphAPI) CreateTransaction(currentGraph *knowledge.Graph) *Transact
7789
transaction.newGraph = knowledge.NewGraph()
7890
transaction.binder = knowledge.NewGraphBinder(transaction.newGraph)
7991
transaction.client = gapi.client
80-
transaction.currentGraph = currentGraph
92+
transaction.currentGraph = gapi.currentGraph
8193
transaction.parallelization = parallelization
8294
transaction.chunkSize = chunkSize
8395

8496
transaction.retryCount = maxRetries
8597
transaction.retryDelay = retryDelay
8698
transaction.retryBackoffFactor = retryBackoff
87-
return transaction
99+
100+
transaction.onError = func(err error) {
101+
// there was an error, we don't know the remote graph state.
102+
// we clear the cached copy to refresh in on the next run.
103+
fmt.Println("transaction: clearing graph cache because of error:", err)
104+
gapi.currentGraph = nil
105+
}
106+
107+
transaction.onSuccess = func(g *knowledge.Graph) {
108+
// tx was successful, we updated to local graph cache to
109+
// speed up the next tx.
110+
gapi.currentGraph = g
111+
}
112+
113+
return transaction, nil
88114
}
89115

90116
// ReadCurrentGraph read the current graph stored in graph kb

internal/client/transaction.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ type Transaction struct {
3636
retryCount int
3737
retryDelay time.Duration
3838
retryBackoffFactor float64
39+
40+
onSuccess func(*knowledge.Graph)
41+
onError func(error)
3942
}
4043

4144
// Relate create a relation between two assets
@@ -72,12 +75,14 @@ func withRetryOnTooManyRequests(fn func() error, backoffFactor float64, maxRetri
7275
}
7376

7477
// Commit commit the transaction and gives ownership to the source for caching.
75-
func (cgt *Transaction) Commit() (*knowledge.Graph, error) {
78+
func (cgt *Transaction) Commit() error {
7679
sg := cgt.newGraph.ExtractSchema()
7780

7881
fmt.Println("Start uploading the schema of the graph...")
7982
if err := cgt.client.UpdateSchema(sg); err != nil {
80-
return nil, fmt.Errorf("Unable to update the schema of the graph: %v", err)
83+
err := fmt.Errorf("Unable to update the schema of the graph: %v", err)
84+
cgt.onError(err)
85+
return err
8186
}
8287

8388
fmt.Println("Finished uploading the schema of the graph...")
@@ -141,7 +146,8 @@ func (cgt *Transaction) Commit() (*knowledge.Graph, error) {
141146
for _, f := range futures {
142147
err := <-f
143148
if err != nil {
144-
return nil, err
149+
cgt.onError(err)
150+
return err
145151
}
146152
}
147153

@@ -181,13 +187,14 @@ func (cgt *Transaction) Commit() (*knowledge.Graph, error) {
181187
for _, f := range futures {
182188
err := <-f
183189
if err != nil {
184-
return nil, err
190+
cgt.onError(err)
191+
return err
185192
}
186193
}
187194

188195
fmt.Println("Finished uploading the graph...")
189196

190-
g := cgt.newGraph
197+
cgt.onSuccess(cgt.newGraph)
191198
cgt.newGraph = knowledge.NewGraph()
192-
return g, nil // give ownership of the transaction graph so that it can be cached if needed
199+
return nil
193200
}

0 commit comments

Comments
 (0)