Skip to content

Commit d9c0f29

Browse files
committed
NRL-1320 grab delta each run
1 parent 300c824 commit d9c0f29

File tree

1 file changed

+29
-5
lines changed
  • terraform/account-wide-infrastructure/modules/glue/src

1 file changed

+29
-5
lines changed

terraform/account-wide-infrastructure/modules/glue/src/pipeline.py

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import time
2+
13
import boto3
24
from instances import GlueContextSingleton, LoggerSingleton
35
from pyspark.sql.functions import col
@@ -28,6 +30,7 @@ def __init__(
2830
region_name="eu-west-2",
2931
endpoint_url="https://glue.eu-west-2.amazonaws.com",
3032
)
33+
self.job_name = job_name
3134
self.name_prefix = "-".join(job_name.split("-")[:4])
3235

3336
def run(self):
@@ -47,16 +50,37 @@ def run(self):
4750
self.logger.error(f"ETL process failed: {e}")
4851
raise e
4952

53+
def get_last_run(self):
54+
allRuns = self.glue.get_job_runs(JobName=self.job_name)
55+
lastRuntime = None
56+
if allRuns["JobRuns"]:
57+
for i in allRuns["JobRuns"]:
58+
if i["JobRunState"] == "SUCCEEDED":
59+
lastRuntime = time.mktime(i["StartedOn"].timetuple())
60+
break
61+
else:
62+
continue
63+
64+
return lastRuntime
65+
5066
def extract(self):
5167
"""Extract JSON data from S3"""
5268
self.logger.info(f"Extracting data from {self.source_path} as JSON")
69+
lastRuntime = self.get_last_run()
5370
data = {}
5471
for name, schema in self.schemas.items():
55-
data[name] = (
56-
self.spark.read.option("recursiveFileLookup", "true")
57-
.schema(schema)
58-
.json(self.source_path)
59-
).where(col("host").contains(name))
72+
if lastRuntime:
73+
data[name] = (
74+
self.spark.read.option("recursiveFileLookup", "true")
75+
.schema(schema)
76+
.json(self.source_path)
77+
).where((col("host").contains(name)) & (col("time") > lastRuntime))
78+
else:
79+
data[name] = (
80+
self.spark.read.option("recursiveFileLookup", "true")
81+
.schema(schema)
82+
.json(self.source_path)
83+
).where(col("host").contains(name))
6084
return data
6185

6286
def transform(self, dataframe):

0 commit comments

Comments
 (0)