Skip to content

Commit 6f09685

Browse files
committed
split up data loading over several sessions to prevent blocked queries
1 parent bf0bf4e commit 6f09685

File tree

6 files changed

+151
-141
lines changed

6 files changed

+151
-141
lines changed

src/server/api/API_ingest/ingest_sources_from_api.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,19 @@
22
import structlog
33
logger = structlog.get_logger()
44

5-
def start(session):
5+
def start():
66
logger.debug("Start Fetching raw data from different API sources")
7+
78
logger.debug(" Fetching Salesforce contacts")
8-
salesforce_contacts.store_contacts_all(session)
9+
salesforce_contacts.store_contacts_all()
910
logger.debug(" Finished fetching Salesforce contacts")
11+
1012
logger.debug(" Fetching Shelterluv people")
11-
shelterluv_people.store_shelterluv_people_all(session)
13+
shelterluv_people.store_shelterluv_people_all()
1214
logger.debug(" Finished fetching Shelterluv people")
15+
1316
logger.debug(" Fetching Shelterluv events")
14-
sle_count = sl_animal_events.store_all_animals_and_events(session)
17+
sle_count = sl_animal_events.store_all_animals_and_events()
1518
logger.debug(" Finished fetching Shelterluv events - %d records" , sle_count)
1619

1720
logger.debug("Finished fetching raw data from different API sources")

src/server/api/API_ingest/salesforce_contacts.py

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,29 +5,38 @@
55
from config import engine
66
from models import SalesForceContacts
77

8-
def store_contacts_all(session):
8+
import structlog
9+
logger = structlog.get_logger()
910

10-
session.execute("TRUNCATE TABLE salesforcecontacts")
11+
def store_contacts_all():
12+
Session = sessionmaker(engine)
13+
with Session() as session:
1114

12-
sf = Salesforce(domain=os.getenv('SALESFORCE_DOMAIN'), password=os.getenv('SALESFORCE_PW'), username=os.getenv('SALESFORCE_USERNAME'), organizationId=os.getenv('SALESFORCE_ORGANIZATION_ID'), security_token=os.getenv('SALESFORCE_SECURITY_TOKEN'))
13-
results = sf.query("SELECT Contact_ID_18__c, FirstName, LastName, Contact.Account.Name, MailingCountry, MailingStreet, MailingCity, MailingState, MailingPostalCode, Phone, MobilePhone, Email FROM Contact")
14-
done = False
15-
while not done:
16-
for row in results['records']:
17-
account_name = row['Account']['Name'] if row['Account'] is not None else None
18-
contact = SalesForceContacts(contact_id=row['Contact_ID_18__c'],
19-
first_name=row['FirstName'],
20-
last_name=row['LastName'],
21-
account_name=account_name,
22-
mailing_country=row['MailingCountry'],
23-
mailing_street=row['MailingStreet'],
24-
mailing_city=row['MailingCity'],
25-
mailing_state_province=row['MailingState'],
26-
mailing_zip_postal_code=row['MailingPostalCode'],
27-
phone=row['Phone'],
28-
mobile=row['MobilePhone'],
29-
email=['Email'])
30-
session.add(contact)
31-
done = results['done']
32-
if not done:
33-
results = sf.query_more(results['nextRecordsUrl'])
15+
logger.debug("truncating table salesforcecontacts")
16+
session.execute("TRUNCATE TABLE salesforcecontacts")
17+
18+
logger.debug("retrieving the latest salesforce contacts data")
19+
sf = Salesforce(domain=os.getenv('SALESFORCE_DOMAIN'), password=os.getenv('SALESFORCE_PW'), username=os.getenv('SALESFORCE_USERNAME'), organizationId=os.getenv('SALESFORCE_ORGANIZATION_ID'), security_token=os.getenv('SALESFORCE_SECURITY_TOKEN'))
20+
results = sf.query("SELECT Contact_ID_18__c, FirstName, LastName, Contact.Account.Name, MailingCountry, MailingStreet, MailingCity, MailingState, MailingPostalCode, Phone, MobilePhone, Email FROM Contact")
21+
done = False
22+
while not done:
23+
for row in results['records']:
24+
account_name = row['Account']['Name'] if row['Account'] is not None else None
25+
contact = SalesForceContacts(contact_id=row['Contact_ID_18__c'],
26+
first_name=row['FirstName'],
27+
last_name=row['LastName'],
28+
account_name=account_name,
29+
mailing_country=row['MailingCountry'],
30+
mailing_street=row['MailingStreet'],
31+
mailing_city=row['MailingCity'],
32+
mailing_state_province=row['MailingState'],
33+
mailing_zip_postal_code=row['MailingPostalCode'],
34+
phone=row['Phone'],
35+
mobile=row['MobilePhone'],
36+
email=['Email'])
37+
session.add(contact)
38+
done = results['done']
39+
if not done:
40+
results = sf.query_more(results['nextRecordsUrl'])
41+
session.commit()
42+
logger.debug("finished downloading latest salesforce contacts data")
Lines changed: 69 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,7 @@
1-
from api.api import common_api
2-
from config import engine
3-
from flask import jsonify, current_app
4-
from sqlalchemy.sql import text
5-
import requests
6-
import time
7-
from datetime import datetime
8-
9-
from sqlalchemy.dialects.postgresql import insert
101
from sqlalchemy import Table, MetaData
11-
from pipeline import flow_script
2+
from sqlalchemy.orm import sessionmaker
3+
124
from config import engine
13-
from flask import request, redirect, jsonify, current_app
14-
from api.file_uploader import validate_and_arrange_upload
15-
from sqlalchemy.orm import Session, sessionmaker
165

