Skip to content

Commit 78bc7ac

Browse files
committed
add predicate to tascomi tascomi_officers_trusted
1 parent f2e4a49 commit 78bc7ac

File tree

1 file changed

+118
-80
lines changed

1 file changed

+118
-80
lines changed
Lines changed: 118 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,155 +1,193 @@
11
import sys
2+
23
import boto3
4+
import pyspark.sql.functions as F
5+
from awsglue.context import GlueContext
6+
from awsglue.dynamicframe import DynamicFrame
7+
from awsglue.job import Job
38
from awsglue.transforms import *
49
from awsglue.utils import getResolvedOptions
510
from pyspark.context import SparkContext
6-
from awsglue.context import GlueContext
7-
from awsglue.job import Job
8-
from awsglue.dynamicframe import DynamicFrame
911
from pyspark.sql.functions import *
10-
import pyspark.sql.functions as F
11-
from scripts.helpers.helpers import get_glue_env_var, get_latest_partitions, create_pushdown_predicate, add_import_time_columns, PARTITION_KEYS
12+
13+
from scripts.helpers.helpers import create_pushdown_predicate, get_glue_env_var
14+
1215

1316
# Function to ensure we only return the lates snapshot
1417
def get_latest_snapshot(df):
15-
df = df.where(col('snapshot_date') == df.select(max('snapshot_date')).first()[0])
16-
return df
18+
df = df.where(col("snapshot_date") == df.select(max("snapshot_date")).first()[0])
19+
return df
20+
21+
1722
# Creates a function that removes any columns that are entirely null values - useful for large tables
1823

24+
1925
def drop_null_columns(df):
20-
26+
2127
_df_length = df.count()
22-
null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
28+
null_counts = (
29+
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns])
30+
.collect()[0]
31+
.asDict()
32+
)
2333
to_drop = [k for k, v in null_counts.items() if v >= _df_length]
2434
df = df.drop(*to_drop)
25-
35+
2636
return df
27-
28-
#function to clear target
37+
38+
39+
# function to clear target
2940
def clear_target_folder(s3_bucket_target):
30-
s3 = boto3.resource('s3')
31-
folderString = s3_bucket_target.replace('s3://', '')
32-
bucketName = folderString.split('/')[0]
33-
prefix = folderString.replace(bucketName+'/', '')+'/'
41+
s3 = boto3.resource("s3")
42+
folderString = s3_bucket_target.replace("s3://", "")
43+
bucketName = folderString.split("/")[0]
44+
prefix = folderString.replace(bucketName + "/", "") + "/"
3445
bucket = s3.Bucket(bucketName)
3546
bucket.objects.filter(Prefix=prefix).delete()
3647
return
37-
48+
3849

3950
if __name__ == "__main__":
40-
51+
4152
# read job parameters
42-
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
43-
source_catalog_table = get_glue_env_var('source_catalog_table','')
44-
source_catalog_table2 = get_glue_env_var('source_catalog_table2','')
45-
source_catalog_table3 = get_glue_env_var('source_catalog_table3','')
46-
source_catalog_database = get_glue_env_var('source_catalog_database', '')
47-
s3_bucket_target = get_glue_env_var('s3_bucket_target', '')
48-
53+
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
54+
source_catalog_table = get_glue_env_var("source_catalog_table", "")
55+
source_catalog_table2 = get_glue_env_var("source_catalog_table2", "")
56+
source_catalog_table3 = get_glue_env_var("source_catalog_table3", "")
57+
source_catalog_database = get_glue_env_var("source_catalog_database", "")
58+
s3_bucket_target = get_glue_env_var("s3_bucket_target", "")
59+
days_to_load = 30
60+
4961
# start the Spark session and the logger
50-
glueContext = GlueContext(SparkContext.getOrCreate())
62+
glueContext = GlueContext(SparkContext.getOrCreate())
5163
logger = glueContext.get_logger()
5264
job = Job(glueContext)
53-
job.init(args['JOB_NAME'], args)
65+
job.init(args["JOB_NAME"], args)
5466

55-
logger.info(f'The job is starting. The source table is {source_catalog_database}.{source_catalog_table}')
67+
logger.info(
68+
f"The job is starting. The source table is {source_catalog_database}.{source_catalog_table}"
69+
)
70+
71+
# Create a predicate to filter data for the last days_to_load days
72+
predicate = create_pushdown_predicate("snapshot_date", days_to_load)
73+
logger.info(
74+
f"Loading data with predicate: {predicate} to filter for last {days_to_load} days"
75+
)
5676

5777
# Load data from glue catalog
5878
data_source = glueContext.create_dynamic_frame.from_catalog(
59-
name_space = source_catalog_database,
60-
table_name = source_catalog_table
79+
name_space=source_catalog_database,
80+
table_name=source_catalog_table,
81+
push_down_predicate=predicate,
6182
)
6283
data_source2 = glueContext.create_dynamic_frame.from_catalog(
6384
name_space=source_catalog_database,
64-
table_name=source_catalog_table2
85+
table_name=source_catalog_table2,
86+
push_down_predicate=predicate,
6587
)
66-
88+
6789
data_source3 = glueContext.create_dynamic_frame.from_catalog(
6890
name_space=source_catalog_database,
69-
table_name=source_catalog_table3
91+
table_name=source_catalog_table3,
92+
push_down_predicate=predicate,
7093
)
7194

