|
| 1 | +import time |
| 2 | +import jwt |
| 3 | +import os |
| 4 | +import requests |
| 5 | +import certifi |
| 6 | +import grpc |
| 7 | +import pub_sub.stubs.pubsub_api_pb2_grpc as pb2_grpc |
| 8 | +import pub_sub.stubs.pubsub_api_pb2 as pb2 |
| 9 | +import avro.io |
| 10 | +import io |
| 11 | +import structlog |
| 12 | + |
| 13 | +logger = structlog.get_logger() |
| 14 | + |
| 15 | +# todo: also read these from dict module |
| 16 | +ISSUER = os.getenv("SALESFORCE_CONSUMER_KEY") |
| 17 | +DOMAIN = os.getenv("SALESFORCE_DOMAIN") |
| 18 | +SUBJECT = os.getenv("SALESFORCE_USERNAME") |
| 19 | +INSTANCE_URL = os.getenv("INSTANCE_URL") |
| 20 | +TENANT_ID = os.getenv("TENANT_ID") |
| 21 | + |
| 22 | +UPDATE_TOPIC = "/event/Updated_Contacts_From_Pipeline__e" |
| 23 | + |
| 24 | + |
| 25 | +def pipeline_update_message(message_dict): |
| 26 | + # todo: look for certificate using both local and container pathing |
| 27 | + pem_file = 'bin/connected-app-secrets.pem' |
| 28 | + with open(pem_file) as fd: |
| 29 | + private_key = fd.read() |
| 30 | + logger.info('Loaded PEM certificate') |
| 31 | + |
| 32 | + claim = { |
| 33 | + 'iss': ISSUER, |
| 34 | + 'exp': int(time.time()) + 300, |
| 35 | + 'aud': 'https://{}.salesforce.com'.format(DOMAIN), |
| 36 | + 'sub': SUBJECT, |
| 37 | + } |
| 38 | + assertion = jwt.encode(claim, private_key, algorithm='RS256', headers={'alg': 'RS256'}) |
| 39 | + logger.info('Generated JWT') |
| 40 | + |
| 41 | + r = requests.post('https://{}.salesforce.com/services/oauth2/token'.format(DOMAIN), data={ |
| 42 | + 'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', |
| 43 | + 'assertion': assertion, |
| 44 | + }) |
| 45 | + logger.info(r.json()) |
| 46 | + access_token = r.json()['access_token'] |
| 47 | + logger.info('Made OAuth call to get access token') |
| 48 | + |
| 49 | + with open(certifi.where(), 'rb') as f: |
| 50 | + creds = grpc.ssl_channel_credentials(f.read()) |
| 51 | + with grpc.secure_channel('api.pubsub.salesforce.com:7443', creds) as channel: |
| 52 | + auth_meta_data = (('accesstoken', access_token), |
| 53 | + ('instanceurl', INSTANCE_URL), |
| 54 | + ('tenantid', TENANT_ID)) |
| 55 | + stub = pb2_grpc.PubSubStub(channel) |
| 56 | + schema_id = stub.GetTopic(pb2.TopicRequest(topic_name=UPDATE_TOPIC), metadata=auth_meta_data).schema_id |
| 57 | + schema = stub.GetSchema(pb2.SchemaRequest(schema_id=schema_id), metadata=auth_meta_data).schema_json |
| 58 | + buf = io.BytesIO() |
| 59 | + encoder = avro.io.BinaryEncoder(buf) |
| 60 | + writer = avro.io.DatumWriter(avro.schema.parse(schema)) |
| 61 | + writer.write(message_dict, encoder) |
| 62 | + payload = { |
| 63 | + "schema_id": schema_id, |
| 64 | + "payload": buf.getvalue() |
| 65 | + } |
| 66 | + stub.Publish(pb2.PublishRequest(topic_name=UPDATE_TOPIC, events=[payload]), metadata=auth_meta_data) |
| 67 | + logger.info('Pipeline update message sent') |
0 commit comments