Skip to content

Commit 53d9f90

Browse files
authored
Merge pull request #332 from CodeForPhilly/313-long-exec
313 long exec
2 parents c3a1bb9 + 853fc43 commit 53d9f90

File tree

11 files changed

+254
-95
lines changed

11 files changed

+254
-95
lines changed

documentation/documentation-images/exec_status.svg

Lines changed: 3 additions & 0 deletions
Loading

src/server/alembic/versions/05e0693f8cbb_key_value_table.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,4 @@ def upgrade():
2828

2929

3030
def downgrade():
31-
pass
31+
op.drop_table('kv_unique')

src/server/alembic/versions/36c4ecbfd11a_add_pdp_users_full_name.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,4 @@ def upgrade():
2121

2222

2323
def downgrade():
24-
pass
24+
op.drop_column("pdp_users", "full_name")

src/server/alembic/versions/6b8cf99be000_add_user_journal_table.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,4 @@ def upgrade():
2929

3030

3131
def downgrade():
32-
pass
32+
op.drop_table('pdp_user_journal')
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
"""create execution status table
2+
3+
Revision ID: bfb1262d3195
4+
Revises: 05e0693f8cbb
5+
Create Date: 2021-05-28 16:12:40.561829
6+
7+
"""
8+
from alembic import op
9+
import sqlalchemy as sa
10+
from sqlalchemy.sql.sqltypes import Integer
11+
from sqlalchemy.sql import func
12+
13+
# revision identifiers, used by Alembic.
14+
revision = 'bfb1262d3195'
15+
down_revision = '05e0693f8cbb'
16+
branch_labels = None
17+
depends_on = None
18+
19+
20+
def upgrade():
21+
op.create_table (
22+
"execution_status",
23+
sa.Column("_id", sa.Integer, primary_key=True),
24+
sa.Column("job_id", sa.Integer, nullable=False),
25+
sa.Column("stage", sa.String(32), nullable=False),
26+
sa.Column("status", sa.String(32), nullable=False),
27+
sa.Column("details", sa.String(128), nullable=False),
28+
sa.Column("update_stamp", sa.DateTime, nullable=False, server_default=func.now())
29+
)
30+
31+
op.execute("""CREATE FUNCTION last_upd_trig() RETURNS trigger
32+
LANGUAGE plpgsql AS
33+
$$BEGIN
34+
NEW.update_stamp := current_timestamp;
35+
RETURN NEW;
36+
END;$$;""")
37+
38+
op.execute("""CREATE TRIGGER last_upd_trigger
39+
BEFORE INSERT OR UPDATE ON execution_status
40+
FOR EACH ROW
41+
EXECUTE PROCEDURE last_upd_trig();"""
42+
) # Postgres-specific, obviously
43+
44+
op.create_unique_constraint("uq_job_id", "execution_status", ["job_id"])
45+
46+
def downgrade():
47+
op.drop_table("execution_status")
48+
op.execute("DROP FUNCTION last_upd_trig()")

src/server/api/.optic/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
2+
captures/

src/server/api/admin_api.py

Lines changed: 95 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from api.api import admin_api
22
import os
3+
import time
34
from datetime import datetime
45
import json
56
from sqlalchemy.sql import text
@@ -58,8 +59,11 @@ def list_current_files():
5859
@jwt_ops.admin_required
5960
def execute():
6061
current_app.logger.info("Execute flow")
61-
flow_script.start_flow()
62+
job_outcome = flow_script.start_flow() # 'busy', 'completed', or 'nothing to do'
63+
current_app.logger.info("Job outcome: " + str(job_outcome))
6264

65+
66+
# -------- Skip update if 'busy' or 'nothing to do' as nothing changed ? ------
6367
current_time = datetime.now().ctime()
6468
statistics = get_statistics()
6569

