Skip to content

Commit 45ebc7f

Browse files
authored
Merge pull request #574 from CodeForPhilly/feat/batched-platform-message
sending batched salesforce platform messages
2 parents 2602e1f + 9a8bccb commit 45ebc7f

File tree

3 files changed

+74
-64
lines changed

3 files changed

+74
-64
lines changed

src/server/api/API_ingest/updated_data.py

Lines changed: 41 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -11,64 +11,51 @@ def get_updated_contact_data():
1111
Session = sessionmaker(engine)
1212

1313
qry = """ -- Collect latest foster/volunteer dates
14-
with ev_dates as
15-
(select
16-
person_id,
14+
select json_agg (upd) as "cd"
15+
from (
16+
select
17+
sf.source_id as "Id" , -- long salesforce string
18+
sle.person_id as "Person_Id__c", -- short PAWS-local shelterluv id
19+
case
20+
when
21+
(extract(epoch from now())::bigint - foster_out < 365*86400) -- foster out in last year
22+
or (extract(epoch from now())::bigint - foster_return < 365*86400) -- foster return
23+
then 'Active'
24+
else 'Inactive'
25+
end as "Foster_Activity__c",
26+
foster_out as "Foster_Start_Date__c",
27+
foster_return as "Foster_End_Date__c",
28+
vol.first_date "First_volunteer_date__c",
29+
vol.last_date "Last_volunteer_date__c",
30+
vol.hours as "Total_volunteer_hours__c",
31+
vc.source_id::integer as "Volgistics_Id__c"
32+
from (
33+
select source_id, matching_id from pdp_contacts sf
34+
where sf.source_type = 'salesforcecontacts'
35+
) sf
36+
left join pdp_contacts sl on sl.matching_id = sf.matching_id and sl.source_type = 'shelterluvpeople'
37+
left join (
38+
select
39+
person_id,
1740
max(case when event_type=1 then time else null end) * 1000 adopt,
1841
max(case when event_type=2 then time else null end) * 1000 foster_out,
19-
-- max(case when event_type=3 then time else null end) rto,
42+
-- max(case when event_type=3 then time else null end) rto,
2043
max(case when event_type=5 then time else null end) * 1000 foster_return
21-
22-
from
23-
sl_animal_events sla
24-
left join sl_event_types sle on sle.id = sla.event_type
25-
26-
where sle.id in (1,2,5)
27-
group by person_id
28-
order by person_id
29-
)
30-
31-
32-
select json_agg (upd) as "cd" from (
44+
from sl_animal_events
45+
group by person_id
46+
) sle on sle.person_id::text = sl.source_id
47+
left join pdp_contacts vc on vc.matching_id = sf.matching_id and vc.source_type = 'volgistics'
48+
left join (
3349
select
34-
slsf.source_id as "Contact_Record_Id__c" , -- long salesforce string
35-
slp.internal_id as "Person_Id__c" , -- short PAWS-local shelterluv id
36-
37-
--case
38-
-- when
39-
-- (extract(epoch from now())::bigint - foster_out < 365*86400) -- foster out in last year
40-
-- or (extract(epoch from now())::bigint - foster_return < 365*86400) -- foster return
41-
-- then 'Active'
42-
-- else 'Inactive'
43-
--end as "Updated_Recent_Foster_Activity__c",
44-
45-
foster_out as "Updated_Foster_Start_Date__c",
46-
foster_return as "Updated_Foster_End_Date__c",
47-
48-
extract(epoch from min(vs.from_date)) * 1000 as "Updated_First_Volunteer_Date__c",
49-
extract(epoch from max(vs.from_date)) * 1000 as "Updated_Last_Volunteer_Date__c",
50-
sum(vs.hours) as "Updated_Total_Volunteer_Hours__c",
51-
vc.source_id::integer as "Volgistics_Id__c"
52-
53-
from
54-
ev_dates
55-
left join pdp_contacts slc on slc.source_id = person_id::text and slc.source_type = 'shelterluvpeople'
56-
left join pdp_contacts slsf on slsf.matching_id = slc.matching_id and slsf.source_type = 'salesforcecontacts'
57-
left join shelterluvpeople slp on slp.internal_id = person_id::text
58-
left join pdp_contacts vc on vc.matching_id = slc.matching_id and vc.source_type = 'volgistics'
59-
left join volgisticsshifts vs on vs.volg_id::text = vc.source_id
60-
61-
where
62-
slsf.source_id is not null
63-
64-
group by
65-
slsf.source_id,
66-
slp.internal_id,
67-
vc.source_id,
68-
foster_out ,
69-
foster_return
70-
71-
) upd ;
50+
volg_id,
51+
sum(hours) as hours,
52+
extract(epoch from min(from_date)) * 1000 as first_date,
53+
extract(epoch from max(from_date)) * 1000 as last_date
54+
from volgisticsshifts
55+
group by volg_id
56+
) vol on vol.volg_id::text = vc.source_id
57+
where sl.matching_id is not null or vc.matching_id is not null
58+
) upd;
7259
"""
7360