176
import structlog
187
logger = structlog.get_logger()
@@ -68,80 +57,84 @@ def truncate_animals():
6857
return 0
6958

7059

71-
def truncate_events(session):
60+
def truncate_events():
7261
"""Truncate the shelterluv_events table"""
7362

74-
metadata = MetaData()
75-
sla = Table("sl_animal_events", metadata, autoload=True, autoload_with=engine)
63+
Session = sessionmaker(engine)
64+
with Session() as session:
65+
metadata = MetaData()
66+
sla = Table("sl_animal_events", metadata, autoload=True, autoload_with=engine)
7667

77-
truncate = "TRUNCATE table sl_animal_events;"
78-
result = session.execute(truncate)
68+
truncate = "TRUNCATE table sl_animal_events;"
69+
result = session.execute(truncate)
70+
session.commit()
7971

8072
return 0
8173

8274
def insert_events(event_list):
83-
Session = sessionmaker(engine)
84-
session = Session()
85-
insert_events(session,event_list)
86-
87-
def insert_events(session, event_list):
8875
"""Insert event records into sl_animal_events table and return row count. """
8976

90-
# Always a clean insert
91-
truncate_events(session)
92-
93-
metadata = MetaData()
94-
sla = Table("sl_animal_events", metadata, autoload=True, autoload_with=engine)
95-
96-
# TODO: Pull from DB - inserted in db_setup/base_users.py/populate_sl_event_types()
97-
event_map = {
98-
"Outcome.Adoption": 1,
99-
"Outcome.Foster": 2,
100-
"Outcome.ReturnToOwner": 3,
101-
"Intake.AdoptionReturn": 4,
102-
"Intake.FosterReturn":5
103-
}
104-
105-
# """ INSERT INTO "sl_event_types" ("id","event_name") VALUES
106-
# ( 1,'Outcome.Adoption' ),
107-
# ( 2,'Outcome.Foster' ),
108-
# ( 3,'Outcome.ReturnToOwner' ),
109-
# ( 4,'Intake.AdoptionReturn' ),
110-
# ( 5,'Intake.FosterReturn' ) """
111-
112-
113-
114-
115-
# Event record: [ AssociatedRecords[Type = Person]["Id"]',
116-
# AssociatedRecords[Type = Animal]["Id"]',
117-
# "Type",
118-
# "Time"
119-
# ]
120-
#
121-
# In db: ['id',
122-
# 'person_id',
123-
# 'animal_id',
124-
# 'event_type',
125-
# 'time']
77+
offset = 0
78+
has_more = True
12679

