Skip to content

Add stacking/sync script for long wide table#15

Open
yukinko-iwasaki wants to merge 22 commits intomainfrom
stacking-scripts
Open

Add stacking/sync script for long wide table#15
yukinko-iwasaki wants to merge 22 commits intomainfrom
stacking-scripts

Conversation

@yukinko-iwasaki
Copy link

No description provided.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Comment on lines 7 to 8
import pyspark.sql.functions as f

Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PySpark module is imported twice, on lines 3 and 7. Remove the duplicate import to follow Python best practices and improve code maintainability.

Suggested change
import pyspark.sql.functions as f

Copilot uses AI. Check for mistakes.
Comment on lines 39 to 44
spark.sql(f"""
UPDATE `prd_csc_mega`.`sgld48`.`test_ingestion_metadata`
SET stacked_ouo_table_version = stacked_ouo_table_version + 1,
stacked_all_table_version = stacked_all_table_version + 1
WHERE table_name IN {names_tuple}
""")
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code uses string formatting to construct an SQL UPDATE statement, which is vulnerable to SQL injection. If the table_name field contains malicious SQL code, it could be executed. Use parameterized queries or proper SQL escaping to prevent SQL injection vulnerabilities.

Copilot uses AI. Check for mistakes.
Comment on lines 9 to 48
# 1. Read the source table
# 2. Randomize using f.rand()
# 3. Limit to 20 rows
test_df = spark.table("`prd_csc_mega`.`sgld48`.`_ingestion_metadata`") \
.orderBy(f.rand()) \
.limit(20)

# 4. Save as the new test table
# 'overwrite' mode mimics 'CREATE OR REPLACE'
test_df.write \
.mode("overwrite") \
.format("delta") \
.saveAsTable("`prd_csc_mega`.`sgld48`.`test_ingestion_metadata`")

# 1. Grab 5 random table names and "freeze" them into a Python list
# .collect() pulls the data out of the Spark plan and into local memory
random_rows = spark.table("`prd_csc_mega`.`sgld48`.`test_ingestion_metadata`") \
.select("table_name") \
.orderBy(f.rand()) \
.limit(5) \
.collect()

# Create a list of strings: ['table_a', 'table_b', ...]
target_tables = [row.table_name for row in random_rows]

# 2. Update the Delta table using the fixed list
# This uses the standard SQL 'IN' syntax but fills it with our Python list
if target_tables:
names_tuple = str(tuple(target_tables)).replace(",)", ")") # Handle single-item edge case

spark.sql(f"""
UPDATE `prd_csc_mega`.`sgld48`.`test_ingestion_metadata`
SET stacked_ouo_table_version = stacked_ouo_table_version + 1,
stacked_all_table_version = stacked_all_table_version + 1
WHERE table_name IN {names_tuple}
""")

print(f"Successfully updated: {target_tables}")
else:
print("No rows found to update.")
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test file doesn't define any actual test functions. All pytest test functions must start with "test_" prefix. The code appears to be a script that directly executes operations rather than a test. Consider restructuring this as proper test functions with assertions to validate expected behavior.

