Skip to content

Commit b5f1e3c

Browse files
committed
Ensures only single execute can run
flow_script requests job_id, bails if not available because there's a job already running.
1 parent 3c56d62 commit b5f1e3c

File tree

2 files changed

+120
-71
lines changed

2 files changed

+120
-71
lines changed

src/server/api/admin_api.py

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from api.api import admin_api
22
import os
3+
import time
34
from datetime import datetime
45
import json
56
from sqlalchemy.sql import text
@@ -156,19 +157,59 @@ def get_exec_status(job_id):
156157

157158
return exec_status
158159

160+
def start_job():
161+
"""If no running jobs, create a job_id and execution status entry.
162+
This ensures only only one job runs at a time.
163+
If there's a running job, return None. """
159164

160165

166+
engine.dispose() # we don't want other process's conn pool
161167

162-
"""
163-
@admin_api.route('/api/status', methods=['GET'])
164-
def checkStatus():
165-
with engine.connect() as connection:
166-
query = text("SELECT now()")
167-
query_result = connection.execute(query)
168+
job_id = str(int(time.time()))
169+
q = text("""SELECT job_id from execution_status
170+
WHERE status = 'executing' """)
168171

169-
# Need to iterate over the results proxy
170-
results = {}
171-
for row in query_result:
172-
results = dict(row)
173-
return jsonify(results)
174-
"""
172+
i = text("""INSERT INTO execution_status (job_id, stage, status, details)
173+
values(:j, :stg, :stat, :det) """)
174+
i = i.bindparams(j = job_id,
175+
stg ='initiating',
176+
stat ='executing',
177+
det = '' )
178+
179+
running_job = None
180+
181+
with engine.begin() as connection: # BEGIN TRANSACTION
182+
q_result = connection.execute(q)
183+
if q_result.rowcount == 0:
184+
# No running jobs
185+
ins_result = connection.execute(i)
186+
else:
187+
running_job = q_result.fetchone()[0]
188+
# COMMIT TRANSACTION
189+
#TODO: what would an exception look like here?
190+
191+
192+
if running_job :
193+
# There was a running job already
194+
current_app.logger.info("Request to start job, but job_id " + str(running_job) + " already executing")
195+
return None
196+
else:
197+
current_app.logger.info("Assigned job_id " + job_id )
198+
return job_id
199+
200+
201+
202+
203+
# """
204+
# @admin_api.route('/api/status', methods=['GET'])
205+
# def checkStatus():
206+
# with engine.connect() as connection:
207+
# query = text("SELECT now()")
208+
# query_result = connection.execute(query)
209+
210+
# # Need to iterate over the results proxy
211+
# results = {}
212+
# for row in query_result:
213+
# results = dict(row)
214+
# return jsonify(results)
215+
# """

src/server/pipeline/flow_script.py

Lines changed: 67 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,69 +1,77 @@
11
import os
2-
import datetime, time
2+
33
import pandas as pd
44
from flask import current_app
5+
from api import admin_api
56
from pipeline import calssify_new_data, clean_and_load_data, archive_rows, match_data, log_db
67
from config import CURRENT_SOURCE_FILES_PATH
78
from config import engine
89
from models import Base
910

1011

