|
1 | | -import sys |
2 | | -from calendar import c |
3 | | - |
4 | | -from awsglue import DynamicFrame |
5 | | -from awsglue.context import GlueContext |
6 | | -from awsglue.job import Job |
7 | | -from awsglue.transforms import * |
8 | | -from awsglue.utils import getResolvedOptions |
9 | | -from pyspark.context import SparkContext |
10 | | - |
11 | | -from scripts.helpers.helpers import ( |
12 | | - PARTITION_KEYS, |
13 | | - create_pushdown_predicate, |
14 | | - get_glue_env_var, |
15 | | - get_latest_partitions, |
16 | | -) |
17 | | - |
18 | | - |
19 | | -def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame: |
20 | | - for alias, frame in mapping.items(): |
21 | | - frame.toDF().createOrReplaceTempView(alias) |
22 | | - result = spark.sql(query) |
23 | | - return DynamicFrame.fromDF(result, glueContext, transformation_ctx) |
| 1 | +""" |
| 2 | +Only need to change the table name and the query prototyped on the Athena UI |
| 3 | +by replacing table_name and query_on_athena |
| 4 | +""" |
24 | 5 |
|
| 6 | +from scripts.helpers.athena_helpers import create_update_table_with_partition |
| 7 | +from scripts.helpers.helpers import get_glue_env_var |
25 | 8 |
|
26 | | -args = getResolvedOptions(sys.argv, ["JOB_NAME"]) |
27 | | -sc = SparkContext() |
28 | | -glueContext = GlueContext(sc) |
29 | | -spark = glueContext.spark_session |
30 | | -job = Job(glueContext) |
31 | | -job.init(args["JOB_NAME"], args) |
32 | 9 | environment = get_glue_env_var("environment") |
33 | 10 |
|
34 | | -# Script generated for node S3 bucket - refined - parking_permit_denormalised_data |
35 | | -S3bucketrefinedparking_permit_denormalised_data_node1 = glueContext.create_dynamic_frame.from_catalog( |
36 | | - database="dataplatform-" + environment + "-liberator-refined-zone", |
37 | | - table_name="parking_permit_denormalised_data", |
38 | | - transformation_ctx="S3bucketrefinedparking_permit_denormalised_data_node1", |
39 | | - # teporarily removed while table partitions are fixed |
40 | | - # push_down_predicate=create_pushdown_predicate("import_date", 7), |
41 | | -) |
42 | | - |
43 | | -# Script generated for node Amazon S3 - raw - liberator_permit_llpg |
44 | | -AmazonS3rawliberator_permit_llpg_node1657535904691 = ( |
45 | | - glueContext.create_dynamic_frame.from_catalog( |
46 | | - database="dataplatform-" + environment + "-liberator-raw-zone", |
47 | | - table_name="liberator_permit_llpg", |
48 | | - transformation_ctx="AmazonS3rawliberator_permit_llpg_node1657535904691", |
49 | | - push_down_predicate=create_pushdown_predicate("import_date", 7), |
50 | | - ) |
51 | | -) |
52 | | - |
53 | | -# Script generated for node Amazon S3 - unrestricted_address_api_dbo_hackney_address |
54 | | -AmazonS3unrestricted_address_api_dbo_hackney_address_node1657535910004 = glueContext.create_dynamic_frame.from_catalog( |
55 | | - database="dataplatform-" + environment + "-raw-zone-unrestricted-address-api", |
56 | | - table_name="unrestricted_address_api_dbo_hackney_address", |
57 | | - transformation_ctx="AmazonS3unrestricted_address_api_dbo_hackney_address_node1657535910004", |
58 | | - push_down_predicate=create_pushdown_predicate("import_date", 30), |
59 | | -) |
60 | | - |
61 | | -# Script generated for node Amazon S3 - parking raw - ltn_london_fields |
62 | | -AmazonS3parkingrawltn_london_fields_node1657536241729 = ( |
63 | | - glueContext.create_dynamic_frame.from_catalog( |
64 | | - database="parking-raw-zone", |
65 | | - table_name="ltn_london_fields", |
66 | | - transformation_ctx="AmazonS3parkingrawltn_london_fields_node1657536241729", |
67 | | - ) |
68 | | -) |
| 11 | +# The target table in liberator refined zone |
| 12 | +table_name = "parking_permit_denormalised_gds_street_llpg" |
69 | 13 |
|
70 | | -# Script generated for node Apply Mapping - New |
71 | | -SqlQuery0 = """ |
72 | | -/*08/04/2022 - added ,case when latest_permit_status in('Approved','Renewed','Created','ORDER_APPROVED','PENDING_VRM_CHANGE','RENEW_EVID','PENDING_ADDR_CHANGE') and live_permit_flag = 1 then 1 else 0 end as live_flag */ |
| 14 | +# The exact same query prototyped in pre-prod(stg) or prod Athena |
| 15 | +query_on_athena = """ |
| 16 | +-- Updated 2025-01-29 "unrestricted-raw-zone"."geolive_llpg_llpg_address" replaces "dataplatform-prod-raw-zone-unrestricted-address-api"."unrestricted_address_api_dbo_hackney_address" |
73 | 17 | with street as (select |
74 | 18 | UPRN as SR_UPRN, |
75 | 19 | ADDRESS1 as SR_ADDRESS1, |
@@ -136,18 +80,47 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra |
136 | 80 | ,import_month |
137 | 81 | ,import_day |
138 | 82 | ,import_date |
139 | | -FROM liberator_permit_llpg |
140 | | -where (ADDRESS1 like 'Street Record' or ADDRESS1 like 'STREET RECORD') and liberator_permit_llpg.import_date = (SELECT MAX(liberator_permit_llpg.import_date) FROM liberator_permit_llpg) |
| 83 | +FROM "dataplatform-prod-liberator-raw-zone".liberator_permit_llpg |
| 84 | +where (ADDRESS1 like 'Street Record' or ADDRESS1 like 'STREET RECORD') and liberator_permit_llpg.import_date = (SELECT MAX(liberator_permit_llpg.import_date) FROM "dataplatform-prod-liberator-raw-zone".liberator_permit_llpg) |
141 | 85 | ) |
142 | 86 | , llpg as ( |
143 | | - SELECT * FROM unrestricted_address_api_dbo_hackney_address where unrestricted_address_api_dbo_hackney_address.import_date = (SELECT max(unrestricted_address_api_dbo_hackney_address.import_date) FROM unrestricted_address_api_dbo_hackney_address) and lpi_logical_status like 'Approved Preferred' |
| 87 | + SELECT |
| 88 | + -- conversion of new column names and data-types to support the original query... |
| 89 | + uprn, -- bigint -- Confirmed: was previously converted from double to bigint to prevent subsequent varchar casts being broken. |
| 90 | + usrn, -- int |
| 91 | + street_description, -- string |
| 92 | + isparent AS property_shell, -- boolean |
| 93 | + blpu_class, -- string |
| 94 | + usage_primary, -- string |
| 95 | + usage_description, -- string |
| 96 | + planning_use_class, -- string |
| 97 | + CAST(longitude AS DOUBLE) AS longitude, -- converts string to double |
| 98 | + CAST(latitude AS DOUBLE) AS latitude, -- converts string to double |
| 99 | + lpi_logical_status_code, -- int BS7666 code |
| 100 | + lpi_logical_status, --string logical description as per previous API version |
| 101 | + import_date -- string |
| 102 | + FROM "unrestricted-raw-zone"."geolive_llpg_llpg_address" |
| 103 | + -- replaces "dataplatform-prod-raw-zone-unrestricted-address-api"."unrestricted_address_api_dbo_hackney_address" |
| 104 | + where import_date = (SELECT max(import_date) FROM "unrestricted-raw-zone"."geolive_llpg_llpg_address") |
| 105 | + and lpi_logical_status_code = 1 -- like 'Approved Preferred' |
144 | 106 | ) |
145 | | -SELECT street.usrn as sr_usrn, SR_ADDRESS1, SR_ADDRESS2, llpg.street_description, SR_WARD_CODE, SR_ward_name, llpg.property_shell, llpg.blpu_class, llpg.usage_primary, llpg.usage_description |
146 | | -,concat(cast(street.usrn as string),' - ', llpg.street_description) as street, concat(cast(llpg.uprn as string),' - ',llpg.usage_description) as add_type , concat(llpg.blpu_class,' - ',llpg.planning_use_class ) as add_class, |
| 107 | +SELECT |
| 108 | + street.usrn as sr_usrn, |
| 109 | + SR_ADDRESS1, SR_ADDRESS2, |
| 110 | + llpg.street_description, |
| 111 | + SR_WARD_CODE, |
| 112 | + SR_ward_name, |
| 113 | + llpg.property_shell, |
| 114 | + llpg.blpu_class, |
| 115 | + llpg.usage_primary, |
| 116 | + llpg.usage_description, |
| 117 | + concat(cast(street.usrn as varchar),' - ', llpg.street_description) as street, |
| 118 | + concat(cast(llpg.uprn as varchar),' - ',llpg.usage_description) as add_type, |
| 119 | + concat(llpg.blpu_class,' - ',llpg.planning_use_class ) as add_class, |
147 | 120 |
|
148 | 121 | case when cpz !='' and cpz_name != '' then concat(cpz,' - ', cpz_name) |
149 | | -when cpz !='' and cpz_name = '' then concat(cpz) |
150 | | -when cpz ='' and cpz_name != '' then concat(cpz_name) |
| 122 | +when cpz !='' and cpz_name = '' then concat(cpz,'-') |
| 123 | +when cpz ='' and cpz_name != '' then concat(cpz_name,'-') |
151 | 124 | else 'NONE' |
152 | 125 | end as zone_name, |
153 | 126 | case |
@@ -219,49 +192,21 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra |
219 | 192 | , Case when ltn_london_fields.uprn !='' then 'LTN London Fields' else 'NOT LTN London Fields' end as flag_name_ltn_london_fields |
220 | 193 |
|
221 | 194 |
|
222 | | -, parking_permit_denormalised_data.*, llpg.planning_use_class, llpg.longitude, llpg.latitude FROM parking_permit_denormalised_data |
| 195 | +, parking_permit_denormalised_data.*, llpg.planning_use_class, llpg.longitude, llpg.latitude FROM "dataplatform-prod-liberator-refined-zone".parking_permit_denormalised_data |
223 | 196 |
|
224 | | -left join llpg on cast(llpg.uprn as string) = cast(parking_permit_denormalised_data.uprn as string) /*and |
225 | | -cast(concat(parking_permit_denormalised_data.import_year,parking_permit_denormalised_data.import_month,parking_permit_denormalised_data.import_day) as string |
| 197 | +left join llpg on cast(llpg.uprn as varchar) = cast(parking_permit_denormalised_data.uprn as varchar) /*and |
| 198 | +cast(concat(parking_permit_denormalised_data.import_year,parking_permit_denormalised_data.import_month,parking_permit_denormalised_data.import_day) as varchar |
226 | 199 | ) = llpg.import_date*/ |
227 | 200 |
|
228 | | -left join street on cast(street.usrn as string) = cast(llpg.usrn as string) |
| 201 | +left join street on cast(street.usrn as varchar) = cast(llpg.usrn as varchar) |
229 | 202 |
|
230 | | -left join ltn_london_fields on ltn_london_fields.uprn = parking_permit_denormalised_data.uprn |
231 | | -
|
232 | | -
|
233 | | -where parking_permit_denormalised_data.import_date = (SELECT max(parking_permit_denormalised_data.import_date) FROM parking_permit_denormalised_data) |
| 203 | +left join "parking-raw-zone".ltn_london_fields on ltn_london_fields.uprn = parking_permit_denormalised_data.uprn |
234 | 204 |
|
235 | 205 |
|
| 206 | +where parking_permit_denormalised_data.import_date = (SELECT max(parking_permit_denormalised_data.import_date) FROM "dataplatform-prod-liberator-refined-zone".parking_permit_denormalised_data) |
| 207 | +; |
236 | 208 | """ |
237 | | -ApplyMappingNew_node1657710041310 = sparkSqlQuery( |
238 | | - glueContext, |
239 | | - query=SqlQuery0, |
240 | | - mapping={ |
241 | | - "parking_permit_denormalised_data": S3bucketrefinedparking_permit_denormalised_data_node1, |
242 | | - "liberator_permit_llpg": AmazonS3rawliberator_permit_llpg_node1657535904691, |
243 | | - "unrestricted_address_api_dbo_hackney_address": AmazonS3unrestricted_address_api_dbo_hackney_address_node1657535910004, |
244 | | - "ltn_london_fields": AmazonS3parkingrawltn_london_fields_node1657536241729, |
245 | | - }, |
246 | | - transformation_ctx="ApplyMappingNew_node1657710041310", |
247 | | -) |
248 | 209 |
|
249 | | -# Script generated for node Amazon S3 |
250 | | -AmazonS3_node1657709892303 = glueContext.getSink( |
251 | | - path="s3://dataplatform-" |
252 | | - + environment |
253 | | - + "-refined-zone/parking/liberator/parking_permit_denormalised_gds_street_llpg/", |
254 | | - connection_type="s3", |
255 | | - updateBehavior="UPDATE_IN_DATABASE", |
256 | | - partitionKeys=["import_year", "import_month", "import_day", "import_date"], |
257 | | - compression="snappy", |
258 | | - enableUpdateCatalog=True, |
259 | | - transformation_ctx="AmazonS3_node1657709892303", |
260 | | -) |
261 | | -AmazonS3_node1657709892303.setCatalogInfo( |
262 | | - catalogDatabase="dataplatform-" + environment + "-liberator-refined-zone", |
263 | | - catalogTableName="parking_permit_denormalised_gds_street_llpg", |
| 210 | +create_update_table_with_partition( |
| 211 | + environment=environment, query_on_athena=query_on_athena, table_name=table_name |
264 | 212 | ) |
265 | | -AmazonS3_node1657709892303.setFormat("glueparquet") |
266 | | -AmazonS3_node1657709892303.writeFrame(ApplyMappingNew_node1657710041310) |
267 | | -job.commit() |
0 commit comments