Skip to content

Commit da408fb

Browse files
authored
Example to call OCI Data Flow from OCI Functions
1 parent 6e44fd2 commit da408fb

File tree

1 file changed

+137
-0
lines changed

1 file changed

+137
-0
lines changed
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
# Copyright (c) 2023 Oracle and/or its affiliates.
2+
#
3+
# The Universal Permissive License (UPL), Version 1.0
4+
#
5+
# Subject to the condition set forth below, permission is hereby granted to any
6+
# person obtaining a copy of this software, associated documentation and/or data
7+
# (collectively the "Software"), free of charge and under any and all copyright
8+
# rights in the Software, and any and all patent rights owned or freely
9+
# licensable by each licensor hereunder covering either (i) the unmodified
10+
# Software as contributed to or provided by such licensor, or (ii) the Larger
11+
# Works (as defined below), to deal in both
12+
#
13+
# (a) the Software, and
14+
# (b) any piece of software and/or hardware listed in the lrgrwrks.txt file if
15+
# one is included with the Software (each a "Larger Work" to which the Software
16+
# is contributed by such licensors),
17+
# without restriction, including without limitation the rights to copy, create
18+
# derivative works of, display, perform, and distribute the Software and make,
19+
# use, sell, offer for sale, import, export, have made, and have sold the
20+
# Software and the Larger Work(s), and to sublicense the foregoing rights on
21+
# either these or other terms.
22+
#
23+
# This license is subject to the following condition:
24+
# The above copyright notice and either this complete permission notice or at
25+
# a minimum a reference to the UPL must be included in all copies or
26+
# substantial portions of the Software.
27+
#
28+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
29+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
30+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
31+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
32+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
33+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
34+
# SOFTWARE.
35+
import io
36+
import json
37+
import logging
38+
import oci
39+
import os
40+
import requests
41+
import sys
42+
43+
from fdk import response
44+
from oci.signer import Signer
45+
46+
def handler(ctx, data: io.BytesIO=None):
47+
# Really doesn't belong here, unsure how to enable logging remotely otherwise.
48+
logging.basicConfig(level=logging.INFO)
49+
50+
retval = dict()
51+
# For extra debugging, uncomment this. Populate the initial return value.
52+
# retval = dict(os.environ)
53+
54+
# Get an appropriate signer, automatically.
55+
logging.info('Using resource principal for private key')
56+
signer = oci.auth.signers.get_resource_principals_signer()
57+
58+
# Ensure we got valid JSON input and all fields accounted for.
59+
try:
60+
body = json.loads(data.getvalue())
61+
body = body['data']
62+
for item in ['resourceName']:
63+
if item not in body:
64+
retval['error'] = 'Missing mandatory field ' + item
65+
return response.Response(
66+
ctx,
67+
response_data=json.dumps(retval),
68+
headers={"Content-Type": "application/json"})
69+
70+
applicationId = body.get('applicationId','<OCI ID application data flow>')
71+
compartmentId = body.get('compartmentId','<Compartment ID>')
72+
displayName = body.get('displayName','MaterialInventory')
73+
#driverShape = body.get('driverShape','VM.Standard2.1')
74+
#executorShape = body.get('executorShape','VM.Standard2.1')
75+
numExecutors = body.get('numExecutors',1)
76+
pool_id = '<Pool ID>'
77+
region = body.get('region', '<Regios>')
78+
79+
resourceName = body.get('resourceName')
80+
81+
if 'parameters' not in body:
82+
parameters = dict()
83+
else:
84+
parameters = body.get('parameters')
85+
parameters['input-path'] = 'oci://<bucket name>@<namespace bucket>/{}'.format(resourceName)
86+
logging.info(parameters['input-path'])
87+
88+
except (Exception, ValueError) as ex:
89+
retval['error'] = str(ex)
90+
return response.Response(
91+
ctx,
92+
response_data=json.dumps(retval),
93+
headers={"Content-Type": "application/json"})
94+
95+
# Call Data Flow.
96+
dataflow_root = 'https://dataflow.{}.oci.oraclecloud.com/20200129'.format(region)
97+
dataflow_runs_endpoint = dataflow_root + '/runs'
98+
run_payload = dict(
99+
compartmentId=compartmentId,
100+
applicationId=applicationId,
101+
displayName=displayName,
102+
applicationSettings=dict(
103+
#driverShape=driverShape,
104+
#executorShape=executorShape,
105+
numExecutors=numExecutors,
106+
pool_id=pool_id,
107+
arguments=[
108+
dict(name=key, value=value) for key, value in parameters.items()
109+
]
110+
),
111+
)
112+
retval['run_payload'] = run_payload
113+
try:
114+
result = requests.post(
115+
dataflow_runs_endpoint,
116+
json=run_payload,
117+
auth=signer)
118+
result_obj = json.loads(result.text)
119+
if 'id' not in result_obj:
120+
retval['error'] = result.text
121+
else:
122+
runid = result_obj['id']
123+
retval['runid'] = result_obj['id']
124+
except Exception as ex:
125+
retval['error'] = str(ex)
126+
return response.Response(ctx,
127+
response_data=json.dumps(retval),
128+
headers={"Content-Type": "application/json"})
129+
130+
if __name__ == '__main__':
131+
from fdk import context
132+
ctx = context.InvokeContext(None, None, None)
133+
134+
# Read stdin and turn it into BytesIO
135+
input = io.BytesIO(sys.stdin.read().encode())
136+
retval = handler(ctx, input)
137+
print(retval.body())

0 commit comments

Comments
 (0)