-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path03-history-loader.py
More file actions
40 lines (33 loc) · 1.61 KB
/
03-history-loader.py
File metadata and controls
40 lines (33 loc) · 1.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# Databricks notebook source
# MAGIC %run ./01-config
# COMMAND ----------
class HistoryLoader():
def __init__(self):
Conf = Config()
self.landing_zone = Conf.base_dir_data + "/raw"
self.test_data_dir = Conf.base_dir_data + "/test_data"
self.catalog = env
self.db_name = Conf.db_name
def load_date_lookup(self):
print(f"Loading date lookup table.....")
spark.sql(f"""INSERT OVERWRITE TABLE {self.catalog}.{self.db_name}.date_lookup
SELECT date,week,year,month,dayofweek,dayofmonth,dayofyear,week_part
FROM json. `{self.test_data_dir}/6-date-lookup.json/`""")
print("Done")
def load_history(self):
import time
start = int(time.time())
print(f"Starting historical data load....")
self.load_date_lookup()
print(f"Historical data load completed in {int(time.time())-start} seconds")
def assert_count(self,table_name,expected_count):
print(f"validating records count in {table_name}")
actual_count = spark.read.table(f"{self.catalog}.{self.db_name}.{table_name}").count()
assert actual_count == expected_count, f"Expected {expected_count:,} records, found {actual_count:,} in {table_name}"
print(f"Found {actual_count:, }/Expected {expected_count:, }records: Success")
def validate(self):
import time
start = int(time.time())
print(f"Starting historical data load validation.....")
self.assert_count(f"date_lookup",365)
print(f"Historical data load validation completed in {int(time.time())-start} seconds")