File tree Expand file tree Collapse file tree 1 file changed +14
-16
lines changed
Expand file tree Collapse file tree 1 file changed +14
-16
lines changed Original file line number Diff line number Diff line change 2323 datediff ,
2424 when ,
2525)
26+ from pyspark .sql .window import Window
2627from scripts .helpers .helpers import (
2728 PARTITION_KEYS ,
2829 add_import_time_columns ,
@@ -980,23 +981,20 @@ def export_dynamic_frame_as_xml_gzip(
980981 case_priorities , accounts4 .tenancy_ref == case_priorities .tenancy_ref2 , "left"
981982 )
982983
983- accounts6 = accounts5 .selectExpr (
984- "AccountReference as AccountReference" ,
985- # "TenureType",
986- # "TenureTypeCode",
987- # "max_date as TenancyStartDate",
988- "TenancyEndDate" ,
989- "LocalAuthority" ,
990- # "HousingOfficerName",
991- "Patch" ,
992- "'Hackney' as Region" ,
993- # "import_date as import_date",
994- # "tenancy_ref as TenReference",
995- # "is_paused_until as BreathingSpaceEndDate",
996- "Case when Deceased=1 then 'Y' else 'N' end as Deceased"
997- # "previousweekbalance"
998- )
984+ w = Window .partitionBy ("AccountReference" ).orderBy (F .col ("TenancyEndDate" ).desc ())
999985
986+ accounts6 = (
987+ accounts5 .withColumn ("rn" , F .row_number ().over (w ))
988+ .filter (F .col ("rn" ) == 1 )
989+ .selectExpr (
990+ "AccountReference" ,
991+ "TenancyEndDate" ,
992+ "LocalAuthority" ,
993+ "Patch" ,
994+ "'Hackney' as Region" ,
995+ "case when Deceased=1 then 'Y' else 'N' end as Deceased" ,
996+ )
997+ )
1000998 accounts7 = accounts6 .filter ("AccountReference is not null" )
1001999
10021000 accounts8 = accounts7 .distinct ()
You can’t perform that action at this time.
0 commit comments