Skip to content

Commit 8f85488

Browse files
authored
Add-a-predicate-to-tascomi_officers_trusted (#2233)
* add predicate to tascomi tascomi_officers_trusted * fix to make linter happy
1 parent f2e4a49 commit 8f85488

File tree

1 file changed

+117
-83
lines changed

1 file changed

+117
-83
lines changed
Lines changed: 117 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1,155 +1,189 @@
11
import sys
2+
23
import boto3
3-
from awsglue.transforms import *
4-
from awsglue.utils import getResolvedOptions
5-
from pyspark.context import SparkContext
4+
import pyspark.sql.functions as F
65
from awsglue.context import GlueContext
7-
from awsglue.job import Job
86
from awsglue.dynamicframe import DynamicFrame
9-
from pyspark.sql.functions import *
10-
import pyspark.sql.functions as F
11-
from scripts.helpers.helpers import get_glue_env_var, get_latest_partitions, create_pushdown_predicate, add_import_time_columns, PARTITION_KEYS
7+
from awsglue.job import Job
8+
from awsglue.utils import getResolvedOptions
9+
from pyspark.context import SparkContext
10+
from pyspark.sql.functions import col, concat, lit, max, trim
11+
12+
from scripts.helpers.helpers import create_pushdown_predicate, get_glue_env_var
13+
1214

1315
# Function to ensure we only return the lates snapshot
1416
def get_latest_snapshot(df):
15-
df = df.where(col('snapshot_date') == df.select(max('snapshot_date')).first()[0])
16-
return df
17-
# Creates a function that removes any columns that are entirely null values - useful for large tables
17+
df = df.where(col("snapshot_date") == df.select(max("snapshot_date")).first()[0])
18+
return df
1819

20+
21+
# Creates a function that removes any columns that are entirely null values - useful for large tables
1922
def drop_null_columns(df):
20-
2123
_df_length = df.count()
22-
null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
24+
null_counts = (
25+
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns])
26+
.collect()[0]
27+
.asDict()
28+
)
2329
to_drop = [k for k, v in null_counts.items() if v >= _df_length]
2430
df = df.drop(*to_drop)
25-
31+
2632
return df
27-
28-
#function to clear target
33+
34+
35+
# function to clear target
2936
def clear_target_folder(s3_bucket_target):
30-
s3 = boto3.resource('s3')
31-
folderString = s3_bucket_target.replace('s3://', '')
32-
bucketName = folderString.split('/')[0]
33-
prefix = folderString.replace(bucketName+'/', '')+'/'
37+
s3 = boto3.resource("s3")
38+
folderString = s3_bucket_target.replace("s3://", "")
39+
bucketName = folderString.split("/")[0]
40+
prefix = folderString.replace(bucketName + "/", "") + "/"
3441
bucket = s3.Bucket(bucketName)
3542
bucket.objects.filter(Prefix=prefix).delete()
3643
return
37-
44+
3845

3946
if __name__ == "__main__":
40-
47+
4148
# read job parameters
42-
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
43-
source_catalog_table = get_glue_env_var('source_catalog_table','')
44-
source_catalog_table2 = get_glue_env_var('source_catalog_table2','')
45-
source_catalog_table3 = get_glue_env_var('source_catalog_table3','')
46-
source_catalog_database = get_glue_env_var('source_catalog_database', '')
47-
s3_bucket_target = get_glue_env_var('s3_bucket_target', '')
48-
49+
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
50+
source_catalog_table = get_glue_env_var("source_catalog_table", "")
51+
source_catalog_table2 = get_glue_env_var("source_catalog_table2", "")
52+
source_catalog_table3 = get_glue_env_var("source_catalog_table3", "")
53+
source_catalog_database = get_glue_env_var("source_catalog_database", "")
54+
s3_bucket_target = get_glue_env_var("s3_bucket_target", "")
55+
days_to_load = 30
56+
4957
# start the Spark session and the logger
50-
glueContext = GlueContext(SparkContext.getOrCreate())
58+
glueContext = GlueContext(SparkContext.getOrCreate())
5159
logger = glueContext.get_logger()
5260
job = Job(glueContext)
53-
job.init(args['JOB_NAME'], args)
61+
job.init(args["JOB_NAME"], args)
5462

55-
logger.info(f'The job is starting. The source table is {source_catalog_database}.{source_catalog_table}')
63+
logger.info(
64+
f"The job is starting. The source table is {source_catalog_database}.{source_catalog_table}"
65+
)
66+
67+
# Create a predicate to filter data for the last days_to_load days
68+
predicate = create_pushdown_predicate("snapshot_date", days_to_load)
69+
logger.info(
70+
f"Loading data with predicate: {predicate} to filter for last {days_to_load} days"
71+
)
5672

5773
# Load data from glue catalog
5874
data_source = glueContext.create_dynamic_frame.from_catalog(
59-
name_space = source_catalog_database,
60-
table_name = source_catalog_table
75+
name_space=source_catalog_database,
76+
table_name=source_catalog_table,
77+
push_down_predicate=predicate,
6178
)
6279
data_source2 = glueContext.create_dynamic_frame.from_catalog(
6380
name_space=source_catalog_database,
64-
table_name=source_catalog_table2
81+
table_name=source_catalog_table2,
82+
push_down_predicate=predicate,
6583
)
66-
84+
6785
data_source3 = glueContext.create_dynamic_frame.from_catalog(
6886
name_space=source_catalog_database,
69-
table_name=source_catalog_table3
87+
table_name=source_catalog_table3,
88+
push_down_predicate=predicate,
7089
)
7190

