Skip to content

Commit 9c60741

Browse files
committed
Add CSV source.
1 parent d071bdb commit 9c60741

File tree

11 files changed

+185
-87
lines changed

11 files changed

+185
-87
lines changed

cmd/go-graphkb/main.go

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,6 @@ var ConfigPath string
2727

2828
func init() {
2929
log.SetFlags(log.LstdFlags | log.Lshortfile)
30-
31-
Sources = []sources.Source{}
32-
33-
for _, s := range Sources {
34-
sources.Registry.Add(s)
35-
g := s.Graph()
36-
37-
for _, a := range g.Assets() {
38-
knowledge.SchemaRegistrySingleton.AddAssetType(a)
39-
}
40-
41-
for _, r := range g.Relations() {
42-
knowledge.SchemaRegistrySingleton.AddRelationType(r.Type)
43-
}
44-
}
4530
}
4631

4732
func main() {
@@ -117,6 +102,27 @@ func onInit() {
117102
viper.GetString("mariadb.password"),
118103
viper.GetString("mariadb.host"),
119104
dbName)
105+
106+
Sources = []sources.Source{
107+
sources.NewCSVSource(),
108+
}
109+
110+
for _, s := range Sources {
111+
sources.Registry.Add(s)
112+
g, err := s.Graph()
113+
114+
if err != nil {
115+
log.Fatal(err)
116+
}
117+
118+
for _, a := range g.Assets() {
119+
knowledge.SchemaRegistrySingleton.AddAssetType(a)
120+
}
121+
122+
for _, r := range g.Relations() {
123+
knowledge.SchemaRegistrySingleton.AddRelationType(r.Type)
124+
}
125+
}
120126
}
121127

