Skip to content

Commit bd401a0

Browse files
committed
Pass auth token through headers instead of query param.
Also make the retry behavior customizable by clients.
1 parent f487696 commit bd401a0

File tree

6 files changed

+77
-35
lines changed

6 files changed

+77
-35
lines changed

internal/client/graph_api.go

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
package client
22

33
import (
4+
"time"
5+
46
"github.com/clems4ever/go-graphkb/internal/knowledge"
57
)
68

79
// GraphAPI represent the graph API from a data source point of view
810
type GraphAPI struct {
911
client *GraphClient
1012

11-
parallelization int
12-
chunkSize int
13+
options GraphAPIOptions
1314
}
1415

1516
// GraphAPIOptions options to pass to build graph API
@@ -26,37 +27,63 @@ type GraphAPIOptions struct {
2627

2728
// The size of a chunk of updates, i.e., number of assets or relations sent in one HTTP request to the streaming API.
2829
ChunkSize int
30+
31+
// Max number of retries before giving up (default is 10)
32+
MaxRetries int
33+
// The base delay between retries (default is 5 seconds). This delay is multiplied by the backoff factor.
34+
RetryDelay time.Duration
35+
36+
// The backoff factor (default is 1.01)
37+
RetryBackoffFactor float64
2938
}
3039

3140
// NewGraphAPI create an emitter of graph
3241
func NewGraphAPI(options GraphAPIOptions) *GraphAPI {
3342
return &GraphAPI{
34-
client: NewGraphClient(options.URL, options.AuthToken, options.SkipVerify),
35-
parallelization: options.Parallelization,
36-
chunkSize: options.ChunkSize,
43+
client: NewGraphClient(options.URL, options.AuthToken, options.SkipVerify),
44+
options: options,
3745
}
3846
}
3947

4048
// CreateTransaction create a full graph transaction. This kind of transaction will diff the new graph
4149
// with previous version of it.
4250
func (gapi *GraphAPI) CreateTransaction(currentGraph *knowledge.Graph) *Transaction {
43-
var parallelization = gapi.parallelization
51+
var parallelization = gapi.options.Parallelization
4452
if parallelization == 0 {
4553
parallelization = 30
4654
}
4755

48-
var chunkSize = gapi.chunkSize
56+
var chunkSize = gapi.options.ChunkSize
4957
if chunkSize == 0 {
5058
chunkSize = 1000
5159
}
5260

61+
var maxRetries = gapi.options.MaxRetries
62+
if maxRetries == 0 {
63+
maxRetries = 10
64+
}
65+
66+
var retryDelay = gapi.options.RetryDelay
67+
if retryDelay == 0 {
68+
retryDelay = 5 * time.Second
69+
}
70+
71+
var retryBackoff = gapi.options.RetryBackoffFactor
72+
if retryBackoff == 0.0 {
73+
retryBackoff = 1.01
74+
}
75+
5376
transaction := new(Transaction)
5477
transaction.newGraph = knowledge.NewGraph()
5578
transaction.binder = knowledge.NewGraphBinder(transaction.newGraph)
5679
transaction.client = gapi.client
5780
transaction.currentGraph = currentGraph
5881
transaction.parallelization = parallelization
5982
transaction.chunkSize = chunkSize
83+
84+
transaction.retryCount = maxRetries
85+
transaction.retryDelay = retryDelay
86+
transaction.retryBackoffFactor = retryBackoff
6087
return transaction
6188
}
6289

internal/client/graph_client.go

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@ import (
55
"crypto/tls"
66
"encoding/json"
77
"fmt"
8+
"io"
89
"io/ioutil"
910
"net/http"
1011

1112
"github.com/clems4ever/go-graphkb/internal/knowledge"
1213
"github.com/clems4ever/go-graphkb/internal/schema"
14+
"github.com/clems4ever/go-graphkb/internal/utils"
1315
)
1416

1517
// ErrTooManyRequests error representing too many requests to the API
@@ -37,11 +39,18 @@ func NewGraphClient(URL, authToken string, skipVerify bool) *GraphClient {
3739
}
3840
}
3941