7461
with Session() as session:

src/server/pub_sub/salesforce_message_publisher.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import time
23
import jwt
34
import os
@@ -16,12 +17,12 @@
1617
ISSUER = os.getenv("SALESFORCE_CONSUMER_KEY")
1718
DOMAIN = os.getenv("SALESFORCE_DOMAIN")
1819
SUBJECT = os.getenv("SALESFORCE_USERNAME")
20+
CREATOR_CONTACT_ID = os.getenv("CREATOR_CONTACT_ID")
1921
INSTANCE_URL = os.getenv("INSTANCE_URL")
2022
TENANT_ID = os.getenv("TENANT_ID")
21-
PLATFORM_MESSAGE_AUTHOR = os.getenv("PLATFORM_MESSAGE_AUTHOR_RECORD_ID")
22-
23-
UPDATE_TOPIC = "/event/Updated_Contacts_From_Pipeline__e"
23+
BATCH_SIZE = os.getenv("BATCH_SIZE", 400)
2424

25+
UPDATE_TOPIC = "/event/updated_contacts_batched__e"
2526

2627
def send_pipeline_update_messages(contacts_list):
2728
pem_file = 'bin/connected-app-secrets.pem'
@@ -57,19 +58,34 @@ def send_pipeline_update_messages(contacts_list):
5758
schema_id = stub.GetTopic(pb2.TopicRequest(topic_name=UPDATE_TOPIC), metadata=auth_meta_data).schema_id
5859
schema = stub.GetSchema(pb2.SchemaRequest(schema_id=schema_id), metadata=auth_meta_data).schema_json
5960

60-
for contact_dict in contacts_list:
61-
contact_dict['CreatedDate'] = int(datetime.now().timestamp())
62-
contact_dict['CreatedById'] = PLATFORM_MESSAGE_AUTHOR
61+
payloads = []
62+
while len(contacts_list) > 0:
63+
if len(contacts_list) > BATCH_SIZE:
64+
current_batch = contacts_list[:BATCH_SIZE]
65+
del contacts_list[:BATCH_SIZE]
66+
else:
67+
current_batch = contacts_list
68+
contacts_list = []
6369

70+
root_object = {
71+
"updatedContactsJson" : current_batch
72+
}
73+
message = {
74+
"CreatedById": CREATOR_CONTACT_ID,
75+
"CreatedDate": int(datetime.now().timestamp()),
76+
"updated_contacts_json__c": json.dumps(root_object)
77+
}
6478
buf = io.BytesIO()
6579
encoder = avro.io.BinaryEncoder(buf)
6680
writer = avro.io.DatumWriter(avro.schema.parse(schema))
67-
writer.write(contact_dict, encoder)
81+
writer.write(message, encoder)
6882
payload = {
6983
"schema_id": schema_id,
7084
"payload": buf.getvalue()
7185
}
72-
stub.Publish(pb2.PublishRequest(topic_name=UPDATE_TOPIC, events=[payload]), metadata=auth_meta_data)
73-
logger.info('Pipeline update message sent')
86+
payloads.append(payload)
87+
88+
stub.Publish(pb2.PublishRequest(topic_name=UPDATE_TOPIC, events=payloads), metadata=auth_meta_data)
89+
90+
logger.info("%s total pipeline update messages sent", len(payloads))
7491

75-
logger.info("%s total pipeline update messages sent", len(contacts_list))

src/server/secrets_dict.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,10 @@
66
BASEEDITOR_PW="editorpw"
77
BASEADMIN_PW="basepw"
88
DROPBOX_APP="DBAPPPW"
9+
10+
SALESFORCE_USERNAME=''
11+
SALESFORCE_CONSUMER_KEY=''
12+
SALESFORCE_DOMAIN=''
13+
TENANT_ID=''
14+
INSTANCE_URL=''
15+
CREATOR_CONTACT_ID=''

0 commit comments

Comments
 (0)