72-
# Load Officers Table
91+
# Load Officers Table
7392

7493
# convert to a data frame
7594
df = data_source.toDF()
76-
95+
7796
# Rename columns
78-
df = df.withColumnRenamed("id", "officer_id") \
79-
.withColumnRenamed("forename", "officer_forename") \
80-
.withColumnRenamed("surname", "officer_surname")
81-
97+
df = (
98+
df.withColumnRenamed("id", "officer_id")
99+
.withColumnRenamed("forename", "officer_forename")
100+
.withColumnRenamed("surname", "officer_surname")
101+
)
102+
82103
# Specify Columns to Keep
83-
df = df.select('officer_id',
84-
"officer_forename",
85-
"officer_surname",
86-
'username',
87-
'email',
88-
'mobile',
89-
'phone',
90-
'job_title',
91-
'import_date',
92-
'import_day',
93-
'import_month',
94-
'import_year',
95-
'snapshot_date',
96-
'snapshot_year',
97-
'snapshot_month',
98-
'snapshot_day')
104+
df = df.select(
105+
"officer_id",
106+
"officer_forename",
107+
"officer_surname",
108+
"username",
109+
"email",
110+
"mobile",
111+
"phone",
112+
"job_title",
113+
"import_date",
114+
"import_day",
115+
"import_month",
116+
"import_year",
117+
"snapshot_date",
118+
"snapshot_year",
119+
"snapshot_month",
120+
"snapshot_day",
121+
)
99122

100123
# Return only latest snapshot
101-
124+
102125
df = get_latest_snapshot(df)
103-
df = df.withColumn('counter_officer', lit(1))
104-
df = df.withColumn('officer_name', concat(trim(col('officer_forename')), lit(" "), trim(col('officer_surname'))))
105-
# Load User Teams Map Table
126+
df = df.withColumn("counter_officer", lit(1))
127+
df = df.withColumn(
128+
"officer_name",
129+
concat(trim(col("officer_forename")), lit(" "), trim(col("officer_surname"))),
130+
)
131+
# Load User Teams Map Table
106132
# convert to a data frame
107133
df2 = data_source2.toDF()
108134

109135
# drop old snapshots
110-
136+
111137
df2 = get_latest_snapshot(df2)
112-
138+
113139
# Rename Relevant Columns
114-
# df2 = df2.withColumnRenamed("user_id","officer_id")
140+
# df2 = df2.withColumnRenamed("user_id","officer_id")
115141

116142
# Keep Only Relevant Columns
117-
df2 = df2.select("user_id",
118-
"user_team_id")
119-
143+
df2 = df2.select("user_id", "user_team_id")
120144

121145
# convert to a data frame
122146
df3 = data_source3.toDF()
123147

124-
125148
# drop old snapshots
126-
149+
127150
df3 = get_latest_snapshot(df3)
128-
129-
df3 = df3.withColumnRenamed("id","team_id") \
130-
.withColumnRenamed("name","team_name") \
131-
.withColumnRenamed("description","team_description")
132-
151+
152+
df3 = (
153+
df3.withColumnRenamed("id", "team_id")
154+
.withColumnRenamed("name", "team_name")
155+
.withColumnRenamed("description", "team_description")
156+
)
157+
133158
# Keep Only Relevant Columns
134-
df3 = df3.select("team_id","team_name",'team_description','location')
159+
df3 = df3.select("team_id", "team_name", "team_description", "location")
135160
# Transform data using the functions defined outside the main block
136161
# Join
137162
df2 = df2.join(df3, df2.user_team_id == df3.team_id, "left")
138163
df = df.join(df2, df.officer_id == df2.user_id, "left")
139164
df = df.drop("team_id", "user_id")
140-
## Data Processing Ends
141-
# Convert data frame to dynamic frame
165+
# Data Processing Ends
166+
# Convert data frame to dynamic frame
142167
dynamic_frame = DynamicFrame.fromDF(df, glueContext, "target_data_to_write")
143168

144-
# wipe out the target folder in the trusted zone
145-
logger.info(f'clearing target bucket')
169+
# wipe out the target folder in the trusted zone
170+
logger.info("clearing target bucket")
146171
clear_target_folder(s3_bucket_target)
147172

148-
# Write the data to S3
173+
# Write the data to S3
149174
parquet_data = glueContext.write_dynamic_frame.from_options(
150175
frame=dynamic_frame,
151176
connection_type="s3",
152177
format="parquet",
153-
connection_options={"path": s3_bucket_target, "partitionKeys": ['snapshot_year','snapshot_month','snapshot_day','snapshot_date']},
154-
transformation_ctx="target_data_to_write")
178+
connection_options={
179+
"path": s3_bucket_target,
180+
"partitionKeys": [
181+
"snapshot_year",
182+
"snapshot_month",
183+
"snapshot_day",
184+
"snapshot_date",
185+
],
186+
},
187+
transformation_ctx="target_data_to_write",
188+
)
155189
job.commit()

0 commit comments

Comments
 (0)