72-
# Load Officers Table
95+
# Load Officers Table
7396

7497
# convert to a data frame
7598
df = data_source.toDF()
76-
99+
77100
# Rename columns
78-
df = df.withColumnRenamed("id", "officer_id") \
79-
.withColumnRenamed("forename", "officer_forename") \
80-
.withColumnRenamed("surname", "officer_surname")
81-
101+
df = (
102+
df.withColumnRenamed("id", "officer_id")
103+
.withColumnRenamed("forename", "officer_forename")
104+
.withColumnRenamed("surname", "officer_surname")
105+
)
106+
82107
# Specify Columns to Keep
83-
df = df.select('officer_id',
84-
"officer_forename",
85-
"officer_surname",
86-
'username',
87-
'email',
88-
'mobile',
89-
'phone',
90-
'job_title',
91-
'import_date',
92-
'import_day',
93-
'import_month',
94-
'import_year',
95-
'snapshot_date',
96-
'snapshot_year',
97-
'snapshot_month',
98-
'snapshot_day')
108+
df = df.select(
109+
"officer_id",
110+
"officer_forename",
111+
"officer_surname",
112+
"username",
113+
"email",
114+
"mobile",
115+
"phone",
116+
"job_title",
117+
"import_date",
118+
"import_day",
119+
"import_month",
120+
"import_year",
121+
"snapshot_date",
122+
"snapshot_year",
123+
"snapshot_month",
124+
"snapshot_day",
125+
)
99126

100127
# Return only latest snapshot
101-
128+
102129
df = get_latest_snapshot(df)
103-
df = df.withColumn('counter_officer', lit(1))
104-
df = df.withColumn('officer_name', concat(trim(col('officer_forename')), lit(" "), trim(col('officer_surname'))))
105-
# Load User Teams Map Table
130+
df = df.withColumn("counter_officer", lit(1))
131+
df = df.withColumn(
132+
"officer_name",
133+
concat(trim(col("officer_forename")), lit(" "), trim(col("officer_surname"))),
134+
)
135+
# Load User Teams Map Table
106136
# convert to a data frame
107137
df2 = data_source2.toDF()
108138

109139
# drop old snapshots
110-
140+
111141
df2 = get_latest_snapshot(df2)
112-
142+
113143
# Rename Relevant Columns
114-
# df2 = df2.withColumnRenamed("user_id","officer_id")
144+
# df2 = df2.withColumnRenamed("user_id","officer_id")
115145

116146
# Keep Only Relevant Columns
117-
df2 = df2.select("user_id",
118-
"user_team_id")
119-
147+
df2 = df2.select("user_id", "user_team_id")
120148

121149
# convert to a data frame
122150
df3 = data_source3.toDF()
123151

124-
125152
# drop old snapshots
126-
153+
127154
df3 = get_latest_snapshot(df3)
128-
129-
df3 = df3.withColumnRenamed("id","team_id") \
130-
.withColumnRenamed("name","team_name") \
131-
.withColumnRenamed("description","team_description")
132-
155+
156+
df3 = (
157+
df3.withColumnRenamed("id", "team_id")
158+
.withColumnRenamed("name", "team_name")
159+
.withColumnRenamed("description", "team_description")
160+
)
161+
133162
# Keep Only Relevant Columns
134-
df3 = df3.select("team_id","team_name",'team_description','location')
163+
df3 = df3.select("team_id", "team_name", "team_description", "location")
135164
# Transform data using the functions defined outside the main block
136165
# Join
137166
df2 = df2.join(df3, df2.user_team_id == df3.team_id, "left")
138167
df = df.join(df2, df.officer_id == df2.user_id, "left")
139168
df = df.drop("team_id", "user_id")
140-
## Data Processing Ends
141-
# Convert data frame to dynamic frame
169+
## Data Processing Ends
170+
# Convert data frame to dynamic frame
142171
dynamic_frame = DynamicFrame.fromDF(df, glueContext, "target_data_to_write")
143172

144-
# wipe out the target folder in the trusted zone
145-
logger.info(f'clearing target bucket')
173+
# wipe out the target folder in the trusted zone
174+
logger.info(f"clearing target bucket")
146175
clear_target_folder(s3_bucket_target)
147176

148-
# Write the data to S3
177+
# Write the data to S3
149178
parquet_data = glueContext.write_dynamic_frame.from_options(
150179
frame=dynamic_frame,
151180
connection_type="s3",
152181
format="parquet",
153-
connection_options={"path": s3_bucket_target, "partitionKeys": ['snapshot_year','snapshot_month','snapshot_day','snapshot_date']},
154-
transformation_ctx="target_data_to_write")
182+
connection_options={
183+
"path": s3_bucket_target,
184+
"partitionKeys": [
185+
"snapshot_year",
186+
"snapshot_month",
187+
"snapshot_day",
188+
"snapshot_date",
189+
],
190+
},
191+
transformation_ctx="target_data_to_write",
192+
)
155193
job.commit()

0 commit comments

Comments
 (0)