Skip to content

Commit 9d58f84

Browse files
committed
Merge branch 'table_for_file' of https://github.com/CodeForPhilly/paws-data-pipeline into execute-adjustments
� Conflicts: � src/server/api/admin_api.py
2 parents a350cf3 + 644e2e7 commit 9d58f84

File tree

2 files changed

+75
-40
lines changed

2 files changed

+75
-40
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
"""key/value table
2+
3+
Revision ID: 05e0693f8cbb
4+
Revises: 6b8cf99be000
5+
Create Date: 2021-03-18 11:35:43.512082
6+
7+
"""
8+
from alembic import op
9+
import sqlalchemy as sa
10+
11+
12+
# revision identifiers, used by Alembic.
13+
revision = '05e0693f8cbb'
14+
down_revision = '6b8cf99be000'
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade():
20+
op.create_table(
21+
'kv_unique',
22+
sa.Column('_id', sa.Integer, primary_key=True),
23+
sa.Column('keycol', sa.String(50), nullable=False, unique=True),
24+
sa.Column('valcol', sa.String(65536), nullable=True),
25+
)
26+
27+
# op.create_index('kvk_ix', 'kv_unique', ['key'], unique=True)
28+
29+
30+
def downgrade():
31+
pass

src/server/api/admin_api.py

Lines changed: 44 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
from api.api import admin_api
2+
import shutil
23
import os
34
from datetime import datetime
45
import json
56
from sqlalchemy.sql import text
7+
8+
from sqlalchemy.dialects.postgresql import insert
9+
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey, exc, select
610
from pipeline import flow_script
711
from config import engine
812
from flask import request, redirect, jsonify, current_app, abort
@@ -15,10 +19,14 @@
1519

1620
ALLOWED_EXTENSIONS = {"csv", "xlsx"}
1721

22+
metadata = MetaData()
1823

1924
def __allowed_file(filename):
2025
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
2126

27+
kvt = Table("kv_unique", metadata, autoload=True, autoload_with=engine)
28+
29+
2230

2331
# file upload tutorial
2432
@admin_api.route("/api/file", methods=["POST"])
@@ -36,7 +44,7 @@ def uploadCSV():
3644
finally:
3745
file.close()
3846

39-
return jsonify({"success": "uploaded file"});
47+
return redirect("/")
4048

4149

4250
@admin_api.route("/api/listCurrentFiles", methods=["GET"])
@@ -55,38 +63,38 @@ def list_current_files():
5563
@admin_api.route("/api/execute", methods=["GET"])
5664
def execute():
5765
current_app.logger.info("Execute flow")
66+
flow_script.start_flow()
5867

59-
try:
60-
last_execution_file = open(LOGS_PATH + "last_execution.json", "r")
61-
last_execution_details = json.loads(last_execution_file.read())
62-
63-
if last_execution_details != "Running":
64-
last_execution_file = open(LOGS_PATH + "last_execution.json", "w")
65-
last_execution_file.write(json.dumps("Running"))
66-
last_execution_file.close()
67-
68-
flow_script.start_flow()
69-
70-
statistics = get_statistics()
68+
current_time = datetime.now().ctime()
69+
statistics = get_statistics()
7170

72-
last_execution_details = {"stats": statistics}
71+
last_execution_details = {"executionTime": current_time, "stats": statistics}
72+
last_ex_json = (json.dumps(last_execution_details))
7373

74-
except Exception as e:
75-
last_execution_details = {"stats": {"Execution Error": str(e)}}
76-
return abort(500)
77-
78-
finally:
79-
current_time = datetime.now().ctime()
80-
81-
last_execution_details["executionTime"] = current_time
82-
last_execution_file = open(LOGS_PATH + "last_execution.json", "w")
83-
last_execution_file.write(json.dumps(last_execution_details))
84-
last_execution_file.close()
74+
# Write Last Execution stats to DB
75+
# See Alembic Revision ID: 05e0693f8cbb for table definition
76+
with engine.connect() as connection:
77+
ins_stmt = insert(kvt).values( # Postgres-specific insert() supporting ON CONFLICT
78+
keycol = 'last_execution_time',
79+
valcol = last_ex_json,
80+
)
81+
# If key already present in DB, do update instead
82+
upsert = ins_stmt.on_conflict_do_update(
83+
constraint='kv_unique_keycol_key',
84+
set_=dict(valcol=last_ex_json)
85+
)
86+
87+
try:
88+
connection.execute(upsert)
89+
except Exception as e:
90+
current_app.logger.error("Insert/Update failed on Last Execution stats")
91+
current_app.logger.exception(e)
8592

8693
return jsonify(success=True)
8794

8895

8996
def get_statistics():
97+
9098
with engine.connect() as connection:
9199
query_matches = text("SELECT count(*) FROM (SELECT distinct matching_id from pdp_contacts) as a;")
92100
query_total_count = text("SELECT count(*) FROM pdp_contacts;")
@@ -104,26 +112,22 @@ def get_statistics():
104112

105113
@admin_api.route("/api/statistics", methods=["GET"])
106114
def list_statistics():
107-
try:
108-
last_execution_file = open(LOGS_PATH + "last_execution.json", "r")
109-
last_execution_details = json.loads(last_execution_file.read())
110-
last_execution_file.close()
115+
""" Pull Last Execution stats from DB. """
116+
current_app.logger.info("list_statistics() request")
117+
last_execution_details = '{}' # Empty but valid JSON
111118

112-
except (FileNotFoundError):
113-
current_app.logger.error("last_execution.json file was missing")
114-
return abort(500)
119+
try: # See Alembic Revision ID: 05e0693f8cbb for table definition
120+
with engine.connect() as connection:
121+
s = text("select valcol from kv_unique where keycol = 'last_execution_time';")
122+
result = connection.execute(s)
123+
last_execution_details = result.fetchone()[0]
115124

116-
except (json.JSONDecodeError):
117-
current_app.logger.error(
118-
"last_execution.json could not be decoded - possible corruption"
119-
)
120-
return abort(500)
121125

122126
except Exception as e:
123-
current_app.logger.error("Failure reading last_execution.json: ", e)
124-
return abort(500)
127+
current_app.logger.error("Failure reading Last Execution stats from DB")
128+
# return abort(500) # Weird but not worth a 500
125129

126-
return jsonify(last_execution_details)
130+
return last_execution_details
127131

128132

129133
"""

0 commit comments

Comments
 (0)