@@ -87,26 +87,44 @@ CREATE TABLE IF NOT EXISTS relations (
87
87
q , err = m .db .QueryContext (context .Background (), `
88
88
CREATE TABLE IF NOT EXISTS graph_schema (
89
89
id INTEGER AUTO_INCREMENT NOT NULL,
90
- source_name VARCHAR(64) NOT NULL,
90
+ importer VARCHAR(64) NOT NULL,
91
91
graph TEXT NOT NULL,
92
92
timestamp TIMESTAMP,
93
- CONSTRAINT pk_schema PRIMARY KEY (id))` )
93
+
94
+ CONSTRAINT pk_schema PRIMARY KEY (id))` )
95
+ if err != nil {
96
+ return err
97
+ }
98
+ defer q .Close ()
99
+
100
+ // Create the table storing importers tokens
101
+ q , err = m .db .QueryContext (context .Background (), `
102
+ CREATE TABLE IF NOT EXISTS importers (
103
+ id INTEGER AUTO_INCREMENT NOT NULL,
104
+ name VARCHAR(64) NOT NULL,
105
+ auth_token VARCHAR(64) NOT NULL,
106
+
107
+ CONSTRAINT pk_importer PRIMARY KEY (id),
108
+ UNIQUE unique_importer_idx (name, auth_token)
109
+ )` )
94
110
if err != nil {
95
111
return err
96
112
}
97
113
defer q .Close ()
98
114
return nil
99
115
}
100
116
101
- // AssetIDResolver store ID assets in a cache
117
+ // AssetRegistry store ID of assets in a cache
102
118
type AssetRegistry struct {
103
119
cache map [knowledge.AssetKey ]int64
104
120
}
105
121
122
+ // Set id of an asset
106
123
func (ar * AssetRegistry ) Set (a knowledge.AssetKey , idx int64 ) {
107
124
ar .cache [a ] = idx
108
125
}
109
126
127
+ // Get id of an asset
110
128
func (ar * AssetRegistry ) Get (a knowledge.AssetKey ) (int64 , bool ) {
111
129
idx , ok := ar .cache [a ]
112
130
return idx , ok
@@ -286,19 +304,15 @@ func (m *MariaDB) removeRelations(source string, relations []knowledge.Relation)
286
304
DELETE r FROM relations r
287
305
INNER JOIN assets a ON r.from_id = a.id
288
306
INNER JOIN assets b ON r.to_id = b.id
289
- WHERE a.type = ? AND a.value = ? AND b.type = ? AND b.value = ? AND r.type = ?` )
307
+ WHERE a.type = ? AND a.value = ? AND b.type = ? AND b.value = ? AND r.type = ? AND r.source = ? ` )
290
308
if err != nil {
291
309
return 0 , 0 , err
292
310
}
293
311
defer stmt .Close ()
294
312
295
313
for _ , r := range relations {
296
- rel := SourceRelation {
297
- Relation : r ,
298
- Source : source ,
299
- }
300
314
res , err := stmt .ExecContext (context .Background (),
301
- rel .From .Type , rel .From .Key , rel .To .Type , rel .To .Key , rel .Type )
315
+ r .From .Type , r .From .Key , r .To .Type , r .To .Key , r .Type , source )
302
316
if err != nil {
303
317
return 0 , 0 , fmt .Errorf ("Unable to detete relation %v: %v" , r , err )
304
318
}
@@ -508,6 +522,7 @@ func (m *MariaDB) Query(ctx context.Context, query *query.QueryIL) (*knowledge.G
508
522
return res , nil
509
523
}
510
524
525
+ // SaveSchema save the schema graph in database
511
526
func (m * MariaDB ) SaveSchema (ctx context.Context , sourceName string , schema schema.SchemaGraph ) error {
512
527
b , err := json .Marshal (schema )
513
528
if err != nil {
@@ -523,55 +538,60 @@ func (m *MariaDB) SaveSchema(ctx context.Context, sourceName string, schema sche
523
538
return nil
524
539
}
525
540
541
+ // LoadSchema load the schema graph of the source from DB
526
542
func (m * MariaDB ) LoadSchema (ctx context.Context , sourceName string ) (schema.SchemaGraph , error ) {
527
543
row := m .db .QueryRowContext (ctx , "SELECT graph FROM graph_schema WHERE source_name = ? ORDER BY id DESC LIMIT 1" , sourceName )
528
- var rawJson string
529
- if err := row .Scan (& rawJson ); err != nil {
544
+ var rawJSON string
545
+ if err := row .Scan (& rawJSON ); err != nil {
530
546
if err == sql .ErrNoRows {
531
547
return schema .NewSchemaGraph (), nil
532
- } else {
533
- return schema .NewSchemaGraph (), err
534
548
}
549
+ return schema .NewSchemaGraph (), err
535
550
}
536
551
537
552
graph := schema .NewSchemaGraph ()
538
- err := json .Unmarshal ([]byte (rawJson ), & graph )
553
+ err := json .Unmarshal ([]byte (rawJSON ), & graph )
539
554
if err != nil {
540
555
return schema .NewSchemaGraph (), err
541
556
}
542
557
543
558
return graph , nil
544
559
}
545
560
546
- func (m * MariaDB ) ListSources (ctx context.Context ) ([]string , error ) {
547
- rows , err := m .db .QueryContext (ctx , "SELECT DISTINCT source_name FROM graph_schema" )
561
+ // ListImporters list importers with their authentication tokens
562
+ func (m * MariaDB ) ListImporters (ctx context.Context ) (map [string ]string , error ) {
563
+ rows , err := m .db .QueryContext (ctx , "SELECT name, auth_token FROM importers" )
548
564
549
565
if err != nil {
550
566
return nil , fmt .Errorf ("Unable to read sources from database: %v" , err )
551
567
}
552
568
defer rows .Close ()
553
569
554
- sources := make ([ ]string , 0 )
570
+ importers := make (map [ string ]string )
555
571
for rows .Next () {
556
- var source string
557
- if err := rows .Scan (& source ); err != nil {
572
+ var importerName string
573
+ var authToken string
574
+ if err := rows .Scan (& importerName , & authToken ); err != nil {
558
575
return nil , err
559
576
}
560
- sources = append ( sources , source )
577
+ importers [ importerName ] = authToken
561
578
}
562
- return sources , nil
579
+ return importers , nil
563
580
}
564
581
582
+ // MariaDBCursor is a cursor of data retrieved by MariaDB
565
583
type MariaDBCursor struct {
566
584
* sql.Rows
567
585
568
586
Projections []knowledge.Projection
569
587
}
570
588
589
+ // HasMore tells whether there are more data to retrieve from the cursor
571
590
func (mc * MariaDBCursor ) HasMore () bool {
572
591
return mc .Rows .Next ()
573
592
}
574
593
594
+ // Read read one more item from the cursor
575
595
func (mc * MariaDBCursor ) Read (ctx context.Context , doc interface {}) error {
576
596
var err error
577
597
var fArr []string
@@ -650,6 +670,7 @@ func (mc *MariaDBCursor) Read(ctx context.Context, doc interface{}) error {
650
670
return nil
651
671
}
652
672
673
+ // Close the cursor
653
674
func (mc * MariaDBCursor ) Close () error {
654
675
return mc .Rows .Close ()
655
676
}
0 commit comments