Skip to content

Commit cfa7eff

Browse files
Add tables to column warning logs (#91)
* feat: add table names to warning logs about column mismatches * fix: ensure that unit tests run correctly using docker * format test_table with black * add tests for updated logs * run `black` on druzhba/table.py * Revert "run `black` on druzhba/table.py" This reverts commit 9fe0306. --------- Co-authored-by: Adam J Hartz <ahartz@seatgeek.com>
1 parent ee468d3 commit cfa7eff

File tree

5 files changed

+91
-58
lines changed

5 files changed

+91
-58
lines changed

.env.test.docker

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,5 @@ REDSHIFT_USER=
1313
REDSHIFT_PASSWORD=
1414
REDSHIFT_IAM_COPY_ROLE=
1515
REDSHIFT_CERT_PATH=
16+
PGTEST_DATABASE_URL=postgresql://postgres:postgres_root_password@druzhba_postgres_1:5432/druzhba_test
17+
MYSQLTEST_DATABASE_URL=mysql://root:mysql_root_password@druzhba_mysql_1:3306/druzhba_test

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ RUN apt-get update && \
1616
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
1717

1818
RUN curl https://pyenv.run | bash
19-
ENV PATH="/root/.pyenv/bin:${PATH}"
19+
ENV PATH="/root/.pyenv/shims:/root/.pyenv/bin:${PATH}"
2020
WORKDIR /app
2121
COPY . .
2222
ARG PYTHON_VERSION=3.10.4

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ services:
3333
build:
3434
context: .
3535
dockerfile: Dockerfile
36-
command: "python setup.py test"
36+
command: "pytest test/unit/"
3737
env_file:
3838
- ./.env.test.docker
3939
volumes:

druzhba/table.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -785,16 +785,25 @@ def get_destination_table_status(self):
785785
elif len(unexpected_dw_columns) > 0:
786786
msg = (
787787
"Columns exist in the warehouse table that are not in "
788-
"the source: `%s`"
788+
"the source: `%s` (source table: %s, destination table: %s)"
789+
) % (
790+
"`, `".join(unexpected_dw_columns),
791+
self.source_table_name,
792+
self.destination_table_name,
789793
)
790-
self.logger.warning(msg, "`, `".join(unexpected_dw_columns))
794+
self.logger.warning(msg)
791795
return self.DESTINATION_TABLE_INCORRECT
792796
elif len(unexpected_source_columns) > 0:
793797
msg = (
794798
"Columns exist in the source table that are not in the "
795-
+ "warehouse. Skipping column(s): `%s`"
799+
"warehouse. Skipping column(s): `%s` "
800+
"(source table: %s, destination table: %s)"
801+
) % (
802+
"`, `".join(unexpected_source_columns),
803+
self.source_table_name,
804+
self.destination_table_name,
796805
)
797-
self.logger.warning(msg, "`, `".join(unexpected_source_columns))
806+
self.logger.warning(msg)
798807

799808
# Copy from avro will just ignore the extra columns so we can proceed
800809
return self.DESTINATION_TABLE_OK

test/unit/test_table.py

Lines changed: 74 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222

2323

2424
class IgnoreWhitespace(str):
25-
""" Wrap a string so it compares equal to other strings
26-
except for repeated whitespace characters. """
25+
"""Wrap a string so it compares equal to other strings
26+
except for repeated whitespace characters."""
2727

2828
r = re.compile(r"(\s+)")
2929

@@ -207,7 +207,7 @@ def test_where_clause(self):
207207
self.assertEqual(
208208
table_config.where_clause(), "\nWHERE id > '13' AND id <= '42'"
209209
)
210-
210+
211211
with patch(
212212
"druzhba.table.TableConfig.old_index_value", new_callable=PropertyMock
213213
) as oiv:
@@ -217,7 +217,8 @@ def test_where_clause(self):
217217
) as niv:
218218
niv.return_value = 42
219219
with patch(
220-
"druzhba.table.TableConfig.lookback_index_value", new_callable=PropertyMock
220+
"druzhba.table.TableConfig.lookback_index_value",
221+
new_callable=PropertyMock,
221222
) as liv:
222223
liv.return_value = 10
223224
table_config = TableConfig(
@@ -589,6 +590,44 @@ def test_error_if_destination_table_incorrect(self, r):
589590
tt3.check_destination_table_status()
590591
self.assertEqual(tt3._destination_table_status, tt.DESTINATION_TABLE_REBUILD)
591592

593+
def test_unexpected_dw_columns_log_message(self):
594+
tt = self.MockTable()
595+
tt._dw_columns = ["col1", "col2", "extra_col"]
596+
tt._columns = ["col1", "col2"]
597+
logging.disable(logging.NOTSET)
598+
try:
599+
with self.assertLogs(tt.logger, level="WARNING") as cm:
600+
result = tt.get_destination_table_status()
601+
self.assertEqual(result, TableConfig.DESTINATION_TABLE_INCORRECT)
602+
self.assertEqual(len(cm.output), 1)
603+
self.assertEqual(
604+
cm.output[0],
605+
"WARNING:test_logger:Columns exist in the warehouse table that are "
606+
"not in the source: `extra_col` "
607+
"(source table: org_table, destination table: my_table)",
608+
)
609+
finally:
610+
logging.disable(logging.CRITICAL)
611+
612+
def test_unexpected_source_columns_log_message(self):
613+
tt = self.MockTable()
614+
tt._dw_columns = ["col1", "col2"]
615+
tt._columns = ["col1", "col2", "new_col"]
616+
logging.disable(logging.NOTSET)
617+
try:
618+
with self.assertLogs(tt.logger, level="WARNING") as cm:
619+
result = tt.get_destination_table_status()
620+
self.assertEqual(result, TableConfig.DESTINATION_TABLE_OK)
621+
self.assertEqual(len(cm.output), 1)
622+
self.assertEqual(
623+
cm.output[0],
624+
"WARNING:test_logger:Columns exist in the source table that are "
625+
"not in the warehouse. Skipping column(s): `new_col` "
626+
"(source table: org_table, destination table: my_table)",
627+
)
628+
finally:
629+
logging.disable(logging.CRITICAL)
630+
592631
def test_query_description_to_avro(self):
593632
tt = self.MockTable()
594633
tt._desc = [["column_1", "db_str("], ["column_2", "db_int"]]
@@ -673,7 +712,9 @@ def test_extract_full_single(self, mock_io):
673712

674713
tt.write_manifest_file.assert_not_called()
675714
tt._upload_s3.assert_called_once_with(
676-
ANY, "my-bucket", "my_prefix/my_db.org_table.20190101T010203.avro",
715+
ANY,
716+
"my-bucket",
717+
"my_prefix/my_db.org_table.20190101T010203.avro",
677718
)
678719

679720
self.assertEqual(tt.row_count, 3)
@@ -744,17 +785,13 @@ def test_redshift_copy_create(self):
744785
call('LOCK TABLE "my_table";'),
745786
call('DROP TABLE IF EXISTS "my_db_my_table_staging";'),
746787
call('CREATE TABLE "my_db_my_table_staging" (LIKE "my_table");'),
747-
call(
748-
IgnoreWhitespace(
749-
f"""
788+
call(IgnoreWhitespace(f"""
750789
COPY "my_db_my_table_staging" FROM 's3://my-bucket/my_prefix/my_db.org_table.20190101T010203.avro'
751790
CREDENTIALS 'aws_iam_role=iam_copy_role'
752791
FORMAT AS AVRO 'auto'
753792
EXPLICIT_IDS ACCEPTINVCHARS TRUNCATECOLUMNS
754793
COMPUPDATE OFF STATUPDATE OFF;
755-
"""
756-
)
757-
),
794+
""")),
758795
call('DELETE FROM "my_table";'),
759796
call('INSERT INTO "my_table" SELECT * FROM "my_db_my_table_staging";'),
760797
call('DROP TABLE "my_db_my_table_staging";'),
@@ -789,17 +826,13 @@ def test_redshift_copy_incremental_single(self):
789826
call('LOCK TABLE "my_table";'),
790827
call('DROP TABLE IF EXISTS "my_db_my_table_staging";'),
791828
call('CREATE TABLE "my_db_my_table_staging" (LIKE "my_table");'),
792-
call(
793-
IgnoreWhitespace(
794-
"""
829+
call(IgnoreWhitespace("""
795830
COPY "my_db_my_table_staging" FROM 's3://my-bucket/my_prefix/my_db.org_table.20190101T010203.avro'
796831
CREDENTIALS 'aws_iam_role=iam_copy_role'
797832
FORMAT AS AVRO 'auto'
798833
EXPLICIT_IDS ACCEPTINVCHARS TRUNCATECOLUMNS
799834
COMPUPDATE OFF STATUPDATE OFF;
800-
"""
801-
)
802-
),
835+
""")),
803836
call('DELETE FROM "my_table" ' + where_clause + ";"),
804837
call('INSERT INTO "my_table" SELECT * FROM "my_db_my_table_staging";'),
805838
call('DROP TABLE "my_db_my_table_staging";'),
@@ -819,7 +852,10 @@ def test_redshift_copy_incremental_manifest(self, r):
819852
tt.manifest_mode = True
820853
tt.s3 = Mock()
821854
tt.s3.list_objects = Mock(
822-
return_value=[{"ContentLength": 30}, {"ContentLength": 21},]
855+
return_value=[
856+
{"ContentLength": 30},
857+
{"ContentLength": 21},
858+
]
823859
)
824860
tt.key_name = None
825861
tt._destination_table_status = tt.DESTINATION_TABLE_OK
@@ -837,18 +873,14 @@ def test_redshift_copy_incremental_manifest(self, r):
837873
call('LOCK TABLE "my_table";'),
838874
call('DROP TABLE IF EXISTS "my_db_my_table_staging";'),
839875
call('CREATE TABLE "my_db_my_table_staging" (LIKE "my_table");'),
840-
call(
841-
IgnoreWhitespace(
842-
f"""
876+
call(IgnoreWhitespace(f"""
843877
COPY "my_db_my_table_staging" FROM 's3://{get_redshift().s3_config.bucket}/{get_redshift().s3_config.prefix}/my_db.org_table.20190101T010203.manifest'
844878
CREDENTIALS 'aws_iam_role=iam_copy_role'
845879
MANIFEST
846880
FORMAT AS AVRO 'auto'
847881
EXPLICIT_IDS ACCEPTINVCHARS TRUNCATECOLUMNS
848882
COMPUPDATE OFF STATUPDATE OFF;
849-
"""
850-
)
851-
),
883+
""")),
852884
call('DELETE FROM "my_table" ' + where_clause + ";"),
853885
call('INSERT INTO "my_table" SELECT * FROM "my_db_my_table_staging";'),
854886
call('DROP TABLE "my_db_my_table_staging";'),
@@ -891,17 +923,13 @@ def test_redshift_copy_full_refresh(self):
891923
call('LOCK TABLE "my_table";'),
892924
call('DROP TABLE IF EXISTS "my_db_my_table_staging";'),
893925
call('CREATE TABLE "my_db_my_table_staging" (LIKE "my_table");'),
894-
call(
895-
IgnoreWhitespace(
896-
f"""
926+
call(IgnoreWhitespace(f"""
897927
COPY "my_db_my_table_staging" FROM 's3://{get_redshift().s3_config.bucket}/{get_redshift().s3_config.prefix}/my_db.org_table.20190101T010203.avro'
898928
CREDENTIALS 'aws_iam_role=iam_copy_role'
899929
FORMAT AS AVRO 'auto'
900930
EXPLICIT_IDS ACCEPTINVCHARS TRUNCATECOLUMNS
901931
COMPUPDATE OFF STATUPDATE OFF;
902-
"""
903-
)
904-
),
932+
""")),
905933
call('DELETE FROM "my_table";'),
906934
call('INSERT INTO "my_table" SELECT * FROM "my_db_my_table_staging";'),
907935
call('DROP TABLE "my_db_my_table_staging";'),
@@ -956,7 +984,9 @@ def test_write_manifest_file(self, mock_io):
956984
self.assertListEqual(manifest["entries"], expected_entries)
957985

958986
tt._upload_s3.assert_called_once_with(
959-
ANY, "my-bucket", "my_prefix/my_db.org_table.20190101T010203.manifest",
987+
ANY,
988+
"my-bucket",
989+
"my_prefix/my_db.org_table.20190101T010203.manifest",
960990
)
961991

962992
def test_invalid_manifest_state(self):
@@ -989,17 +1019,13 @@ def test_redshift_copy_full_refresh_with_index_col(self):
9891019
call('LOCK TABLE "my_table";'),
9901020
call('DROP TABLE IF EXISTS "my_db_my_table_staging";'),
9911021
call('CREATE TABLE "my_db_my_table_staging" (LIKE "my_table");'),
992-
call(
993-
IgnoreWhitespace(
994-
f"""
1022+
call(IgnoreWhitespace(f"""
9951023
COPY "my_db_my_table_staging" FROM 's3://{get_redshift().s3_config.bucket}/{get_redshift().s3_config.prefix}/my_db.org_table.20190101T010203.avro'
9961024
CREDENTIALS 'aws_iam_role=iam_copy_role'
9971025
FORMAT AS AVRO 'auto'
9981026
EXPLICIT_IDS ACCEPTINVCHARS TRUNCATECOLUMNS
9991027
COMPUPDATE OFF STATUPDATE OFF;
1000-
"""
1001-
)
1002-
),
1028+
""")),
10031029
call('DELETE FROM "my_table";'),
10041030
call('INSERT INTO "my_table" SELECT * FROM "my_db_my_table_staging";'),
10051031
call('DROP TABLE "my_db_my_table_staging";'),
@@ -1035,17 +1061,13 @@ def test_redshift_copy_rebuild(self):
10351061
call(ANY), # big get permissions query
10361062
call("CREATE TABLE my_db_my_table_staging ();"),
10371063
call("GRANT SELECT ON my_db_my_table_staging TO GROUP group_name;"),
1038-
call(
1039-
IgnoreWhitespace(
1040-
f"""
1064+
call(IgnoreWhitespace(f"""
10411065
COPY "my_db_my_table_staging" FROM 's3://{get_redshift().s3_config.bucket}/{get_redshift().s3_config.prefix}/my_db.org_table.20190101T010203.avro'
10421066
CREDENTIALS 'aws_iam_role=iam_copy_role'
10431067
FORMAT AS AVRO 'auto'
10441068
EXPLICIT_IDS ACCEPTINVCHARS TRUNCATECOLUMNS
10451069
COMPUPDATE OFF STATUPDATE OFF;
1046-
"""
1047-
)
1048-
),
1070+
""")),
10491071
call('DROP TABLE "my_table";'),
10501072
call('ALTER TABLE "my_db_my_table_staging" RENAME TO "my_table";'),
10511073
call("SELECT COUNT(*) FROM my_table;"),
@@ -1092,16 +1114,16 @@ def test_override_db_name(self):
10921114
"""Test that override_db_name parameter works correctly for index tracking"""
10931115
from druzhba.table import TableConfig
10941116
from druzhba.db import ConnectionParams
1095-
1117+
10961118
connection_params = ConnectionParams(
10971119
name="test_db",
1098-
host="localhost",
1120+
host="localhost",
10991121
port=5432,
11001122
user="test_user",
11011123
password="test_pass",
1102-
additional={}
1124+
additional={},
11031125
)
1104-
1126+
11051127
# Test without override - should use actual db_name
11061128
table_config = TableConfig(
11071129
database_alias="test_alias",
@@ -1110,12 +1132,12 @@ def test_override_db_name(self):
11101132
destination_schema_name="public",
11111133
source_table_name="source_table",
11121134
index_schema="idx_schema",
1113-
index_table="idx_table"
1135+
index_table="idx_table",
11141136
)
1115-
1137+
11161138
self.assertEqual(table_config.index_db_name, "test_db")
11171139
self.assertEqual(table_config.db_name, "test_db")
1118-
1140+
11191141
# Test with override - should use override_db_name
11201142
table_config_with_override = TableConfig(
11211143
database_alias="test_alias",
@@ -1125,9 +1147,9 @@ def test_override_db_name(self):
11251147
source_table_name="source_table",
11261148
index_schema="idx_schema",
11271149
index_table="idx_table",
1128-
override_db_name="override_name"
1150+
override_db_name="override_name",
11291151
)
1130-
1152+
11311153
self.assertEqual(table_config_with_override.index_db_name, "override_name")
11321154
self.assertEqual(table_config_with_override.db_name, "test_db")
11331155
self.assertEqual(table_config_with_override.override_db_name, "override_name")

0 commit comments

Comments
 (0)