Suggested change
# 1. Read the source table
# 2. Randomize using f.rand()
# 3. Limit to 20 rows
test_df = spark.table("`prd_csc_mega`.`sgld48`.`_ingestion_metadata`") \
.orderBy(f.rand()) \
.limit(20)
# 4. Save as the new test table
# 'overwrite' mode mimics 'CREATE OR REPLACE'
test_df.write \
.mode("overwrite") \
.format("delta") \
.saveAsTable("`prd_csc_mega`.`sgld48`.`test_ingestion_metadata`")
# 1. Grab 5 random table names and "freeze" them into a Python list
# .collect() pulls the data out of the Spark plan and into local memory
random_rows = spark.table("`prd_csc_mega`.`sgld48`.`test_ingestion_metadata`") \
.select("table_name") \
.orderBy(f.rand()) \
.limit(5) \
.collect()
# Create a list of strings: ['table_a', 'table_b', ...]
target_tables = [row.table_name for row in random_rows]
# 2. Update the Delta table using the fixed list
# This uses the standard SQL 'IN' syntax but fills it with our Python list
if target_tables:
names_tuple = str(tuple(target_tables)).replace(",)", ")") # Handle single-item edge case
spark.sql(f"""
UPDATE `prd_csc_mega`.`sgld48`.`test_ingestion_metadata`
SET stacked_ouo_table_version = stacked_ouo_table_version + 1,
stacked_all_table_version = stacked_all_table_version + 1
WHERE table_name IN {names_tuple}
""")
print(f"Successfully updated: {target_tables}")
else:
print("No rows found to update.")
def _prepare_and_update_test_ingestion_metadata():
# 1. Read the source table
# 2. Randomize using f.rand()
# 3. Limit to 20 rows
test_df = spark.table("`prd_csc_mega`.`sgld48`.`_ingestion_metadata`") \
.orderBy(f.rand()) \
.limit(20)
# 4. Save as the new test table
# 'overwrite' mode mimics 'CREATE OR REPLACE'
test_df.write \
.mode("overwrite") \
.format("delta") \
.saveAsTable("`prd_csc_mega`.`sgld48`.`test_ingestion_metadata`")
# 1. Grab 5 random table names and "freeze" them into a Python list
# .collect() pulls the data out of the Spark plan and into local memory
random_rows = spark.table("`prd_csc_mega`.`sgld48`.`test_ingestion_metadata`") \
.select("table_name") \
.orderBy(f.rand()) \
.limit(5) \
.collect()
# Create a list of strings: ['table_a', 'table_b', ...]
target_tables = [row.table_name for row in random_rows]
# 2. Update the Delta table using the fixed list
# This uses the standard SQL 'IN' syntax but fills it with our Python list
if target_tables:
names_tuple = str(tuple(target_tables)).replace(",)", ")") # Handle single-item edge case
spark.sql(f"""
UPDATE `prd_csc_mega`.`sgld48`.`test_ingestion_metadata`
SET stacked_ouo_table_version = stacked_ouo_table_version + 1,
stacked_all_table_version = stacked_all_table_version + 1
WHERE table_name IN {names_tuple}
""")
print(f"Successfully updated: {target_tables}")
else:
print("No rows found to update.")
def test_prepare_and_update_test_ingestion_metadata():
"""
Basic pytest-compatible test that runs the stacking pipeline and verifies
that the target test table can be read and contains at most 20 rows.
"""
_prepare_and_update_test_ingestion_metadata()
# Verify the test table exists and the row count matches the expected limit.
df = spark.table("`prd_csc_mega`.`sgld48`.`test_ingestion_metadata`")
row_count = df.count()
assert 0 <= row_count <= 20

Copilot uses AI. Check for mistakes.
spark_write_table(ouo_df, HARMONIZED_OFFICIAL, mode = "overwrite", options = list("overwriteSchema" = "true"))
}

#
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment on line 183 is incomplete or unnecessary. It consists only of a "#" with no content, which should either be removed or completed with meaningful information.

Suggested change
#

Copilot uses AI. Check for mistakes.
Comment on lines 1 to 2
# Databricks notebook source
#TESTING
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file has a Databricks notebook source comment, but it appears to be a test file in the pytest directory. Test files should not be Databricks notebooks and should follow the proper pytest structure. The comment "#TESTING" on line 2 also appears to be a placeholder or debugging comment that should be removed.

Suggested change
# Databricks notebook source
#TESTING

Copilot uses AI. Check for mistakes.
Comment on lines 59 to 71
if (SparkR::tableExists(HARMONIZED_CONFIDENTIAL)) {
harmonized_all <- tbl(sc, HARMONIZED_CONFIDENTIAL)
} else {
create_query <- paste0(
"CREATE TABLE ", HARMONIZED_CONFIDENTIAL,
" (", columns_sql, ") USING DELTA"
)
DBI::dbExecute(sc, create_query)
harmonized_all <- tbl(sc, HARMONIZED_CONFIDENTIAL)
}

# Check and create HARMONIZED_OFFICIAL if needed
if (SparkR::tableExists(HARMONIZED_OFFICIAL)) {
Copy link

Copilot AI Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using SparkR::tableExists while the rest of the script uses sparklyr is inconsistent and may not work correctly. The sparklyr package provides its own table existence checking through DBI::dbExistsTable or by catching errors when trying to access the table. This mixing of Spark interfaces could lead to compatibility issues.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants