Skip to content

Commit 9e6ab58

Browse files
authored
Change mailchimp import method (#22)
- load data in the db more often - allow for a longer timeout & retry time
1 parent 6db4ca6 commit 9e6ab58

File tree

2 files changed

+52
-32
lines changed

2 files changed

+52
-32
lines changed

api/paul_api/paul_api/settings.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,12 @@
5757
SENTRY_DSN=(str, ""),
5858
SENTRY_ENVIRONMENT=(str, ""),
5959
SENTRY_TRACES_SAMPLE_RATE=(float, 0.0),
60-
BACKGROUND_WORKERS=(int, 3),
6160
ADMIN_SITE_TITLE=(str, "PAUL Admin"),
6261
ADMIN_SITE_HEADER=(str, "PAUL"),
6362
MAILCHIMP_KEY=(str, ""),
63+
# django-q2 settings
64+
BACKGROUND_WORKERS=(int, 3),
65+
WORKER_TIMEOUT=(int, 20 * 60), # All tasks must finish in less than 20 minutes
6466
)
6567
environ.Env.read_env(f"{root}/.env") # reading .env file
6668

@@ -196,10 +198,10 @@
196198
"name": "paul",
197199
"workers": env("BACKGROUND_WORKERS"),
198200
"recycle": 100,
199-
"timeout": 300, # All tasks must finish in less than 5 minutes
200-
"retry": 600, # Retry unfinished tasks after 10 minutes
201+
"timeout": env("WORKER_TIMEOUT"),
202+
"retry": env("WORKER_TIMEOUT") + 120, # Retry unfinished tasks after 2 more minutes
201203
"ack_failures": True,
202-
"max_attempts": 3,
204+
"max_attempts": 2,
203205
"compress": True,
204206
"save_limit": 200,
205207
"queue_limit": 4,

api/paul_api/plugin_mailchimp/utils.py

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -54,20 +54,20 @@ def create_mailchimp_tables(audiences_name: str="") -> int:
5454
get_or_create_table(mc_settings.audiences_stats_table_name, 'audiences_stats')
5555
get_or_create_table(mc_settings.audience_segments_table_name, 'audience_segments')
5656
get_or_create_table(mc_settings.audience_tags_table_name, 'audience_tags')
57-
57+
5858
contact_table = get_or_create_table(
5959
audiences_name, 'contact_merge_fields', 'audience_members')
6060
contact_table.table_type = Table.TYPE_CONTACTS
6161
contact_table.save()
62-
62+
6363
get_or_create_table(
6464
mc_settings.segment_members_table_name, 'contact_merge_fields', 'segment_members')
6565

6666
return contact_table.id
6767

6868

6969
def get_or_create_table(table_name: str, *table_rulesets: str) -> Table:
70-
70+
7171
if not len(table_rulesets):
7272
raise ValueError(_("No table rulesets provided"))
7373

@@ -87,7 +87,7 @@ def get_or_create_table(table_name: str, *table_rulesets: str) -> Table:
8787
# Combine all required table definitions into a single one
8888
mappings = [table_fields.TABLE_MAPPING[ruleset] for ruleset in table_rulesets]
8989
table_fields_defs = functools.reduce(operator.ior, mappings, {})
90-
90+
9191
for field_name, field_details in table_fields_defs.items():
9292
column, created = TableColumn.objects.get_or_create(
9393
table=table,
@@ -214,15 +214,15 @@ def retrieve_lists_data(client: MailChimp):
214214
'audience_id': mlist['id'],
215215
'audience_name': mlist['name']
216216
})
217-
217+
218218
for field in audiences_stats_table_fields_defs:
219219
field_def = audiences_stats_table_fields_defs[field]
220220

221221
try:
222222
field_value = get_field_value(field, field_def, mlist)
223223
except KeyError:
224224
continue
225-
225+
226226
if field_def['type'] == 'date':
227227
audience_stats_entry.data[field] = field_value[:10]
228228
else:
@@ -232,10 +232,11 @@ def retrieve_lists_data(client: MailChimp):
232232

233233
# Sync list segments
234234
list_segments = client.lists.segments.all(list_id=mlist['id'], get_all=True)
235-
segment_members_creation_queue = []
236-
segment_members_update_queue = []
237235

238236
for segment in list_segments['segments']:
237+
segment_members_creation_queue = []
238+
segment_members_update_queue = []
239+
239240
audience_segments_exists = Entry.objects.filter(
240241
table=audience_segments_table, data__audience_id=segment['list_id'])
241242
if audience_segments_exists:
@@ -286,15 +287,15 @@ def retrieve_lists_data(client: MailChimp):
286287
})
287288
for field in segment_members_and_contact_fields_defs:
288289
field_def = segment_members_and_contact_fields_defs[field]
289-
290+
290291
try:
291292
field_value = get_field_value(field, field_def, member)
292293
except KeyError:
293294
continue
294295

