Skip to content

Commit 47205c1

Browse files
committed
NRL-1268 configure timestamp and partition by date
1 parent 3ad2c3c commit 47205c1

File tree

3 files changed

+24
-8
lines changed

3 files changed

+24
-8
lines changed

terraform/account-wide-infrastructure/modules/glue/glue.tf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ resource "aws_glue_job" "glue_job" {
5252
"--source_path" = "s3://${aws_s3_bucket.source-data-bucket.id}/" # Specify the source S3 path
5353
"--target_path" = "s3://${aws_s3_bucket.target-data-bucket.id}/logs" # Specify the destination S3 path
5454
"--job_name" = "${var.name_prefix}-glue-job"
55+
"--partition_cols" = "date"
5556
"--enable-continuous-log-filter" = "true"
5657
"--enable-metrics" = "true"
5758
"--extra-py-files" = "s3://${aws_s3_bucket.code-bucket.id}/src.zip"

terraform/account-wide-infrastructure/modules/glue/src/main.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
from transformations import dtype_conversion, flatten_df, logSchema
77

88
# Get arguments from AWS Glue job
9-
args = getResolvedOptions(sys.argv, ["job_name", "source_path", "target_path"])
9+
args = getResolvedOptions(
10+
sys.argv, ["job_name", "source_path", "target_path", "partition_cols"]
11+
)
1012

1113
# Start Glue context
1214
sc = SparkContext()

terraform/account-wide-infrastructure/modules/glue/src/transformations.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,21 @@
1-
from pyspark.sql.functions import col, to_timestamp
1+
from pyspark.sql.functions import (
2+
col,
3+
from_unixtime,
4+
regexp_replace,
5+
to_date,
6+
to_timestamp,
7+
)
28
from pyspark.sql.types import (
39
BooleanType,
10+
DoubleType,
411
StringType,
512
StructField,
613
StructType,
7-
TimestampType,
814
)
915

1016
logSchema = StructType(
1117
[
12-
StructField("time", TimestampType(), True),
18+
StructField("time", DoubleType(), True),
1319
StructField("index", StringType(), True),
1420
StructField("host", StringType(), True),
1521
StructField("source", StringType(), True),
@@ -80,8 +86,15 @@ def flatten(schema, prefix=""):
8086

8187

8288
def dtype_conversion(df):
83-
df = df.withColumn(
84-
"event_timestamp",
85-
to_timestamp(df["event_timestamp"], "yyyy-MM-dd HH:mm:ss,SSSXXX"),
89+
df = (
90+
df.withColumn(
91+
"event_timestamp_cleaned", regexp_replace(col("event_timestamp"), ",", ".")
92+
)
93+
.withColumn(
94+
"event_timestamp",
95+
to_timestamp(col("event_timestamp_cleaned"), "yyyy-MM-dd HH:mm:ss.SSSZ"),
96+
)
97+
.withColumn("time", from_unixtime(col("time")).cast("timestamp"))
98+
.withColumn("date", to_date(col("time")))
8699
)
87-
return df
100+
return df.drop("event_timestamp_cleaned")

0 commit comments

Comments
 (0)