Skip to content

Commit ca3cfed

Browse files
authored
Merge pull request #232 from CodeForPhilly/table_for_file
Uses DB for storing Last Execution stats instead of file
2 parents 9a7a2ec + 4691e4d commit ca3cfed

File tree

6 files changed

+135
-27
lines changed

6 files changed

+135
-27
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
"""key/value table
2+
3+
Revision ID: 05e0693f8cbb
4+
Revises: 6b8cf99be000
5+
Create Date: 2021-03-18 11:35:43.512082
6+
7+
"""
8+
from alembic import op
9+
import sqlalchemy as sa
10+
11+
12+
# revision identifiers, used by Alembic.
13+
revision = '05e0693f8cbb'
14+
down_revision = '6b8cf99be000'
15+
branch_labels = None
16+
depends_on = None
17+
18+
19+
def upgrade():
20+
op.create_table(
21+
'kv_unique',
22+
sa.Column('_id', sa.Integer, primary_key=True),
23+
sa.Column('keycol', sa.String(50), nullable=False, unique=True),
24+
sa.Column('valcol', sa.String(65536), nullable=True),
25+
)
26+
27+
# op.create_index('kvk_ix', 'kv_unique', ['key'], unique=True)
28+
29+
30+
def downgrade():
31+
pass

src/server/api/admin_api.py

Lines changed: 52 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44
from datetime import datetime
55
import json
66
from sqlalchemy.sql import text
7+
8+
from sqlalchemy.dialects.postgresql import insert
9+
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey, exc, select
710
from pipeline import flow_script
811
from config import engine
912
from flask import request, redirect, jsonify, current_app, abort
@@ -16,10 +19,14 @@
1619

1720
ALLOWED_EXTENSIONS = {"csv", "xlsx"}
1821

22+
metadata = MetaData()
1923

2024
def __allowed_file(filename):
2125
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
2226

27+
kvt = Table("kv_unique", metadata, autoload=True, autoload_with=engine)
28+
29+
2330

2431
# file upload tutorial
2532
@admin_api.route("/api/file", methods=["POST"])
@@ -62,15 +69,32 @@ def execute():
6269
statistics = get_statistics()
6370

6471
last_execution_details = {"executionTime": current_time, "stats": statistics}
72+
last_ex_json = (json.dumps(last_execution_details))
6573

66-
last_execution_file = open(LOGS_PATH + "last_execution.json", "w")
67-
last_execution_file.write(json.dumps(last_execution_details))
68-
last_execution_file.close()
74+
# Write Last Execution stats to DB
75+
# See Alembic Revision ID: 05e0693f8cbb for table definition
76+
with engine.connect() as connection:
77+
ins_stmt = insert(kvt).values( # Postgres-specific insert() supporting ON CONFLICT
78+
keycol = 'last_execution_time',
79+
valcol = last_ex_json,
80+
)
81+
# If key already present in DB, do update instead
82+
upsert = ins_stmt.on_conflict_do_update(
83+
constraint='kv_unique_keycol_key',
84+
set_=dict(valcol=last_ex_json)
85+
)
86+
87+
try:
88+
connection.execute(upsert)
89+
except Exception as e:
90+
current_app.logger.error("Insert/Update failed on Last Execution stats")
91+
current_app.logger.exception(e)
6992

7093
return jsonify(success=True)
7194

7295

7396
def get_statistics():
97+
7498
with engine.connect() as connection:
7599
query_matches = text("SELECT count(*) FROM (SELECT distinct matching_id from pdp_contacts) as a;")
76100
query_total_count = text("SELECT count(*) FROM pdp_contacts;")
@@ -88,26 +112,36 @@ def get_statistics():
88112

89113
@admin_api.route("/api/statistics", methods=["GET"])
90114
def list_statistics():
91-
try:
92-
last_execution_file = open(LOGS_PATH + "last_execution.json", "r")
93-
last_execution_details = json.loads(last_execution_file.read())
94-
last_execution_file.close()
115+
""" Pull Last Execution stats from DB. """
116+
current_app.logger.info("list_statistics() request")
117+
last_execution_details = '{}' # Empty but valid JSON
95118

96-
except (FileNotFoundError):
97-
current_app.logger.error("last_execution.json file was missing")
98-
return abort(500)
119+
try: # See Alembic Revision ID: 05e0693f8cbb for table definition
120+
with engine.connect() as connection:
121+
s = text("select valcol from kv_unique where keycol = 'last_execution_time';")
122+
result = connection.execute(s)
123+
last_execution_details = result.fetchone()[0]
99124

100-
except (json.JSONDecodeError):
101-
current_app.logger.error(
102-
"last_execution.json could not be decoded - possible corruption"
103-
)
104-
return abort(500)
105125