127-
ins_list = [] # Create a list of per-row dicts
128-
for rec in event_list:
129-
ins_list.append(
130-
{
131-
"person_id": next(
132-
filter(lambda x: x["Type"] == "Person", rec["AssociatedRecords"])
133-
)["Id"],
134-
"animal_id": next(
135-
filter(lambda x: x["Type"] == "Animal", rec["AssociatedRecords"])
136-
)["Id"],
137-
"event_type": event_map[rec["Type"]],
138-
"time": rec["Time"],
139-
}
140-
)
80+
# Always a clean insert
81+
truncate_events()
14182

142-
# TODO: Wrap with try/catch
143-
ret = session.execute(sla.insert(ins_list))
144-
logger.debug("finished inserting events")
83+
Session = sessionmaker(engine)
84+
with Session() as session:
85+
metadata = MetaData()
86+
sla = Table("sl_animal_events", metadata, autoload=True, autoload_with=engine)
87+
88+
# TODO: Pull from DB - inserted in db_setup/base_users.py/populate_sl_event_types()
89+
event_map = {
90+
"Outcome.Adoption": 1,
91+
"Outcome.Foster": 2,
92+
"Outcome.ReturnToOwner": 3,
93+
"Intake.AdoptionReturn": 4,
94+
"Intake.FosterReturn":5
95+
}
96+
97+
# """ INSERT INTO "sl_event_types" ("id","event_name") VALUES
98+
# ( 1,'Outcome.Adoption' ),
99+
# ( 2,'Outcome.Foster' ),
100+
# ( 3,'Outcome.ReturnToOwner' ),
101+
# ( 4,'Intake.AdoptionReturn' ),
102+
# ( 5,'Intake.FosterReturn' ) """
103+
104+
105+
106+
107+
# Event record: [ AssociatedRecords[Type = Person]["Id"]',
108+
# AssociatedRecords[Type = Animal]["Id"]',
109+
# "Type",
110+
# "Time"
111+
# ]
112+
#
113+
# In db: ['id',
114+
# 'person_id',
115+
# 'animal_id',
116+
# 'event_type',
117+
# 'time']
118+
119+
ins_list = [] # Create a list of per-row dicts
120+
for rec in event_list:
121+
ins_list.append(
122+
{
123+
"person_id": next(
124+
filter(lambda x: x["Type"] == "Person", rec["AssociatedRecords"])
125+
)["Id"],
126+
"animal_id": next(
127+
filter(lambda x: x["Type"] == "Animal", rec["AssociatedRecords"])
128+
)["Id"],
129+
"event_type": event_map[rec["Type"]],
130+
"time": rec["Time"],
131+
}
132+
)
133+
134+
# TODO: Wrap with try/catch
135+
ret = session.execute(sla.insert(ins_list))
136+
session.commit()
137+
logger.debug("finished inserting events")
145138

146139
return ret.rowcount
147140

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import requests, os
22
from models import ShelterluvPeople
3+
from config import engine
4+
from sqlalchemy.orm import sessionmaker
35
import structlog
46
logger = structlog.get_logger()
57

@@ -20,6 +22,7 @@
2022

2123

2224
TEST_MODE=os.getenv("TEST_MODE") # if not present, has value None
25+
LIMIT = 100
2326
#################################
2427
# This script is used to fetch data from shelterluv API.
2528
# Please be mindful of your usage.
@@ -34,37 +37,40 @@
3437

3538
''' Iterate over all shelterlove people and store in json file in the raw data folder
3639
We fetch 100 items in each request, since that is the limit based on our research '''
37-
def store_shelterluv_people_all(session):
40+
def store_shelterluv_people_all():
3841
offset = 0
39-
LIMIT = 100
4042
has_more = True
43+
Session = sessionmaker(engine)
4144

42-
session.execute("TRUNCATE TABLE shelterluvpeople")
45+
with Session() as session:
46+
logger.debug("Truncating table shelterluvpeople")
4347

44-
logger.debug("Start getting shelterluv contacts from people table")
48+
session.execute("TRUNCATE TABLE shelterluvpeople")
4549

