-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathupdate_data_for_amp_review.py
More file actions
196 lines (173 loc) · 7.34 KB
/
update_data_for_amp_review.py
File metadata and controls
196 lines (173 loc) · 7.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Append NEW [NMAquifer_Table.Field, PointID, Error] rows to AMP_review!A:C
without overwriting existing content. Skips duplicates already present.
- Parses transfer metrics blocks under "PointID|Table|Field|Error"
- Extra error columns are appended to Error, then cleaned/normalized
- Only writes to columns A..C using Sheets 'append' API
"""
from pathlib import Path
import sys
import re
from google.oauth2.service_account import Credentials
from googleapiclient.discovery import build
from datetime import datetime, timezone
# Write last-run timestamp here (pick a cell outside your data table)
UPDATED_CELL = "A1" # change to e.g. "H1" or "AA1" if A1 is your header row
UPDATED_PREFIX = "TRANSFER METRICS UPDATED:"
# ======= CONFIG — EDIT THESE =======
SERVICE_ACCOUNT_FILE = "transfermetrics_service_account.json"
SPREADSHEET_ID = "1iQzeKqRWHIKbnNptH_wRQEpJ_pt1rI00ax9d5BhDAhU"
SHEET_NAME = "AMP_review"
TRANSFER_METRICS_PATH = r"transfer_metrics_logs/transfer_metrics_metrics_2026-01-27T01_15_56.csv"
# ===================================
HEADER_CANON = "pointid|table|field|error"
def get_sheets_service(sa_path: str):
scopes = ["https://www.googleapis.com/auth/spreadsheets"]
creds = Credentials.from_service_account_file(sa_path, scopes=scopes)
return build("sheets", "v4", credentials=creds)
def canon(s: str) -> str:
return (s or "").strip().lower()
# ---------- Error text normalization ----------
_row_id_re = re.compile(r"\brow\.id\s*=\s*\d+,\s*", re.IGNORECASE)
_sensor_type_re = re.compile(
r"key\s+error\s+adding\s+sensor_type\s*:\s*(?P<stype>[^,|]+?)\s*error\s*:\s*'?(?P=stype)'?",
re.IGNORECASE,
)
_org_missing_re = re.compile(
r'key\s*\(organization\)\s*=\s*\((?P<org>[^)]+)\)\s*is\s*not\s*present\s*in\s*table\s*"?"?lexicon_term"?"?\.',
re.IGNORECASE,
)
_value_error_prefix_re = re.compile(r"^\s*value\s*error\s*[,:\-]\s*", re.IGNORECASE)
def clean_error(msg: str) -> str:
"""Normalize error messages per requested rules."""
if not msg:
return msg
out = msg
out = _row_id_re.sub("", out)
m = _sensor_type_re.search(out)
if m:
stype = m.group("stype").strip()
out = _sensor_type_re.sub(f"Invalid sensor_type: {stype}", out)
m = _org_missing_re.search(out)
if m:
org = m.group("org").strip()
out = _org_missing_re.sub(f"Invalid organization: {org}", out)
out = _value_error_prefix_re.sub("", out)
# remove straight & smart double quotes
out = out.replace('"', '').replace('“', '').replace('”', '')
# tidy whitespace / trailing punctuation
out = re.sub(r"\s{2,}", " ", out).strip()
out = re.sub(r"[\s\|,]+$", "", out)
return out
def parse_amp_rows(path: Path):
"""
Return list of rows: [NMAquifer_Table.Field, PointID, Error]
Scans all header-delimited blocks; supports extra error columns.
"""
if not path.exists():
sys.exit(f"File not found: {path}")
with open(path, "r", encoding="utf-8-sig", errors="replace") as f:
lines = [ln.rstrip("\n") for ln in f]
rows_out = []
n, i = len(lines), 0
while i < n:
line = lines[i].strip()
if line and canon(line) == HEADER_CANON:
i += 1
while i < n:
raw = lines[i]
s = raw.strip()
if not s:
break
if canon(s) == HEADER_CANON:
break
parts = [p.strip() for p in raw.split("|")]
if len(parts) >= 4:
point_id = parts[0]
table = parts[1]
field = parts[2]
error = parts[3]
extra = " | ".join(parts[4:]).strip() if len(parts) > 4 else ""
combined_error = error if not extra else f"{error} | {extra}"
combined_error = clean_error(combined_error)
nm_tf = f"{table}.{field}" if field else table
rows_out.append([nm_tf, point_id, combined_error])
i += 1
continue
i += 1
return rows_out
def ensure_tab(service, spreadsheet_id: str, tab_name: str):
meta = service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute()
if tab_name not in [s["properties"]["title"] for s in meta.get("sheets", [])]:
service.spreadsheets().batchUpdate(
spreadsheetId=spreadsheet_id,
body={"requests": [{"addSheet": {"properties": {"title": tab_name}}}]},
).execute()
def load_existing_set(service, spreadsheet_id: str, tab_name: str):
"""Load existing A:C rows into a set of tuples for duplicate detection."""
resp = service.spreadsheets().values().get(
spreadsheetId=spreadsheet_id,
range=f"'{tab_name}'!A1:C"
).execute()
values = resp.get("values", [])
if not values:
return set(), 0
# skip header if present
start = 1 if values and [v.strip().lower() for v in values[0]] == \
["nmaquifer_table.field", "pointid", "error"] else 0
existing = set()
for row in values[start:]:
a = row[0].strip() if len(row) > 0 else ""
b = row[1].strip() if len(row) > 1 else ""
c = row[2].strip() if len(row) > 2 else ""
if a or b or c:
existing.add((a, b, c))
return existing, len(values)
def main():
service = get_sheets_service(SERVICE_ACCOUNT_FILE)
ensure_tab(service, SPREADSHEET_ID, SHEET_NAME)
# 1) Parse new rows from transfer metrics
new_rows = parse_amp_rows(Path(TRANSFER_METRICS_PATH))
# Deduplicate within incoming batch
seen_batch = set()
unique_new = []
for r in new_rows:
key = (r[0].strip(), r[1].strip(), r[2].strip())
if key not in seen_batch:
seen_batch.add(key)
unique_new.append(r)
# 2) Load existing rows from A:C to avoid duplicates
existing_set, current_height = load_existing_set(service, SPREADSHEET_ID, SHEET_NAME)
# 3) Filter to only truly new rows
to_append = [r for r in unique_new if (r[0].strip(), r[1].strip(), r[2].strip()) not in existing_set]
if not to_append:
print("[info] No new rows to append. Nothing changed.")
return
# 4) If sheet is empty, include header first; otherwise just append rows
# Check if there's any data at all (height == 0 or 1 with no header)
need_header = (current_height == 0) or (current_height == 1 and len(existing_set) == 0)
values = []
if need_header:
values.append(["NMAquifer_Table.Field", "PointID", "Error"])
values.extend(to_append)
# 5) Append to A:C only (no overwrite of other columns)
service.spreadsheets().values().append(
spreadsheetId=SPREADSHEET_ID,
range=f"'{SHEET_NAME}'!A:C",
valueInputOption="RAW",
insertDataOption="INSERT_ROWS",
body={"values": values}
).execute()
# --- write last-run timestamp (single cell, non-intrusive) ---
ts = datetime.now(timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S %Z")
service.spreadsheets().values().update(
spreadsheetId=SPREADSHEET_ID,
range=f"'{SHEET_NAME}'!{UPDATED_CELL}",
valueInputOption="RAW",
body={"values": [[f"{UPDATED_PREFIX} {ts}"]]}
).execute()
print(f"[done] Appended {len(to_append)} new row(s) to {SHEET_NAME}!A:C (kept existing content).")
if __name__ == "__main__":
main()