Skip to content

Commit 0150446

Browse files
committed
Separate importers from backend.
Separating importers from backend allows to have them running remotely independently from the backend serving the UI. An programmatic API is also provided to easily create a new importer in Go. Currently, importers are directly connected to the database but in the long-term the backend should act as a gateway to control DB accesses.
1 parent 1ba79e8 commit 0150446

29 files changed

+521
-561
lines changed

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ go:
66
script:
77
- go test -v ./...
88
- go build -o go-graphkb cmd/go-graphkb/main.go
9+
- go build -o source-csv cmd/source-csv/main.go

cmd/go-graphkb/main.go

Lines changed: 5 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,17 @@ import (
44
"context"
55
"fmt"
66
"log"
7-
"sort"
8-
"strings"
9-
"sync"
107
"time"
118

9+
"github.com/clems4ever/go-graphkb/internal/database"
1210
"github.com/clems4ever/go-graphkb/internal/knowledge"
1311
"github.com/clems4ever/go-graphkb/internal/server"
14-
"github.com/clems4ever/go-graphkb/internal/sources"
1512
"github.com/spf13/cobra"
1613
"github.com/spf13/viper"
1714
)
1815

19-
// Sources repository of sources
20-
var Sources []sources.Source
21-
2216
// Database the selected database
23-
var Database knowledge.GraphDB
17+
var Database *database.MariaDB
2418

2519
// ConfigPath string
2620
var ConfigPath string
@@ -34,12 +28,6 @@ func main() {
3428
Use: "go-graphkb [opts]",
3529
}
3630