46-
while has_more:
47-
r = requests.get("http://shelterluv.com/api/v1/people?limit={}&offset={}".format(LIMIT, offset),
48-
headers={"x-api-key": SHELTERLUV_SECRET_TOKEN})
49-
response = r.json()
50-
for person in response["people"]:
51-
#todo: Does this need more "null checks"?
52-
session.add(ShelterluvPeople(firstname=person["Firstname"],
53-
lastname=person["Lastname"],
54-
id=person["ID"] if "ID" in person else None,
55-
internal_id=person["Internal-ID"],
56-
associated=person["Associated"],
57-
street=person["Street"],
58-
apartment=person["Apartment"],
59-
city=person["City"],
60-
state=person["State"],
61-
zip=person["Zip"],
62-
email=person["Email"],
63-
phone=person["Phone"],
64-
animal_ids=person["Animal_ids"]))
65-
offset += LIMIT
66-
has_more = response["has_more"] if not TEST_MODE else response["has_more"] and offset < 1000
50+
logger.debug("Start getting shelterluv contacts from people table")
6751

52+
while has_more:
53+
r = requests.get("http://shelterluv.com/api/v1/people?limit={}&offset={}".format(LIMIT, offset),
54+
headers={"x-api-key": SHELTERLUV_SECRET_TOKEN})
55+
response = r.json()
56+
for person in response["people"]:
57+
#todo: Does this need more "null checks"?
58+
session.add(ShelterluvPeople(firstname=person["Firstname"],
59+
lastname=person["Lastname"],
60+
id=person["ID"] if "ID" in person else None,
61+
internal_id=person["Internal-ID"],
62+
associated=person["Associated"],
63+
street=person["Street"],
64+
apartment=person["Apartment"],
65+
city=person["City"],
66+
state=person["State"],
67+
zip=person["Zip"],
68+
email=person["Email"],
69+
phone=person["Phone"],
70+
animal_ids=person["Animal_ids"]))
71+
offset += LIMIT
72+
has_more = response["has_more"] if not TEST_MODE else response["has_more"] and offset < 1000
73+
session.commit()
6874

6975
logger.debug("Finish getting shelterluv contacts from people table")
7076

src/server/api/API_ingest/sl_animal_events.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1-
import os, time, json
1+
import json
2+
import os
23
import posixpath as path
34

45
import structlog
6+
57
logger = structlog.get_logger()
68

79
import requests
@@ -165,7 +167,7 @@ def slae_test():
165167
count = shelterluv_db.insert_events(b)
166168
return count
167169

168-
def store_all_animals_and_events(session):
170+
def store_all_animals_and_events():
169171
total_count = get_event_count()
170172
logger.debug("Total events: %d", total_count)
171173

@@ -175,7 +177,7 @@ def store_all_animals_and_events(session):
175177
# f = filter_events(b)
176178
# print(f)
177179

178-
count = shelterluv_db.insert_events(session, b)
180+
count = shelterluv_db.insert_events(b)
179181
return count
180182

181183

src/server/api/internal_api.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
from api.api import internal_api
2-
from config import engine
3-
from flask import jsonify, current_app
41
from datetime import datetime
2+
3+
import structlog
4+
from flask import jsonify
5+
56
from api.API_ingest import ingest_sources_from_api
7+
from api.api import internal_api
68
from rfm_funcs.create_scores import create_scores
7-
from sqlalchemy.orm import sessionmaker
89

9-
import structlog
1010
logger = structlog.get_logger()
1111

1212
### Internal API endpoints can only be accessed from inside the cluster;
@@ -29,10 +29,7 @@ def user_test2():
2929
@internal_api.route("/api/internal/ingestRawData", methods=["GET"])
3030
def ingest_raw_data():
3131
try:
32-
Session = sessionmaker(engine)
33-
with Session() as session:
34-
ingest_sources_from_api.start(session)
35-
session.commit()
32+
ingest_sources_from_api.start()
3633
except Exception as e:
3734
logger.error(e)
3835

0 commit comments

Comments
 (0)