106126
except Exception as e:
107-
current_app.logger.error("Failure reading last_execution.json: ", e)
108-
return abort(500)
127+
current_app.logger.error("Failure reading Last Execution stats from DB")
128+
# return abort(500) # Weird but not worth a 500
129+
130+
return last_execution_details
131+
132+
133+
@admin_api.route("/api/get_execution_status/<int:job_id>", methods=["GET"])
134+
def get_exec_status(job_id):
135+
kvt = Table("kv_unique", metadata, autoload=True, autoload_with=engine)
136+
with engine.connect() as connection:
137+
s_jobid = 'job-' + str(job_id)
138+
s = text("select valcol from kv_unique where keycol = :j ;")
139+
s = s.bindparams(j=s_jobid)
140+
result = connection.execute(s)
141+
exec_status = result.fetchone()[0]
142+
143+
return exec_status
109144

110-
return jsonify(last_execution_details)
111145

112146

113147
"""

src/server/app.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
app.secret_key = APP_SECRET_KEY
1717
app.config["MAX_CONTENT_LENGTH"] = 500 * 1024 * 1024 # 500 Megs
1818
app.config["SEND_FILE_MAX_AGE_DEFAULT"] = 0
19+
1920
from api.admin_api import admin_api
2021
from api.common_api import common_api
2122
from api.user_api import user_api
@@ -24,8 +25,8 @@
2425
app.register_blueprint(common_api)
2526
app.register_blueprint(user_api)
2627

27-
app.logger.setLevel('INFO') # By default, Docker appears to set at INFO but VSCode at WARNING
2828

29+
app.logger.setLevel('INFO') # By default, Docker appears to set at INFO but VSCode at WARNING
2930

3031
# init_db_schema.start(connection)
3132

src/server/config.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -76,10 +76,3 @@
7676
os.makedirs(CURRENT_SOURCE_FILES_PATH, exist_ok=True)
7777
os.makedirs(REPORT_PATH, exist_ok=True)
7878
os.makedirs(ZIPPED_FILES, exist_ok=True)
79-
80-
if not (os.path.exists(LOGS_PATH + "last_execution.json")):
81-
f = open(
82-
LOGS_PATH + "last_execution.json", "w"
83-
) # Prevent 500 error from /api/statistics
84-
f.write("{}")
85-
f.close()

src/server/pipeline/log_db.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
from datetime import datetime
2+
import json
3+
from sqlalchemy.sql import text
4+
from flask import current_app
5+
6+
from sqlalchemy.dialects.postgresql import insert
7+
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey, exc, select
8+
9+
from config import engine
10+
11+
12+
metadata = MetaData()
13+
14+
kvt = Table("kv_unique", metadata, autoload=True, autoload_with=engine)
15+
16+
17+
18+
def log_exec_status(job_id: str, job_status: dict):
19+
20+
# Write Last Execution stats to DB
21+
# See Alembic Revision ID: 05e0693f8cbb for table definition
22+
with engine.connect() as connection:
23+
ins_stmt = insert(kvt).values( # Postgres-specific insert() supporting ON CONFLICT
24+
keycol = 'job-' + job_id,
25+
valcol = json.dumps(job_status)
26+
)
27+
28+
# If key already present in DB, do update instead
29+
upsert = ins_stmt.on_conflict_do_update(
30+
constraint='kv_unique_keycol_key',
31+
set_=dict(valcol=json.dumps(job_status))
32+
)
33+
34+
try:
35+
connection.execute(upsert)
36+
except Exception as e:
37+
current_app.logger.error("Insert/Update failed Execution status")
38+
current_app.logger.exception(e)
39+
40+

src/server/pipeline/match_data.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
import datetime
1+
import datetime, time
22
import pandas as pd
33
import numpy as np
44

55
from flask import current_app
6+
from pipeline import log_db
7+
68

79

810
def start(connection, added_or_updated_rows):
@@ -12,6 +14,10 @@ def start(connection, added_or_updated_rows):
1214
current_app.logger.info('Start record matching')
1315
# Will need to consider updating the existing row contents (filter by active), deactivate,
1416
# try to match, and merge previous matching groups if applicable
17+
18+
job_id = str(int(time.time()))
19+
log_db.log_exec_status(job_id,{'status': 'starting', 'at_row': 0, 'of_rows':0})
20+
current_app.logger.info("Running execute job ID " + job_id)
1521
items_to_update = pd.concat([added_or_updated_rows["new"], added_or_updated_rows["updated"]], ignore_index=True)
1622
pdp_contacts = pd.read_sql_table('pdp_contacts', connection)
1723

@@ -32,6 +38,7 @@ def start(connection, added_or_updated_rows):
3238
current_app.logger.info("- Matching rows {}-{} of {}".format(
3339
row_num+1, min(len(rows), row_num+row_print_freq), len(rows))
3440
)
41+
log_db.log_exec_status(job_id,{'status': 'executing', 'at_row': row_num+1, 'of_rows':len(rows)})
3542

3643
# Exact matches based on specified columns
3744
row_matches = pdp_contacts[
@@ -59,3 +66,5 @@ def start(connection, added_or_updated_rows):
5966
current_app.logger.info("- Writing data to pdp_contacts table")
6067
items_to_update.to_sql('pdp_contacts', connection, index=False, if_exists='append')
6168
current_app.logger.info("- Finished load to pdp_contacts table")
69+
70+
log_db.log_exec_status(job_id,{'status': 'complete', 'at_row': len(rows), 'of_rows':len(rows)})

0 commit comments

Comments
 (0)