Skip to content

Commit a5df14d

Browse files
authored
Merge branch 'main' into CE-2642-adding-s3-bucket-for-mwaa-etl-scripts
2 parents 2a117c4 + 2c3a22c commit a5df14d

File tree

3 files changed

+271
-158
lines changed

3 files changed

+271
-158
lines changed

scripts/jobs/parking/parking_permit_denormalised_gds_street_llpg.py

Lines changed: 65 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
1-
from calendar import c
21
import sys
2+
from calendar import c
3+
4+
from awsglue import DynamicFrame
5+
from awsglue.context import GlueContext
6+
from awsglue.job import Job
37
from awsglue.transforms import *
48
from awsglue.utils import getResolvedOptions
59
from pyspark.context import SparkContext
6-
from awsglue.context import GlueContext
7-
from awsglue.job import Job
8-
from awsglue import DynamicFrame
10+
911
from scripts.helpers.helpers import (
10-
get_glue_env_var,
11-
get_latest_partitions,
1212
PARTITION_KEYS,
1313
create_pushdown_predicate,
14+
get_glue_env_var,
15+
get_latest_partitions,
1416
)
1517

18+
1619
def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
1720
for alias, frame in mapping.items():
1821
frame.toDF().createOrReplaceTempView(alias)
@@ -29,20 +32,18 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra
2932
environment = get_glue_env_var("environment")
3033

