1
+ # Copyright (c) 2023 Oracle and/or its affiliates.
2
+ #
3
+ # The Universal Permissive License (UPL), Version 1.0
4
+ #
5
+ # Subject to the condition set forth below, permission is hereby granted to any
6
+ # person obtaining a copy of this software, associated documentation and/or data
7
+ # (collectively the "Software"), free of charge and under any and all copyright
8
+ # rights in the Software, and any and all patent rights owned or freely
9
+ # licensable by each licensor hereunder covering either (i) the unmodified
10
+ # Software as contributed to or provided by such licensor, or (ii) the Larger
11
+ # Works (as defined below), to deal in both
12
+ #
13
+ # (a) the Software, and
14
+ # (b) any piece of software and/or hardware listed in the lrgrwrks.txt file if
15
+ # one is included with the Software (each a "Larger Work" to which the Software
16
+ # is contributed by such licensors),
17
+ # without restriction, including without limitation the rights to copy, create
18
+ # derivative works of, display, perform, and distribute the Software and make,
19
+ # use, sell, offer for sale, import, export, have made, and have sold the
20
+ # Software and the Larger Work(s), and to sublicense the foregoing rights on
21
+ # either these or other terms.
22
+ #
23
+ # This license is subject to the following condition:
24
+ # The above copyright notice and either this complete permission notice or at
25
+ # a minimum a reference to the UPL must be included in all copies or
26
+ # substantial portions of the Software.
27
+ #
28
+ # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
29
+ # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
30
+ # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
31
+ # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
32
+ # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
33
+ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
34
+ # SOFTWARE.
35
+
36
+ import argparse
37
+ import os
38
+
39
+ from pyspark import SparkConf
40
+ from pyspark .sql import SparkSession , SQLContext
41
+ from pyspark .sql .functions import to_date
42
+ from pyspark .sql .functions import explode
43
+
44
+
45
+ def main ():
46
+ parser = argparse .ArgumentParser ()
47
+ parser .add_argument ("inputpath" )
48
+ #parser.add_argument("--output-path", required=True)
49
+ args = parser .parse_args ()
50
+
51
+ # Set up Spark.
52
+ spark = get_dataflow_spark_session ()
53
+ sql_context = SQLContext (spark )
54
+
55
+ # Load our data.
56
+ df = (
57
+ spark .read .format ("csv" )
58
+ #.schema(schema)
59
+ .option ("inferSchema" , "true" )
60
+ .option ("header" ,"true" )
61
+ .option ("multiLine" , "false" )
62
+ .option ("delimiter" ,";" )
63
+ .option ("dateFormat" ,"dd.MM.yyyy" )
64
+ .load (args .inputpath )
65
+ .cache ())
66
+ # cache the dataset to increase computing speed
67
+
68
+ df = df .withColumn ("DATE_KEY" ,to_date (df ["DATE_KEY" ],"dd.MM.yyyy" ))
69
+
70
+ df .write .format ("oracle" ) \
71
+ .option ("adbId" ,"<autonomous database ocid>" ) \
72
+ .option ("dbtable" , "<table name>" ) \
73
+ .option ("user" , "<username>" )\
74
+ .option ("password" , "<password>" )\
75
+ .option ("truncate" , "true" )\
76
+ .mode ("overwrite" )\
77
+ .save ()
78
+
79
+ # Show on the console that something happened.
80
+ print ("Successfully saved {} rows to ADW." .format (df .count ()))
81
+
82
+ def get_dataflow_spark_session (
83
+ app_name = "DataFlow" , file_location = None , profile_name = None , spark_config = {}
84
+ ):
85
+ """
86
+ Get a Spark session in a way that supports running locally or in Data Flow.
87
+ """
88
+ if in_dataflow ():
89
+ spark_builder = SparkSession .builder .appName (app_name )
90
+ else :
91
+ # Import OCI.
92
+ try :
93
+ import oci
94
+ except :
95
+ raise Exception (
96
+ "You need to install the OCI python library to test locally"
97
+ )
98
+
99
+ # Use defaults for anything unset.
100
+ if file_location is None :
101
+ file_location = oci .config .DEFAULT_LOCATION
102
+ if profile_name is None :
103
+ profile_name = oci .config .DEFAULT_PROFILE
104
+
105
+ # Load the config file.
106
+ try :
107
+ oci_config = oci .config .from_file (
108
+ file_location = file_location , profile_name = profile_name
109
+ )
110
+ except Exception as e :
111
+ print ("You need to set up your OCI config properly to run locally" )
112
+ raise e
113
+ conf = SparkConf ()
114
+ conf .set ("fs.oci.client.auth.tenantId" , oci_config ["tenancy" ])
115
+ conf .set ("fs.oci.client.auth.userId" , oci_config ["user" ])
116
+ conf .set ("fs.oci.client.auth.fingerprint" , oci_config ["fingerprint" ])
117
+ conf .set ("fs.oci.client.auth.pemfilepath" , oci_config ["key_file" ])
118
+ conf .set (
119
+ "fs.oci.client.hostname" ,
120
+ "https://objectstorage.{0}.oraclecloud.com" .format (oci_config ["region" ]),
121
+ )
122
+ spark_builder = SparkSession .builder .appName (app_name ).config (conf = conf )
123
+
124
+ # Add in extra configuration.
125
+ for key , val in spark_config .items ():
126
+ spark_builder .config (key , val )
127
+
128
+ # Create the Spark session.
129
+ session = spark_builder .getOrCreate ()
130
+ return session
131
+
132
+
133
+ def in_dataflow ():
134
+ """
135
+ Determine if we are running in OCI Data Flow by checking the environment.
136
+ """
137
+ if os .environ .get ("HOME" ) == "/home/dataflow" :
138
+ return True
139
+ return False
140
+
141
+
142
+ if __name__ == "__main__" :
143
+ main ()
0 commit comments