|
1 | | -import sys |
2 | | -from awsglue.transforms import * |
3 | | -from awsglue.utils import getResolvedOptions |
4 | | -from pyspark.context import SparkContext |
5 | | -from awsglue.context import GlueContext |
6 | | -from awsglue.job import Job |
7 | | -from awsglue import DynamicFrame |
| 1 | +""" |
| 2 | +Only need to change the table name and the query prototyped on the Athena UI |
| 3 | +by replacing table_name and query_on_athena |
| 4 | +Note: python file name should be the same as the table name |
| 5 | +""" |
8 | 6 |
|
| 7 | +from scripts.helpers.athena_helpers import create_update_table_with_partition |
9 | 8 | from scripts.helpers.helpers import get_glue_env_var |
10 | | -environment = get_glue_env_var("environment") |
11 | | - |
12 | | - |
13 | | -def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame: |
14 | | - for alias, frame in mapping.items(): |
15 | | - frame.toDF().createOrReplaceTempView(alias) |
16 | | - result = spark.sql(query) |
17 | | - return DynamicFrame.fromDF(result, glueContext, transformation_ctx) |
18 | | - |
19 | | - |
20 | | -args = getResolvedOptions(sys.argv, ["JOB_NAME"]) |
21 | | -sc = SparkContext() |
22 | | -glueContext = GlueContext(sc) |
23 | | -spark = glueContext.spark_session |
24 | | -job = Job(glueContext) |
25 | | -job.init(args["JOB_NAME"], args) |
26 | 9 |
|
27 | | -# Script generated for node Amazon S3 |
28 | | -AmazonS3_node1625732651466 = glueContext.create_dynamic_frame.from_catalog( |
29 | | - database="dataplatform-" + environment + "-liberator-refined-zone", |
30 | | - table_name="pcnfoidetails_pcn_foi_full", |
31 | | - transformation_ctx="AmazonS3_node1625732651466", |
32 | | -) |
| 10 | +environment = get_glue_env_var("environment") |
33 | 11 |
|
34 | | -# Script generated for node Amazon S3 |
35 | | -AmazonS3_node1646229922398 = glueContext.create_dynamic_frame.from_catalog( |
36 | | - database="dataplatform-" + environment + "-liberator-raw-zone", |
37 | | - table_name="liberator_pcn_tickets", |
38 | | - transformation_ctx="AmazonS3_node1646229922398", |
39 | | -) |
| 12 | +# The target table in liberator refined zone |
| 13 | +table_name = "Parking_Foreign_VRM_PCNs" |
40 | 14 |
|
41 | | -# Script generated for node ApplyMapping |
42 | | -SqlQuery0 = """ |
| 15 | +# The exact same query prototyped in pre-prod(stg) or prod Athena |
| 16 | +query_on_athena = """ |
43 | 17 | /************************************************************************************************************************* |
44 | | -Parking_Foreign_VRM_PCNs |
| 18 | + Parking_Foreign_VRM_PCNs |
45 | 19 |
|
46 | | -This SQL creates the list of VRMs against the current valid Permits |
| 20 | + This SQL creates the list of VRMs against the current valid Permits |
47 | 21 |
|
48 | | -02/03/2022 - Create Query |
49 | | -*************************************************************************************************************************/ |
50 | | -/* Collect the Foreign PCNs from the raw PCN data */ |
| 22 | + 02/03/2022 - Create Query |
| 23 | + *************************************************************************************************************************/ |
51 | 24 | With PCN_Raw as ( |
52 | | -select * from liberator_pcn_tickets |
53 | | -Where Import_Date = (Select MAX(Import_Date) |
54 | | - from liberator_pcn_tickets) and lower(foreignvehiclecountry) = 'y') |
55 | | -
|
56 | | -SELECT |
57 | | - PCN, cast(pcnissuedatetime as timestamp) as pcnissuedatetime, pcn_canx_date, A.cancellationgroup, |
58 | | - A.cancellationreason, street_location, |
59 | | - whereonlocation, zone, usrn, A.contraventioncode, debttype, A.vrm, vehiclemake, vehiclemodel, |
60 | | - vehiclecolour, ceo, isremoval, A.progressionstage, lib_payment_received as payment_received, |
61 | | - whenpaid as PaymentDate, noderef, replace(replace(ticketnotes, '\r',''), '\n','') as ticketnotes, |
62 | | - |
63 | | - current_timestamp() as ImportDateTime, |
64 | | - |
65 | | - replace(cast(current_date() as string),'-','') as import_date, |
66 | | - |
67 | | - -- Add the Import date |
68 | | - cast(Year(current_date) as string) as import_year, |
69 | | - cast(month(current_date) as string) as import_month, |
70 | | - cast(day(current_date) as string) as import_day |
71 | | - |
72 | | - |
73 | | - FROM pcnfoidetails_pcn_foi_full as A |
74 | | - INNER JOIN PCN_Raw as B ON A.pcn = B.ticketserialnumber |
75 | | - Where ImportDateTime = (Select MAX(ImportDateTime) from pcnfoidetails_pcn_foi_full) and warningflag = 0 |
76 | | - Order By pcnissuedatetime |
77 | | -""" |
78 | | -ApplyMapping_node2 = sparkSqlQuery( |
79 | | - glueContext, |
80 | | - query=SqlQuery0, |
81 | | - mapping={ |
82 | | - "pcnfoidetails_pcn_foi_full": AmazonS3_node1625732651466, |
83 | | - "liberator_pcn_tickets": AmazonS3_node1646229922398, |
84 | | - }, |
85 | | - transformation_ctx="ApplyMapping_node2", |
| 25 | + select * |
| 26 | + from "dataplatform-prod-liberator-raw-zone".liberator_pcn_tickets |
| 27 | + Where Import_Date = ( |
| 28 | + Select MAX(Import_Date) |
| 29 | + from "dataplatform-prod-liberator-raw-zone".liberator_pcn_tickets |
| 30 | + ) |
| 31 | + and lower(foreignvehiclecountry) = 'y' |
86 | 32 | ) |
| 33 | +SELECT PCN, |
| 34 | + cast(pcnissuedatetime as timestamp) as pcnissuedatetime, |
| 35 | + pcn_canx_date, |
| 36 | + A.cancellationgroup, |
| 37 | + A.cancellationreason, |
| 38 | + street_location, |
| 39 | + whereonlocation, |
| 40 | + zone, |
| 41 | + usrn, |
| 42 | + A.contraventioncode, |
| 43 | + debttype, |
| 44 | + A.vrm, |
| 45 | + vehiclemake, |
| 46 | + vehiclemodel, |
| 47 | + vehiclecolour, |
| 48 | + ceo, |
| 49 | + isremoval, |
| 50 | + A.progressionstage, |
| 51 | + lib_payment_received as payment_received, |
| 52 | + whenpaid as PaymentDate, |
| 53 | + noderef, |
| 54 | + replace(replace(ticketnotes, '\r', ''), '\n', '') as ticketnotes, |
| 55 | + cast(current_timestamp as timestamp) as ImportDateTime, |
| 56 | + format_datetime(current_date, 'yyyy') AS import_year, |
| 57 | + format_datetime(current_date, 'MM') AS import_month, |
| 58 | + format_datetime(current_date, 'dd') AS import_day, |
| 59 | + format_datetime(current_date, 'yyyyMMdd') AS import_date |
| 60 | +FROM "dataplatform-prod-liberator-refined-zone".pcnfoidetails_pcn_foi_full as A |
| 61 | + INNER JOIN PCN_Raw as B ON A.pcn = B.ticketserialnumber |
| 62 | +Where ImportDateTime = ( |
| 63 | + Select MAX(ImportDateTime) |
| 64 | + from "dataplatform-prod-liberator-refined-zone".pcnfoidetails_pcn_foi_full |
| 65 | + ) |
| 66 | + and warningflag = 0 |
| 67 | +Order By pcnissuedatetime; |
| 68 | +""" |
87 | 69 |
|
88 | | -# Script generated for node S3 bucket |
89 | | -S3bucket_node3 = glueContext.getSink( |
90 | | - path="s3://dataplatform-" + environment + "-refined-zone/parking/liberator/Parking_Foreign_VRM_PCNs/", |
91 | | - connection_type="s3", |
92 | | - updateBehavior="UPDATE_IN_DATABASE", |
93 | | - partitionKeys=["import_year", "import_month", "import_day"], |
94 | | - enableUpdateCatalog=True, |
95 | | - transformation_ctx="S3bucket_node3", |
96 | | -) |
97 | | -S3bucket_node3.setCatalogInfo( |
98 | | - catalogDatabase="dataplatform-" + environment + "-liberator-refined-zone", |
99 | | - catalogTableName="Parking_Foreign_VRM_PCNs", |
| 70 | +create_update_table_with_partition( |
| 71 | + environment=environment, query_on_athena=query_on_athena, table_name=table_name |
100 | 72 | ) |
101 | | -S3bucket_node3.setFormat("glueparquet") |
102 | | -S3bucket_node3.writeFrame(ApplyMapping_node2) |
103 | | -job.commit() |
0 commit comments