-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
98 lines (78 loc) · 2.7 KB
/
main.py
File metadata and controls
98 lines (78 loc) · 2.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# /// script
# requires-python = ">=3.13"
# dependencies = [
# "datablob",
# "pyodbc",
# "pyodbc-extras",
# "simple-env",
# "tzdata",
# ]
# ///
import datablob
import pyodbc
import pyodbc_extras
import simple_env as se
from zoneinfo import ZoneInfo
timezone = ZoneInfo(se.get("GFI_RTESUM_TIMEZONE"))
AWS_BUCKET_NAME = se.get("AWS_BUCKET_NAME")
if not AWS_BUCKET_NAME:
raise Exception("[dataops-gfi-rtesum] missing AWS_BUCKET_NAME")
AWS_BUCKET_PATH = se.get("AWS_BUCKET_PATH")
if not AWS_BUCKET_PATH:
raise Exception("[dataops-gfi-rtesum] missing AWS_BUCKET_PATH")
DB_SERVER = se.get("DB_SERVER")
if not DB_SERVER:
raise Exception("[dataops-gfi-rtesum] missing DB_SERVER")
DB_DATABASE = se.get("DB_DATABASE")
if not DB_DATABASE:
raise Exception("[dataops-gfi-rtesum] missing DB_DATABASE")
DB_USER = se.get("DB_USER")
if not DB_USER:
raise Exception("[dataops-gfi-rtesum] missing DB_USER")
DB_PASSWORD = se.get("DB_PASSWORD")
if not DB_PASSWORD:
raise Exception("[dataops-gfi-rtesum] missing DB_PASSWORD")
DB_TRUST_SERVER_CERTIFICATE = se.get("DB_TRUST_SERVER_CERTIFICATE")
if not DB_TRUST_SERVER_CERTIFICATE:
raise Exception("[dataops-gfi-rtesum] missing DB_TRUST_SERVER_CERTIFICATE")
available_drivers = pyodbc.drivers()
print("available_drivers:", available_drivers)
if "ODBC Driver 17 for SQL Server" in available_drivers:
driver = "ODBC Driver 17 for SQL Server"
elif "SQL Server" in available_drivers:
driver = "SQL Server"
elif "ODBC Driver 11 for SQL Server" in available_drivers:
driver = "ODBC Driver 11 for SQL Server"
else:
# select first driver available
driver = available_drivers[0]
print(f'[dataops-gfi-rtesum] chose driver "{driver}"')
params = {
"DRIVER": "{" + driver + "}",
"SERVER": DB_SERVER,
"DATABASE": DB_DATABASE,
"UID": DB_USER,
"PWD": DB_PASSWORD,
"ENCRYPT": "yes",
"TrustServerCertificate": DB_TRUST_SERVER_CERTIFICATE,
}
connection_string = ";".join(["=".join(item) for item in params.items()])
cnxn = pyodbc.connect(connection_string)
print("[dataops-gfi-rtesum] connected to database")
cursor = cnxn.cursor()
print("[dataops-gfi-rtesum] created database cursor")
table = pyodbc_extras.dump_table(cursor, "rtesum")
column_names = table["column_names"]
rows = table["rows"]
for row in rows:
row["tday"] = row["tday"].replace(tzinfo=timezone).isoformat()
row["curr_r"] = float(str(row["curr_r"]))
row["uncl_r"] = float(str(row["uncl_r"]))
# client for data store
client = datablob.DataBlobClient(
bucket_name=AWS_BUCKET_NAME, bucket_path=AWS_BUCKET_PATH
)
client.update_dataset(
name="gfi_rtesum", version="1", data=rows, column_names=column_names
)
print(f"[dataops-gfi-rtesum] updated {len(rows)} rows")