37-
startCmd := &cobra.Command{
38-
Use: "start",
39-
Run: start,
40-
Args: cobra.MaximumNArgs(1),
41-
}
42-
4331
listenCmd := &cobra.Command{
4432
Use: "listen",
4533
Run: listen,
@@ -61,12 +49,6 @@ func main() {
6149
Args: cobra.ExactArgs(1),
6250
}
6351

64-
sourceCmd := &cobra.Command{
65-
Use: "source [source]",
66-
Run: getSource,
67-
Args: cobra.ExactArgs(1),
68-
}
69-
7052
queryCmd := &cobra.Command{
7153
Use: "query [query]",
7254
Run: queryFunc,
@@ -77,7 +59,7 @@ func main() {
7759

7860
cobra.OnInitialize(onInit)
7961

80-
rootCmd.AddCommand(startCmd, cleanCmd, sourceCmd, listenCmd, countCmd, readCmd, queryCmd)
62+
rootCmd.AddCommand(cleanCmd, listenCmd, countCmd, readCmd, queryCmd)
8163
if err := rootCmd.Execute(); err != nil {
8264
log.Fatal(err)
8365
}
@@ -97,75 +79,11 @@ func onInit() {
9779
if dbName == "" {
9880
log.Fatal("Please provide database_name option in your configuration file")
9981
}
100-
Database = knowledge.NewMariaDB(
82+
Database = database.NewMariaDB(
10183
viper.GetString("mariadb.username"),
10284
viper.GetString("mariadb.password"),
10385
viper.GetString("mariadb.host"),
10486
dbName)
105-
106-
Sources = []sources.Source{
107-
sources.NewCSVSource(),
108-
}
109-
110-
for _, s := range Sources {
111-
sources.Registry.Add(s)
112-
g, err := s.Graph()
113-
114-
if err != nil {
115-
log.Fatal(err)
116-
}
117-
118-
for _, a := range g.Assets() {
119-
knowledge.SchemaRegistrySingleton.AddAssetType(a)
120-
}
121-
122-
for _, r := range g.Relations() {
123-
knowledge.SchemaRegistrySingleton.AddRelationType(r.Type)
124-
}
125-
}
126-
}
127-
128-
func start(cmd *cobra.Command, args []string) {
129-
var wg sync.WaitGroup
130-
wg.Add(1)
131-
132-
eventBus := make(chan knowledge.SourceSubGraphUpdates)
133-
listener := knowledge.NewSourceListener(Database)
134-
135-
if err := Database.InitializeSchema(); err != nil {
136-
log.Fatal(err)
137-
}
138-
139-
// Start kb listening
140-
go listener.Listen(eventBus)
141-
142-
var selectedSources []sources.Source
143-
144-
// if argument is provided, we select the source
145-
if len(args) == 1 {
146-
for _, s := range Sources {
147-
if s.Name() == args[0] {
148-
selectedSources = []sources.Source{s}
149-
break
150-
}
151-
}
152-
if len(selectedSources) == 0 {
153-
log.Fatal(fmt.Sprintf("Unable to find source with name %s", args[0]))
154-
}
155-
156-
} else {
157-
selectedSources = Sources
158-
}
159-
160-
for _, source := range selectedSources {
161-
emitter := knowledge.NewGraphEmitter(source.Name(), eventBus, Database)
162-
err := source.Start(emitter)
163-
if err != nil {
164-
fmt.Printf("[ERROR] %s\n", err)
165-
}
166-
}
167-
168-
wg.Wait()
16987
}
17088

17189
func count(cmd *cobra.Command, args []string) {
@@ -188,53 +106,8 @@ func flush(cmd *cobra.Command, args []string) {
188106
fmt.Println("Successul flush")
189107
}
190108

191-
func getSource(cmd *cobra.Command, args []string) {
192-
sourceName := args[0]
193-
194-
selectedSources := []sources.Source{}
195-
for _, s := range Sources {
196-
if s.Name() == sourceName {
197-
selectedSources = append(selectedSources, s)
198-
}
199-
}
200-
201-
assets := make(map[string]bool)
202-
relations := make(map[string]bool)
203-
for _, s := range selectedSources {
204-
g, err := s.Graph()
205-
if err != nil {
206-
log.Fatal(err)
207-
}
208-
209-
for _, a := range g.Assets() {
210-
assets[string(a)] = true
211-
}
212-
213-
for _, r := range g.Relations() {
214-
t := fmt.Sprintf("%s_%s_%s", r.FromType, r.Type, r.ToType)
215-
relations[t] = true
216-
}
217-
}
218-
219-
assetsSlice := make([]string, 0)
220-
relationsSlice := make([]string, 0)
221-
for a := range assets {
222-
assetsSlice = append(assetsSlice, fmt.Sprintf("\t%s", a))
223-
}
224-
for r := range relations {
225-
relationsSlice = append(relationsSlice, fmt.Sprintf("\t%s", r))
226-
}
227-
228-
sort.Strings(assetsSlice)
229-
sort.Strings(relationsSlice)
230-
231-
fmt.Printf("assets -> \n%s\nrelations -> \n%s\n",
232-
strings.Join(assetsSlice, "\n"),
233-
strings.Join(relationsSlice, "\n"))
234-
}
235-
236109
func listen(cmd *cobra.Command, args []string) {
237-
server.StartServer(Database)
110+
server.StartServer(Database, Database)
238111
}
239112

240113
func read(cmd *cobra.Command, args []string) {

cmd/source-csv/main.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"log"
6+
7+
"github.com/clems4ever/go-graphkb/importer"
8+
"github.com/clems4ever/go-graphkb/internal/sources"
9+
"github.com/spf13/cobra"
10+
"github.com/spf13/viper"
11+
)
12+
13+
// ConfigPath string
14+
var ConfigPath string
15+
16+
// SourceName string
17+
var SourceName string
18+
19+
func onInit() {
20+
viper.SetConfigFile(ConfigPath)
21+
viper.SetConfigType("yaml")
22+
23+
if err := viper.ReadInConfig(); err != nil {
24+
panic(fmt.Errorf("Cannot read configuration file from %s", ConfigPath))
25+
}
26+
27+
fmt.Println("Using config file:", viper.ConfigFileUsed())
28+
}
29+
30+
func main() {
31+
log.SetFlags(log.LstdFlags | log.Lshortfile)
32+
33+
cobra.OnInitialize(onInit)
34+
35+
rootCmd := &cobra.Command{
36+
Use: "source-csv [opts]",
37+
Run: func(cmd *cobra.Command, args []string) {
38+
if err := importer.Start(SourceName, sources.NewCSVSource(), nil); err != nil {
39+
log.Fatal(err)
40+
}
41+
},
42+
}
43+
44+
rootCmd.PersistentFlags().StringVar(&ConfigPath, "config", "config.yml", "Provide the path to the configuration file (required)")
45+
rootCmd.PersistentFlags().StringVar(&SourceName, "source-name", "", "Provide a unique source name")
46+
47+
if err := cobra.MarkFlagRequired(rootCmd.PersistentFlags(), "source-name"); err != nil {
48+
log.Fatal(err)
49+
}
50+
51+
if err := rootCmd.Execute(); err != nil {
52+
log.Fatal(err)
53+
}
54+
}

config.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,4 @@ mariadb:
88

99
sources:
1010
csv:
11-
schema: ./examples/example-schema.csv
1211
data: ./examples/example-data.csv

examples/example-schema.csv

Lines changed: 0 additions & 5 deletions
This file was deleted.

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/abbot/go-http-auth v0.4.0
77
github.com/antlr/antlr4 v0.0.0-20200103163232-691acdc23f1f
88
github.com/cheggaaa/pb v2.0.7+incompatible
9+
github.com/deckarep/golang-set v1.7.1
910
github.com/go-sql-driver/mysql v1.5.0
1011
github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259
1112
github.com/gorilla/mux v1.7.3

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
2929
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3030
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
3131
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
32+
github.com/deckarep/golang-set v1.7.1 h1:SCQV0S6gTtp6itiFrTqI+pfmJ4LN85S1YzhDf9rTHJQ=
33+
github.com/deckarep/golang-set v1.7.1/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ=
3234
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
3335
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
3436
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=

importer/importer.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package importer
2+
3+
import (
4+
"fmt"
5+
"log"
6+
7+
"github.com/clems4ever/go-graphkb/internal/database"
8+
"github.com/clems4ever/go-graphkb/internal/knowledge"
9+
"github.com/clems4ever/go-graphkb/internal/sources"
10+
"github.com/spf13/viper"
11+
)
12+
13+
type ImporterOptions struct {
14+
CacheGraph bool
15+
}
16+
17+
func Start(name string, source sources.Source, options *ImporterOptions) error {
18+
dbName := viper.GetString("mariadb.database")
19+
if dbName == "" {
20+
return fmt.Errorf("Please provide database_name option in your configuration file")
21+
}
22+
mariaDatabase := database.NewMariaDB(
23+
viper.GetString("mariadb.username"),
24+
viper.GetString("mariadb.password"),
25+
viper.GetString("mariadb.host"),
26+
dbName)
27+
28+
observableSource := sources.NewObservableSource(source)
29+
30+
eventBus := make(chan knowledge.SourceSubGraphUpdates)
31+
listener := knowledge.NewSourceListener(mariaDatabase, mariaDatabase)
32+
33+
closeC := listener.Listen(eventBus)
34+
35+
if err := mariaDatabase.InitializeSchema(); err != nil {
36+
log.Fatal(err)
37+
}
38+
39+
emitter := knowledge.NewGraphEmitter(name, eventBus, mariaDatabase)
40+
if err := observableSource.Start(emitter); err != nil {
41+
return err
42+
}
43+
44+
close(eventBus)
45+
<-closeC
46+
47+
return nil
48+
}

0 commit comments

Comments
 (0)