|
| 1 | +import json |
1 | 2 | import time
|
2 | 3 | import jwt
|
3 | 4 | import os
|
|
20 | 21 | TENANT_ID = os.getenv("TENANT_ID")
|
21 | 22 | PLATFORM_MESSAGE_AUTHOR = os.getenv("PLATFORM_MESSAGE_AUTHOR_RECORD_ID")
|
22 | 23 |
|
23 |
| -UPDATE_TOPIC = "/event/Updated_Contacts_From_Pipeline__e" |
| 24 | +UPDATE_TOPIC = "/event/updated_contacts_batched__e" |
| 25 | +BATCH_SIZE = 200 |
24 | 26 |
|
25 | 27 |
|
26 | 28 | def send_pipeline_update_messages(contacts_list):
|
@@ -57,19 +59,35 @@ def send_pipeline_update_messages(contacts_list):
|
57 | 59 | schema_id = stub.GetTopic(pb2.TopicRequest(topic_name=UPDATE_TOPIC), metadata=auth_meta_data).schema_id
|
58 | 60 | schema = stub.GetSchema(pb2.SchemaRequest(schema_id=schema_id), metadata=auth_meta_data).schema_json
|
59 | 61 |
|
60 |
| - for contact_dict in contacts_list: |
61 |
| - contact_dict['CreatedDate'] = int(datetime.now().timestamp()) |
62 |
| - contact_dict['CreatedById'] = PLATFORM_MESSAGE_AUTHOR |
| 62 | + payloads = [] |
| 63 | + while len(contacts_list) > 0: |
| 64 | + if len(contacts_list) > BATCH_SIZE: |
| 65 | + current_batch = contacts_list[:BATCH_SIZE] |
| 66 | + del contacts_list[:BATCH_SIZE] |
| 67 | + else: |
| 68 | + current_batch = contacts_list |
| 69 | + contacts_list = [] |
63 | 70 |
|
| 71 | + root_object = { |
| 72 | + "updatedContactsJson" : current_batch |
| 73 | + } |
| 74 | + |
| 75 | + message = { |
| 76 | + "CreatedById": "0052g000003G926AAC", |
| 77 | + "CreatedDate": int(datetime.now().timestamp()), |
| 78 | + "updated_contacts_json__c": json.dumps(root_object) |
| 79 | + } |
64 | 80 | buf = io.BytesIO()
|
65 | 81 | encoder = avro.io.BinaryEncoder(buf)
|
66 | 82 | writer = avro.io.DatumWriter(avro.schema.parse(schema))
|
67 |
| - writer.write(contact_dict, encoder) |
| 83 | + writer.write(message, encoder) |
68 | 84 | payload = {
|
69 | 85 | "schema_id": schema_id,
|
70 | 86 | "payload": buf.getvalue()
|
71 | 87 | }
|
72 |
| - stub.Publish(pb2.PublishRequest(topic_name=UPDATE_TOPIC, events=[payload]), metadata=auth_meta_data) |
73 |
| - logger.info('Pipeline update message sent') |
| 88 | + payloads.append(payload) |
| 89 | + |
| 90 | + stub.Publish(pb2.PublishRequest(topic_name=UPDATE_TOPIC, events=payloads), metadata=auth_meta_data) |
| 91 | + |
| 92 | + logger.info("%s total pipeline update messages sent", len(payloads)) |
74 | 93 |
|
75 |
| - logger.info("%s total pipeline update messages sent", len(contacts_list)) |
|
0 commit comments