Skip to content

Commit 551c084

Browse files
committed
NRL-1411 Handle empty dataframes
1 parent 164157a commit 551c084

File tree

2 files changed

+23
-14
lines changed

2 files changed

+23
-14
lines changed

terraform/account-wide-infrastructure/modules/glue/src/pipeline.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,12 @@ def load(self, data):
104104
self.logger.info(f"Loading data into {self.target_path} as Parquet")
105105
for name, dataframe in data.items():
106106
name = name.replace("--", "_")
107-
dataframe.coalesce(1).write.mode("append").partitionBy(
108-
*self.partition_cols
109-
).parquet(f"{self.target_path}{name}")
107+
try:
108+
dataframe.coalesce(1).write.mode("append").partitionBy(
109+
*self.partition_cols
110+
).parquet(f"{self.target_path}{name}")
111+
except:
112+
self.logger.info(f"{name} dataframe has no rows. Skipping.")
110113

111114
def trigger_crawler(self):
112115
self.glue.start_crawler(Name=f"{self.name_prefix}-log-crawler")

terraform/account-wide-infrastructure/modules/glue/src/transformations.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -39,19 +39,25 @@ def rename_cols(df):
3939

4040

4141
def dtype_conversion(df):
42-
df = (
43-
df.withColumn(
44-
"event_timestamp_cleaned", regexp_replace(col("event_timestamp"), ",", ".")
42+
try:
43+
df = (
44+
df.withColumn(
45+
"event_timestamp_cleaned",
46+
regexp_replace(col("event_timestamp"), ",", "."),
47+
)
48+
.withColumn(
49+
"event_timestamp",
50+
to_timestamp(
51+
col("event_timestamp_cleaned"), "yyyy-MM-dd HH:mm:ss.SSSZ"
52+
),
53+
)
54+
.withColumn("time", from_unixtime(col("time")).cast("timestamp"))
55+
.withColumn("date", to_date(col("time")))
4556
)
46-
.withColumn(
47-
"event_timestamp",
48-
to_timestamp(col("event_timestamp_cleaned"), "yyyy-MM-dd HH:mm:ss.SSSZ"),
49-
)
50-
.withColumn("time", from_unixtime(col("time")).cast("timestamp"))
51-
.withColumn("date", to_date(col("time")))
52-
)
5357

54-
df = df.drop("event_timestamp_cleaned")
58+
df = df.drop("event_timestamp_cleaned")
59+
except:
60+
...
5561

5662
select_exprs = []
5763
for column_name in df.columns:

0 commit comments

Comments
 (0)