|
12 | 12 | from awsglue.utils import getResolvedOptions |
13 | 13 | from pyspark.context import SparkContext |
14 | 14 | from pyspark.sql.functions import ( |
15 | | - col, |
| 15 | + col, |
16 | 16 | current_date, |
17 | 17 | date_format, |
18 | 18 | date_sub, |
@@ -525,7 +525,8 @@ def export_dynamic_frame_as_xml_gzip(dynamic_frame: DynamicFrame, s3_bucket: str |
525 | 525 | # patch = patch.distinct() |
526 | 526 |
|
527 | 527 | # data loads - takes all records that have and end date and secure tenancies, intro and mesne tenancies are added after as there are some tenancy ids with both |
528 | | - accounts = df2.filter("endoftenuredate is not NULL and paymentreference<>'' and paymentreference<>'Blank - not in use'") |
| 528 | + accounts = df2.filter( |
| 529 | + "endoftenuredate is not NULL and paymentreference<>'' and paymentreference<>'Blank - not in use'") |
529 | 530 | accounts_s = accounts.where(col("description").isin({"Secure", "Non-Secure", "Private Garage", "Tenant Garage"})) |
530 | 531 | accounts_int = accounts.where(col("description").isin({"Introductory", "Mense Profit Ac"})) |
531 | 532 | accounts_int = accounts_int.join(accounts_s, accounts_int.paymentreference == accounts_s.paymentreference, |
@@ -570,9 +571,9 @@ def export_dynamic_frame_as_xml_gzip(dynamic_frame: DynamicFrame, s3_bucket: str |
570 | 571 |
|
571 | 572 | # Arrangements |
572 | 573 |
|
573 | | - ten = accounts2.select('uh_ten_ref', 'paymentreference')\ |
| 574 | + ten = accounts2.select('uh_ten_ref', 'paymentreference') \ |
574 | 575 | .filter(col("previousweekbalance") > 0.00) |
575 | | - |
| 576 | + |
576 | 577 | arr = df5.join(ten, df5.tenancy_ref == ten.uh_ten_ref, "inner") |
577 | 578 | arr = arr.distinct() |
578 | 579 | arr = arr.where(col("current_state").isin({"live", "breached"})) |
@@ -802,14 +803,19 @@ def export_dynamic_frame_as_xml_gzip(dynamic_frame: DynamicFrame, s3_bucket: str |
802 | 803 |
|
803 | 804 | accounts2 = accounts2.withColumn('days_since_termination', datediff(current_date(), F.col('TenancyEndDate'))) |
804 | 805 |
|
805 | | - accounts3 = accounts2.withColumn('Patch',when(accounts2['days_since_termination'] < 7, 'New Terminations') |
806 | | - .when((accounts2['days_since_termination'] >= 7) & (accounts2['days_since_termination'] <= 31), '0-1 month') |
807 | | - .when((accounts2['days_since_termination'] > 31) & (accounts2['days_since_termination'] <= 92), '2-3 months') |
808 | | - .when((accounts2['days_since_termination'] > 92) & (accounts2['days_since_termination'] <= 182), '3-6 months') |
809 | | - .when((accounts2['days_since_termination'] > 182) & (accounts2['days_since_termination'] <= 365), '6-12 months') |
810 | | - .when((accounts2['days_since_termination'] > 365) & (accounts2['days_since_termination'] <= 2191), '1 year - 6 years') |
811 | | - .when(accounts2['days_since_termination'] > 2191, '6 years+') |
812 | | - .otherwise('Unknown')) |
| 806 | + accounts3 = accounts2.withColumn('Patch', when(accounts2['days_since_termination'] < 7, 'New Terminations') |
| 807 | + .when( |
| 808 | + (accounts2['days_since_termination'] >= 7) & (accounts2['days_since_termination'] <= 31), '0-1 month') |
| 809 | + .when( |
| 810 | + (accounts2['days_since_termination'] > 31) & (accounts2['days_since_termination'] <= 92), '2-3 months') |
| 811 | + .when( |
| 812 | + (accounts2['days_since_termination'] > 92) & (accounts2['days_since_termination'] <= 182), '3-6 months') |
| 813 | + .when( |
| 814 | + (accounts2['days_since_termination'] > 182) & (accounts2['days_since_termination'] <= 365), '6-12 months') |
| 815 | + .when( |
| 816 | + (accounts2['days_since_termination'] > 365) & (accounts2['days_since_termination'] <= 2191), '1 year - 6 years') |
| 817 | + .when(accounts2['days_since_termination'] > 2191, '6 years+') |
| 818 | + .otherwise('Unknown')) |
813 | 819 |
|
814 | 820 | accounts3 = accounts3.selectExpr("paymentreference as AccountReference", |
815 | 821 | "description as TenureType", |
|
0 commit comments