Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .env.test.docker
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ REDSHIFT_USER=
REDSHIFT_PASSWORD=
REDSHIFT_IAM_COPY_ROLE=
REDSHIFT_CERT_PATH=
PGTEST_DATABASE_URL=postgresql://postgres:postgres_root_password@druzhba_postgres_1:5432/druzhba_test
MYSQLTEST_DATABASE_URL=mysql://root:mysql_root_password@druzhba_mysql_1:3306/druzhba_test
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ RUN apt-get update && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*

RUN curl https://pyenv.run | bash
ENV PATH="/root/.pyenv/bin:${PATH}"
ENV PATH="/root/.pyenv/shims:/root/.pyenv/bin:${PATH}"
WORKDIR /app
COPY . .
ARG PYTHON_VERSION=3.10.4
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ services:
build:
context: .
dockerfile: Dockerfile
command: "python setup.py test"
command: "pytest test/unit/"
env_file:
- ./.env.test.docker
volumes:
Expand Down
17 changes: 13 additions & 4 deletions druzhba/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -785,16 +785,25 @@ def get_destination_table_status(self):
elif len(unexpected_dw_columns) > 0:
msg = (
"Columns exist in the warehouse table that are not in "
"the source: `%s`"
"the source: `%s` (source table: %s, destination table: %s)"
) % (
"`, `".join(unexpected_dw_columns),
self.source_table_name,
self.destination_table_name,
)
self.logger.warning(msg, "`, `".join(unexpected_dw_columns))
self.logger.warning(msg)
return self.DESTINATION_TABLE_INCORRECT
elif len(unexpected_source_columns) > 0:
msg = (
"Columns exist in the source table that are not in the "
+ "warehouse. Skipping column(s): `%s`"
"warehouse. Skipping column(s): `%s` "
"(source table: %s, destination table: %s)"
) % (
"`, `".join(unexpected_source_columns),
self.source_table_name,
self.destination_table_name,
)
self.logger.warning(msg, "`, `".join(unexpected_source_columns))
self.logger.warning(msg)

# Copy from avro will just ignore the extra columns so we can proceed
return self.DESTINATION_TABLE_OK
Expand Down
126 changes: 74 additions & 52 deletions test/unit/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@


class IgnoreWhitespace(str):
""" Wrap a string so it compares equal to other strings
except for repeated whitespace characters. """
"""Wrap a string so it compares equal to other strings
except for repeated whitespace characters."""

r = re.compile(r"(\s+)")

Expand Down Expand Up @@ -207,7 +207,7 @@ def test_where_clause(self):
self.assertEqual(
table_config.where_clause(), "\nWHERE id > '13' AND id <= '42'"
)