@@ -87,8 +91,19 @@ def execute():
8791
except Exception as e:
8892
current_app.logger.error("Insert/Update failed on Last Execution stats")
8993
current_app.logger.exception(e)
94+
# -------------------------------------------------------------------------------
95+
96+
if job_outcome == 'busy':
97+
return jsonify({'outcome' : 'Already analyzing'}), 503
98+
99+
elif job_outcome == 'nothing to do':
100+
return jsonify({'outcome' : 'No uploaded files to process'}), 200
101+
102+
elif job_outcome == 'completed' :
103+
return jsonify({'outcome' : 'Analysis completed'}), 200
90104

91-
return jsonify(success=True)
105+
else:
106+
return jsonify({'outcome' : 'Unknown status: ' + str(job_outcome)}), 200
92107

93108

94109
def get_statistics():
@@ -134,41 +149,96 @@ def list_statistics():
134149
return last_execution_details
135150

136151

137-
@admin_api.route("/api/get_execution_status/<int:job_id>", methods=["GET"])
152+
@admin_api.route("/api/get_execution_status", methods=["GET"])
138153
@jwt_ops.admin_required
139-
def get_exec_status(job_id):
140-
""" Get the execution status record from the DB for the specified job_id """
154+
def get_exec_status():
155+
""" Get the execution status record from the DB for a running job, if present"""
141156

142157

143158
engine.dispose() # we don't want other process's conn pool
144159

145160
with engine.connect() as connection:
161+
q = text("""SELECT job_id, stage, status, details, update_stamp
162+
FROM execution_status
163+
WHERE status = 'executing' """)
164+
result = connection.execute(q)
146165

147-
s_jobid = 'job-' + str(job_id)
148-
s = text("select valcol from kv_unique where keycol = :j ;")
149-
s = s.bindparams(j=s_jobid)
150-
result = connection.execute(s)
151166
if result.rowcount > 0:
152-
exec_status = result.fetchone()[0]
167+
running_job = result.fetchone()
168+
return jsonify(dict(zip(result.keys(), running_job)))
153169
else:
154-
current_app.logger.warning("0 results for exec status query")
155-
exec_status = '{}'
170+
return jsonify('')
156171

157-
return exec_status
172+
@admin_api.route("/api/job_in_progress", methods=["GET"])
173+
@jwt_ops.admin_required
174+
def is_job_in_progresss():
175+
"""Return True if there's a running execute, False if not. """
158176

177+
engine.dispose() # we don't want other process's conn pool
159178

179+
with engine.connect() as connection:
180+
q = text("""SELECT job_id from execution_status WHERE status = 'executing' """)
181+
result = connection.execute(q)
160182

183+
if result.rowcount > 0:
184+
return jsonify(True)
185+
else:
186+
return jsonify(False)
161187

162-
"""
163-
@admin_api.route('/api/status', methods=['GET'])
164-
def checkStatus():
165-
with engine.connect() as connection:
166-
query = text("SELECT now()")
167-
query_result = connection.execute(query)
168188

169-
# Need to iterate over the results proxy
170-
results = {}
171-
for row in query_result:
172-
results = dict(row)
173-
return jsonify(results)
174-
"""
189+
def start_job():
190+
"""If no running jobs, create a job_id and execution status entry.
191+
This ensures only only one job runs at a time.
192+
If there's a running job, return None. """
193+
194+
195+
engine.dispose() # we don't want other process's conn pool
196+
197+
job_id = str(int(time.time()))
198+
q = text("""SELECT job_id from execution_status
199+
WHERE status = 'executing' """)
200+
201+
i = text("""INSERT INTO execution_status (job_id, stage, status, details)
202+
values(:j, :stg, :stat, :det) """)
203+
i = i.bindparams(j = job_id,
204+
stg ='initiating',
205+
stat ='executing',
206+
det = '' )
207+
208+
running_job = None
209+
210+
with engine.begin() as connection: # BEGIN TRANSACTION
211+
q_result = connection.execute(q)
212+
if q_result.rowcount == 0:
213+
# No running jobs
214+
ins_result = connection.execute(i)
215+
else:
216+
running_job = q_result.fetchone()[0]
217+
# COMMIT TRANSACTION
218+
#TODO: what would an exception look like here?
219+
220+
221+
if running_job :
222+
# There was a running job already
223+
current_app.logger.info("Request to start job, but job_id " + str(running_job) + " already executing")
224+
return None
225+
else:
226+
current_app.logger.info("Assigned job_id " + job_id )
227+
return job_id
228+
229+
230+
231+
232+
# """
233+
# @admin_api.route('/api/status', methods=['GET'])
234+
# def checkStatus():
235+
# with engine.connect() as connection:
236+
# query = text("SELECT now()")
237+
# query_result = connection.execute(query)
238+
239+
# # Need to iterate over the results proxy
240+
# results = {}
241+
# for row in query_result:
242+
# results = dict(row)
243+
# return jsonify(results)
244+
# """

