|
1 | 1 | import sys |
2 | | -import boto3 |
3 | | -import io |
4 | | -import zipfile |
5 | | -from pyspark.sql import Window |
6 | | -from pyspark.context import SparkContext |
| 2 | +from datetime import date |
| 3 | + |
| 4 | +import pyspark.sql.functions as F |
7 | 5 | from awsglue.context import GlueContext |
8 | 6 | from awsglue.dynamicframe import DynamicFrame |
9 | | -from awsglue.transforms import * |
10 | | -from awsglue.utils import getResolvedOptions |
11 | 7 | from awsglue.job import Job |
12 | | -from pyspark.sql.functions import * |
13 | | -from pyspark.sql.types import IntegerType, StringType, FloatType |
14 | | -from datetime import date |
15 | | -import pyspark.sql.functions as f |
| 8 | +from awsglue.transforms import DropFields |
| 9 | +from awsglue.utils import getResolvedOptions |
| 10 | +from pyspark.context import SparkContext |
| 11 | +from pyspark.sql.functions import col, current_date, date_sub, lit, to_date |
| 12 | + |
16 | 13 | from scripts.helpers.helpers import ( |
17 | | - move_file, |
18 | | - rename_file, |
19 | | - get_glue_env_var, |
20 | | - get_latest_partitions_optimized, |
21 | | - create_pushdown_predicate, |
22 | | - add_import_time_columns, |
23 | 14 | PARTITION_KEYS, |
24 | | - parse_json_into_dataframe, |
25 | | - table_exists_in_catalog, |
| 15 | + add_import_time_columns, |
26 | 16 | clear_target_folder, |
| 17 | + get_glue_env_var, |
| 18 | + get_latest_partitions_optimized, |
| 19 | + move_file, |
| 20 | + rename_file, |
27 | 21 | ) |
28 | 22 |
|
29 | 23 | # The block below is the actual job. It is ignored when running tests locally. |
|
128 | 122 | "RHB": "Housing Benefit", |
129 | 123 | "RIT": "Internal Transfer", |
130 | 124 | "RML": "MW Loan Payment", |
131 | | - "ROB": "\Opening Balance", |
| 125 | + "ROB": r"\Opening Balance", |
132 | 126 | "RPD": "Prompt Pay. Discount", |
133 | 127 | "RPO": "Postal Order", |
134 | 128 | "RPY": "PayPoint/Post Office", |
|
274 | 268 | "SMS": "Text message sent", |
275 | 269 | "ACB": "Actual Cost Breakdown Sent", |
276 | 270 | "TAA": "TA New Account checks", |
277 | | - "RAP": "Outcome of rent arrears panel", |
| 271 | + "RAP": "Rent Arrears Panel Outcome", |
278 | 272 | "PLA": "Pre legal action visit", |
279 | 273 | "PEO": "Pre eviction contact outcome", |
280 | 274 | "AAD": "Pre notice interview", |
281 | | - "RAP": "Rent Arrears Panel Outcome", |
282 | 275 | "DA4": "Referred to Credit Gee", |
283 | 276 | "RT4": "Returned by Credit Gee", |
284 | 277 | "ZW0": "MW Pre Arrears Completed", |
|
300 | 293 | "RT3": "RETURNED BY LEWIS DEBT AGENCY", |
301 | 294 | "INV": "ACTION ON HOLD", |
302 | 295 | "MHB": "HB INVESTIGATION PENDING", |
303 | | - "RT4": "Returned by Credit Gee", |
304 | 296 | "MW0": "MW Pre Arrears", |
305 | 297 | "MW1": "MW Letter Action 1", |
306 | 298 | "MW2": "MW Letter Action 2", |
|
575 | 567 | transformation_ctx="target_data_to_write", |
576 | 568 | ) |
577 | 569 |
|
578 | | - filename = f"/rent.accounts%s.csv.gz" % today.strftime("%Y%m%d") |
| 570 | + filename = "/rent.accounts%s.csv.gz" % today.strftime("%Y%m%d") |
579 | 571 | rename_file(s3_bucket, "housing/rentsense/gzip/accounts", filename) |
580 | 572 |
|
581 | 573 | # move file to export folder |
582 | | - target_path = f"housing/rentsense/export/%s/" % today.strftime("%Y%m%d") |
583 | | - # move_file("dataplatform-stg-refined-zone", "housing/rentsense/gzip/accounts/", target_path, f"rent.accounts%s.csv.gz" % today.strftime("%Y%m%d")) |
| 574 | + target_path = "housing/rentsense/export/%s/" % today.strftime("%Y%m%d") |
| 575 | + # move_file("dataplatform-stg-refined-zone", "housing/rentsense/gzip/accounts/", target_path, "rent.accounts%s.csv.gz" % today.strftime("%Y%m%d")) |
584 | 576 | move_file( |
585 | 577 | s3_bucket, |
586 | 578 | "housing/rentsense/gzip/accounts/", |
587 | 579 | target_path, |
588 | | - f"rent.accounts%s.csv.gz" % today.strftime("%Y%m%d"), |
| 580 | + "rent.accounts%s.csv.gz" % today.strftime("%Y%m%d"), |
589 | 581 | ) |
590 | 582 |
|
591 | 583 | # Arrangements |
|
675 | 667 | ) |
676 | 668 |
|
677 | 669 | today = date.today() |
678 | | - filename = f"/rent.arrangements%s.csv.gz" % today.strftime("%Y%m%d") |
| 670 | + filename = "/rent.arrangements%s.csv.gz" % today.strftime("%Y%m%d") |
679 | 671 | rename_file(s3_bucket, "housing/rentsense/gzip/arrangements", filename) |
680 | 672 |
|
681 | 673 | # move file to export folder |
682 | | - target_path = f"housing/rentsense/export/%s/" % today.strftime("%Y%m%d") |
| 674 | + target_path = "housing/rentsense/export/%s/" % today.strftime("%Y%m%d") |
683 | 675 | move_file( |
684 | 676 | s3_bucket, |
685 | 677 | "housing/rentsense/gzip/arrangements/", |
686 | 678 | target_path, |
687 | | - f"rent.arrangements%s.csv.gz" % today.strftime("%Y%m%d"), |
| 679 | + "rent.arrangements%s.csv.gz" % today.strftime("%Y%m%d"), |
688 | 680 | ) |
689 | 681 |
|
690 | 682 | # Tenants |
|
802 | 794 | transformation_ctx="target_data_to_write", |
803 | 795 | ) |
804 | 796 |
|
805 | | - filename = f"/rent.tenants%s.csv.gz" % today.strftime("%Y%m%d") |
| 797 | + filename = "/rent.tenants%s.csv.gz" % today.strftime("%Y%m%d") |
806 | 798 | rename_file(s3_bucket, "housing/rentsense/gzip/tenants", filename) |
807 | 799 |
|
808 | 800 | # move file to export folder |
809 | | - target_path = f"housing/rentsense/export/%s/" % today.strftime("%Y%m%d") |
| 801 | + target_path = "housing/rentsense/export/%s/" % today.strftime("%Y%m%d") |
810 | 802 | move_file( |
811 | 803 | s3_bucket, |
812 | 804 | "housing/rentsense/gzip/tenants/", |
813 | 805 | target_path, |
814 | | - f"rent.tenants%s.csv.gz" % today.strftime("%Y%m%d"), |
| 806 | + "rent.tenants%s.csv.gz" % today.strftime("%Y%m%d"), |
815 | 807 | ) |
816 | 808 |
|
817 | 809 | # Balances |
|
877 | 869 | transformation_ctx="target_data_to_write", |
878 | 870 | ) |
879 | 871 |
|
880 | | - filename = f"/rent.balances%s.csv.gz" % today.strftime("%Y%m%d") |
| 872 | + filename = "/rent.balances%s.csv.gz" % today.strftime("%Y%m%d") |
881 | 873 | rename_file(s3_bucket, "housing/rentsense/gzip/balances", filename) |
882 | 874 |
|
883 | 875 | # move file to export folder |
884 | | - target_path = f"housing/rentsense/export/%s/" % today.strftime("%Y%m%d") |
| 876 | + target_path = "housing/rentsense/export/%s/" % today.strftime("%Y%m%d") |
885 | 877 | move_file( |
886 | 878 | s3_bucket, |
887 | 879 | "housing/rentsense/gzip/balances/", |
888 | 880 | target_path, |
889 | | - f"rent.balances%s.csv.gz" % today.strftime("%Y%m%d"), |
| 881 | + "rent.balances%s.csv.gz" % today.strftime("%Y%m%d"), |
890 | 882 | ) |
891 | 883 |
|
892 | 884 | # Actions |
|
963 | 955 | transformation_ctx="target_data_to_write", |
964 | 956 | ) |
965 | 957 |
|
966 | | - filename = f"/rent.actions%s.csv.gz" % today.strftime("%Y%m%d") |
| 958 | + filename = "/rent.actions%s.csv.gz" % today.strftime("%Y%m%d") |
967 | 959 | rename_file(s3_bucket, "housing/rentsense/gzip/actions", filename) |
968 | 960 |
|
969 | 961 | # move file to export folder |
970 | | - target_path = f"housing/rentsense/export/%s/" % today.strftime("%Y%m%d") |
| 962 | + target_path = "housing/rentsense/export/%s/" % today.strftime("%Y%m%d") |
971 | 963 | move_file( |
972 | 964 | s3_bucket, |
973 | 965 | "housing/rentsense/gzip/actions/", |
974 | 966 | target_path, |
975 | | - f"rent.actions%s.csv.gz" % today.strftime("%Y%m%d"), |
| 967 | + "rent.actions%s.csv.gz" % today.strftime("%Y%m%d"), |
976 | 968 | ) |
977 | 969 |
|
978 | 970 | # Transactions |
|
1052 | 1044 | transformation_ctx="target_data_to_write", |
1053 | 1045 | ) |
1054 | 1046 |
|
1055 | | - filename = f"/rent.transactions%s.csv.gz" % today.strftime("%Y%m%d") |
| 1047 | + filename = "/rent.transactions%s.csv.gz" % today.strftime("%Y%m%d") |
1056 | 1048 | rename_file(s3_bucket, "housing/rentsense/gzip/transactions", filename) |
1057 | 1049 |
|
1058 | 1050 | # move file to export folder |
1059 | | - target_path = f"housing/rentsense/export/%s/" % today.strftime("%Y%m%d") |
| 1051 | + target_path = "housing/rentsense/export/%s/" % today.strftime("%Y%m%d") |
1060 | 1052 | move_file( |
1061 | 1053 | s3_bucket, |
1062 | 1054 | "housing/rentsense/gzip/transactions/", |
1063 | 1055 | target_path, |
1064 | | - f"rent.transactions%s.csv.gz" % today.strftime("%Y%m%d"), |
| 1056 | + "rent.transactions%s.csv.gz" % today.strftime("%Y%m%d"), |
1065 | 1057 | ) |
1066 | 1058 |
|
1067 | 1059 | job.commit() |
0 commit comments