|
| 1 | +from pyspark.sql import SparkSession |
| 2 | +from pyspark.sql.functions import current_timestamp |
| 3 | +from simple_salesforce import Salesforce |
| 4 | +import os |
| 5 | +import argparse |
| 6 | +from datetime import datetime, timedelta |
| 7 | +from salesforceCredentials import username,password,security_token |
| 8 | +import time |
| 9 | + |
| 10 | +# Salesforce credentials |
| 11 | +sf = Salesforce(username=username, password=password, security_token=security_token) |
| 12 | + |
| 13 | +parser = argparse.ArgumentParser() |
| 14 | +parser.add_argument('--auth-type', default='PLAIN') |
| 15 | +parser.add_argument('--bootstrap-port', default='9092') |
| 16 | +parser.add_argument('--bootstrap-server', default='cell-1.streaming.<OCI Region>.oci.oraclecloud.com') |
| 17 | +parser.add_argument('--checkpoint-location', default='oci://<bucketname>@<namespace>/<checkpoint folder>') |
| 18 | +parser.add_argument('--encryption', default='SASL_SSL') |
| 19 | +parser.add_argument('--ocid') |
| 20 | +parser.add_argument('--output-location', default='oci://<bucketname>@<namespace>/<output folder>') |
| 21 | +parser.add_argument('--output-mode', default='file') |
| 22 | +parser.add_argument('--stream-password',default='<Token>') |
| 23 | +parser.add_argument('--raw-stream', default='<Topic name>') |
| 24 | +parser.add_argument('--stream-username',default='<tenancy name>/<username>/<streampool OCID>') |
| 25 | +args = parser.parse_args() |
| 26 | + |
| 27 | +if args.bootstrap_server is None: |
| 28 | + args.bootstrap_server = os.environ.get('BOOTSTRAP_SERVER') |
| 29 | +if args.raw_stream is None: |
| 30 | + args.raw_stream = os.environ.get('RAW_STREAM') |
| 31 | +if args.stream_username is None: |
| 32 | + args.stream_username = os.environ.get('STREAM_USERNAME') |
| 33 | +if args.stream_password is None: |
| 34 | + args.stream_password = os.environ.get('STREAM_PASSWORD') |
| 35 | + |
| 36 | +assert args.bootstrap_server is not None, "Kafka bootstrap server (--bootstrap-server) name must be set" |
| 37 | +assert args.checkpoint_location is not None, "Checkpoint location (--checkpoint-location) must be set" |
| 38 | +assert args.output_location is not None, "Output location (--output-location) must be set" |
| 39 | +assert args.raw_stream is not None, "Kafka topic (--raw-stream) name must be set" |
| 40 | + |
| 41 | +if args.ocid is not None: |
| 42 | + jaas_template = 'com.oracle.bmc.auth.sasl.ResourcePrincipalsLoginModule required intent="streamPoolId:{ocid}";' |
| 43 | + args.auth_type = 'OCI-RSA-SHA256' |
| 44 | +else: |
| 45 | + jaas_template = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="{username}" password="{password}";' |
| 46 | + |
| 47 | +# Function to fetch updated records from Salesforce |
| 48 | +def fetch_updated_records(last_extracted_timestamp): |
| 49 | + soql_query = f"SELECT Id, Name, LastModifiedDate FROM Account WHERE LastModifiedDate > {last_extracted_timestamp}" |
| 50 | + result = sf.query_all(query=soql_query) |
| 51 | + return result.get('records', []) |
| 52 | + |
| 53 | +# Function to convert Salesforce records to PySpark DataFrame |
| 54 | +def records_to_dataframe(records): |
| 55 | + spark = SparkSession.builder.appName("SalesforceApp") \ |
| 56 | + .getOrCreate() |
| 57 | + #spark.sparkContext.setLogLevel('ERROR') |
| 58 | + # Check if records is not empty and has the expected structure |
| 59 | + if records and isinstance(records, list) and isinstance(records[0], dict): |
| 60 | + # Extract values from each record |
| 61 | + data = [(record.get("Id", ""), record.get("Name", ""), record.get("LastModifiedDate", "")) for record in records] |
| 62 | + schema = ["Id", "Name", "LastModifiedDate"] |
| 63 | + return spark.createDataFrame(data, schema=schema) |
| 64 | + else: |
| 65 | + # If records are not in the expected format, return an empty DataFrame |
| 66 | + return spark.createDataFrame([], schema=["Id", "Name", "LastModifiedDate"]) |
| 67 | + |
| 68 | + |
| 69 | +# Main streaming ingestion process |
| 70 | +def main(): |
| 71 | + #Dummy last extracted timestamp |
| 72 | + last_extracted_timestamp = '2000-01-01T00:00:00Z' |
| 73 | + while True: |
| 74 | + # Fetch updated records from Salesforce |
| 75 | + records = fetch_updated_records(last_extracted_timestamp) |
| 76 | + if records: |
| 77 | + # Convert records to DataFrame |
| 78 | + df = records_to_dataframe(records) |
| 79 | + |
| 80 | + # Write to Kafka |
| 81 | + df_transformed = df.selectExpr("CAST(Id AS STRING) AS key", "CAST(Name AS STRING) AS value", "CAST(LastModifiedDate AS TIMESTAMP) AS timestamp") |
| 82 | + # Kafka configuration |
| 83 | + df_transformed \ |
| 84 | + .write \ |
| 85 | + .format("kafka") \ |
| 86 | + .option("kafka.bootstrap.servers", '{}:{}'.format(args.bootstrap_server, |
| 87 | + args.bootstrap_port)) \ |
| 88 | + .option("kafka.enable.idempotence", "false") \ |
| 89 | + .option("kafka.sasl.jaas.config", jaas_template.format( |
| 90 | + username=args.stream_username, password=args.stream_password |
| 91 | + )) \ |
| 92 | + .option('kafka.sasl.mechanism', args.auth_type) \ |
| 93 | + .option('kafka.security.protocol', args.encryption) \ |
| 94 | + .option("topic", args.raw_stream) \ |
| 95 | + .option("kafka.max.partition.fetch.bytes", "1024 * 1024") \ |
| 96 | + .option("checkpointLocation", args.checkpoint_location) \ |
| 97 | + .mode("append") \ |
| 98 | + .save() |
| 99 | + # Update the last extraction timestamp |
| 100 | + last_extracted_timestamp = max([record["LastModifiedDate"] for record in records]) |
| 101 | + # Sleep for a specified interval before the next iteration |
| 102 | + time.sleep(10) |
| 103 | + |
| 104 | +if __name__ == "__main__": |
| 105 | + # Start the streaming ingestion process |
| 106 | + main() |
0 commit comments