with patch(
"druzhba.table.TableConfig.old_index_value", new_callable=PropertyMock
) as oiv:
Expand All @@ -217,7 +217,8 @@ def test_where_clause(self):
) as niv:
niv.return_value = 42
with patch(
"druzhba.table.TableConfig.lookback_index_value", new_callable=PropertyMock
"druzhba.table.TableConfig.lookback_index_value",
new_callable=PropertyMock,
) as liv:
liv.return_value = 10
table_config = TableConfig(
Expand Down Expand Up @@ -589,6 +590,44 @@ def test_error_if_destination_table_incorrect(self, r):
tt3.check_destination_table_status()
self.assertEqual(tt3._destination_table_status, tt.DESTINATION_TABLE_REBUILD)

def test_unexpected_dw_columns_log_message(self):
tt = self.MockTable()
tt._dw_columns = ["col1", "col2", "extra_col"]
tt._columns = ["col1", "col2"]
logging.disable(logging.NOTSET)
try:
with self.assertLogs(tt.logger, level="WARNING") as cm:
result = tt.get_destination_table_status()
self.assertEqual(result, TableConfig.DESTINATION_TABLE_INCORRECT)
self.assertEqual(len(cm.output), 1)
self.assertEqual(
cm.output[0],
"WARNING:test_logger:Columns exist in the warehouse table that are "
"not in the source: `extra_col` "
"(source table: org_table, destination table: my_table)",
)
finally:
logging.disable(logging.CRITICAL)

def test_unexpected_source_columns_log_message(self):
tt = self.MockTable()
tt._dw_columns = ["col1", "col2"]
tt._columns = ["col1", "col2", "new_col"]
logging.disable(logging.NOTSET)
try:
with self.assertLogs(tt.logger, level="WARNING") as cm:
result = tt.get_destination_table_status()
self.assertEqual(result, TableConfig.DESTINATION_TABLE_OK)
self.assertEqual(len(cm.output), 1)
self.assertEqual(
cm.output[0],
"WARNING:test_logger:Columns exist in the source table that are "
"not in the warehouse. Skipping column(s): `new_col` "
"(source table: org_table, destination table: my_table)",
)
finally:
logging.disable(logging.CRITICAL)

def test_query_description_to_avro(self):
tt = self.MockTable()
tt._desc = [["column_1", "db_str("], ["column_2", "db_int"]]
Expand Down Expand Up @@ -673,7 +712,9 @@ def test_extract_full_single(self, mock_io):

tt.write_manifest_file.assert_not_called()
tt._upload_s3.assert_called_once_with(
ANY, "my-bucket", "my_prefix/my_db.org_table.20190101T010203.avro",
ANY,
"my-bucket",
"my_prefix/my_db.org_table.20190101T010203.avro",
)

self.assertEqual(tt.row_count, 3)
Expand Down Expand Up @@ -744,17 +785,13 @@ def test_redshift_copy_create(self):
call('LOCK TABLE "my_table";'),
call('DROP TABLE IF EXISTS "my_db_my_table_staging";'),
call('CREATE TABLE "my_db_my_table_staging" (LIKE "my_table");'),
call(
IgnoreWhitespace(
f"""
call(IgnoreWhitespace(f"""
COPY "my_db_my_table_staging" FROM 's3://my-bucket/my_prefix/my_db.org_table.20190101T010203.avro'
CREDENTIALS 'aws_iam_role=iam_copy_role'
FORMAT AS AVRO 'auto'
EXPLICIT_IDS ACCEPTINVCHARS TRUNCATECOLUMNS
COMPUPDATE OFF STATUPDATE OFF;
"""
)
),
""")),
call('DELETE FROM "my_table";'),
call('INSERT INTO "my_table" SELECT * FROM "my_db_my_table_staging";'),
call('DROP TABLE "my_db_my_table_staging";'),
Expand Down Expand Up @@ -789,17 +826,13 @@ def test_redshift_copy_incremental_single(self):
call('LOCK TABLE "my_table";'),
call('DROP TABLE IF EXISTS "my_db_my_table_staging";'),
call('CREATE TABLE "my_db_my_table_staging" (LIKE "my_table");'),
call(
IgnoreWhitespace(
"""
call(IgnoreWhitespace("""
COPY "my_db_my_table_staging" FROM 's3://my-bucket/my_prefix/my_db.org_table.20190101T010203.avro'
CREDENTIALS 'aws_iam_role=iam_copy_role'
FORMAT AS AVRO 'auto'
EXPLICIT_IDS ACCEPTINVCHARS TRUNCATECOLUMNS
COMPUPDATE OFF STATUPDATE OFF;
"""
)
),
""")),
call('DELETE FROM "my_table" ' + where_clause + ";"),
call('INSERT INTO "my_table" SELECT * FROM "my_db_my_table_staging";'),
call('DROP TABLE "my_db_my_table_staging";'),
Expand All @@ -819,7 +852,10 @@ def test_redshift_copy_incremental_manifest(self, r):
tt.manifest_mode = True
tt.s3 = Mock()
tt.s3.list_objects = Mock(
return_value=[{"ContentLength": 30}, {"ContentLength": 21},]
return_value=[
{"ContentLength": 30},
{"ContentLength": 21},
]
)
tt.key_name = None
tt._destination_table_status = tt.DESTINATION_TABLE_OK
Expand All @@ -837,18 +873,14 @@ def test_redshift_copy_incremental_manifest(self, r):
call('LOCK TABLE "my_table";'),
call('DROP TABLE IF EXISTS "my_db_my_table_staging";'),
call('CREATE TABLE "my_db_my_table_staging" (LIKE "my_table");'),
call(
IgnoreWhitespace(
f"""
call(IgnoreWhitespace(f"""
COPY "my_db_my_table_staging" FROM 's3://{get_redshift().s3_config.bucket}/{get_redshift().s3_config.prefix}/my_db.org_table.20190101T010203.manifest'
CREDENTIALS 'aws_iam_role=iam_copy_role'
MANIFEST
FORMAT AS AVRO 'auto'
EXPLICIT_IDS ACCEPTINVCHARS TRUNCATECOLUMNS
COMPUPDATE OFF STATUPDATE OFF;
"""
)
),
""")),
call('DELETE FROM "my_table" ' + where_clause + ";"),
call('INSERT INTO "my_table" SELECT * FROM "my_db_my_table_staging";'),
call('DROP TABLE "my_db_my_table_staging";'),
Expand Down Expand Up @@ -891,17 +923,13 @@ def test_redshift_copy_full_refresh(self):
call('LOCK TABLE "my_table";'),
call('DROP TABLE IF EXISTS "my_db_my_table_staging";'),
call('CREATE TABLE "my_db_my_table_staging" (LIKE "my_table");'),
call(
IgnoreWhitespace(
f"""
call(IgnoreWhitespace(f"""
COPY "my_db_my_table_staging" FROM 's3://{get_redshift().s3_config.bucket}/{get_redshift().s3_config.prefix}/my_db.org_table.20190101T010203.avro'
CREDENTIALS 'aws_iam_role=iam_copy_role'
FORMAT AS AVRO 'auto'
EXPLICIT_IDS ACCEPTINVCHARS TRUNCATECOLUMNS
COMPUPDATE OFF STATUPDATE OFF;
"""
)
),
""")),
call('DELETE FROM "my_table";'),
call('INSERT INTO "my_table" SELECT * FROM "my_db_my_table_staging";'),
call('DROP TABLE "my_db_my_table_staging";'),
Expand Down Expand Up @@ -956,7 +984,9 @@ def test_write_manifest_file(self, mock_io):
self.assertListEqual(manifest["entries"], expected_entries)

tt._upload_s3.assert_called_once_with(
ANY, "my-bucket", "my_prefix/my_db.org_table.20190101T010203.manifest",
ANY,
"my-bucket",
"my_prefix/my_db.org_table.20190101T010203.manifest",
)

def test_invalid_manifest_state(self):
Expand Down Expand Up @@ -989,17 +1019,13 @@ def test_redshift_copy_full_refresh_with_index_col(self):
call('LOCK TABLE "my_table";'),
call('DROP TABLE IF EXISTS "my_db_my_table_staging";'),
call('CREATE TABLE "my_db_my_table_staging" (LIKE "my_table");'),
call(
IgnoreWhitespace(
f"""
call(IgnoreWhitespace(f"""
COPY "my_db_my_table_staging" FROM 's3://{get_redshift().s3_config.bucket}/{get_redshift().s3_config.prefix}/my_db.org_table.20190101T010203.avro'
CREDENTIALS 'aws_iam_role=iam_copy_role'
FORMAT AS AVRO 'auto'
EXPLICIT_IDS ACCEPTINVCHARS TRUNCATECOLUMNS
COMPUPDATE OFF STATUPDATE OFF;
"""
)
),
""")),
call('DELETE FROM "my_table";'),
call('INSERT INTO "my_table" SELECT * FROM "my_db_my_table_staging";'),
call('DROP TABLE "my_db_my_table_staging";'),
Expand Down Expand Up @@ -1035,17 +1061,13 @@ def test_redshift_copy_rebuild(self):
call(ANY), # big get permissions query
call("CREATE TABLE my_db_my_table_staging ();"),
call("GRANT SELECT ON my_db_my_table_staging TO GROUP group_name;"),
call(
IgnoreWhitespace(
f"""
call(IgnoreWhitespace(f"""
COPY "my_db_my_table_staging" FROM 's3://{get_redshift().s3_config.bucket}/{get_redshift().s3_config.prefix}/my_db.org_table.20190101T010203.avro'
CREDENTIALS 'aws_iam_role=iam_copy_role'
FORMAT AS AVRO 'auto'
EXPLICIT_IDS ACCEPTINVCHARS TRUNCATECOLUMNS
COMPUPDATE OFF STATUPDATE OFF;
"""
)
),
""")),
call('DROP TABLE "my_table";'),
call('ALTER TABLE "my_db_my_table_staging" RENAME TO "my_table";'),
call("SELECT COUNT(*) FROM my_table;"),
Expand Down Expand Up @@ -1092,16 +1114,16 @@ def test_override_db_name(self):
"""Test that override_db_name parameter works correctly for index tracking"""
from druzhba.table import TableConfig
from druzhba.db import ConnectionParams

connection_params = ConnectionParams(
name="test_db",
host="localhost",
host="localhost",
port=5432,
user="test_user",
password="test_pass",
additional={}
additional={},
)

# Test without override - should use actual db_name
table_config = TableConfig(
database_alias="test_alias",
Expand All @@ -1110,12 +1132,12 @@ def test_override_db_name(self):
destination_schema_name="public",
source_table_name="source_table",
index_schema="idx_schema",
index_table="idx_table"
index_table="idx_table",
)

self.assertEqual(table_config.index_db_name, "test_db")
self.assertEqual(table_config.db_name, "test_db")

# Test with override - should use override_db_name
table_config_with_override = TableConfig(
database_alias="test_alias",
Expand All @@ -1125,9 +1147,9 @@ def test_override_db_name(self):
source_table_name="source_table",
index_schema="idx_schema",
index_table="idx_table",
override_db_name="override_name"
override_db_name="override_name",
)

self.assertEqual(table_config_with_override.index_db_name, "override_name")
self.assertEqual(table_config_with_override.db_name, "test_db")
self.assertEqual(table_config_with_override.override_db_name, "override_name")
Expand Down
Loading