Skip to content

Commit c129e38

Browse files
committed
NRL-1346 Handle SSP logs
1 parent 3d9c90a commit c129e38

File tree

4 files changed

+75
-22
lines changed

4 files changed

+75
-22
lines changed

terraform/account-wide-infrastructure/modules/glue/glue.tf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ resource "aws_glue_crawler" "log_crawler" {
4646
s3_target {
4747
path = "s3://${aws_s3_bucket.target-data-bucket.id}/producer_upsertDocumentReference/"
4848
}
49+
s3_target {
50+
path = "s3://${aws_s3_bucket.target-data-bucket.id}/ssp/"
51+
}
4952
schema_change_policy {
5053
delete_behavior = "LOG"
5154
}

terraform/account-wide-infrastructure/modules/glue/src/main.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from awsglue.utils import getResolvedOptions
66
from pipeline import LogPipeline
77
from pyspark.sql import SparkSession
8-
from transformations import dtype_conversion, rename_cols, resolve_dupes
8+
from transformations import dtype_conversion, format_ssp, rename_cols, resolve_dupes
99

1010
# Spark and Glue Context initialization
1111
spark = SparkSession.builder.config("spark.sql.caseSensitive", "true").getOrCreate()
@@ -37,6 +37,7 @@
3737
"producer--updateDocumentReference",
3838
"producer--deleteDocumentReference",
3939
"producer--createDocumentReference",
40+
"s2c",
4041
]
4142

4243
# Initialize ETL process
@@ -49,7 +50,7 @@
4950
host_prefixes=host_prefixes,
5051
job_name=args["job_name"],
5152
partition_cols=partition_cols,
52-
transformations=[rename_cols, resolve_dupes, dtype_conversion],
53+
transformations=[rename_cols, resolve_dupes, dtype_conversion, format_ssp],
5354
)
5455

5556
# Run the job

terraform/account-wide-infrastructure/modules/glue/src/pipeline.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def extract_dynamic(self):
8181
},
8282
format="json",
8383
).filter(
84-
f=lambda x, n=name: (x["host"].endswith(n))
84+
f=lambda x, n=name: (x["host"] is not None and n in x["host"])
8585
and (x["time"] > last_runtime)
8686
)
8787

@@ -95,7 +95,7 @@ def extract_dynamic(self):
9595
"groupSize": "134217728",
9696
},
9797
format="json",
98-
).filter(f=lambda x, n=name: x["host"].endswith(n))
98+
).filter(f=lambda x, n=name: (x["host"] is not None and n in x["host"]))
9999

100100
return data
101101

@@ -107,23 +107,25 @@ def transform(self, dataframe, name):
107107
)
108108
for transformation in self.transformations:
109109
self.logger.info(f"Applying transformation: {transformation.__name__}")
110-
dataframe = transformation(dataframe, self.logger)
110+
dataframe = transformation(dataframe, self.logger, name)
111111
return dataframe
112112

113113
def load(self, data):
114114
"""Load transformed data into Parquet format"""
115115
self.logger.info(f"Loading data into {self.target_path} as Parquet")
116116
for name, dataframe in data.items():
117117
name = name.replace("--", "_")
118+
if name == "s2c":
119+
name = "ssp"
118120
try:
119121
self.logger.info(
120122
f"Attempting to load dataframe {name} into {self.target_path}{name}"
121123
)
122124
dataframe.write.mode("append").partitionBy(
123125
*self.partition_cols
124126
).parquet(f"{self.target_path}{name}")
125-
except:
126-
self.logger.info(f"{name} dataframe has no rows. Skipping.")
127+
except Exception as e:
128+
self.logger.info(f"{name} failed to write with error: {e}")
127129

128130
def trigger_crawler(self):
129131
self.glue.start_crawler(Name=f"{self.name_prefix}-log-crawler")

terraform/account-wide-infrastructure/modules/glue/src/transformations.py

Lines changed: 62 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,43 @@
1515
from pyspark.sql.types import NullType
1616

1717