src/server/pipeline/clean_and_load_data.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from flask import current_app
99
import sqlalchemy
1010
from config import CURRENT_SOURCE_FILES_PATH
11-
11+
from pipeline import log_db
1212

1313
def start(connection, pdp_contacts_df, file_path_list):
1414
result = pd.DataFrame(columns=pdp_contacts_df.columns)

src/server/pipeline/flow_script.py

Lines changed: 74 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,86 @@
11
import os
2+
23
import pandas as pd
34
from flask import current_app
4-
from pipeline import calssify_new_data, clean_and_load_data, archive_rows, match_data
5+
from api import admin_api
6+
from pipeline import calssify_new_data, clean_and_load_data, archive_rows, match_data, log_db
57
from config import CURRENT_SOURCE_FILES_PATH
68
from config import engine
79
from models import Base
810

911

1012
def start_flow():
11-
file_path_list = os.listdir(CURRENT_SOURCE_FILES_PATH)
12-
13-
if file_path_list:
14-
with engine.connect() as connection:
15-
Base.metadata.create_all(connection)
16-
17-
# Get previous version of pdp_contacts table, which is used later to classify new records
18-
pdp_contacts_df = pd.read_sql_table('pdp_contacts', connection)
19-
pdp_contacts_df = pdp_contacts_df[pdp_contacts_df["archived_date"].isnull()]
20-
pdp_contacts_df = pdp_contacts_df.drop(columns=['archived_date', 'created_date', '_id', 'matching_id'])
21-
22-
current_app.logger.info('Loaded {} records from pdp_contacts table'.format(pdp_contacts_df.shape[0]))
23-
24-
# Clean the input data and normalize/rename columns
25-
# Populate new records in secondary tables (donations, volunteer shifts)
26-
# input - existing files in path
27-
# output - normalized object of all entries, as well as the input json rows for primary sources
28-
normalized_data, source_json, manual_matches_df = clean_and_load_data.start(connection, pdp_contacts_df, file_path_list)
29-
30-
# Standardize column data types via postgres (e.g. reading a csv column as int vs. str)
31-
# (If additional inconsistencies are encountered, may need to enforce the schema of
32-
# the contacts loader by initializing it from pdp_contacts.)
33-
normalized_data.to_sql('_temp_pdp_contacts_loader', connection, index=False, if_exists='replace')
34-
normalized_data = pd.read_sql_table('_temp_pdp_contacts_loader', connection)
35-
36-
# Classifies rows to old rows that haven't changed, updated rows and new rows - compared to the existing state of the DB
37-
rows_classified = calssify_new_data.start(pdp_contacts_df, normalized_data)
38-
39-
# Archives rows the were updated in the current state of the DB (changes their archived_date to now)
40-
archive_rows.archive(connection, rows_classified["updated"])
41-
42-
# Match new+updated records against previous version of pdp_contacts database, and
43-
# write these rows to the database.
44-
match_data.start(connection, rows_classified, manual_matches_df)
45-
46-
# Copy raw input rows to json fields in pdp_contacts,
47-
# using a temporary table to simplify the update code.
48-
current_app.logger.info('Saving json of original rows to pdp_contacts')
49-
source_json.to_sql('_temp_pdp_contacts_loader', connection, index=False, if_exists='replace')
50-
# https://www.postgresql.org/docs/8.4/sql-update.html
51-
connection.execute('''
52-
UPDATE pdp_contacts pdp
53-
SET json = to_json(temp.json)
54-
FROM _temp_pdp_contacts_loader temp
55-
WHERE
56-
pdp.source_type = temp.source_type AND
57-
pdp.source_id = temp.source_id AND
58-
pdp.archived_date IS NULL
59-
''')
13+
14+
job_id = admin_api.start_job()
15+
16+
if (not job_id):
17+
current_app.logger.info('Failed to get job_id')
18+
job_outcome = 'busy'
19+
20+
else:
21+
log_db.log_exec_status(job_id, 'start_flow', 'executing', '')
22+
23+
file_path_list = os.listdir(CURRENT_SOURCE_FILES_PATH)
24+
25+
26+
27+
if file_path_list:
28+
with engine.connect() as connection:
29+
Base.metadata.create_all(connection)
30+
31+
# Get previous version of pdp_contacts table, which is used later to classify new records
32+
pdp_contacts_df = pd.read_sql_table('pdp_contacts', connection)
33+
pdp_contacts_df = pdp_contacts_df[pdp_contacts_df["archived_date"].isnull()]
34+
pdp_contacts_df = pdp_contacts_df.drop(columns=['archived_date', 'created_date', '_id', 'matching_id'])
35+
36+
current_app.logger.info('Loaded {} records from pdp_contacts table'.format(pdp_contacts_df.shape[0]))
37+
38+
# Clean the input data and normalize/rename columns
39+
# Populate new records in secondary tables (donations, volunteer shifts)
40+
# input - existing files in path
41+
# output - normalized object of all entries, as well as the input json rows for primary sources
42+
log_db.log_exec_status(job_id, 'clean_and_load', 'executing', '')
43+
normalized_data, source_json, manual_matches_df = clean_and_load_data.start(connection, pdp_contacts_df, file_path_list)
44+
45+
# Standardize column data types via postgres (e.g. reading a csv column as int vs. str)
46+
# (If additional inconsistencies are encountered, may need to enforce the schema of
47+
# the contacts loader by initializing it from pdp_contacts.)
48+
normalized_data.to_sql('_temp_pdp_contacts_loader', connection, index=False, if_exists='replace')
49+
normalized_data = pd.read_sql_table('_temp_pdp_contacts_loader', connection)
50+
51+
# Classifies rows to old rows that haven't changed, updated rows and new rows - compared to the existing state of the DB
52+
log_db.log_exec_status(job_id, 'classify', 'executing', '')
53+
rows_classified = calssify_new_data.start(pdp_contacts_df, normalized_data)
54+
55+
# Archives rows the were updated in the current state of the DB (changes their archived_date to now)
56+
archive_rows.archive(connection, rows_classified["updated"])
57+
58+
# Match new+updated records against previous version of pdp_contacts database, and
59+
# write these rows to the database.
60+
match_data.start(connection, rows_classified, manual_matches_df, job_id)
61+
62+
# Copy raw input rows to json fields in pdp_contacts,
63+
# using a temporary table to simplify the update code.
64+
current_app.logger.info('Saving json of original rows to pdp_contacts')
65+
source_json.to_sql('_temp_pdp_contacts_loader', connection, index=False, if_exists='replace')
66+
# https://www.postgresql.org/docs/8.4/sql-update.html
67+
connection.execute('''
68+
UPDATE pdp_contacts pdp
69+
SET json = to_json(temp.json)
70+
FROM _temp_pdp_contacts_loader temp
71+
WHERE
72+
pdp.source_type = temp.source_type AND
73+
pdp.source_id = temp.source_id AND
74+
pdp.archived_date IS NULL
75+
''')
6076

6177
current_app.logger.info('Finished flow script run')
78+
job_outcome = 'completed'
79+
80+
else: # No files in list
81+
current_app.logger.info('No files to process')
82+
job_outcome = 'nothing to do'
83+
84+
log_db.log_exec_status(job_id, 'flow', 'complete', '' )
6285

86+
return job_outcome

0 commit comments

Comments
 (0)