Skip to content

Commit 31413b5

Browse files
committed
NRL-1187 format data
1 parent 6d3fc80 commit 31413b5

File tree

3 files changed

+13
-3
lines changed

3 files changed

+13
-3
lines changed

terraform/account-wide-infrastructure/modules/glue/src/main.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from awsglue.utils import getResolvedOptions
44
from pipeline import LogPipeline
55
from pyspark.context import SparkContext
6-
from transformations import logSchema, placeholder
6+
from transformations import flatten_df, logSchema
77

88
# Get arguments from AWS Glue job
99
args = getResolvedOptions(sys.argv, ["job_name", "source_path", "target_path"])
@@ -20,7 +20,7 @@
2020
target_path=args["target_path"],
2121
schema=logSchema,
2222
partition_cols=partition_cols,
23-
transformations=[placeholder],
23+
transformations=[flatten_df],
2424
)
2525

2626
# Run the job

terraform/account-wide-infrastructure/modules/glue/src/pipeline.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,6 @@ def transform(self, dataframe):
5050
def load(self, dataframe):
5151
"""Load transformed data into Parquet format"""
5252
self.logger.info(f"Loading data into {self.target_path} as Parquet")
53-
dataframe.write.mode("overwrite").partitionBy(*self.partition_cols).parquet(
53+
dataframe.write.mode("append").partitionBy(*self.partition_cols).parquet(
5454
self.target_path
5555
)

terraform/account-wide-infrastructure/modules/glue/src/transformations.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,5 +58,15 @@
5858
)
5959

6060

61+
def flatten_df(df):
62+
cols = []
63+
for c in df.dtypes:
64+
if "struct" in c[1]:
65+
nested_col = c[0]
66+
else:
67+
cols.append(c[0])
68+
return df.select(*cols, f"{nested_col}.*")
69+
70+
6171
def placeholder(df):
6272
return df

0 commit comments

Comments
 (0)