Skip to content

Commit e66b62f

Browse files
authored
Merge branch 'main' into di-543-govnotify-ingestion-for-customer-services
2 parents 76a4f8b + 257f5ee commit e66b62f

File tree

13 files changed

+317
-28
lines changed

13 files changed

+317
-28
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
numpy==2.2.3
1+
numpy==2.2.4
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
gspread==6.2.0
22
oauth2client==4.1.3
3-
google-api-python-client==2.163.0
3+
google-api-python-client==2.166.0
44
yagmail==0.15.293
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
s3fs==2025.3.0
1+
s3fs==2025.3.2

scripts/jobs/parking/parking_permit_street_cpz_stress_mc.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
11/02/2025 - add totals for following permit types
2525
permit_type, All Zone, Doctor, Dispensation, Health and Social Care
2626
Leisure centre permit, Community Support Permit
27+
01/04/2025 - add check for is_motorcycle is NULL
2728
*********************************************************************************/
2829
With parking_permit_denormalised_data as (
2930
SELECT *
@@ -155,8 +156,14 @@
155156
street_name
156157
),
157158
/*** get the VRM details to obtain the m/c flag ***/
159+
/*** 01-04-2025 - tidy the code ***/
158160
MC_Permit_Flag as (
159-
SELECT *
161+
SELECT permit_reference, vrm, make, model, fuel, engine_capactiy, co2_emission, colour,
162+
foreign, door_plan, lpg_conversion,
163+
CASE
164+
When length(ltrim(rtrim(is_motorcycle))) = 0 Then 'N'
165+
ELSE ltrim(rtrim(is_motorcycle))
166+
END as is_motorcycle
160167
from "dataplatform-prod-liberator-raw-zone".liberator_permit_vrm_480
161168
WHERE import_date = format_datetime(current_date, 'yyyyMMdd')
162169
),
@@ -193,14 +200,14 @@
193200
AND Permit_type not IN ('Suspension')
194201
AND latest_permit_status not IN ('Cancelled', 'Rejected', 'RENEW_REJECTED')
195202
),
203+
/*** 01-04-2025 - add check for is_motorcycle is NULL ***/
196204
Permits_summ as (
197205
SELECT permit_type,
198206
cpz,
199207
address2 as Street,
200208
cast(count(*) as int) as CurrentPermitTotal
201209
FROM Permits
202-
WHERE r1 = 1
203-
AND is_motorcycle != 'Y'
210+
WHERE r1 = 1 AND is_motorcycle is NULL OR rtrim(ltrim(is_motorcycle)) = 'N'
204211
GROUP BY permit_type,
205212
cpz,
206213
address2

scripts/jobs/planning/tascomi_create_daily_snapshot.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,29 @@ def loadIncrementsSinceDate(increment_table_name, name_space, date):
8282
return increment_df
8383

8484

85+
def purge_today_partition(
86+
glueContext: GlueContext, target_destination: str, retentionPeriod: int = 0
87+
) -> None:
88+
"""
89+
Purges (delete) only today's partition under the given target destination.
90+
Parameters:
91+
glueContext: GlueContext instance.
92+
target_destination: Base S3 path (e.g., "s3://your-bucket/path").
93+
retentionPeriod: Retention period in hours (default 0, meaning delete all files immediately).
94+
Returns:
95+
partition_path: The S3 partition path that was purged.
96+
"""
97+
now = datetime.now()
98+
snapshot_year = str(now.year)
99+
snapshot_month = str(now.month).zfill(2)
100+
snapshot_day = str(now.day).zfill(2)
101+
snapshot_date = snapshot_year + snapshot_month + snapshot_day
102+
103+
partition_path = f"{target_destination}/snapshot_year={snapshot_year}/snapshot_month={snapshot_month}/snapshot_day={snapshot_day}/snapshot_date={snapshot_date}"
104+
105+
glueContext.purge_s3_path(partition_path, {"retentionPeriod": retentionPeriod})
106+
107+
85108
# dict containing parameters for DQ checks
86109
dq_params = {
87110
"appeals": {"unique": ["id"]},
@@ -189,7 +212,7 @@ def loadIncrementsSinceDate(increment_table_name, name_space, date):
189212
# snapshot table in glue catalogue
190213
else:
191214
pushDownPredicate = create_pushdown_predicate(
192-
partitionDateColumn="snapshot_date", daysBuffer=3
215+
partitionDateColumn="snapshot_date", daysBuffer=30
193216
)
194217
# load latest snpashot
195218
snapshot_ddf = glueContext.create_dynamic_frame.from_catalog(
@@ -318,6 +341,9 @@ def loadIncrementsSinceDate(increment_table_name, name_space, date):
318341
snapshot_df, glueContext, "resultDataFrame"
319342
)
320343
target_destination = s3_bucket_target + table_name
344+
345+
# Clean up today's partition before writing
346+
purge_today_partition(glueContext, target_destination)
321347
parquetData = glueContext.write_dynamic_frame.from_options(
322348
frame=resultDataFrame,
323349
connection_type="s3",
@@ -331,5 +357,6 @@ def loadIncrementsSinceDate(increment_table_name, name_space, date):
331357
finally:
332358
if len(dq_errors) > 0:
333359
logger.error(f"DQ Errors: {dq_errors}")
360+
raise Exception(f"Data quality check failed: {'; '.join(dq_errors)}")
334361
spark.sparkContext._gateway.close()
335362
spark.stop()

0 commit comments

Comments
 (0)