-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathanomalo-catalog.py
More file actions
184 lines (155 loc) · 5.9 KB
/
anomalo-catalog.py
File metadata and controls
184 lines (155 loc) · 5.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
try:
import argparse
import os
import sys
import dotenv
if dotenv.load_dotenv(".env", verbose=True):
print("Loaded environment variables from `.env`")
try:
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
except NameError:
sys.path.insert(0, os.getcwd())
from anomalo_api import AnomaloClient
except Exception as x:
raise Exception(
"Please install required packages with `pip install -r requirements.txt`"
) from x
import traceback
from typing import Sequence
from adapters.base_adapter import AnomaloCatalogAdapter
AVAILABLE_ADAPTERS = {a.__name__: a for a in AnomaloCatalogAdapter.adapters()}
def get_arg_parser():
parser = argparse.ArgumentParser(
description="Sync Anomalo check metadata with your data catalog."
)
parser.add_argument(
"--catalogs",
action="store_true",
dest="list_catalogs",
help="List available catalog integrations",
)
parser.add_argument(
"--catalog", type=str, choices=AVAILABLE_ADAPTERS.keys(), help="Catalog type"
)
parser.add_argument(
"--list-anomalo-organizations",
action="store_true",
dest="list_orgs",
help="List available Anomalo organizations",
)
parser.add_argument(
"--anomalo-organization-id",
type=int,
default=None,
dest="anomalo_organization_id",
help="Anomalo organization ID (default: use last organization accessed by API key's user)",
)
parser.add_argument(
"--warehouse-name",
type=str,
default=None,
dest="warehouse_name",
help="Only sync tables from the named Anomalo data source (aka warehouse)",
)
parser.add_argument(
"--warehouse-id",
type=int,
default=None,
dest="warehouse_id",
help="Only sync tables from the Anomalo data source (aka warehouse) with this id",
)
parser.add_argument(
"--update-table-description",
action="store_true",
dest="update_table_description",
help="Update the table's description field with Anomalo metadata (default: disabled)",
)
parser.add_argument(
"--no-update-labels", # Inverse name for disabling the flag
action="store_false",
dest="update_labels",
help="Disable applying labels to monitored assets in the catalog (default: enabled)",
)
parser.add_argument(
"--no-update-aspect", # Inverse name for disabling the flag
action="store_false",
dest="update_aspect",
help="Disable updating the Anomalo custom Aspect in the catalog (default: enabled)",
)
parser.add_argument(
"--no-update-endorsement", # Inverse name for disabling the flag
action="store_false",
dest="update_endorsement",
help="Disable applying endorsement to monitored assets in the catalog (default: enabled)",
)
parser.add_argument(
"--force-update-typedefs",
action="store_true",
dest="force_update_typedefs",
help="Force re-registration of catalog metadata type definitions (default: disabled)",
)
parser.add_argument(
"--overwrite-table-comment",
action="store_true",
dest="overwrite_table_comment",
help="Overwrite existing table comments entirely instead of only updating the Anomalo section (default: disabled)",
)
return parser
def main(cli_args: Sequence[str] = None):
args = get_arg_parser().parse_args(cli_args)
client = AnomaloClient(args.anomalo_organization_id)
if args.list_orgs:
print("Available Anomalo organizations:")
for org in client.api_client.get_all_organizations():
print(f" {org['id']:>4}: {org['name']}")
exit(0)
if args.list_catalogs:
print(f"Available catalogs: {', '.join(AVAILABLE_ADAPTERS.keys())}")
exit(0)
if not args.catalog:
print(
"--catalog <catalog_name> argument required; use --catalogs to list available options"
)
exit(3)
adapter = AVAILABLE_ADAPTERS[args.catalog](args)
adapter.configure()
print(
f"Reading warehouse list from Anomalo deployment HOST={client.api_client.host} ORGANIZATION_ID={client.organization_id} ..."
)
warehouses = client.get_warehouses()["warehouses"]
wh_summary = [wh["name"] + " (" + str(wh["id"]) + ")" for wh in warehouses]
print(f"Found {len(warehouses)} data sources: {wh_summary}")
updated_table_count = 0
error_table_count = 0
for wh in warehouses:
if args.warehouse_name and wh["name"] != args.warehouse_name:
print(f"Skipping `{wh['name']}` ({wh['id']}): name filter")
continue
if args.warehouse_id and wh["id"] != args.warehouse_id:
print(f"Skipping `{wh['name']}` ({wh['id']}): id filter")
continue
if not adapter.include_warehouse(wh):
print(f"Skipping unsupported data source `{wh['name']}` ({wh['id']})...")
continue
print(
f"Processing configured tables in data source `{wh['name']}` ({wh['id']})..."
)
configured_tables = client.get_configured_tables(warehouse_id=wh["id"])
print(
f"Publishing DQ status to {len(configured_tables)} configured tables in data source `{wh['name']}` ({wh['id']})..."
)
for t in configured_tables:
table_summary = client.get_table_summary(t)
try:
if adapter.update_catalog_asset(wh, table_summary):
updated_table_count += 1
else:
error_table_count += 1
except Exception as e:
print(traceback.format_exc())
error_table_count += 1
print(
f"\n\nFINISHED SYNC. Updated {updated_table_count} tables, failed to sync {error_table_count} tables.\n"
)
if __name__ == "__main__":
main()