42+
func (gc *GraphClient) newRequest(method, path string, body io.Reader) (*http.Request, error) {
43+
req, err := http.NewRequest(method, fmt.Sprintf("%s%s", gc.url, path), body)
44+
if err != nil {
45+
return nil, err
46+
}
47+
req.Header.Add(utils.XAuthTokenHeader, gc.authToken)
48+
return req, nil
49+
}
50+
4051
// ReadCurrentGraph read the current graph stored in graph kb
4152
func (gc *GraphClient) ReadCurrentGraph() (*knowledge.Graph, error) {
42-
url := fmt.Sprintf("%s/api/graph/read?token=%s", gc.url, gc.authToken)
43-
44-
req, err := http.NewRequest("GET", url, nil)
53+
req, err := gc.newRequest("GET", "/api/graph/read", nil)
4554
if err != nil {
4655
return nil, err
4756
}
@@ -81,8 +90,7 @@ func (gc *GraphClient) UpdateSchema(sg schema.SchemaGraph) error {
8190
return fmt.Errorf("Unable to marshall request body")
8291
}
8392

84-
url := fmt.Sprintf("%s/api/graph/schema?token=%s", gc.url, gc.authToken)
85-
req, err := http.NewRequest("PUT", url, bytes.NewBuffer(b))
93+
req, err := gc.newRequest("PUT", "/api/graph/schema", bytes.NewBuffer(b))
8694
if err != nil {
8795
return err
8896
}
@@ -113,8 +121,7 @@ func (gc *GraphClient) InsertAssets(assets []knowledge.Asset) error {
113121
return fmt.Errorf("Unable to marshall request body")
114122
}
115123

116-
url := fmt.Sprintf("%s/api/graph/asset?token=%s", gc.url, gc.authToken)
117-
req, err := http.NewRequest("PUT", url, bytes.NewBuffer(b))
124+
req, err := gc.newRequest("PUT", "/api/graph/assets", bytes.NewBuffer(b))
118125
if err != nil {
119126
return err
120127
}
@@ -145,8 +152,7 @@ func (gc *GraphClient) DeleteAssets(assets []knowledge.Asset) error {
145152
return fmt.Errorf("Unable to marshall request body")
146153
}
147154

148-
url := fmt.Sprintf("%s/api/graph/asset?token=%s", gc.url, gc.authToken)
149-
req, err := http.NewRequest("DELETE", url, bytes.NewBuffer(b))
155+
req, err := gc.newRequest("DELETE", "/api/graph/assets", bytes.NewBuffer(b))
150156
if err != nil {
151157
return err
152158
}
@@ -177,8 +183,7 @@ func (gc *GraphClient) InsertRelations(relations []knowledge.Relation) error {
177183
return fmt.Errorf("Unable to marshall request body")
178184
}
179185

180-
url := fmt.Sprintf("%s/api/graph/relation?token=%s", gc.url, gc.authToken)
181-
req, err := http.NewRequest("PUT", url, bytes.NewBuffer(b))
186+
req, err := gc.newRequest("PUT", "/api/graph/relations", bytes.NewBuffer(b))
182187
if err != nil {
183188
return err
184189
}
@@ -209,8 +214,7 @@ func (gc *GraphClient) DeleteRelations(relations []knowledge.Relation) error {
209214
return fmt.Errorf("Unable to marshall request body")
210215
}
211216

212-
url := fmt.Sprintf("%s/api/graph/relation?token=%s", gc.url, gc.authToken)
213-
req, err := http.NewRequest("DELETE", url, bytes.NewBuffer(b))
217+
req, err := gc.newRequest("DELETE", "/api/graph/relations", bytes.NewBuffer(b))
214218
if err != nil {
215219
return err
216220
}

internal/client/transaction.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ type Transaction struct {
3232

3333
// The number of items to send to the streaming API in one request
3434
chunkSize int
35+
36+
retryCount int
37+
retryDelay time.Duration
38+
retryBackoffFactor float64
3539
}
3640

3741
// Relate create a relation between two assets
@@ -49,12 +53,12 @@ func (cgt *Transaction) Bind(asset string, assetType schema.AssetType) {
4953
}
5054

5155
// withRetryOnTooManyRequests helper retrying the function when too many request error has been received
52-
func withRetryOnTooManyRequests(fn func() error, maxRetries int) error {
56+
func withRetryOnTooManyRequests(fn func() error, backoffFactor float64, maxRetries int, delay time.Duration) error {
5357
trials := 0
5458
for {
5559
err := fn()
5660
if err != nil {
57-
backoffTime := time.Duration(int(math.Pow(1.01, float64(trials)))*15) * time.Second
61+
backoffTime := time.Duration(int(math.Pow(backoffFactor, float64(trials)))) * delay
5862
fmt.Printf("Sleeping for %d seconds\n", backoffTime/time.Second)
5963
time.Sleep(backoffTime)
6064
} else {
@@ -104,7 +108,7 @@ func (cgt *Transaction) Commit() (*knowledge.Graph, error) {
104108
relations = append(relations, r.(knowledge.Relation))
105109
}
106110
f := p.Exec(func() error {
107-
if err := withRetryOnTooManyRequests(func() error { return cgt.client.DeleteRelations(relations) }, 10); err != nil {
111+
if err := withRetryOnTooManyRequests(func() error { return cgt.client.DeleteRelations(relations) }, cgt.retryBackoffFactor, cgt.retryCount, cgt.retryDelay); err != nil {
108112
return fmt.Errorf("Unable to remove the relations %v: %v", relations, err)
109113
}
110114
progress.Add(len(relations))
@@ -119,7 +123,7 @@ func (cgt *Transaction) Commit() (*knowledge.Graph, error) {
119123
assets = append(assets, a.(knowledge.Asset))
120124
}
121125
f := p.Exec(func() error {
122-
if err := withRetryOnTooManyRequests(func() error { return cgt.client.InsertAssets(assets) }, 10); err != nil {
126+
if err := withRetryOnTooManyRequests(func() error { return cgt.client.InsertAssets(assets) }, cgt.retryBackoffFactor, cgt.retryCount, cgt.retryDelay); err != nil {
123127
return fmt.Errorf("Unable to upsert the asset %v: %v", assets, err)
124128
}
125129
progress.Add(len(assets))
@@ -144,7 +148,7 @@ func (cgt *Transaction) Commit() (*knowledge.Graph, error) {
144148
assets = append(assets, a.(knowledge.Asset))
145149
}
146150
f := p.Exec(func() error {
147-
if err := withRetryOnTooManyRequests(func() error { return cgt.client.DeleteAssets(assets) }, 10); err != nil {
151+
if err := withRetryOnTooManyRequests(func() error { return cgt.client.DeleteAssets(assets) }, cgt.retryBackoffFactor, cgt.retryCount, cgt.retryDelay); err != nil {
148152
return fmt.Errorf("Unable to remove the asset %v: %v", assets, err)
149153
}
150154
progress.Add(len(assets))
@@ -159,7 +163,7 @@ func (cgt *Transaction) Commit() (*knowledge.Graph, error) {
159163
relations = append(relations, r.(knowledge.Relation))
160164
}
161165
f := p.Exec(func() error {
162-
if err := withRetryOnTooManyRequests(func() error { return cgt.client.InsertRelations(relations) }, 10); err != nil {
166+
if err := withRetryOnTooManyRequests(func() error { return cgt.client.InsertRelations(relations) }, cgt.retryBackoffFactor, cgt.retryCount, cgt.retryDelay); err != nil {
163167
return fmt.Errorf("Unable to upsert the relation %v: %v", relations, err)
164168
}
165169
progress.Add(len(relations))

internal/handlers/token.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,25 @@ import (
55
"net/http"
66

77
"github.com/clems4ever/go-graphkb/internal/sources"
8+
"github.com/clems4ever/go-graphkb/internal/utils"
89
)
910

1011
// IsTokenValid is the token valid
1112
func IsTokenValid(registry sources.Registry, r *http.Request) (bool, string, error) {
12-
token, ok := r.URL.Query()["token"]
13+
token := r.Header.Get(utils.XAuthTokenHeader)
1314

14-
if !ok || len(token) != 1 {
15-
return false, "", fmt.Errorf("Unable to detect token query parameter")
15+
if token == "" {
16+
return false, "", fmt.Errorf("No auth token provided")
1617
}
1718

1819
sourceToToken, err := registry.ListSources(r.Context())
1920

2021
if err != nil {
21-
return false, "", err
22+
return false, "", fmt.Errorf("Unable to list the sources: %v", err)
2223
}
2324

2425
for sn, t := range sourceToToken {
25-
if t == token[0] {
26+
if t == token {
2627
return true, sn, nil
2728
}
2829
}

internal/server/server.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -320,10 +320,10 @@ func StartServer(listenInterface string,
320320
sem := semaphore.NewWeighted(concurrency)
321321

322322
r.HandleFunc("/api/graph/schema", handlers.PutSchema(sourcesRegistry, graphUpdater, sem)).Methods("PUT")
323-
r.HandleFunc("/api/graph/asset", handlers.PutAsset(sourcesRegistry, graphUpdater, sem)).Methods("PUT")
324-
r.HandleFunc("/api/graph/relation", handlers.PutRelation(sourcesRegistry, graphUpdater, sem)).Methods("PUT")
325-
r.HandleFunc("/api/graph/asset", handlers.DeleteAsset(sourcesRegistry, graphUpdater, sem)).Methods("DELETE")
326-
r.HandleFunc("/api/graph/relation", handlers.DeleteRelation(sourcesRegistry, graphUpdater, sem)).Methods("DELETE")
323+
r.HandleFunc("/api/graph/assets", handlers.PutAsset(sourcesRegistry, graphUpdater, sem)).Methods("PUT")
324+
r.HandleFunc("/api/graph/assets", handlers.DeleteAsset(sourcesRegistry, graphUpdater, sem)).Methods("DELETE")
325+
r.HandleFunc("/api/graph/relations", handlers.PutRelation(sourcesRegistry, graphUpdater, sem)).Methods("PUT")
326+
r.HandleFunc("/api/graph/relations", handlers.DeleteRelation(sourcesRegistry, graphUpdater, sem)).Methods("DELETE")
327327

328328
r.HandleFunc("/api/query", postQueryHandler).Methods("POST")
329329
r.PathPrefix("/").Handler(http.FileServer(http.Dir("./web/build/")))

internal/utils/constant.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package utils
2+
3+
var (
4+
// XAuthTokenHeader is the name of the header supposed to contain the auth token
5+
XAuthTokenHeader = "X-Auth-Token"
6+
)

0 commit comments

Comments
 (0)