1112
def start_flow():
12-
file_path_list = os.listdir(CURRENT_SOURCE_FILES_PATH)
13-
14-
job_id = str(int(time.time()))
15-
log_db.log_exec_status(job_id, 'start_flow', 'executing', '')
16-
17-
if file_path_list:
18-
with engine.connect() as connection:
19-
Base.metadata.create_all(connection)
20-
21-
# Get previous version of pdp_contacts table, which is used later to classify new records
22-
pdp_contacts_df = pd.read_sql_table('pdp_contacts', connection)
23-
pdp_contacts_df = pdp_contacts_df[pdp_contacts_df["archived_date"].isnull()]
24-
pdp_contacts_df = pdp_contacts_df.drop(columns=['archived_date', 'created_date', '_id', 'matching_id'])
25-
26-
current_app.logger.info('Loaded {} records from pdp_contacts table'.format(pdp_contacts_df.shape[0]))
27-
28-
# Clean the input data and normalize/rename columns
29-
# Populate new records in secondary tables (donations, volunteer shifts)
30-
# input - existing files in path
31-
# output - normalized object of all entries, as well as the input json rows for primary sources
32-
log_db.log_exec_status(job_id, 'clean_and_load', 'executing', '')
33-
normalized_data, source_json, manual_matches_df = clean_and_load_data.start(connection, pdp_contacts_df, file_path_list)
34-
35-
# Standardize column data types via postgres (e.g. reading a csv column as int vs. str)
36-
# (If additional inconsistencies are encountered, may need to enforce the schema of
37-
# the contacts loader by initializing it from pdp_contacts.)
38-
normalized_data.to_sql('_temp_pdp_contacts_loader', connection, index=False, if_exists='replace')
39-
normalized_data = pd.read_sql_table('_temp_pdp_contacts_loader', connection)
40-
41-
# Classifies rows to old rows that haven't changed, updated rows and new rows - compared to the existing state of the DB
42-
log_db.log_exec_status(job_id, 'classify', 'executing', '')
43-
rows_classified = calssify_new_data.start(pdp_contacts_df, normalized_data)
44-
45-
# Archives rows the were updated in the current state of the DB (changes their archived_date to now)
46-
archive_rows.archive(connection, rows_classified["updated"])
47-
48-
# Match new+updated records against previous version of pdp_contacts database, and
49-
# write these rows to the database.
50-
match_data.start(connection, rows_classified, manual_matches_df, job_id)
51-
52-
# Copy raw input rows to json fields in pdp_contacts,
53-
# using a temporary table to simplify the update code.
54-
current_app.logger.info('Saving json of original rows to pdp_contacts')
55-
source_json.to_sql('_temp_pdp_contacts_loader', connection, index=False, if_exists='replace')
56-
# https://www.postgresql.org/docs/8.4/sql-update.html
57-
connection.execute('''
58-
UPDATE pdp_contacts pdp
59-
SET json = to_json(temp.json)
60-
FROM _temp_pdp_contacts_loader temp
61-
WHERE
62-
pdp.source_type = temp.source_type AND
63-
pdp.source_id = temp.source_id AND
64-
pdp.archived_date IS NULL
65-
''')
66-
67-
current_app.logger.info('Finished flow script run')
68-
69-
log_db.log_exec_status(job_id, 'flow', 'complete', '' )
13+
14+
job_id = admin_api.start_job()
15+
16+
if (not job_id):
17+
current_app.logger.info('Failed to get job_id')
18+
else:
19+
log_db.log_exec_status(job_id, 'start_flow', 'executing', '')
20+
21+
file_path_list = os.listdir(CURRENT_SOURCE_FILES_PATH)
22+
23+
24+
25+
if file_path_list:
26+
with engine.connect() as connection:
27+
Base.metadata.create_all(connection)
28+
29+
# Get previous version of pdp_contacts table, which is used later to classify new records
30+
pdp_contacts_df = pd.read_sql_table('pdp_contacts', connection)
31+
pdp_contacts_df = pdp_contacts_df[pdp_contacts_df["archived_date"].isnull()]
32+
pdp_contacts_df = pdp_contacts_df.drop(columns=['archived_date', 'created_date', '_id', 'matching_id'])
33+
34+
current_app.logger.info('Loaded {} records from pdp_contacts table'.format(pdp_contacts_df.shape[0]))
35+
36+
# Clean the input data and normalize/rename columns
37+
# Populate new records in secondary tables (donations, volunteer shifts)
38+
# input - existing files in path
39+
# output - normalized object of all entries, as well as the input json rows for primary sources
40+
log_db.log_exec_status(job_id, 'clean_and_load', 'executing', '')
41+
normalized_data, source_json, manual_matches_df = clean_and_load_data.start(connection, pdp_contacts_df, file_path_list)
42+
43+
# Standardize column data types via postgres (e.g. reading a csv column as int vs. str)
44+
# (If additional inconsistencies are encountered, may need to enforce the schema of
45+
# the contacts loader by initializing it from pdp_contacts.)
46+
normalized_data.to_sql('_temp_pdp_contacts_loader', connection, index=False, if_exists='replace')
47+
normalized_data = pd.read_sql_table('_temp_pdp_contacts_loader', connection)
48+
49+
# Classifies rows to old rows that haven't changed, updated rows and new rows - compared to the existing state of the DB
50+
log_db.log_exec_status(job_id, 'classify', 'executing', '')
51+
rows_classified = calssify_new_data.start(pdp_contacts_df, normalized_data)
52+
53+
# Archives rows the were updated in the current state of the DB (changes their archived_date to now)
54+
archive_rows.archive(connection, rows_classified["updated"])
55+
56+
# Match new+updated records against previous version of pdp_contacts database, and
57+
# write these rows to the database.
58+
match_data.start(connection, rows_classified, manual_matches_df, job_id)
59+
60+
# Copy raw input rows to json fields in pdp_contacts,
61+
# using a temporary table to simplify the update code.
62+
current_app.logger.info('Saving json of original rows to pdp_contacts')
63+
source_json.to_sql('_temp_pdp_contacts_loader', connection, index=False, if_exists='replace')
64+
# https://www.postgresql.org/docs/8.4/sql-update.html
65+
connection.execute('''
66+
UPDATE pdp_contacts pdp
67+
SET json = to_json(temp.json)
68+
FROM _temp_pdp_contacts_loader temp
69+
WHERE
70+
pdp.source_type = temp.source_type AND
71+
pdp.source_id = temp.source_id AND
72+
pdp.archived_date IS NULL
73+
''')
74+
75+
current_app.logger.info('Finished flow script run')
76+
77+
log_db.log_exec_status(job_id, 'flow', 'complete', '' )

0 commit comments

Comments
 (0)