295296
if field_def['type'] == 'enum':
296297
table_column = TableColumn.objects.get(table=segment_members_table, name=field)
297-
298+
298299
if not table_column.choices:
299300
table_column.choices = []
300301
if is_list_field(field_def):
@@ -318,16 +319,25 @@ def retrieve_lists_data(client: MailChimp):
318319
segment_members_update_queue.append(segment_members_entry)
319320
else:
320321
segment_members_creation_queue.append(segment_members_entry)
321-
322-
Entry.objects.bulk_create(segment_members_creation_queue, batch_size=50)
323-
Entry.objects.bulk_update(segment_members_update_queue, ["data"], batch_size=50)
322+
323+
segment_threshold = 1000
324+
if len(segment_members_creation_queue) > segment_threshold:
325+
Entry.objects.bulk_create(segment_members_creation_queue, batch_size=50)
326+
segment_members_creation_queue = []
327+
if len(segment_members_update_queue) > segment_threshold:
328+
Entry.objects.bulk_update(segment_members_update_queue, ["data"], batch_size=50)
329+
segment_members_update_queue = []
330+
331+
Entry.objects.bulk_create(segment_members_creation_queue, batch_size=50)
332+
Entry.objects.bulk_update(segment_members_update_queue, ["data"], batch_size=50)
324333

325334
# # Sync list members
326335
list_members = client.lists.members.all(list_id=mlist['id'], get_all=True)
327-
list_members_creation_queue = []
328-
list_members_update_queue = []
329336

330337
for member in list_members['members']:
338+
list_members_creation_queue = []
339+
list_members_update_queue = []
340+
331341
member['audience_name'] = mlist['name']
332342
audience_members_exists = Entry.objects.filter(
333343
table=audience_members_table, data__id=member['id'], data__audience_id=mlist['id'])
@@ -354,7 +364,7 @@ def retrieve_lists_data(client: MailChimp):
354364
except Exception as e:
355365
print(e)
356366
raise e
357-
367+
358368
if field_def['type'] == 'enum':
359369
table_column = TableColumn.objects.get(table=audience_members_table, name=field)
360370
if not table_column.choices:
@@ -382,14 +392,22 @@ def retrieve_lists_data(client: MailChimp):
382392
audience_members_entry.data[field] = field_value[:10]
383393
else:
384394
audience_members_entry.data[field] = field_value
385-
395+
386396
if audience_members_exists:
387397
list_members_update_queue.append(audience_members_entry)
388398
else:
389399
list_members_creation_queue.append(audience_members_entry)
390-
391-
Entry.objects.bulk_create(list_members_creation_queue, batch_size=50)
392-
Entry.objects.bulk_update(list_members_update_queue, ["data"], batch_size=50)
400+
401+
members_threshold = 1000
402+
if len(list_members_creation_queue) > members_threshold:
403+
Entry.objects.bulk_create(list_members_creation_queue, batch_size=50)
404+
list_members_creation_queue = []
405+
if len(list_members_update_queue) > members_threshold:
406+
Entry.objects.bulk_update(list_members_update_queue, ["data"], batch_size=50)
407+
list_members_update_queue = []
408+
409+
Entry.objects.bulk_create(list_members_creation_queue, batch_size=50)
410+
Entry.objects.bulk_update(list_members_update_queue, ["data"], batch_size=50)
393411

394412
return success, stats
395413

@@ -441,7 +459,7 @@ def get_emails_from_filtered_view(token, filtered_view):
441459
while continue_request: # TODO: get rid of web request
442460
url = 'http://{}/api/filters/{}/entries/?page={}'.format(
443461
settings.ALLOWED_HOSTS[0],
444-
filtered_view.pk,
462+
filtered_view.pk,
445463
page
446464
)
447465
r = requests.get(url, headers=headers).json()
@@ -484,7 +502,7 @@ def update_table_fields(table: Table, column_model: TableColumn, field_defs: dic
484502
"""
485503
Update table fields with the new field defs
486504
"""
487-
505+
488506
total_updates = 0
489507
for field_name, field_details in field_defs.items():
490508
# First, check if a column with the current name already exists
@@ -503,14 +521,14 @@ def update_table_fields(table: Table, column_model: TableColumn, field_defs: dic
503521
# If the column does not exist with either the current name or the old name, create it
504522
if not column:
505523
column = column_model(table=table)
506-
524+
507525
# Update the column details
508526
column.name = field_name
509527
column.display_name = field_details['display_name']
510528
column.field_type = field_details['type']
511529
column.save()
512530
total_updates += 1
513-
531+
514532
return total_updates
515533

516534

@@ -519,17 +537,17 @@ def update_entry_data_keys(table: Table, entry_model: Entry, field_defs: dict) -
519537
Update table entry data keys from the old key value to the new key value,
520538
if the new key doesn't already exist
521539
"""
522-
540+
523541
total_updates = 0
524542
entry_update_queue = []
525543
entries = entry_model.objects.filter(table=table)
526-
544+
527545
key_mapping = {}
528546
for field_name, field_details in field_defs.items():
529547
old_key = field_details.get('old_key')
530548
if old_key:
531549
key_mapping[old_key] = field_name
532-
550+
533551
# Rename old keys for each table entry
534552
for entry in entries:
535553
if not entry.data:
@@ -547,4 +565,4 @@ def update_entry_data_keys(table: Table, entry_model: Entry, field_defs: dict) -
547565
total_updates += 1
548566

549567
entry_model.objects.bulk_update(entry_update_queue, ["data"], batch_size=50)
550-
return total_updates
568+
return total_updates

0 commit comments

Comments
 (0)