3134
# Script generated for node S3 bucket - refined - parking_permit_denormalised_data
32-
S3bucketrefinedparking_permit_denormalised_data_node1 = (
33-
glueContext.create_dynamic_frame.from_catalog(
34-
database="dataplatform-"+environment+"-liberator-refined-zone",
35-
table_name="parking_permit_denormalised_data",
36-
transformation_ctx="S3bucketrefinedparking_permit_denormalised_data_node1",
37-
#teporarily removed while table partitions are fixed
38-
#push_down_predicate=create_pushdown_predicate("import_date", 7),
39-
)
35+
S3bucketrefinedparking_permit_denormalised_data_node1 = glueContext.create_dynamic_frame.from_catalog(
36+
database="dataplatform-" + environment + "-liberator-refined-zone",
37+
table_name="parking_permit_denormalised_data",
38+
transformation_ctx="S3bucketrefinedparking_permit_denormalised_data_node1",
39+
# teporarily removed while table partitions are fixed
40+
# push_down_predicate=create_pushdown_predicate("import_date", 7),
4041
)
4142

4243
# Script generated for node Amazon S3 - raw - liberator_permit_llpg
4344
AmazonS3rawliberator_permit_llpg_node1657535904691 = (
4445
glueContext.create_dynamic_frame.from_catalog(
45-
database="dataplatform-"+environment+"-liberator-raw-zone",
46+
database="dataplatform-" + environment + "-liberator-raw-zone",
4647
table_name="liberator_permit_llpg",
4748
transformation_ctx="AmazonS3rawliberator_permit_llpg_node1657535904691",
4849
push_down_predicate=create_pushdown_predicate("import_date", 7),
@@ -51,10 +52,10 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra
5152

5253
# Script generated for node Amazon S3 - unrestricted_address_api_dbo_hackney_address
5354
AmazonS3unrestricted_address_api_dbo_hackney_address_node1657535910004 = glueContext.create_dynamic_frame.from_catalog(
54-
database="dataplatform-"+environment+"-raw-zone-unrestricted-address-api",
55+
database="dataplatform-" + environment + "-raw-zone-unrestricted-address-api",
5556
table_name="unrestricted_address_api_dbo_hackney_address",
5657
transformation_ctx="AmazonS3unrestricted_address_api_dbo_hackney_address_node1657535910004",
57-
push_down_predicate=create_pushdown_predicate("import_date", 7),
58+
push_down_predicate=create_pushdown_predicate("import_date", 30),
5859
)
5960

6061
# Script generated for node Amazon S3 - parking raw - ltn_london_fields
@@ -69,7 +70,7 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra
6970
# Script generated for node Apply Mapping - New
7071
SqlQuery0 = """
7172
/*08/04/2022 - added ,case when latest_permit_status in('Approved','Renewed','Created','ORDER_APPROVED','PENDING_VRM_CHANGE','RENEW_EVID','PENDING_ADDR_CHANGE') and live_permit_flag = 1 then 1 else 0 end as live_flag */
72-
with street as (select
73+
with street as (select
7374
UPRN as SR_UPRN,
7475
ADDRESS1 as SR_ADDRESS1,
7576
ADDRESS2 as SR_ADDRESS2,
@@ -96,27 +97,27 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra
9697
STREET_END_X as SR_STREET_END_X,
9798
STREET_END_Y as SR_STREET_END_Y,
9899
WARD_CODE as SR_WARD_CODE,
99-
if(WARD_CODE = 'E05009367', 'BROWNSWOOD WARD',
100-
if(WARD_CODE = 'E05009368', 'CAZENOVE WARD',
101-
if(WARD_CODE = 'E05009369', 'CLISSOLD WARD',
102-
if(WARD_CODE = 'E05009370', 'DALSTON WARD',
103-
if(WARD_CODE = 'E05009371', 'DE BEAUVOIR WARD',
104-
if(WARD_CODE = 'E05009372', 'HACKNEY CENTRAL WARD',
105-
if(WARD_CODE = 'E05009373', 'HACKNEY DOWNS WARD',
106-
if(WARD_CODE = 'E05009374', 'HACKNEY WICK WARD',
107-
if(WARD_CODE = 'E05009375', 'HAGGERSTON WARD',
108-
if(WARD_CODE = 'E05009376', 'HOMERTON WARD',
109-
if(WARD_CODE = 'E05009377', 'HOXTON EAST AND SHOREDITCH WARD',
110-
if(WARD_CODE = 'E05009378', 'HOXTON WEST WARD',
111-
if(WARD_CODE = 'E05009379', 'KINGS PARK WARD',
112-
if(WARD_CODE = 'E05009380', 'LEA BRIDGE WARD',
113-
if(WARD_CODE = 'E05009381', 'LONDON FIELDS WARD',
114-
if(WARD_CODE = 'E05009382', 'SHACKLEWELL WARD',
115-
if(WARD_CODE = 'E05009383', 'SPRINGFIELD WARD',
116-
if(WARD_CODE = 'E05009384', 'STAMFORD HILL WEST',
117-
if(WARD_CODE = 'E05009385', 'STOKE NEWINGTON WARD',
118-
if(WARD_CODE = 'E05009386', 'VICTORIA WARD',
119-
if(WARD_CODE = 'E05009387', 'WOODBERRY DOWN WARD',WARD_CODE))))))))))))))))))))) as SR_ward_name,
100+
if(WARD_CODE = 'E05009367', 'BROWNSWOOD WARD',
101+
if(WARD_CODE = 'E05009368', 'CAZENOVE WARD',
102+
if(WARD_CODE = 'E05009369', 'CLISSOLD WARD',
103+
if(WARD_CODE = 'E05009370', 'DALSTON WARD',
104+
if(WARD_CODE = 'E05009371', 'DE BEAUVOIR WARD',
105+
if(WARD_CODE = 'E05009372', 'HACKNEY CENTRAL WARD',
106+
if(WARD_CODE = 'E05009373', 'HACKNEY DOWNS WARD',
107+
if(WARD_CODE = 'E05009374', 'HACKNEY WICK WARD',
108+
if(WARD_CODE = 'E05009375', 'HAGGERSTON WARD',
109+
if(WARD_CODE = 'E05009376', 'HOMERTON WARD',
110+
if(WARD_CODE = 'E05009377', 'HOXTON EAST AND SHOREDITCH WARD',
111+
if(WARD_CODE = 'E05009378', 'HOXTON WEST WARD',
112+
if(WARD_CODE = 'E05009379', 'KINGS PARK WARD',
113+
if(WARD_CODE = 'E05009380', 'LEA BRIDGE WARD',
114+
if(WARD_CODE = 'E05009381', 'LONDON FIELDS WARD',
115+
if(WARD_CODE = 'E05009382', 'SHACKLEWELL WARD',
116+
if(WARD_CODE = 'E05009383', 'SPRINGFIELD WARD',
117+
if(WARD_CODE = 'E05009384', 'STAMFORD HILL WEST',
118+
if(WARD_CODE = 'E05009385', 'STOKE NEWINGTON WARD',
119+
if(WARD_CODE = 'E05009386', 'VICTORIA WARD',
120+
if(WARD_CODE = 'E05009387', 'WOODBERRY DOWN WARD',WARD_CODE))))))))))))))))))))) as SR_ward_name,
120121
PARISH_CODE as SR_PARISH_CODE,
121122
PARENT_UPRN as SR_PARENT_UPRN,
122123
PAO_START as SR_PAO_START,
@@ -142,14 +143,14 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra
142143
SELECT * FROM unrestricted_address_api_dbo_hackney_address where unrestricted_address_api_dbo_hackney_address.import_date = (SELECT max(unrestricted_address_api_dbo_hackney_address.import_date) FROM unrestricted_address_api_dbo_hackney_address) and lpi_logical_status like 'Approved Preferred'
143144
)
144145
SELECT street.usrn as sr_usrn, SR_ADDRESS1, SR_ADDRESS2, llpg.street_description, SR_WARD_CODE, SR_ward_name, llpg.property_shell, llpg.blpu_class, llpg.usage_primary, llpg.usage_description
145-
,concat(cast(street.usrn as string),' - ', llpg.street_description) as street, concat(cast(llpg.uprn as string),' - ',llpg.usage_description) as add_type , concat(llpg.blpu_class,' - ',llpg.planning_use_class ) as add_class,
146+
,concat(cast(street.usrn as string),' - ', llpg.street_description) as street, concat(cast(llpg.uprn as string),' - ',llpg.usage_description) as add_type , concat(llpg.blpu_class,' - ',llpg.planning_use_class ) as add_class,
146147
147148
case when cpz !='' and cpz_name != '' then concat(cpz,' - ', cpz_name)
148149
when cpz !='' and cpz_name = '' then concat(cpz)
149150
when cpz ='' and cpz_name != '' then concat(cpz_name)
150151
else 'NONE'
151-
end as zone_name,
152-
case
152+
end as zone_name,
153+
case
153154
154155
when address_line_2 ='' and business_name ='' and hasc_organisation_name ='' and doctors_surgery_name ='' then concat(permit_reference,' - ',address_line_1,', ',parking_permit_denormalised_data.postcode,' - ',email_address_of_applicant)
155156
when business_name !='' and hasc_organisation_name !='' and address_line_2 ='' then concat(permit_reference,' - ',business_name,' - ',hasc_organisation_name,' - ',address_line_1,', ',parking_permit_denormalised_data.postcode,' - ',email_address_of_applicant)
@@ -167,37 +168,37 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra
167168
168169
169170
when business_name !='' then concat(permit_reference,' - ',business_name,' - ',address_line_1,', ',address_line_2,', ',address_line_3,', ',parking_permit_denormalised_data.postcode,' - ',email_address_of_applicant)
170-
when hasc_organisation_name !='' then concat(permit_reference,' - ',hasc_organisation_name,' - ',address_line_1,', ',address_line_2,', ',address_line_3,', ',parking_permit_denormalised_data.postcode,' - ',email_address_of_applicant)
171+
when hasc_organisation_name !='' then concat(permit_reference,' - ',hasc_organisation_name,' - ',address_line_1,', ',address_line_2,', ',address_line_3,', ',parking_permit_denormalised_data.postcode,' - ',email_address_of_applicant)
171172
when doctors_surgery_name !='' then concat(permit_reference,' - ',doctors_surgery_name,' - ',address_line_1,', ',address_line_2,', ',address_line_3,', ',parking_permit_denormalised_data.postcode,' - ',email_address_of_applicant)
172173
173-
else concat(permit_reference,' - ',address_line_1,', ',address_line_2,', ',address_line_3,', ',parking_permit_denormalised_data.postcode,' - ',email_address_of_applicant)
174+
else concat(permit_reference,' - ',address_line_1,', ',address_line_2,', ',address_line_3,', ',parking_permit_denormalised_data.postcode,' - ',email_address_of_applicant)
174175
end as permit_summary
175176
176177
177-
,case
178+
,case
178179
when address_line_2 ='' then concat(permit_reference,' - ',address_line_1,', ',parking_permit_denormalised_data.postcode)
179-
when address_line_3 ='' then concat(permit_reference,' - ',address_line_1,', ',address_line_2,', ',parking_permit_denormalised_data.postcode)
180-
else concat(permit_reference,' - ',address_line_1,', ',address_line_2,', ',address_line_3,', ',parking_permit_denormalised_data.postcode)
180+
when address_line_3 ='' then concat(permit_reference,' - ',address_line_1,', ',address_line_2,', ',parking_permit_denormalised_data.postcode)
181+
else concat(permit_reference,' - ',address_line_1,', ',address_line_2,', ',address_line_3,', ',parking_permit_denormalised_data.postcode)
181182
end as permit_full_address
182-
,case
183+
,case
183184
when address_line_2 ='' then concat(address_line_1,', ',parking_permit_denormalised_data.postcode)
184-
when address_line_3 ='' then concat(address_line_1,', ',address_line_2,', ',parking_permit_denormalised_data.postcode)
185-
else concat(address_line_1,', ',address_line_2,', ',address_line_3,', ',parking_permit_denormalised_data.postcode)
185+
when address_line_3 ='' then concat(address_line_1,', ',address_line_2,', ',parking_permit_denormalised_data.postcode)
186+
else concat(address_line_1,', ',address_line_2,', ',address_line_3,', ',parking_permit_denormalised_data.postcode)
186187
end as full_address
187188
188-
,case
189-
when address_line_2 ='' then concat(permit_reference,' - ',usage_primary,' - ',address_line_1,', ',parking_permit_denormalised_data.postcode)
190-
when address_line_3 ='' then concat(permit_reference,' - ',usage_primary,' - ',address_line_1,', ',address_line_2,', ',parking_permit_denormalised_data.postcode)
191-
else concat(permit_reference,' - ',usage_primary,' - ',address_line_1,', ',address_line_2,', ',address_line_3,', ',parking_permit_denormalised_data.postcode)
189+
,case
190+
when address_line_2 ='' then concat(permit_reference,' - ',usage_primary,' - ',address_line_1,', ',parking_permit_denormalised_data.postcode)
191+
when address_line_3 ='' then concat(permit_reference,' - ',usage_primary,' - ',address_line_1,', ',address_line_2,', ',parking_permit_denormalised_data.postcode)
192+
else concat(permit_reference,' - ',usage_primary,' - ',address_line_1,', ',address_line_2,', ',address_line_3,', ',parking_permit_denormalised_data.postcode)
192193
end as permit_full_address_type
193194
194-
,case
195-
when address_line_2 ='' then concat(usage_primary,' - ',address_line_1,', ',parking_permit_denormalised_data.postcode)
196-
when address_line_3 ='' then concat(usage_primary,' - ',address_line_1,', ',address_line_2,', ',parking_permit_denormalised_data.postcode)
197-
else concat(usage_primary,' - ',address_line_1,', ',address_line_2,', ',address_line_3,', ',parking_permit_denormalised_data.postcode)
195+
,case
196+
when address_line_2 ='' then concat(usage_primary,' - ',address_line_1,', ',parking_permit_denormalised_data.postcode)
197+
when address_line_3 ='' then concat(usage_primary,' - ',address_line_1,', ',address_line_2,', ',parking_permit_denormalised_data.postcode)
198+
else concat(usage_primary,' - ',address_line_1,', ',address_line_2,', ',address_line_3,', ',parking_permit_denormalised_data.postcode)
198199
end as full_address_type
199200
200-
,case when latest_permit_status in('Approved','Renewed','Created','ORDER_APPROVED','PENDING_VRM_CHANGE','RENEW_EVID','PENDING_ADDR_CHANGE') and live_permit_flag = 1 then 1 else 0 end as live_flag
201+
,case when latest_permit_status in('Approved','Renewed','Created','ORDER_APPROVED','PENDING_VRM_CHANGE','RENEW_EVID','PENDING_ADDR_CHANGE') and live_permit_flag = 1 then 1 else 0 end as live_flag
201202
202203
,Case when live_permit_flag = 1 and blue_badge_number !='' and permit_type like 'Estate Resident' and (amount like '0.00' or amount like '0.000' or amount like '') then 1 else 0 end as flag_lp_est_bb_zero
203204
,Case when live_permit_flag = 1 and blue_badge_number !='' and permit_type != 'Estate Resident' then 1 else 0 end as flag_lp_bb_onst
@@ -220,7 +221,7 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra
220221
221222
, parking_permit_denormalised_data.*, llpg.planning_use_class, llpg.longitude, llpg.latitude FROM parking_permit_denormalised_data
222223
223-
left join llpg on cast(llpg.uprn as string) = cast(parking_permit_denormalised_data.uprn as string) /*and
224+
left join llpg on cast(llpg.uprn as string) = cast(parking_permit_denormalised_data.uprn as string) /*and
224225
cast(concat(parking_permit_denormalised_data.import_year,parking_permit_denormalised_data.import_month,parking_permit_denormalised_data.import_day) as string
225226
) = llpg.import_date*/
226227
@@ -247,7 +248,9 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra
247248

248249
# Script generated for node Amazon S3
249250
AmazonS3_node1657709892303 = glueContext.getSink(
250-
path="s3://dataplatform-"+environment+"-refined-zone/parking/liberator/parking_permit_denormalised_gds_street_llpg/",
251+
path="s3://dataplatform-"
252+
+ environment
253+
+ "-refined-zone/parking/liberator/parking_permit_denormalised_gds_street_llpg/",
251254
connection_type="s3",
252255
updateBehavior="UPDATE_IN_DATABASE",
253256
partitionKeys=["import_year", "import_month", "import_day", "import_date"],
@@ -256,7 +259,7 @@ def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFra
256259
transformation_ctx="AmazonS3_node1657709892303",
257260
)
258261
AmazonS3_node1657709892303.setCatalogInfo(
259-
catalogDatabase="dataplatform-"+environment+"-liberator-refined-zone",
262+
catalogDatabase="dataplatform-" + environment + "-liberator-refined-zone",
260263
catalogTableName="parking_permit_denormalised_gds_street_llpg",
261264
)
262265
AmazonS3_node1657709892303.setFormat("glueparquet")

0 commit comments

Comments
 (0)