Skip to content

Commit 5260640

Browse files
committed
Define the level of concurrency from configuration file.
1 parent 50106e9 commit 5260640

File tree

5 files changed

+35
-60
lines changed

5 files changed

+35
-60
lines changed

cmd/go-graphkb/config.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,6 @@ mariadb_username: graphkb
66
mariadb_password: password
77
mariadb_host: db
88
mariadb_database: graphkb
9+
10+
# The level of concurrency allowed by the graph update API.
11+
concurrency: 32

cmd/go-graphkb/main.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,12 @@ func listen(cmd *cobra.Command, args []string) {
118118
}
119119

120120
listenInterface := viper.GetString("server_listen")
121+
concurrency := viper.GetInt64("concurrency")
122+
if concurrency == 0 {
123+
concurrency = 32
124+
}
121125

122-
server.StartServer(listenInterface, Database, Database, Database, Database)
126+
server.StartServer(listenInterface, Database, Database, Database, Database, concurrency)
123127
}
124128

125129
func read(cmd *cobra.Command, args []string) {

internal/client/transaction.go

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,10 @@ func withRetryOnTooManyRequests(fn func() error, maxRetries int) error {
5050
trials := 0
5151
for {
5252
err := fn()
53-
if err == ErrTooManyRequests {
53+
if err != nil {
5454
backoffTime := time.Duration(int(math.Pow(1.01, float64(trials)))*15) * time.Second
5555
fmt.Printf("Sleeping for %d seconds", backoffTime/time.Second)
5656
time.Sleep(backoffTime)
57-
} else if err != nil {
58-
return err
5957
} else {
6058
return nil
6159
}
@@ -66,38 +64,6 @@ func withRetryOnTooManyRequests(fn func() error, maxRetries int) error {
6664
}
6765
}
6866

69-
func runTasksInParallel(fn func(data interface{}) error, workers int, items []interface{}) error {
70-
var wg sync.WaitGroup
71-
var err error
72-
73-
dataC := make(chan interface{})
74-
75-
wg.Add(workers)
76-
77-
for i := 0; i < workers; i++ {
78-
go func() {
79-
for {
80-
select {
81-
case d, more := <-dataC:
82-
// if d is nil, it means the channel has been closed
83-
if !more || err != nil {
84-
return
85-
}
86-
err = fn(d)
87-
}
88-
}
89-
}()
90-
}
91-
92-
for _, d := range items {
93-
dataC <- d
94-
}
95-
96-
close(dataC)
97-
wg.Wait()
98-
return err
99-
}
100-
10167
// Commit commit the transaction and gives ownership to the source for caching.
10268
func (cgt *Transaction) Commit() (*knowledge.Graph, error) {
10369
sg := cgt.newGraph.ExtractSchema()

internal/handlers/handler_update_graph.go

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@ import (
1212
"golang.org/x/sync/semaphore"
1313
)
1414

15-
var updatesSemaphore = semaphore.NewWeighted(int64(256))
16-
17-
func handleUpdate(registry sources.Registry, fn func(source string, body io.Reader) error) http.HandlerFunc {
15+
func handleUpdate(registry sources.Registry, fn func(source string, body io.Reader) error, sem *semaphore.Weighted) http.HandlerFunc {
1816
return func(w http.ResponseWriter, r *http.Request) {
1917
ok, source, err := IsTokenValid(registry, r)
2018
if err != nil {
@@ -28,12 +26,12 @@ func handleUpdate(registry sources.Registry, fn func(source string, body io.Read
2826
}
2927

3028
{
31-
ok = updatesSemaphore.TryAcquire(1)
29+
ok = sem.TryAcquire(1)
3230
if !ok {
3331
ReplyWithTooManyRequests(w)
3432
return
3533
}
36-
defer updatesSemaphore.Release(1)
34+
defer sem.Release(1)
3735

3836
if err = fn(source, r.Body); err != nil {
3937
ReplyWithInternalError(w, err)
@@ -50,7 +48,7 @@ func handleUpdate(registry sources.Registry, fn func(source string, body io.Read
5048
}
5149

5250
// PutSchema upsert an asset into the graph of the data source
53-
func PutSchema(registry sources.Registry, graphUpdater *knowledge.GraphUpdater) http.HandlerFunc {
51+
func PutSchema(registry sources.Registry, graphUpdater *knowledge.GraphUpdater, sem *semaphore.Weighted) http.HandlerFunc {
5452
return handleUpdate(registry, func(source string, body io.Reader) error {
5553
requestBody := client.PutGraphSchemaRequestBody{}
5654
if err := json.NewDecoder(body).Decode(&requestBody); err != nil {
@@ -60,11 +58,11 @@ func PutSchema(registry sources.Registry, graphUpdater *knowledge.GraphUpdater)
6058
// TODO(c.michaud): verify compatibility of the schema with graph updates
6159
graphUpdater.UpdateSchema(source, requestBody.Schema)
6260
return nil
63-
})
61+
}, sem)
6462
}
6563

6664
// PutAsset upsert an asset into the graph of the data source
67-
func PutAsset(registry sources.Registry, graphUpdater *knowledge.GraphUpdater) http.HandlerFunc {
65+
func PutAsset(registry sources.Registry, graphUpdater *knowledge.GraphUpdater, sem *semaphore.Weighted) http.HandlerFunc {
6866
return handleUpdate(registry, func(source string, body io.Reader) error {
6967
requestBody := client.PutGraphAssetRequestBody{}
7068
if err := json.NewDecoder(body).Decode(&requestBody); err != nil {
@@ -74,11 +72,11 @@ func PutAsset(registry sources.Registry, graphUpdater *knowledge.GraphUpdater) h
7472
// TODO(c.michaud): verify compatibility of the schema with graph updates
7573
graphUpdater.UpsertAsset(source, requestBody.Asset)
7674
return nil
77-
})
75+
}, sem)
7876
}
7977

8078
// PutRelation upsert a relation into the graph of the data source
81-
func PutRelation(registry sources.Registry, graphUpdater *knowledge.GraphUpdater) http.HandlerFunc {
79+
func PutRelation(registry sources.Registry, graphUpdater *knowledge.GraphUpdater, sem *semaphore.Weighted) http.HandlerFunc {
8280
return handleUpdate(registry, func(source string, body io.Reader) error {
8381
requestBody := client.PutGraphRelationRequestBody{}
8482
if err := json.NewDecoder(body).Decode(&requestBody); err != nil {
@@ -88,11 +86,11 @@ func PutRelation(registry sources.Registry, graphUpdater *knowledge.GraphUpdater
8886
// TODO(c.michaud): verify compatibility of the schema with graph updates
8987
graphUpdater.UpsertRelation(source, requestBody.Relation)
9088
return nil
91-
})
89+
}, sem)
9290
}
9391

9492
// DeleteAsset delete an asset from the graph of the data source
95-
func DeleteAsset(registry sources.Registry, graphUpdater *knowledge.GraphUpdater) http.HandlerFunc {
93+
func DeleteAsset(registry sources.Registry, graphUpdater *knowledge.GraphUpdater, sem *semaphore.Weighted) http.HandlerFunc {
9694
return handleUpdate(registry, func(source string, body io.Reader) error {
9795
requestBody := client.DeleteGraphAssetRequestBody{}
9896
if err := json.NewDecoder(body).Decode(&requestBody); err != nil {
@@ -102,11 +100,11 @@ func DeleteAsset(registry sources.Registry, graphUpdater *knowledge.GraphUpdater
102100
// TODO(c.michaud): verify compatibility of the schema with graph updates
103101
graphUpdater.RemoveAsset(source, requestBody.Asset)
104102
return nil
105-
})
103+
}, sem)
106104
}
107105

108106
// DeleteRelation upsert a relation into the graph of the data source
109-
func DeleteRelation(registry sources.Registry, graphUpdater *knowledge.GraphUpdater) http.HandlerFunc {
107+
func DeleteRelation(registry sources.Registry, graphUpdater *knowledge.GraphUpdater, sem *semaphore.Weighted) http.HandlerFunc {
110108
return handleUpdate(registry, func(source string, body io.Reader) error {
111109
requestBody := client.DeleteGraphRelationRequestBody{}
112110
if err := json.NewDecoder(body).Decode(&requestBody); err != nil {
@@ -116,5 +114,5 @@ func DeleteRelation(registry sources.Registry, graphUpdater *knowledge.GraphUpda
116114
// TODO(c.michaud): verify compatibility of the schema with graph updates
117115
graphUpdater.RemoveRelation(source, requestBody.Relation)
118116
return nil
119-
})
117+
}, sem)
120118
}

internal/server/server.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/clems4ever/go-graphkb/internal/sources"
1818
"github.com/clems4ever/go-graphkb/internal/utils"
1919
"github.com/prometheus/client_golang/prometheus/promhttp"
20+
"golang.org/x/sync/semaphore"
2021

2122
auth "github.com/abbot/go-http-auth"
2223

@@ -276,7 +277,8 @@ func StartServer(listenInterface string,
276277
database knowledge.GraphDB,
277278
schemaPersistor schema.Persistor,
278279
sourcesRegistry sources.Registry,
279-
queryHistorizer history.Historizer) {
280+
queryHistorizer history.Historizer,
281+
concurrency int64) {
280282

281283
r := mux.NewRouter()
282284

@@ -314,11 +316,13 @@ func StartServer(listenInterface string,
314316

315317
r.HandleFunc("/api/graph/read", getGraphRead(sourcesRegistry, database)).Methods("GET")
316318

317-
r.HandleFunc("/api/graph/schema", handlers.PutSchema(sourcesRegistry, graphUpdater)).Methods("PUT")
318-
r.HandleFunc("/api/graph/asset", handlers.PutAsset(sourcesRegistry, graphUpdater)).Methods("PUT")
319-
r.HandleFunc("/api/graph/relation", handlers.PutRelation(sourcesRegistry, graphUpdater)).Methods("PUT")
320-
r.HandleFunc("/api/graph/asset", handlers.DeleteAsset(sourcesRegistry, graphUpdater)).Methods("DELETE")
321-
r.HandleFunc("/api/graph/relation", handlers.DeleteRelation(sourcesRegistry, graphUpdater)).Methods("DELETE")
319+
sem := semaphore.NewWeighted(concurrency)
320+
321+
r.HandleFunc("/api/graph/schema", handlers.PutSchema(sourcesRegistry, graphUpdater, sem)).Methods("PUT")
322+
r.HandleFunc("/api/graph/asset", handlers.PutAsset(sourcesRegistry, graphUpdater, sem)).Methods("PUT")
323+
r.HandleFunc("/api/graph/relation", handlers.PutRelation(sourcesRegistry, graphUpdater, sem)).Methods("PUT")
324+
r.HandleFunc("/api/graph/asset", handlers.DeleteAsset(sourcesRegistry, graphUpdater, sem)).Methods("DELETE")
325+
r.HandleFunc("/api/graph/relation", handlers.DeleteRelation(sourcesRegistry, graphUpdater, sem)).Methods("DELETE")
322326

323327
r.HandleFunc("/api/query", postQueryHandler).Methods("POST")
324328
r.PathPrefix("/").Handler(http.FileServer(http.Dir("./web/build/")))
@@ -327,12 +331,12 @@ func StartServer(listenInterface string,
327331

328332
var err error
329333
if viper.GetString("server_tls_cert") != "" {
330-
fmt.Printf("Listening on %s with TLS enabled, the connection is secure\n", listenInterface)
334+
fmt.Printf("Listening on %s with TLS enabled, the connection is secure [concurrency=%d]\n", listenInterface, concurrency)
331335
err = http.ListenAndServeTLS(listenInterface, viper.GetString("server_tls_cert"),
332336
viper.GetString("server_tls_key"), r)
333337
} else {
334-
fmt.Printf("[WARNING] Listening on %s with TLS disabled. Use `server_tls_cert` option to setup a certificate\n",
335-
listenInterface)
338+
fmt.Printf("[WARNING] Listening on %s with TLS disabled. Use `server_tls_cert` option to setup a certificate [concurrency=%d]\n",
339+
listenInterface, concurrency)
336340
err = http.ListenAndServe(listenInterface, r)
337341
}
338342
if err != nil {

0 commit comments

Comments
 (0)