2
2
import jsonschema
3
3
import os
4
4
import requests
5
- import yaml
6
5
7
6
from eppo_metrics_sync .validation import (
8
7
unique_names ,
13
12
from eppo_metrics_sync .dbt_model_parser import DbtModelParser
14
13
from eppo_metrics_sync .helper import load_yaml
15
14
16
-
17
15
API_ENDPOINT = 'https://eppo.cloud/api/v1/metrics/sync'
18
16
19
17
20
18
class EppoMetricsSync :
21
19
def __init__ (
22
- self ,
23
- directory ,
24
- schema_type = 'eppo' ,
25
- dbt_model_prefix = None
20
+ self ,
21
+ directory ,
22
+ schema_type = 'eppo' ,
23
+ dbt_model_prefix = None ,
24
+ sync_prefix = None
26
25
):
27
26
self .directory = directory
28
27
self .fact_sources = []
29
28
self .metrics = []
30
29
self .validation_errors = []
31
30
self .schema_type = schema_type
32
31
self .dbt_model_prefix = dbt_model_prefix
32
+ self .sync_prefix = sync_prefix
33
33
34
34
# temporary: ideally would pull this from Eppo API
35
35
package_root = os .path .dirname (os .path .abspath (__file__ ))
36
36
schema_path = os .path .join (package_root , 'schema' , 'eppo_metric_schema.json' )
37
37
with open (schema_path ) as schema_file :
38
38
self .schema = json .load (schema_file )
39
39
40
-
41
40
def load_eppo_yaml (self , path ):
42
41
yaml_data = load_yaml (path )
43
42
if 'fact_sources' in yaml_data :
@@ -84,18 +83,25 @@ def read_yaml_files(self):
84
83
self .validation_errors .append (
85
84
f"Schema violation in { yaml_path } : \n { valid .error_message } "
86
85
)
87
-
86
+
88
87
elif self .schema_type == 'dbt-model' :
89
88
self .load_dbt_yaml (yaml_path )
90
-
89
+
91
90
else :
92
91
raise ValueError (f'Unexpected schema_type: { self .schema_type } ' )
93
-
92
+
94
93
if len (self .fact_sources ) == 0 and len (self .metrics ) == 0 :
95
94
raise ValueError (
96
95
'No valid yaml files found. ' + ', ' .join (self .validation_errors )
97
96
)
98
97
98
+ def _add_sync_prefix (self ):
99
+ for source in self .fact_sources :
100
+ source ['name' ] = f"[{ self .sync_prefix } ] { source ['name' ]} "
101
+
102
+ for metric in self .metrics :
103
+ metric ['name' ] = f"[{ self .sync_prefix } ] { metric ['name' ]} "
104
+
99
105
def validate (self ):
100
106
101
107
if len (self .fact_sources ) == 0 and len (self .metrics ) == 0 :
@@ -113,15 +119,23 @@ def validate(self):
113
119
114
120
return True
115
121
122
+ def _determine_sync_tag (self ):
123
+ if self .sync_prefix is not None :
124
+ return self .sync_prefix
125
+
126
+ os .getenv ('EPPO_SYNC_TAG' )
127
+
116
128
def sync (self ):
117
129
self .read_yaml_files ()
130
+ if self .sync_prefix is not None :
131
+ self ._add_sync_prefix ()
118
132
self .validate ()
119
133
120
134
api_key = os .getenv ('EPPO_API_KEY' )
121
135
if not api_key :
122
136
raise Exception ('EPPO_API_KEY not set in environment variables. Please set and try again' )
123
137
124
- sync_tag = os . getenv ( 'EPPO_SYNC_TAG' )
138
+ sync_tag = self . _determine_sync_tag ( )
125
139
if not api_key :
126
140
raise Exception ('EPPO_SYNC_TAG not set in environment variables. Please set and try again' )
127
141
0 commit comments