diff --git a/.env.test.docker b/.env.test.docker index 0339556..84f3c48 100644 --- a/.env.test.docker +++ b/.env.test.docker @@ -13,3 +13,5 @@ REDSHIFT_USER= REDSHIFT_PASSWORD= REDSHIFT_IAM_COPY_ROLE= REDSHIFT_CERT_PATH= +PGTEST_DATABASE_URL=postgresql://postgres:postgres_root_password@druzhba_postgres_1:5432/druzhba_test +MYSQLTEST_DATABASE_URL=mysql://root:mysql_root_password@druzhba_mysql_1:3306/druzhba_test diff --git a/Dockerfile b/Dockerfile index ce23447..ac0e60a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,7 +16,7 @@ RUN apt-get update && \ rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* RUN curl https://pyenv.run | bash -ENV PATH="/root/.pyenv/bin:${PATH}" +ENV PATH="/root/.pyenv/shims:/root/.pyenv/bin:${PATH}" WORKDIR /app COPY . . ARG PYTHON_VERSION=3.10.4 diff --git a/docker-compose.yml b/docker-compose.yml index be033f3..7be7f51 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -33,7 +33,7 @@ services: build: context: . dockerfile: Dockerfile - command: "python setup.py test" + command: "pytest test/unit/" env_file: - ./.env.test.docker volumes: diff --git a/druzhba/table.py b/druzhba/table.py index 8f7e2ad..532e337 100644 --- a/druzhba/table.py +++ b/druzhba/table.py @@ -785,16 +785,25 @@ def get_destination_table_status(self): elif len(unexpected_dw_columns) > 0: msg = ( "Columns exist in the warehouse table that are not in " - "the source: `%s`" + "the source: `%s` (source table: %s, destination table: %s)" + ) % ( + "`, `".join(unexpected_dw_columns), + self.source_table_name, + self.destination_table_name, ) - self.logger.warning(msg, "`, `".join(unexpected_dw_columns)) + self.logger.warning(msg) return self.DESTINATION_TABLE_INCORRECT elif len(unexpected_source_columns) > 0: msg = ( "Columns exist in the source table that are not in the " - + "warehouse. Skipping column(s): `%s`" + "warehouse. Skipping column(s): `%s` " + "(source table: %s, destination table: %s)" + ) % ( + "`, `".join(unexpected_source_columns), + self.source_table_name, + self.destination_table_name, ) - self.logger.warning(msg, "`, `".join(unexpected_source_columns)) + self.logger.warning(msg) # Copy from avro will just ignore the extra columns so we can proceed return self.DESTINATION_TABLE_OK diff --git a/test/unit/test_table.py b/test/unit/test_table.py index 4e3fe2e..8dbf3a4 100644 --- a/test/unit/test_table.py +++ b/test/unit/test_table.py @@ -22,8 +22,8 @@ class IgnoreWhitespace(str): - """ Wrap a string so it compares equal to other strings - except for repeated whitespace characters. """ + """Wrap a string so it compares equal to other strings + except for repeated whitespace characters.""" r = re.compile(r"(\s+)") @@ -207,7 +207,7 @@ def test_where_clause(self): self.assertEqual( table_config.where_clause(), "\nWHERE id > '13' AND id <= '42'" ) - + with patch( "druzhba.table.TableConfig.old_index_value", new_callable=PropertyMock ) as oiv: @@ -217,7 +217,8 @@ def test_where_clause(self): ) as niv: niv.return_value = 42 with patch( - "druzhba.table.TableConfig.lookback_index_value", new_callable=PropertyMock + "druzhba.table.TableConfig.lookback_index_value", + new_callable=PropertyMock, ) as liv: liv.return_value = 10 table_config = TableConfig( @@ -589,6 +590,44 @@ def test_error_if_destination_table_incorrect(self, r): tt3.check_destination_table_status() self.assertEqual(tt3._destination_table_status, tt.DESTINATION_TABLE_REBUILD) + def test_unexpected_dw_columns_log_message(self): + tt = self.MockTable() + tt._dw_columns = ["col1", "col2", "extra_col"] + tt._columns = ["col1", "col2"] + logging.disable(logging.NOTSET) + try: + with self.assertLogs(tt.logger, level="WARNING") as cm: + result = tt.get_destination_table_status() + self.assertEqual(result, TableConfig.DESTINATION_TABLE_INCORRECT) + self.assertEqual(len(cm.output), 1) + self.assertEqual( + cm.output[0], + "WARNING:test_logger:Columns exist in the warehouse table that are " + "not in the source: `extra_col` " + "(source table: org_table, destination table: my_table)", + ) + finally: + logging.disable(logging.CRITICAL) + + def test_unexpected_source_columns_log_message(self): + tt = self.MockTable() + tt._dw_columns = ["col1", "col2"] + tt._columns = ["col1", "col2", "new_col"] + logging.disable(logging.NOTSET) + try: + with self.assertLogs(tt.logger, level="WARNING") as cm: + result = tt.get_destination_table_status() + self.assertEqual(result, TableConfig.DESTINATION_TABLE_OK) + self.assertEqual(len(cm.output), 1) + self.assertEqual( + cm.output[0], + "WARNING:test_logger:Columns exist in the source table that are " + "not in the warehouse. Skipping column(s): `new_col` " + "(source table: org_table, destination table: my_table)", + ) + finally: + logging.disable(logging.CRITICAL) + def test_query_description_to_avro(self): tt = self.MockTable() tt._desc = [["column_1", "db_str("], ["column_2", "db_int"]] @@ -673,7 +712,9 @@ def test_extract_full_single(self, mock_io): tt.write_manifest_file.assert_not_called() tt._upload_s3.assert_called_once_with( - ANY, "my-bucket", "my_prefix/my_db.org_table.20190101T010203.avro", + ANY, + "my-bucket", + "my_prefix/my_db.org_table.20190101T010203.avro", ) self.assertEqual(tt.row_count, 3) @@ -744,17 +785,13 @@ def test_redshift_copy_create(self): call('LOCK TABLE "my_table";'), call('DROP TABLE IF EXISTS "my_db_my_table_staging";'), call('CREATE TABLE "my_db_my_table_staging" (LIKE "my_table");'), - call( - IgnoreWhitespace( - f""" + call(IgnoreWhitespace(f""" COPY "my_db_my_table_staging" FROM 's3://my-bucket/my_prefix/my_db.org_table.20190101T010203.avro' CREDENTIALS 'aws_iam_role=iam_copy_role' FORMAT AS AVRO 'auto' EXPLICIT_IDS ACCEPTINVCHARS TRUNCATECOLUMNS COMPUPDATE OFF STATUPDATE OFF; - """ - ) - ), + """)), call('DELETE FROM "my_table";'), call('INSERT INTO "my_table" SELECT * FROM "my_db_my_table_staging";'), call('DROP TABLE "my_db_my_table_staging";'), @@ -789,17 +826,13 @@ def test_redshift_copy_incremental_single(self): call('LOCK TABLE "my_table";'), call('DROP TABLE IF EXISTS "my_db_my_table_staging";'), call('CREATE TABLE "my_db_my_table_staging" (LIKE "my_table");'), - call( - IgnoreWhitespace( - """ + call(IgnoreWhitespace(""" COPY "my_db_my_table_staging" FROM 's3://my-bucket/my_prefix/my_db.org_table.20190101T010203.avro' CREDENTIALS 'aws_iam_role=iam_copy_role' FORMAT AS AVRO 'auto' EXPLICIT_IDS ACCEPTINVCHARS TRUNCATECOLUMNS COMPUPDATE OFF STATUPDATE OFF; - """ - ) - ), + """)), call('DELETE FROM "my_table" ' + where_clause + ";"), call('INSERT INTO "my_table" SELECT * FROM "my_db_my_table_staging";'), call('DROP TABLE "my_db_my_table_staging";'), @@ -819,7 +852,10 @@ def test_redshift_copy_incremental_manifest(self, r): tt.manifest_mode = True tt.s3 = Mock() tt.s3.list_objects = Mock( - return_value=[{"ContentLength": 30}, {"ContentLength": 21},] + return_value=[ + {"ContentLength": 30}, + {"ContentLength": 21}, + ] ) tt.key_name = None tt._destination_table_status = tt.DESTINATION_TABLE_OK @@ -837,18 +873,14 @@ def test_redshift_copy_incremental_manifest(self, r): call('LOCK TABLE "my_table";'), call('DROP TABLE IF EXISTS "my_db_my_table_staging";'), call('CREATE TABLE "my_db_my_table_staging" (LIKE "my_table");'), - call( - IgnoreWhitespace( - f""" + call(IgnoreWhitespace(f""" COPY "my_db_my_table_staging" FROM 's3://{get_redshift().s3_config.bucket}/{get_redshift().s3_config.prefix}/my_db.org_table.20190101T010203.manifest' CREDENTIALS 'aws_iam_role=iam_copy_role' MANIFEST FORMAT AS AVRO 'auto' EXPLICIT_IDS ACCEPTINVCHARS TRUNCATECOLUMNS COMPUPDATE OFF STATUPDATE OFF; - """ - ) - ), + """)), call('DELETE FROM "my_table" ' + where_clause + ";"), call('INSERT INTO "my_table" SELECT * FROM "my_db_my_table_staging";'), call('DROP TABLE "my_db_my_table_staging";'), @@ -891,17 +923,13 @@ def test_redshift_copy_full_refresh(self): call('LOCK TABLE "my_table";'), call('DROP TABLE IF EXISTS "my_db_my_table_staging";'), call('CREATE TABLE "my_db_my_table_staging" (LIKE "my_table");'), - call( - IgnoreWhitespace( - f""" + call(IgnoreWhitespace(f""" COPY "my_db_my_table_staging" FROM 's3://{get_redshift().s3_config.bucket}/{get_redshift().s3_config.prefix}/my_db.org_table.20190101T010203.avro' CREDENTIALS 'aws_iam_role=iam_copy_role' FORMAT AS AVRO 'auto' EXPLICIT_IDS ACCEPTINVCHARS TRUNCATECOLUMNS COMPUPDATE OFF STATUPDATE OFF; - """ - ) - ), + """)), call('DELETE FROM "my_table";'), call('INSERT INTO "my_table" SELECT * FROM "my_db_my_table_staging";'), call('DROP TABLE "my_db_my_table_staging";'), @@ -956,7 +984,9 @@ def test_write_manifest_file(self, mock_io): self.assertListEqual(manifest["entries"], expected_entries) tt._upload_s3.assert_called_once_with( - ANY, "my-bucket", "my_prefix/my_db.org_table.20190101T010203.manifest", + ANY, + "my-bucket", + "my_prefix/my_db.org_table.20190101T010203.manifest", ) def test_invalid_manifest_state(self): @@ -989,17 +1019,13 @@ def test_redshift_copy_full_refresh_with_index_col(self): call('LOCK TABLE "my_table";'), call('DROP TABLE IF EXISTS "my_db_my_table_staging";'), call('CREATE TABLE "my_db_my_table_staging" (LIKE "my_table");'), - call( - IgnoreWhitespace( - f""" + call(IgnoreWhitespace(f""" COPY "my_db_my_table_staging" FROM 's3://{get_redshift().s3_config.bucket}/{get_redshift().s3_config.prefix}/my_db.org_table.20190101T010203.avro' CREDENTIALS 'aws_iam_role=iam_copy_role' FORMAT AS AVRO 'auto' EXPLICIT_IDS ACCEPTINVCHARS TRUNCATECOLUMNS COMPUPDATE OFF STATUPDATE OFF; - """ - ) - ), + """)), call('DELETE FROM "my_table";'), call('INSERT INTO "my_table" SELECT * FROM "my_db_my_table_staging";'), call('DROP TABLE "my_db_my_table_staging";'), @@ -1035,17 +1061,13 @@ def test_redshift_copy_rebuild(self): call(ANY), # big get permissions query call("CREATE TABLE my_db_my_table_staging ();"), call("GRANT SELECT ON my_db_my_table_staging TO GROUP group_name;"), - call( - IgnoreWhitespace( - f""" + call(IgnoreWhitespace(f""" COPY "my_db_my_table_staging" FROM 's3://{get_redshift().s3_config.bucket}/{get_redshift().s3_config.prefix}/my_db.org_table.20190101T010203.avro' CREDENTIALS 'aws_iam_role=iam_copy_role' FORMAT AS AVRO 'auto' EXPLICIT_IDS ACCEPTINVCHARS TRUNCATECOLUMNS COMPUPDATE OFF STATUPDATE OFF; - """ - ) - ), + """)), call('DROP TABLE "my_table";'), call('ALTER TABLE "my_db_my_table_staging" RENAME TO "my_table";'), call("SELECT COUNT(*) FROM my_table;"), @@ -1092,16 +1114,16 @@ def test_override_db_name(self): """Test that override_db_name parameter works correctly for index tracking""" from druzhba.table import TableConfig from druzhba.db import ConnectionParams - + connection_params = ConnectionParams( name="test_db", - host="localhost", + host="localhost", port=5432, user="test_user", password="test_pass", - additional={} + additional={}, ) - + # Test without override - should use actual db_name table_config = TableConfig( database_alias="test_alias", @@ -1110,12 +1132,12 @@ def test_override_db_name(self): destination_schema_name="public", source_table_name="source_table", index_schema="idx_schema", - index_table="idx_table" + index_table="idx_table", ) - + self.assertEqual(table_config.index_db_name, "test_db") self.assertEqual(table_config.db_name, "test_db") - + # Test with override - should use override_db_name table_config_with_override = TableConfig( database_alias="test_alias", @@ -1125,9 +1147,9 @@ def test_override_db_name(self): source_table_name="source_table", index_schema="idx_schema", index_table="idx_table", - override_db_name="override_name" + override_db_name="override_name", ) - + self.assertEqual(table_config_with_override.index_db_name, "override_name") self.assertEqual(table_config_with_override.db_name, "test_db") self.assertEqual(table_config_with_override.override_db_name, "override_name")