Skip to content

Commit ebced37

Browse files
committed
documentation: Added Collab notebook Ingesting data from DB into database Example (#3059)
Added Collab notebook for the example - Ingesting data from DB into Database #3059 Open To make it easier for the user to work the library in Google Collab notebook. enhancement Approved-pr-by: @duyguHsnHsn Signed-off-by: Mahesh Kumar Kadireddy <[email protected]>
1 parent 3d97f64 commit ebced37

File tree

16 files changed

+1578
-7
lines changed

16 files changed

+1578
-7
lines changed

examples/ingest-from-db-example/ingest-from-db-example-notebook/Ingesting_data_from_DB_into_Database.ipynb

Lines changed: 1124 additions & 0 deletions
Large diffs are not rendered by default.

projects/control-service/projects/job-builder-secure/Dockerfile.python.vdk

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,14 +52,14 @@ RUN : \
5252
&& pip install --no-cache-dir --disable-pip-version-check -q -r "$job_name/$requirements_file" \
5353
|| ( echo ">requirements_failed<" && exit 1 ) \
5454
&& echo "Removing native dependencies ..." \
55-
&& yum autoremove build-essential gcc glibc-devel git unzip -y \
55+
&& yum remove build-essential gcc glibc-devel git unzip -y \
5656
&& yum remove freetype2-devel libpng-devel -y; fi \
5757
&& echo "Installing native dependencies ..." \
5858
&& yum install libstdc++ findutils openssl-c_rehash -y \
5959
&& echo "Refreshing CA certificates ..." \
6060
&& /usr/bin/rehash_ca_certificates.sh \
6161
&& echo "Deleting system packages ..." \
62-
&& yum autoremove shadow toybox openssl-c_rehash -y \
62+
&& yum remove shadow toybox openssl-c_rehash -y \
6363
&& echo "Deleting system directories ..." \
6464
&& rm -rf /boot /home /media /mnt /root /srv /usr/lib/ldscripts /usr/lib/rpm /usr/lib/sysimage \
6565
/usr/lib/tdnf /usr/lib/perl5 /usr/lib/gcc /usr/share/locale /tmp/* /usr/include /usr/libexec /usr/libexec \
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.3.11
1+
1.3.12
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Copyright 2023-2024 Broadcom
2+
# SPDX-License-Identifier: Apache-2.0
3+
import logging
4+
5+
from vdk.api.job_input import IJobInput
6+
from vdk.plugin.trino.trino_utils import TrinoTemplateQueries
7+
8+
log = logging.getLogger(__name__)
9+
10+
11+
def run(job_input: IJobInput):
12+
"""
13+
In this step we try to recover potentially unexistent target table from backup.
14+
In some cases the template might fail during the step where new data is written in target table
15+
(last step where tmp_target_table contents are moved to target_table). If this happens, the job fails and
16+
target table is no longer present. Fortunately it has a backup.
17+
So when the job is retried, this first step should recover the target (if the reason for the previous fail
18+
is no longer present).
19+
"""
20+
21+
args = job_input.get_arguments()
22+
target_schema = args.get("target_schema")
23+
target_table = args.get("target_table")
24+
trino_queries = TrinoTemplateQueries(job_input)
25+
26+
trino_queries.ensure_target_exists_step(db=target_schema, target_name=target_table)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
(SELECT * FROM "{source_schema}"."{source_view}" LIMIT 0)
2+
UNION ALL
3+
(SELECT * FROM "{target_schema}"."{target_table}" LIMIT 0)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
CREATE TABLE "{target_schema_staging}"."{target_table_staging}" AS
2+
(
3+
SELECT t.*
4+
FROM "{target_schema}"."{target_table}" AS t
5+
LEFT JOIN "{source_schema}"."{source_view}" AS s ON s."{id_column}" = t."{id_column}"
6+
WHERE s."{id_column}" IS NULL
7+
)
8+
UNION ALL
9+
(
10+
SELECT *
11+
FROM "{source_schema}"."{source_view}"
12+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DROP TABLE IF EXISTS "{target_schema}"."{target_table}"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
INSERT INTO "{target_schema}"."{target_table}"
2+
SELECT * FROM "{source_schema}"."{source_table}"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
SHOW CREATE TABLE "{target_schema}"."{target_table}"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
# Copyright 2023-2024 Broadcom
2+
# SPDX-License-Identifier: Apache-2.0
3+
import logging
4+
import os
5+
import re
6+
7+
from vdk.api.job_input import IJobInput
8+
from vdk.plugin.trino.templates.data_quality_exception import DataQualityException
9+
from vdk.plugin.trino.trino_utils import CommonUtilities
10+
11+
log = logging.getLogger(__name__)
12+
13+
SQL_FILES_FOLDER = (
14+
os.path.dirname(os.path.abspath(__file__)) + "/02-requisite-sql-scripts"
15+
)
16+
17+
18+
"""
19+
This step is intended to handle quality checks if such are provided
20+
and stop the data from being populated into the target table if the check has negative outcome.
21+
Otherwise the data will be directly processed according to the used template type
22+
"""
23+
24+
25+
def run(job_input: IJobInput):
26+
"""
27+
0. Drop staging table
28+
1. Insert target table data, upserted by source view data, to staging table
29+
2. if check,
30+
- send temp/staging table for check validation
31+
- If validated,
32+
- copy the data from staging to target table
33+
- else Raise error
34+
else,
35+
- copy the data from staging to target table
36+
3. Copying the data:
37+
- truncate target table and insert the data from staging table
38+
"""
39+
40+
job_arguments = job_input.get_arguments()
41+
42+
check = job_arguments.get("check")
43+
source_schema = job_arguments.get("source_schema")
44+
source_view = job_arguments.get("source_view")
45+
target_schema = job_arguments.get("target_schema")
46+
target_table = job_arguments.get("target_table")
47+
id_column = job_arguments.get("id_column")
48+
49+
staging_schema = job_arguments.get("staging_schema", target_schema)
50+
staging_table = CommonUtilities.get_staging_table_name(target_schema, target_table)
51+
52+
# Drop staging table
53+
drop_table_query = CommonUtilities.get_file_content(
54+
SQL_FILES_FOLDER, "02-drop-table.sql"
55+
)
56+
drop_table = drop_table_query.format(
57+
target_schema=staging_schema, target_table=staging_table
58+
)
59+
job_input.execute_query(drop_table)
60+
61+
# create staging table and insert data
62+
create_table_and_insert_data_query = CommonUtilities.get_file_content(
63+
SQL_FILES_FOLDER, "02-create-table-and-insert-data.sql"
64+
)
65+
create_staging_table_and_insert_data = create_table_and_insert_data_query.format(
66+
target_schema=target_schema,
67+
target_table=target_table,
68+
source_schema=source_schema,
69+
source_view=source_view,
70+
target_schema_staging=staging_schema,
71+
target_table_staging=staging_table,
72+
id_column=id_column,
73+
)
74+
job_input.execute_query(create_staging_table_and_insert_data)
75+
76+
staging_table_full_name = f"{staging_schema}.{staging_table}"
77+
78+
# copy the data if there's no quality check configure or if it passes
79+
if not check or check(staging_table_full_name):
80+
copy_staging_table_to_target_table(
81+
job_input, target_schema, target_table, staging_schema, staging_table
82+
)
83+
else:
84+
target_table_full_name = f"{target_schema}.{target_table}"
85+
raise DataQualityException(
86+
checked_object=staging_table_full_name,
87+
source_view=f"{source_schema}.{source_view}",
88+
target_table=target_table_full_name,
89+
)
90+
91+
92+
def copy_staging_table_to_target_table(
93+
job_input: IJobInput,
94+
target_schema,
95+
target_table,
96+
source_schema,
97+
source_table,
98+
):
99+
# non-partitioned tables:
100+
# - Since truncate and delete do not work for non-partitioned tables - get the create statement, drop the table and then re-create it - we do this to preserve and metadata like user comments
101+
# - Insert contents from staging table in target table
102+
# - Delete staging table
103+
show_create_query = CommonUtilities.get_file_content(
104+
SQL_FILES_FOLDER, "02-show-create-table.sql"
105+
)
106+
show_create_target_table = show_create_query.format(
107+
target_schema=target_schema, target_table=target_table
108+
)
109+
110+
table_create_statement = job_input.execute_query(show_create_target_table)
111+
# remove the "external_location" clause from the create statement as it might lead to data not being cleaned up properly in hive
112+
table_create_statement = remove_external_location(table_create_statement[0][0])
113+
114+
# drop the table
115+
drop_table_query = CommonUtilities.get_file_content(
116+
SQL_FILES_FOLDER, "02-drop-table.sql"
117+
)
118+
drop_table = drop_table_query.format(
119+
target_schema=target_schema, target_table=target_table
120+
)
121+
job_input.execute_query(drop_table)
122+
123+
# re-create the table
124+
job_input.execute_query(table_create_statement)
125+
126+
# insert the data
127+
insert_into_table_query = CommonUtilities.get_file_content(
128+
SQL_FILES_FOLDER, "02-insert-into-table.sql"
129+
)
130+
insert_into_table = insert_into_table_query.format(
131+
target_schema=target_schema,
132+
target_table=target_table,
133+
source_schema=source_schema,
134+
source_table=source_table,
135+
)
136+
job_input.execute_query(insert_into_table)
137+
138+
139+
def remove_external_location(sql_statement):
140+
# Regular expression pattern to match the external_location clause
141+
pattern = r"external_location\s*=\s*'[^']*',?\s*"
142+
143+
# Remove the external_location clause from the SQL statement
144+
cleaned_sql = re.sub(pattern, "", sql_statement, flags=re.IGNORECASE)
145+
146+
return cleaned_sql

0 commit comments

Comments
 (0)