18-
def resolve_dupes(df, logger):
18+
def format_ssp(df, logger, name):
19+
if name != "s2c":
20+
logger.info(f"Not SSP logs, returning df: {name}")
21+
return df
22+
23+
logger.info(f"Processing SSP logs")
24+
noODSCode = df.filter(col("logReference") != "SSP0001")
25+
ODSCode = df.filter(col("logReference") == "SSP0001")
26+
27+
noODSCode = noODSCode.select(
28+
"time",
29+
"host",
30+
"internalID",
31+
"logReference",
32+
"interaction",
33+
"responseCode",
34+
"responseErrorMessage",
35+
"totalDuration",
36+
)
37+
ODSCode = ODSCode.select(
38+
"sspFrom",
39+
"fromOrgName",
40+
"fromOdsCode",
41+
"fromPostCode",
42+
"sspTo",
43+
"toOrgName",
44+
"toOdsCode",
45+
"toPostCode",
46+
"internalID",
47+
)
48+
49+
df = noODSCode.join(ODSCode, on="internalID", how="left")
50+
51+
return df
52+
53+
54+
def resolve_dupes(df, logger, name):
1955
column_groups = defaultdict(list)
2056
for column_name in df.columns:
2157
normalised_name = column_name.lower().rstrip("_")
@@ -27,7 +63,9 @@ def resolve_dupes(df, logger):
2763
if len(original_names) == 1:
2864
final_select_exprs.append(col(original_names[0]).alias(lower_name))
2965
else:
30-
logger.info(f"Resolving duplicate group '{lower_name}': {original_names}")
66+
logger.info(
67+
f"Resolving duplicate group '{lower_name}': {original_names} for df: {name}"
68+
)
3169

3270
merge_logic = lambda col1, col2: when(
3371
col1.isNull() | col2.isNull(), coalesce(col1, col2)
@@ -40,34 +78,43 @@ def resolve_dupes(df, logger):
4078
return df.select(*final_select_exprs)
4179

4280

43-
def rename_cols(df, logger):
44-
logger.info("Replacing '.' with '_'")
81+
def rename_cols(df, logger, name):
82+
logger.info(f"Replacing '.' with '_' for df: {name}")
4583
for col_name in df.columns:
4684
df = df.withColumnRenamed(col_name, col_name.replace(".", "_"))
4785
return df
4886

4987

50-
def dtype_conversion(df, logger):
88+
def dtype_conversion(df, logger, name):
5189
try:
52-
logger.info("Formatting event_timestamp")
53-
df = (
54-
df.withColumn(
90+
logger.info(f"Formatting event_timestamp, time and date columns for df: {name}")
91+
if "event_timestamp" in df.columns:
92+
df = df.withColumn(
5593
"event_timestamp_cleaned",
5694
regexp_replace(col("event_timestamp"), ",", "."),
57-
)
58-
.withColumn(
95+
).withColumn(
5996
"event_timestamp",
6097
to_timestamp(
6198
col("event_timestamp_cleaned"), "yyyy-MM-dd HH:mm:ss.SSSZ"
6299
),
63100
)
64-
.withColumn("time", from_unixtime(col("time")).cast("timestamp"))
65-
.withColumn("date", to_date(col("time")))
66-
)
67101

68-
df = df.drop("event_timestamp_cleaned")
102+
df = df.drop("event_timestamp_cleaned")
103+
104+
if "time" in df.columns:
105+
df = df.withColumn(
106+
"time", from_unixtime(col("time")).cast("timestamp")
107+
).withColumn("date", to_date(col("time")))
108+
109+
if "_time" in df.columns:
110+
df = df.withColumn(
111+
"time", to_timestamp(col("_time"), "yyyy-MM-dd HH:mm:ss.SSSZ")
112+
).withColumn("date", to_date(col("time")))
113+
114+
df = df.drop("_time")
115+
69116
except Exception as e:
70-
logger.info(f"Failed formatting of timestamp column with error: {e}")
117+
logger.info(f"Failed formatting of timestamp columns with error: {e}")
71118

72119
logger.info("Handling Null Type columns")
73120
select_exprs = []

0 commit comments

Comments
 (0)