122128
func start(cmd *cobra.Command, args []string) {
@@ -179,6 +185,7 @@ func flush(cmd *cobra.Command, args []string) {
179185
if err := Database.FlushAll(); err != nil {
180186
log.Fatal(err)
181187
}
188+
fmt.Println("Successul flush")
182189
}
183190

184191
func getSource(cmd *cobra.Command, args []string) {
@@ -194,7 +201,11 @@ func getSource(cmd *cobra.Command, args []string) {
194201
assets := make(map[string]bool)
195202
relations := make(map[string]bool)
196203
for _, s := range selectedSources {
197-
g := s.Graph()
204+
g, err := s.Graph()
205+
if err != nil {
206+
log.Fatal(err)
207+
}
208+
198209
for _, a := range g.Assets() {
199210
assets[string(a)] = true
200211
}

config.yml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
port: 8090
22

33
mariadb:
4-
username: root
5-
password: example
4+
username: graphkb
5+
password: password
66
host:
77
database: graphkb
8+
9+
sources:
10+
csv:
11+
schema: ./examples/example-schema.csv
12+
data: ./examples/example-data.csv

docker-compose.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,9 @@ services:
1010
- 3306:3306
1111
environment:
1212
MYSQL_ROOT_PASSWORD: example
13+
MYSQL_DATABASE: graphkb
14+
MYSQL_USER: graphkb
15+
MYSQL_PASSWORD: password
1316
volumes:
1417
- /var/lib/go-graphkb:/var/lib/mysql
1518

examples/example-data.csv

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from_type,from_value,relation_type,to_type,to_value
2+
employee,bob,is_in,team,Team-a
3+
employee,charlie,is_in,team,Team-a
4+
employee,alice,is_in,team,Team-b
5+
employee,harry,is_in,team,Team-b
6+
employee,john,is_in,team,Team-c
7+
employee,james,is_in,team,Team-b
8+
employee,john,work_at,office,P1
9+
employee,alice,work_at,office,P1
10+
employee,bob,work_at,office,B2
11+
employee,charlie,work_at,office,B2
12+
employee,james,work_at,office,B2
13+
employee,harry,work_at,office,B2
14+
office,P1,is_in,country,Paris
15+
office,B2,is_in,country,Barcelona
16+
employee,harry,is_manager_of,employee,alice
17+
employee,harry,is_manager_of,employee,james

examples/example-schema.csv

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from_type,relation_type,to_type
2+
employee,is_in,team
3+
employee,is_manager_of,employee
4+
employee,work_at,office
5+
office,is_in,country

internal/knowledge/fieldbinding.go

Lines changed: 0 additions & 62 deletions
This file was deleted.

internal/knowledge/graph.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func NewGraph() *Graph {
4949
// AddAsset add an asset to the graph
5050
func (g *Graph) AddAsset(assetType AssetType, assetKey string) AssetKey {
5151
if !SchemaRegistrySingleton.AssetExists(assetType) {
52-
log.Fatal(fmt.Errorf("Asset type '%s' does not exist to add asset %s", assetType, assetKey))
52+
log.Fatal(fmt.Errorf("Asset type '%s' does not exist in schema registry. Cannot add asset value '%s'", assetType, assetKey))
5353
}
5454
asset := Asset{Type: assetType, Key: assetKey}
5555
g.assets[asset] = true
@@ -59,7 +59,7 @@ func (g *Graph) AddAsset(assetType AssetType, assetKey string) AssetKey {
5959
// AddRelation add a relation to the graph
6060
func (g *Graph) AddRelation(from AssetKey, relationType RelationKeyType, to AssetKey) Relation {
6161
if !SchemaRegistrySingleton.RelationExists(relationType) {
62-
log.Fatal(fmt.Errorf("Relation type '%s' does not exist", relationType))
62+
log.Fatal(fmt.Errorf("Relation type '%s' does not exist in schema registry", relationType))
6363
}
6464
relation := Relation{
6565
Type: relationType,

internal/server/server.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,12 @@ func getSourceGraph(w http.ResponseWriter, r *http.Request) {
5656
for _, sname := range sourceNames {
5757
if sources.Registry.Exist(sname) {
5858
s := sources.Registry.Get(sname)
59-
sg.Merge(s.Graph())
59+
g, err := s.Graph()
60+
if err != nil {
61+
w.WriteHeader(http.StatusInternalServerError)
62+
return
63+
}
64+
sg.Merge(*g)
6065
}
6166
}
6267
replyWithSourceGraph(w, &sg)

internal/sources/csv_source.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package sources
2+
3+
import (
4+
"encoding/csv"
5+
"fmt"
6+
"io"
7+
"os"
8+
9+
"github.com/clems4ever/go-graphkb/internal/knowledge"
10+
"github.com/spf13/viper"
11+
)
12+
13+
type CSVSource struct {
14+
schemaPath string
15+
dataPath string
16+
}
17+
18+
func NewCSVSource() *CSVSource {
19+
csvSource := new(CSVSource)
20+
csvSource.schemaPath = viper.GetString("sources.csv.schema")
21+
csvSource.dataPath = viper.GetString("sources.csv.data")
22+
return csvSource
23+
}
24+
25+
func (cs *CSVSource) Name() string {
26+
return "csv"
27+
}
28+
29+
func (cs *CSVSource) Graph() (*knowledge.SchemaGraph, error) {
30+
file, err := os.Open(cs.schemaPath)
31+
if err != nil {
32+
return nil, err
33+
}
34+
defer file.Close()
35+
36+
r := csv.NewReader(file)
37+
38+
graph := knowledge.NewSchemaGraph()
39+
40+
header := true
41+
42+
for {
43+
record, err := r.Read()
44+
if err == io.EOF {
45+
break
46+
}
47+
if err != nil {
48+
return nil, fmt.Errorf("Unable to read schema in CSV file: %v", err)
49+
}
50+
51+
// Skip header line
52+
if header {
53+
header = false
54+
continue
55+
}
56+
57+
fromType := graph.AddAsset(record[0])
58+
toType := graph.AddAsset(record[2])
59+
graph.AddRelation(fromType, record[1], toType)
60+
}
61+
return &graph, nil
62+
}
63+
64+
func (cs *CSVSource) Start(emitter *knowledge.GraphEmitter) error {
65+
file, err := os.Open(cs.dataPath)
66+
if err != nil {
67+
return err
68+
}
69+
defer file.Close()
70+
71+
r := csv.NewReader(file)
72+
73+
previousGraph, err := emitter.Read()
74+
if err != nil {
75+
return err
76+
}
77+
78+
tx := emitter.CreateCompleteGraphTransaction(previousGraph)
79+
80+
header := true
81+
82+
for {
83+
record, err := r.Read()
84+
if err == io.EOF {
85+
break
86+
}
87+
if err != nil {
88+
return fmt.Errorf("Unable to read schema in CSV file: %v", err)
89+
}
90+
91+
// Skip header line
92+
if header {
93+
header = false
94+
continue
95+
}
96+
97+
relationType := knowledge.RelationType{
98+
FromType: knowledge.AssetType(record[0]),
99+
ToType: knowledge.AssetType(record[3]),
100+
Type: knowledge.RelationKeyType(record[2]),
101+
}
102+
103+
tx.Relate(record[1], relationType, record[4])
104+
}
105+
tx.Commit()
106+
return nil
107+
}
108+
109+
func (cs *CSVSource) Stop() error {
110+
return nil
111+
}

internal/sources/observable_source.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@ func (os *ObservableSource) Name() string {
1818
return os.source.Name()
1919
}
2020

21-
func (os *ObservableSource) Graph() knowledge.SchemaGraph {
22-
sg := os.source.Graph()
21+
func (os *ObservableSource) Graph() (*knowledge.SchemaGraph, error) {
22+
sg, err := os.source.Graph()
23+
if err != nil {
24+
return nil, err
25+
}
2326

2427
source := sg.AddAsset("source")
2528
for _, a := range sg.Assets() {
@@ -28,7 +31,7 @@ func (os *ObservableSource) Graph() knowledge.SchemaGraph {
2831
}
2932
sg.AddRelation(source, "observed", a)
3033
}
31-
return sg
34+
return sg, nil
3235
}
3336

3437
func (os *ObservableSource) Start(e *knowledge.GraphEmitter) error {

0 commit comments

Comments
 (0)