|
| 1 | +# Copyright (c) 2023 Oracle and/or its affiliates. |
| 2 | +# |
| 3 | +# The Universal Permissive License (UPL), Version 1.0 |
| 4 | +# |
| 5 | +# Subject to the condition set forth below, permission is hereby granted to any |
| 6 | +# person obtaining a copy of this software, associated documentation and/or data |
| 7 | +# (collectively the "Software"), free of charge and under any and all copyright |
| 8 | +# rights in the Software, and any and all patent rights owned or freely |
| 9 | +# licensable by each licensor hereunder covering either (i) the unmodified |
| 10 | +# Software as contributed to or provided by such licensor, or (ii) the Larger |
| 11 | +# Works (as defined below), to deal in both |
| 12 | +# |
| 13 | +# (a) the Software, and |
| 14 | +# (b) any piece of software and/or hardware listed in the lrgrwrks.txt file if |
| 15 | +# one is included with the Software (each a "Larger Work" to which the Software |
| 16 | +# is contributed by such licensors), |
| 17 | +# without restriction, including without limitation the rights to copy, create |
| 18 | +# derivative works of, display, perform, and distribute the Software and make, |
| 19 | +# use, sell, offer for sale, import, export, have made, and have sold the |
| 20 | +# Software and the Larger Work(s), and to sublicense the foregoing rights on |
| 21 | +# either these or other terms. |
| 22 | +# |
| 23 | +# This license is subject to the following condition: |
| 24 | +# The above copyright notice and either this complete permission notice or at |
| 25 | +# a minimum a reference to the UPL must be included in all copies or |
| 26 | +# substantial portions of the Software. |
| 27 | +# |
| 28 | +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| 29 | +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| 30 | +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| 31 | +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| 32 | +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| 33 | +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
| 34 | +# SOFTWARE. |
| 35 | + |
| 36 | +from pyspark.sql import SparkSession |
| 37 | +from pyspark.sql.functions import current_timestamp |
| 38 | +from simple_salesforce import Salesforce |
| 39 | +import os |
| 40 | +import argparse |
| 41 | +from datetime import datetime, timedelta |
| 42 | +from salesforceCredentials import username,password,security_token |
| 43 | +import time |
| 44 | + |
| 45 | +# Salesforce credentials |
| 46 | +sf = Salesforce(username=username, password=password, security_token=security_token) |
| 47 | + |
| 48 | +parser = argparse.ArgumentParser() |
| 49 | +parser.add_argument('--auth-type', default='PLAIN') |
| 50 | +parser.add_argument('--bootstrap-port', default='9092') |
| 51 | +parser.add_argument('--bootstrap-server', default='cell-1.streaming.<OCI Region>.oci.oraclecloud.com') |
| 52 | +parser.add_argument('--checkpoint-location', default='oci://<bucketname>@<namespace>/<checkpoint folder>') |
| 53 | +parser.add_argument('--encryption', default='SASL_SSL') |
| 54 | +parser.add_argument('--ocid') |
| 55 | +parser.add_argument('--output-location', default='oci://<bucketname>@<namespace>/<output folder>') |
| 56 | +parser.add_argument('--output-mode', default='file') |
| 57 | +parser.add_argument('--stream-password',default='<Token>') |
| 58 | +parser.add_argument('--raw-stream', default='<Topic name>') |
| 59 | +parser.add_argument('--stream-username',default='<tenancy name>/<username>/<streampool OCID>') |
| 60 | +args = parser.parse_args() |
| 61 | + |
| 62 | +if args.bootstrap_server is None: |
| 63 | + args.bootstrap_server = os.environ.get('BOOTSTRAP_SERVER') |
| 64 | +if args.raw_stream is None: |
| 65 | + args.raw_stream = os.environ.get('RAW_STREAM') |
| 66 | +if args.stream_username is None: |
| 67 | + args.stream_username = os.environ.get('STREAM_USERNAME') |
| 68 | +if args.stream_password is None: |
| 69 | + args.stream_password = os.environ.get('STREAM_PASSWORD') |
| 70 | + |
| 71 | +assert args.bootstrap_server is not None, "Kafka bootstrap server (--bootstrap-server) name must be set" |
| 72 | +assert args.checkpoint_location is not None, "Checkpoint location (--checkpoint-location) must be set" |
| 73 | +assert args.output_location is not None, "Output location (--output-location) must be set" |
| 74 | +assert args.raw_stream is not None, "Kafka topic (--raw-stream) name must be set" |
| 75 | + |
| 76 | +if args.ocid is not None: |
| 77 | + jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";' |
| 78 | + args.auth_type = 'OCI-RSA-SHA256' |
| 79 | +else: |
| 80 | + jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";' |
| 81 | + |
| 82 | +# Function to fetch updated records from Salesforce |
| 83 | +def fetch_updated_records(last_extracted_timestamp): |
| 84 | + soql_query = f"SELECT Id, Name, LastModifiedDate FROM Account WHERE LastModifiedDate > {last_extracted_timestamp}" |
| 85 | + result = sf.query_all(query=soql_query) |
| 86 | + return result.get('records', []) |
| 87 | + |
| 88 | +# Function to convert Salesforce records to PySpark DataFrame |
| 89 | +def records_to_dataframe(records): |
| 90 | + spark = SparkSession.builder.appName("SalesforceApp") \ |
| 91 | + .getOrCreate() |
| 92 | + #spark.sparkContext.setLogLevel('ERROR') |
| 93 | + # Check if records is not empty and has the expected structure |
| 94 | + if records and isinstance(records, list) and isinstance(records[0], dict): |
| 95 | + # Extract values from each record |
| 96 | + data = [(record.get("Id", ""), record.get("Name", ""), record.get("LastModifiedDate", "")) for record in records] |
| 97 | + schema = ["Id", "Name", "LastModifiedDate"] |
| 98 | + return spark.createDataFrame(data, schema=schema) |
| 99 | + else: |
| 100 | + # If records are not in the expected format, return an empty DataFrame |
| 101 | + return spark.createDataFrame([], schema=["Id", "Name", "LastModifiedDate"]) |
| 102 | + |
| 103 | + |
| 104 | +# Main streaming ingestion process |
| 105 | +def main(): |
| 106 | + #Dummy last extracted timestamp |
| 107 | + last_extracted_timestamp = '2000-01-01T00:00:00Z' |
| 108 | + while True: |
| 109 | + # Fetch updated records from Salesforce |
| 110 | + records = fetch_updated_records(last_extracted_timestamp) |
| 111 | + if records: |
| 112 | + # Convert records to DataFrame |
| 113 | + df = records_to_dataframe(records) |
| 114 | + |
| 115 | + # Write to Kafka |
| 116 | + df_transformed = df.selectExpr("CAST(Id AS STRING) AS key", "CAST(Name AS STRING) AS value", "CAST(LastModifiedDate AS TIMESTAMP) AS timestamp") |
| 117 | + # Kafka configuration |
| 118 | + df_transformed \ |
| 119 | + .write \ |
| 120 | + .format("kafka") \ |
| 121 | + .option("kafka.bootstrap.servers", '{}:{}'.format(args.bootstrap_server, |
| 122 | + args.bootstrap_port)) \ |
| 123 | + .option("kafka.enable.idempotence", "false") \ |
| 124 | + .option("kafka.sasl.jaas.config", jaas_template.format( |
| 125 | + username=args.stream_username, password=args.stream_password |
| 126 | + )) \ |
| 127 | + .option('kafka.sasl.mechanism', args.auth_type) \ |
| 128 | + .option('kafka.security.protocol', args.encryption) \ |
| 129 | + .option("topic", args.raw_stream) \ |
| 130 | + .option("kafka.max.partition.fetch.bytes", "1024 * 1024") \ |
| 131 | + .option("checkpointLocation", args.checkpoint_location) \ |
| 132 | + .mode("append") \ |
| 133 | + .save() |
| 134 | + # Update the last extraction timestamp |
| 135 | + last_extracted_timestamp = max([record["LastModifiedDate"] for record in records]) |
| 136 | + # Sleep for a specified interval before the next iteration |
| 137 | + time.sleep(10) |
| 138 | + |
| 139 | +if __name__ == "__main__": |
| 140 | + # Start the streaming ingestion process |
| 141 | + main() |
0 commit comments