Skip to content

Commit 58ddc0e

Browse files
authored
Merge pull request #629 from oracle-devrel/eloi-opensource
OCI Data Flow - Example to load data into ADW
2 parents ff70af6 + c826c10 commit 58ddc0e

File tree

2 files changed

+280
-0
lines changed

2 files changed

+280
-0
lines changed
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
# Copyright (c) 2023 Oracle and/or its affiliates.
2+
#
3+
# The Universal Permissive License (UPL), Version 1.0
4+
#
5+
# Subject to the condition set forth below, permission is hereby granted to any
6+
# person obtaining a copy of this software, associated documentation and/or data
7+
# (collectively the "Software"), free of charge and under any and all copyright
8+
# rights in the Software, and any and all patent rights owned or freely
9+
# licensable by each licensor hereunder covering either (i) the unmodified
10+
# Software as contributed to or provided by such licensor, or (ii) the Larger
11+
# Works (as defined below), to deal in both
12+
#
13+
# (a) the Software, and
14+
# (b) any piece of software and/or hardware listed in the lrgrwrks.txt file if
15+
# one is included with the Software (each a "Larger Work" to which the Software
16+
# is contributed by such licensors),
17+
# without restriction, including without limitation the rights to copy, create
18+
# derivative works of, display, perform, and distribute the Software and make,
19+
# use, sell, offer for sale, import, export, have made, and have sold the
20+
# Software and the Larger Work(s), and to sublicense the foregoing rights on
21+
# either these or other terms.
22+
#
23+
# This license is subject to the following condition:
24+
# The above copyright notice and either this complete permission notice or at
25+
# a minimum a reference to the UPL must be included in all copies or
26+
# substantial portions of the Software.
27+
#
28+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
29+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
30+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
31+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
32+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
33+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
34+
# SOFTWARE.
35+
36+
import argparse
37+
import os
38+
39+
from pyspark import SparkConf
40+
from pyspark.sql import SparkSession, SQLContext
41+
from pyspark.sql.functions import to_date
42+
from pyspark.sql.functions import explode
43+
44+
45+
def main():
46+
parser = argparse.ArgumentParser()
47+
parser.add_argument("inputpath")
48+
#parser.add_argument("--output-path", required=True)
49+
args = parser.parse_args()
50+
51+
# Set up Spark.
52+
spark = get_dataflow_spark_session()
53+
sql_context = SQLContext(spark)
54+
55+
# Load our data.
56+
df = (
57+
spark.read.format("csv")
58+
#.schema(schema)
59+
.option("inferSchema", "true")
60+
.option("header","true")
61+
.option("multiLine", "false")
62+
.option("delimiter",";")
63+
.option("dateFormat","dd.MM.yyyy")
64+
.load(args.inputpath)
65+
.cache())
66+
# cache the dataset to increase computing speed
67+
68+
df = df.withColumn("DATE_KEY",to_date(df["DATE_KEY"],"dd.MM.yyyy"))
69+
70+
df.write.format("oracle") \
71+
.option("adbId","<autonomous database ocid>") \
72+
.option("dbtable", "<table name>") \
73+
.option("user", "<username>")\
74+
.option("password", "<password>")\
75+
.option("truncate", "true")\
76+
.mode("overwrite")\
77+
.save()
78+
79+
# Show on the console that something happened.
80+
print("Successfully saved {} rows to ADW.".format(df.count()))
81+
82+
def get_dataflow_spark_session(
83+
app_name="DataFlow", file_location=None, profile_name=None, spark_config={}
84+
):
85+
"""
86+
Get a Spark session in a way that supports running locally or in Data Flow.
87+
"""
88+
if in_dataflow():
89+
spark_builder = SparkSession.builder.appName(app_name)
90+
else:
91+
# Import OCI.
92+
try:
93+
import oci
94+
except:
95+
raise Exception(
96+
"You need to install the OCI python library to test locally"
97+
)
98+
99+
# Use defaults for anything unset.
100+
if file_location is None:
101+
file_location = oci.config.DEFAULT_LOCATION
102+
if profile_name is None:
103+
profile_name = oci.config.DEFAULT_PROFILE
104+
105+
# Load the config file.
106+
try:
107+
oci_config = oci.config.from_file(
108+
file_location=file_location, profile_name=profile_name
109+
)
110+
except Exception as e:
111+
print("You need to set up your OCI config properly to run locally")
112+
raise e
113+
conf = SparkConf()
114+
conf.set("fs.oci.client.auth.tenantId", oci_config["tenancy"])
115+
conf.set("fs.oci.client.auth.userId", oci_config["user"])
116+
conf.set("fs.oci.client.auth.fingerprint", oci_config["fingerprint"])
117+
conf.set("fs.oci.client.auth.pemfilepath", oci_config["key_file"])
118+
conf.set(
119+
"fs.oci.client.hostname",
120+
"https://objectstorage.{0}.oraclecloud.com".format(oci_config["region"]),
121+
)
122+
spark_builder = SparkSession.builder.appName(app_name).config(conf=conf)
123+
124+
# Add in extra configuration.
125+
for key, val in spark_config.items():
126+
spark_builder.config(key, val)
127+
128+
# Create the Spark session.
129+
session = spark_builder.getOrCreate()
130+
return session
131+
132+
133+
def in_dataflow():
134+
"""
135+
Determine if we are running in OCI Data Flow by checking the environment.
136+
"""
137+
if os.environ.get("HOME") == "/home/dataflow":
138+
return True
139+
return False
140+
141+
142+
if __name__ == "__main__":
143+
main()
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
# Copyright (c) 2023 Oracle and/or its affiliates.
2+
#
3+
# The Universal Permissive License (UPL), Version 1.0
4+
#
5+
# Subject to the condition set forth below, permission is hereby granted to any
6+
# person obtaining a copy of this software, associated documentation and/or data
7+
# (collectively the "Software"), free of charge and under any and all copyright
8+
# rights in the Software, and any and all patent rights owned or freely
9+
# licensable by each licensor hereunder covering either (i) the unmodified
10+
# Software as contributed to or provided by such licensor, or (ii) the Larger
11+
# Works (as defined below), to deal in both
12+
#
13+
# (a) the Software, and
14+
# (b) any piece of software and/or hardware listed in the lrgrwrks.txt file if
15+
# one is included with the Software (each a "Larger Work" to which the Software
16+
# is contributed by such licensors),
17+
# without restriction, including without limitation the rights to copy, create
18+
# derivative works of, display, perform, and distribute the Software and make,
19+
# use, sell, offer for sale, import, export, have made, and have sold the
20+
# Software and the Larger Work(s), and to sublicense the foregoing rights on
21+
# either these or other terms.
22+
#
23+
# This license is subject to the following condition:
24+
# The above copyright notice and either this complete permission notice or at
25+
# a minimum a reference to the UPL must be included in all copies or
26+
# substantial portions of the Software.
27+
#
28+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
29+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
30+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
31+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
32+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
33+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
34+
# SOFTWARE.
35+
import io
36+
import json
37+
import logging
38+
import oci
39+
import os
40+
import requests
41+
import sys
42+
43+
from fdk import response
44+
from oci.signer import Signer
45+
46+
def handler(ctx, data: io.BytesIO=None):
47+
# Really doesn't belong here, unsure how to enable logging remotely otherwise.
48+
logging.basicConfig(level=logging.INFO)
49+
50+
retval = dict()
51+
# For extra debugging, uncomment this. Populate the initial return value.
52+
# retval = dict(os.environ)
53+
54+
# Get an appropriate signer, automatically.
55+
logging.info('Using resource principal for private key')
56+
signer = oci.auth.signers.get_resource_principals_signer()
57+
58+
# Ensure we got valid JSON input and all fields accounted for.
59+
try:
60+
body = json.loads(data.getvalue())
61+
body = body['data']
62+
for item in ['resourceName']:
63+
if item not in body:
64+
retval['error'] = 'Missing mandatory field ' + item
65+
return response.Response(
66+
ctx,
67+
response_data=json.dumps(retval),
68+
headers={"Content-Type": "application/json"})
69+
70+
applicationId = body.get('applicationId',' <OCI ID application data flow>')
71+
compartmentId = body.get('compartmentId','<Compartment ID>')
72+
displayName = body.get('displayName','MaterialInventory')
73+
#driverShape = body.get('driverShape','VM.Standard2.1')
74+
#executorShape = body.get('executorShape','VM.Standard2.1')
75+
numExecutors = body.get('numExecutors',1)
76+
pool_id = '<Pool ID>'
77+
region = body.get('region', '<Regios>')
78+
79+
resourceName = body.get('resourceName')
80+
81+
if 'parameters' not in body:
82+
parameters = dict()
83+
else:
84+
parameters = body.get('parameters')
85+
parameters['input-path'] = 'oci://<bucket name >@<namespace bucket>/{}'.format(resourceName)
86+
logging.info(parameters['input-path'])
87+
88+
except (Exception, ValueError) as ex:
89+
retval['error'] = str(ex)
90+
return response.Response(
91+
ctx,
92+
response_data=json.dumps(retval),
93+
headers={"Content-Type": "application/json"})
94+
95+
# Call Data Flow.
96+
dataflow_root = 'https://dataflow.{}.oci.oraclecloud.com/20200129'.format(region)
97+
dataflow_runs_endpoint = dataflow_root + '/runs'
98+
run_payload = dict(
99+
compartmentId=compartmentId,
100+
applicationId=applicationId,
101+
displayName=displayName,
102+
applicationSettings=dict(
103+
#driverShape=driverShape,
104+
#executorShape=executorShape,
105+
numExecutors=numExecutors,
106+
pool_id=pool_id,
107+
arguments=[
108+
dict(name=key, value=value) for key, value in parameters.items()
109+
]
110+
),
111+
)
112+
retval['run_payload'] = run_payload
113+
try:
114+
result = requests.post(
115+
dataflow_runs_endpoint,
116+
json=run_payload,
117+
auth=signer)
118+
result_obj = json.loads(result.text)
119+
if 'id' not in result_obj:
120+
retval['error'] = result.text
121+
else:
122+
runid = result_obj['id']
123+
retval['runid'] = result_obj['id']
124+
except Exception as ex:
125+
retval['error'] = str(ex)
126+
return response.Response(ctx,
127+
response_data=json.dumps(retval),
128+
headers={"Content-Type": "application/json"})
129+
130+
if __name__ == '__main__':
131+
from fdk import context
132+
ctx = context.InvokeContext(None, None, None)
133+
134+
# Read stdin and turn it into BytesIO
135+
input = io.BytesIO(sys.stdin.read().encode())
136+
retval = handler(ctx, input)
137+
print(retval.body())

0 commit comments

Comments
 (0)