@@ -25,24 +25,66 @@ func NewGraphUpdater(graphDB GraphDB, schemaPersistor schema.Persistor) *GraphUp
25
25
return & GraphUpdater {graphDB , schemaPersistor }
26
26
}
27
27
28
- func (sl * GraphUpdater ) doUpdate (updates SourceSubGraphUpdates ) error {
29
- previousSchema , err := sl .schemaPersistor .LoadSchema (context .Background (), updates .Source )
28
+ // Augment the graph of the user with "observed" relation from the source to the each asset
29
+ func (sl * GraphUpdater ) appendObservedRelations (source string , updates * GraphUpdatesBulk ) {
30
+ assetsToAdd := []Asset {Asset {Type : "source" , Key : source }}
31
+ observedRelationsToRemove := []Relation {}
32
+ observedRelationsToAdd := []Relation {}
33
+
34
+ for _ , a := range updates .AssetUpserts {
35
+ observedRelationsToAdd = append (observedRelationsToAdd , Relation {
36
+ Type : "observed" ,
37
+ From : AssetKey (assetsToAdd [0 ]),
38
+ To : AssetKey (a ),
39
+ })
40
+ }
41
+
42
+ for _ , a := range updates .AssetRemovals {
43
+ observedRelationsToRemove = append (observedRelationsToRemove , Relation {
44
+ Type : "observed" ,
45
+ From : AssetKey (assetsToAdd [0 ]),
46
+ To : AssetKey (a ),
47
+ })
48
+ }
49
+
50
+ updates .AssetUpserts = append (updates .AssetUpserts , assetsToAdd ... )
51
+ updates .RelationUpserts = append (updates .RelationUpserts , observedRelationsToAdd ... )
52
+ updates .RelationRemovals = append (updates .RelationRemovals , observedRelationsToRemove ... )
53
+ }
54
+
55
+ func (sl * GraphUpdater ) updateSchema (source string , sg * schema.SchemaGraph ) error {
56
+ for _ , a := range sg .Assets () {
57
+ sg .AddRelation (schema .AssetType ("source" ), "observed" , a )
58
+ }
59
+ sg .AddAsset ("source" )
60
+
61
+ previousSchema , err := sl .schemaPersistor .LoadSchema (context .Background (), source )
30
62
if err != nil {
31
63
fmt .Printf ("[ERROR] Unable to read schema from DB: %v.\n " , err )
32
64
fmt .Println ("[WARNING] The graph has not been updated." )
33
65
return err
34
66
}
35
67
36
- schemaEqual := previousSchema .Equal (updates . Schema )
68
+ schemaEqual := previousSchema .Equal (* sg )
37
69
38
70
if ! schemaEqual {
39
71
fmt .Println ("The schema needs an update" )
40
- if err := sl .schemaPersistor .SaveSchema (context .Background (), updates . Source , updates . Schema ); err != nil {
72
+ if err := sl .schemaPersistor .SaveSchema (context .Background (), source , * sg ); err != nil {
41
73
fmt .Printf ("[ERROR] Unable to write schema in DB: %v.\n " , err )
42
74
fmt .Println ("[WARNING] The graph has not been updated." )
43
75
return err
44
76
}
45
77
}
78
+ return nil
79
+ }
80
+
81
+ func (sl * GraphUpdater ) doUpdate (updates SourceSubGraphUpdates ) error {
82
+ if err := sl .updateSchema (updates .Source , & updates .Schema ); err != nil {
83
+ return err
84
+ }
85
+
86
+ sl .appendObservedRelations (updates .Source , & updates .Updates )
87
+
46
88
if err := sl .graphDB .UpdateGraph (updates .Source , & updates .Updates ); err != nil {
47
89
fmt .Printf ("[ERROR] Unable to write schema in graph DB: %v\n " , err )
48
90
return err
0 commit comments