diff --git a/.env.template b/.env.template index 7a9afd135f..8438dd955f 100644 --- a/.env.template +++ b/.env.template @@ -39,8 +39,8 @@ USASPENDING_DB_PASSWORD=usaspender # The Broker configuration below supports tests creating a Broker DB on the usaspending-db # container as part of standing up the test suite. -# All values of BROKER_DB_* must match what is in DATA_BROKER_DATABASE_URL if BOTH are given -DATA_BROKER_DATABASE_URL=postgres://usaspending:usaspender@usaspending-db:5432/data_broker +# All values of BROKER_DB_* must match what is in BROKER_DB if BOTH are given +BROKER_DB=postgres://usaspending:usaspender@usaspending-db:5432/data_broker # Configuration values for a connection string to a Broker database # Only necessary for some management commands BROKER_DB_HOST=usaspending-db diff --git a/.github/actions/init-test-environment/action.yaml b/.github/actions/init-test-environment/action.yaml index 01d6129690..f5b6f2d85d 100644 --- a/.github/actions/init-test-environment/action.yaml +++ b/.github/actions/init-test-environment/action.yaml @@ -17,7 +17,7 @@ runs: - name: Set combined ENV shell: bash run: | - echo "DATA_BROKER_DATABASE_URL=postgres://$BROKER_DB_USER:$BROKER_DB_PASSWORD@$BROKER_DB_HOST:$BROKER_DB_PORT/$BROKER_DB_NAME" >> $GITHUB_ENV + echo "BROKER_DB=postgres://$BROKER_DB_USER:$BROKER_DB_PASSWORD@$BROKER_DB_HOST:$BROKER_DB_PORT/$BROKER_DB_NAME" >> $GITHUB_ENV echo "DATABASE_URL=postgres://$USASPENDING_DB_USER:$USASPENDING_DB_PASSWORD@$USASPENDING_DB_HOST:$USASPENDING_DB_PORT/$USASPENDING_DB_NAME" >> $GITHUB_ENV echo "DOWNLOAD_DATABASE_URL=postgres://$USASPENDING_DB_USER:$USASPENDING_DB_PASSWORD@$USASPENDING_DB_HOST:$USASPENDING_DB_PORT/$USASPENDING_DB_NAME" >> $GITHUB_ENV echo "ES_HOSTNAME=$ES_SCHEME://$ES_HOST:$ES_PORT" >> $GITHUB_ENV diff --git a/README.md b/README.md index 57d7bfbeda..06a89a5a32 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ Create a `.envrc` file in the repo root, which will be ignored by git. Change cr ```shell export DATABASE_URL=postgres://usaspending:usaspender@localhost:5432/data_store_api export ES_HOSTNAME=http://localhost:9200 -export DATA_BROKER_DATABASE_URL=postgres://admin:root@localhost:5435/data_broker +export BROKER_DB=postgres://admin:root@localhost:5435/data_broker ``` If `direnv` does not pick this up after saving the file, type @@ -220,10 +220,10 @@ Deployed production API endpoints and docs are found by following links here: `h 3. To run all USAspending tests in the docker services run ```shell - docker compose run --rm -e DATA_BROKER_DATABASE_URL='' usaspending-test + docker compose run --rm -e BROKER_DB='' usaspending-test ``` -_**NOTE**: If an env var named `DATA_BROKER_DATABASE_URL` is set, Broker Integration tests will attempt to be run as well. If doing so, Broker dependencies must be met (see below) or ALL tests will fail hard. Running the above command with `-e DATA_BROKER_DATABASE_URL=''` is a precaution to keep them excluded, unless you really want them (see below if so)._ +_**NOTE**: If an env var named `BROKER_DB` is set, Broker Integration tests will attempt to be run as well. If doing so, Broker dependencies must be met (see below) or ALL tests will fail hard. Running the above command with `-e BROKER_DB=''` is a precaution to keep them excluded, unless you really want them (see below if so)._ To run tests locally and not in the docker services, you need: @@ -273,7 +273,7 @@ To satisfy these dependencies and include execution of these tests, do the follo ```shell docker build -t dataact-broker-backend ../data-act-broker-backend ``` -1. Ensure you have the `DATA_BROKER_DATABASE_URL` environment variable set, and it points to what will be a live PostgreSQL server (no database required) at the time tests are run. +1. Ensure you have the `BROKER_DB` environment variable set, and it points to what will be a live PostgreSQL server (no database required) at the time tests are run. 1. _WARNING: If this is set at all, then ALL above dependencies must be met or ALL tests will fail (Django will try this connection on ALL tests' run)_ 1. This DB could be one you always have running in a local Postgres instance, or one you spin up in a Docker container just before tests are run 1. If invoking `pytest` within a docker container (e.g. using the `usaspending-test` container), you _must_ mount the host's docker socket. This is declared already in the `docker-compose.yml` file services, but would be done manually with: `-v /var/run/docker.sock:/var/run/docker.sock` @@ -286,7 +286,7 @@ Re-running the test suite using `pytest -rs` with these dependencies satisfied s _From within a container_ -_**NOTE**: `DATA_BROKER_DATABASE_URL` is set in the `docker-compose.yml` file (and could pick up `.env` values, if set)_ +_**NOTE**: `BROKER_DB` is set in the `docker-compose.yml` file (and could pick up `.env` values, if set)_ ```shell docker compose run --rm usaspending-test pytest --capture=no --verbose --tb=auto --no-cov --log-cli-level=INFO -k test_broker_integration @@ -294,7 +294,7 @@ docker compose run --rm usaspending-test pytest --capture=no --verbose --tb=auto _From Developer Desktop_ -_**NOTE**: `DATA_BROKER_DATABASE_URL` is set in the `.envrc` file and available in the shell_ +_**NOTE**: `BROKER_DB` is set in the `.envrc` file and available in the shell_ ```shell pytest --capture=no --verbose --tb=auto --no-cov --log-cli-level=INFO -k test_broker_integration ``` diff --git a/docker-compose.yml b/docker-compose.yml index fc90ec1cf6..9417d31f7a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -44,7 +44,7 @@ services: DJANGO_DEBUG: ${DJANGO_DEBUG} DATABASE_URL: postgres://${USASPENDING_DB_USER}:${USASPENDING_DB_PASSWORD}@${USASPENDING_DB_HOST}:${USASPENDING_DB_PORT}/data_store_api ES_HOSTNAME: ${ES_HOSTNAME} - DATA_BROKER_DATABASE_URL: postgresql://${BROKER_DB_USER}:${BROKER_DB_PASSWORD}@${BROKER_DB_HOST}:${BROKER_DB_PORT}/data_broker + BROKER_DB: postgresql://${BROKER_DB_USER}:${BROKER_DB_PASSWORD}@${BROKER_DB_HOST}:${BROKER_DB_PORT}/data_broker usaspending-test: profiles: @@ -68,7 +68,7 @@ services: DATABASE_URL: postgres://${USASPENDING_DB_USER}:${USASPENDING_DB_PASSWORD}@${USASPENDING_DB_HOST}:${USASPENDING_DB_PORT}/data_store_api ES_HOST: ${ES_HOST} ES_HOSTNAME: ${ES_HOSTNAME} - DATA_BROKER_DATABASE_URL: postgresql://${BROKER_DB_USER}:${BROKER_DB_PASSWORD}@${BROKER_DB_HOST}:${BROKER_DB_PORT}/data_broker + BROKER_DB: postgresql://${BROKER_DB_USER}:${BROKER_DB_PASSWORD}@${BROKER_DB_HOST}:${BROKER_DB_PORT}/data_broker MINIO_HOST: ${MINIO_HOST} DOWNLOAD_DATABASE_URL: postgres://${USASPENDING_DB_USER}:${USASPENDING_DB_PASSWORD}@${USASPENDING_DB_HOST}:${USASPENDING_DB_PORT}/data_store_api # Location in host machine where broker src code root can be found @@ -107,7 +107,7 @@ services: environment: DATABASE_URL: postgres://${USASPENDING_DB_USER}:${USASPENDING_DB_PASSWORD}@${USASPENDING_DB_HOST}:${USASPENDING_DB_PORT}/data_store_api ES_HOSTNAME: ${ES_HOSTNAME} - DATA_BROKER_DATABASE_URL: postgresql://${BROKER_DB_USER}:${BROKER_DB_PASSWORD}@${BROKER_DB_HOST}:${BROKER_DB_PORT}/data_broker + BROKER_DB: postgresql://${BROKER_DB_USER}:${BROKER_DB_PASSWORD}@${BROKER_DB_HOST}:${BROKER_DB_PORT}/data_broker # Location in host machine where broker src code root can be found DATA_BROKER_SRC_PATH: "${PWD}/../data-act-broker-backend" @@ -233,7 +233,11 @@ services: mkdir -p data/dti-da-public-files-nonprod/user_reference_docs # Create the bucket within MinIO used for endpoints that list generated downloads mkdir -p data/bulk-download + # Create the bucket for MinIO used for Spark + mkdir -p data/data/files + # Populate initial files in buckets cp dockermount/usaspending_api/data/Data_Dictionary_Crosswalk.xlsx data/dti-da-public-files-nonprod/user_reference_docs/Data_Dictionary_Crosswalk.xlsx + cp dockermount/usaspending_api/data/COVID-19_download_readme.txt data/data/files/COVID-19_download_readme.txt minio server --address ":10001" --console-address ":10002" /data " healthcheck: diff --git a/loading_data.md b/loading_data.md index ab49949fd6..3031b3c3bd 100644 --- a/loading_data.md +++ b/loading_data.md @@ -35,7 +35,7 @@ To load in the reference data, from the same directory as manage.py: To load certified submission data from the broker, you will need a read-only (or higher) connection string to the broker PostgreSQL database. If not running locally, you will also need to ensure your IP address has been whitelisted in the appropriate AWS Security Groups. Set this environment variable before running the **load_submission** command: - DATA_BROKER_DATABASE_URL=postgres://user:password@url:5432/data_broker + BROKER_DB=postgres://user:password@url:5432/data_broker To load a submission from data broker database: diff --git a/usaspending_api/awards/delta_models/award_id_lookup.py b/usaspending_api/awards/delta_models/award_id_lookup.py new file mode 100644 index 0000000000..2c1830de6f --- /dev/null +++ b/usaspending_api/awards/delta_models/award_id_lookup.py @@ -0,0 +1,11 @@ +from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType + + +AWARD_ID_LOOKUP_SCHEMA = StructType( + [ + StructField("award_id", LongType(), False), + StructField("is_fpds", BooleanType(), False), + StructField("transaction_unique_id", StringType(), False), + StructField("generated_unique_award_id", StringType(), False), + ] +) diff --git a/usaspending_api/awards/management/commands/generate_unlinked_awards_download.py b/usaspending_api/awards/management/commands/generate_unlinked_awards_download.py index ac490d67fb..bf13ac253f 100644 --- a/usaspending_api/awards/management/commands/generate_unlinked_awards_download.py +++ b/usaspending_api/awards/management/commands/generate_unlinked_awards_download.py @@ -16,6 +16,7 @@ from usaspending_api.awards.management.sql.spark.unlinked_awards_summary_file import summary_file from usaspending_api.awards.management.sql.spark.unlinked_assistance_file_d2 import file_d2_sql_string from usaspending_api.awards.management.sql.spark.unlinked_accounts_file_c import file_c_sql_string +from usaspending_api.config import CONFIG from usaspending_api.download.filestreaming.file_description import build_file_description, save_file_description from usaspending_api.download.filestreaming.zip_file import append_files_to_zip_file from usaspending_api.references.models.toptier_agency import ToptierAgency @@ -108,9 +109,10 @@ def handle(self, *args, **options): # Save queries as delta tables for efficiency for delta_table_name, sql_file, final_name in self.download_file_list: df = self.spark.sql(sql_file) - df.write.format(source="delta").mode(saveMode="overwrite").option("overwriteSchema", "True").saveAsTable( - name=delta_table_name - ) + df.write.format(source="delta").mode(saveMode="overwrite").options( + overwriteSchema=True, + path=f"s3a://{CONFIG.SPARK_S3_BUCKET}/{CONFIG.DELTA_LAKE_S3_PATH}/temp/{delta_table_name}", + ).saveAsTable(name=f"temp.{delta_table_name}") for agency in toptier_agencies: agency_name = agency["name"] @@ -140,7 +142,7 @@ def process_data_copy_jobs(self, zip_file_path): self.filepaths_to_delete.append(zip_file_path) for delta_table_name, sql_file, final_name in self.download_file_list: - df = self.spark.sql(f"select * from {delta_table_name} where toptier_code = '{self._toptier_code}'") + df = self.spark.sql(f"select * from temp.{delta_table_name} where toptier_code = '{self._toptier_code}'") sql_file = None final_path = self._create_data_csv_dest_path(final_name) intermediate_data_file_path = final_path.parent / (final_path.name + "_temp") diff --git a/usaspending_api/awards/migrations/0114_alter_ctodlinkageupdates_award_id.py b/usaspending_api/awards/migrations/0114_alter_ctodlinkageupdates_award_id.py new file mode 100644 index 0000000000..8e049fa934 --- /dev/null +++ b/usaspending_api/awards/migrations/0114_alter_ctodlinkageupdates_award_id.py @@ -0,0 +1,28 @@ +# Generated by Django 4.2.23 on 2025-10-23 17:39 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("awards", "0113_financialaccountsbyawards_ussgl480210_rein_undel_obs_cpe_and_more"), + ] + + operations = [ + # This recreates the table that was originally created as part of Awards migration "0100_ctodlinkageupdates.py". + # A management command was previously recreating this table without any indexes or constraints. The SQL below + # was taken directly from the sqlmigrate for the migration mentioned above. Additionally, the "NOT NULL" + # constraint is carried forward so that Django can manage the table in the following AlterField statement. + migrations.RunSQL( + sql=""" + DROP TABLE IF EXISTS c_to_d_linkage_updates; + CREATE TABLE "c_to_d_linkage_updates" ("financial_accounts_by_awards_id" integer NOT NULL PRIMARY KEY, "award_id" integer NOT NULL); + """ + ), + migrations.AlterField( + model_name="ctodlinkageupdates", + name="award_id", + field=models.IntegerField(null=True), + ), + ] diff --git a/usaspending_api/awards/models/c_to_d_linkage_updates.py b/usaspending_api/awards/models/c_to_d_linkage_updates.py index b09f60205a..589d51c5da 100644 --- a/usaspending_api/awards/models/c_to_d_linkage_updates.py +++ b/usaspending_api/awards/models/c_to_d_linkage_updates.py @@ -4,7 +4,7 @@ class CToDLinkageUpdates(models.Model): financial_accounts_by_awards_id = models.IntegerField(primary_key=True) - award_id = models.IntegerField(unique=False) + award_id = models.IntegerField(unique=False, null=True) class Meta: managed = True diff --git a/usaspending_api/broker/helpers/delete_fabs_transactions.py b/usaspending_api/broker/helpers/delete_fabs_transactions.py index 8b79969ec7..684f8becdd 100644 --- a/usaspending_api/broker/helpers/delete_fabs_transactions.py +++ b/usaspending_api/broker/helpers/delete_fabs_transactions.py @@ -43,7 +43,7 @@ def get_delete_pks_for_afa_keys(afa_ids_to_delete): is_active is not true """ - with connections[settings.DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[settings.BROKER_DB_ALIAS].cursor() as cursor: cursor.execute(sql, [uppercased]) rows = cursor.fetchall() diff --git a/usaspending_api/broker/management/commands/derive_office_names.py b/usaspending_api/broker/management/commands/derive_office_names.py index c061403fb9..0ca78c49c6 100644 --- a/usaspending_api/broker/management/commands/derive_office_names.py +++ b/usaspending_api/broker/management/commands/derive_office_names.py @@ -16,16 +16,16 @@ class Command(load_base.Command): """ help = "Derives all FABS office names from the office codes in the Office table in Data broker. The \ - DATA_BROKER_DATABASE_URL environment variable must set so we can pull Office data from their db." + BROKER_DB environment variable must set so we can pull Office data from their db." def handle(self, *args, **options): # Grab data broker database connections if not options["test"]: try: - db_conn = connections[settings.DATA_BROKER_DB_ALIAS] + db_conn = connections[settings.BROKER_DB_ALIAS] db_cursor = db_conn.cursor() except Exception as err: - logger.critical("Could not connect to database. Is DATA_BROKER_DATABASE_URL set?") + logger.critical("Could not connect to database. Is BROKER_DB set?") logger.critical(print(err)) raise else: diff --git a/usaspending_api/broker/management/commands/load_broker_table.py b/usaspending_api/broker/management/commands/load_broker_table.py index 41d5a898c5..51a2eb29f4 100644 --- a/usaspending_api/broker/management/commands/load_broker_table.py +++ b/usaspending_api/broker/management/commands/load_broker_table.py @@ -63,7 +63,7 @@ def handle(self, *args, **options): f'Copying "{broker_schema_name}"."{broker_table_name}" from Broker to ' f'"{usas_schema_name}"."{usas_table_name}" in USAspending.' ) - broker_conn = connections[settings.DATA_BROKER_DB_ALIAS] + broker_conn = connections[settings.BROKER_DB_ALIAS] usas_conn = connections[settings.DEFAULT_DB_ALIAS] table_exists_query = f""" SELECT EXISTS ( diff --git a/usaspending_api/broker/management/commands/update_agency_code_name_fabs_fpds.py b/usaspending_api/broker/management/commands/update_agency_code_name_fabs_fpds.py index 9211afdd05..81407837e9 100644 --- a/usaspending_api/broker/management/commands/update_agency_code_name_fabs_fpds.py +++ b/usaspending_api/broker/management/commands/update_agency_code_name_fabs_fpds.py @@ -95,7 +95,7 @@ def get_broker_data(table_type, fiscal_year, fy_start, fy_end, year_range=None, fy=fiscal_year, broker_where=broker_where, usaspending_where=usaspending_where, - broker_server=settings.DATA_BROKER_DBLINK_NAME, + broker_server=settings.BROKER_DBLINK_NAME, ) return sql_statement diff --git a/usaspending_api/broker/management/commands/update_duns.py b/usaspending_api/broker/management/commands/update_duns.py index 17881544a6..af0a306285 100644 --- a/usaspending_api/broker/management/commands/update_duns.py +++ b/usaspending_api/broker/management/commands/update_duns.py @@ -82,7 +82,7 @@ def handle(self, *args, **options): total_start = datetime.now() new_update_date = total_start.strftime("%Y-%m-%d") - db_cursor = connections[settings.DATA_BROKER_DB_ALIAS].cursor() + db_cursor = connections[settings.BROKER_DB_ALIAS].cursor() update_date_query = DUNS.objects.all().aggregate(Max("update_date")) update_date = update_date_query["update_date__max"] diff --git a/usaspending_api/broker/management/commands/update_table_value_from_broker.py b/usaspending_api/broker/management/commands/update_table_value_from_broker.py index b8dc25229e..f1f8a79ac5 100644 --- a/usaspending_api/broker/management/commands/update_table_value_from_broker.py +++ b/usaspending_api/broker/management/commands/update_table_value_from_broker.py @@ -5,7 +5,7 @@ from django.db import connections from usaspending_api.common.helpers.timing_helpers import Timer -from usaspending_api.settings import DATA_BROKER_DB_ALIAS, DEFAULT_DB_ALIAS +from usaspending_api.settings import BROKER_DB_ALIAS, DEFAULT_DB_ALIAS logger = logging.getLogger(__name__) @@ -85,7 +85,7 @@ def id_ranges(self, min_id: int, max_id: int) -> Iterator[tuple[int, int]]: yield n, min(n + self.CHUNK_SIZE, max_id) def get_match_field_range(self) -> tuple[int, int]: - with connections[DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[BROKER_DB_ALIAS].cursor() as cursor: cursor.execute( f""" SELECT min({self.broker_match_field}), max({self.broker_match_field}) diff --git a/usaspending_api/broker/management/commands/update_transactions.py b/usaspending_api/broker/management/commands/update_transactions.py index 6f9c51160f..3b9a4f92d2 100644 --- a/usaspending_api/broker/management/commands/update_transactions.py +++ b/usaspending_api/broker/management/commands/update_transactions.py @@ -379,7 +379,7 @@ def add_arguments(self, parser): def handle(self, *args, **options): logger.info("Starting historical data load...") - db_cursor = connections[settings.DATA_BROKER_DB_ALIAS].cursor() + db_cursor = connections[settings.BROKER_DB_ALIAS].cursor() fiscal_year = options.get("fiscal_year") page = options.get("page") limit = options.get("limit") diff --git a/usaspending_api/broker/tests/integration/test_broker_integration.py b/usaspending_api/broker/tests/integration/test_broker_integration.py index ec1035c2ce..df549f9af2 100644 --- a/usaspending_api/broker/tests/integration/test_broker_integration.py +++ b/usaspending_api/broker/tests/integration/test_broker_integration.py @@ -6,7 +6,7 @@ class BrokerIntegrationTestCase(TestCase): - databases = {settings.DEFAULT_DB_ALIAS, settings.DATA_BROKER_DB_ALIAS} + databases = {settings.DEFAULT_DB_ALIAS, settings.BROKER_DB_ALIAS} dummy_table_name = "dummy_broker_table_to_be_rolled_back" @classmethod @@ -16,7 +16,7 @@ def setUpClass(cls): @classmethod def tearDownClass(cls): # Follow-up of test_broker_transactional_test - with connections[settings.DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[settings.BROKER_DB_ALIAS].cursor() as cursor: cursor.execute("select * from pg_tables where tablename = '{}'".format(cls.dummy_table_name)) results = cursor.fetchall() assert results is not None @@ -29,7 +29,7 @@ def tearDownClass(cls): @pytest.mark.usefixtures("broker_db_setup") def test_can_connect_to_broker(self): """Simple 'integration test' that checks a Broker DB exists to integrate with""" - connection = connections[settings.DATA_BROKER_DB_ALIAS] + connection = connections[settings.BROKER_DB_ALIAS] with connection.cursor() as cursor: cursor.execute("SELECT now()") results = cursor.fetchall() @@ -50,7 +50,7 @@ def test_broker_transactional_test(self): dummy_contents = "dummy_text" # Make sure the table and the data get in there - connection = connections[settings.DATA_BROKER_DB_ALIAS] + connection = connections[settings.BROKER_DB_ALIAS] with connection.cursor() as cursor: cursor.execute("create table {} (contents text)".format(self.dummy_table_name)) cursor.execute("insert into {} values ('{}')".format(self.dummy_table_name, dummy_contents)) @@ -68,7 +68,7 @@ def test_broker_transactional_test(self): @pytest.mark.usefixtures("broker_db_setup") def test_broker_db_fully_setup(self): """Simple 'integration test' that checks a Broker DB had its schema setup""" - connection = connections[settings.DATA_BROKER_DB_ALIAS] + connection = connections[settings.BROKER_DB_ALIAS] with connection.cursor() as cursor: cursor.execute("select * from pg_tables where tablename = 'alembic_version'") results = cursor.fetchall() @@ -84,16 +84,15 @@ def test_can_connect_to_broker_by_dblink(broker_server_dblink_setup, db): """ connection = connections[DEFAULT_DB_ALIAS] with connection.cursor() as cursor: - cursor.execute(f"select srvname from pg_foreign_server where srvname = '{settings.DATA_BROKER_DBLINK_NAME}'") + cursor.execute(f"select srvname from pg_foreign_server where srvname = '{settings.BROKER_DBLINK_NAME}'") results = cursor.fetchall() - if not results or not results[0][0] == settings.DATA_BROKER_DBLINK_NAME: + if not results or not results[0][0] == settings.BROKER_DBLINK_NAME: pytest.skip( - f"No foreign server named '{settings.DATA_BROKER_DBLINK_NAME}' has been setup on this " + f"No foreign server named '{settings.BROKER_DBLINK_NAME}' has been setup on this " "USAspending database. Skipping the test of integration with that server via dblink" ) cursor.execute( - f"SELECT * FROM dblink('{settings.DATA_BROKER_DBLINK_NAME}','SELECT now()') " - "AS broker_time(the_now timestamp)" + f"SELECT * FROM dblink('{settings.BROKER_DBLINK_NAME}','SELECT now()') " "AS broker_time(the_now timestamp)" ) results = cursor.fetchall() assert results is not None diff --git a/usaspending_api/broker/tests/integration/test_get_delete_pks_for_afa_keys.py b/usaspending_api/broker/tests/integration/test_get_delete_pks_for_afa_keys.py index ca5e84fe05..65095f6aab 100644 --- a/usaspending_api/broker/tests/integration/test_get_delete_pks_for_afa_keys.py +++ b/usaspending_api/broker/tests/integration/test_get_delete_pks_for_afa_keys.py @@ -13,7 +13,7 @@ class TestThingWithMultipleDatabases(TestCase): @classmethod def setUpTestData(cls): - connection = connections[settings.DATA_BROKER_DB_ALIAS] + connection = connections[settings.BROKER_DB_ALIAS] with connection.cursor() as cursor: cursor.execute("select count(*) from published_fabs") diff --git a/usaspending_api/broker/tests/integration/test_load_broker_data.py b/usaspending_api/broker/tests/integration/test_load_broker_data.py index d1baad1f7d..a522e88937 100644 --- a/usaspending_api/broker/tests/integration/test_load_broker_data.py +++ b/usaspending_api/broker/tests/integration/test_load_broker_data.py @@ -4,7 +4,7 @@ from django.core.management import call_command from django.db import connections -from usaspending_api.settings import DATA_BROKER_DB_ALIAS, DEFAULT_DB_ALIAS +from usaspending_api.settings import BROKER_DB_ALIAS, DEFAULT_DB_ALIAS @pytest.fixture(scope="module") @@ -14,7 +14,7 @@ def now(): @pytest.fixture def broker_zips_grouped(now): - with connections[DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[BROKER_DB_ALIAS].cursor() as cursor: cursor.execute( """ CREATE TABLE zips_grouped_test ( @@ -37,11 +37,11 @@ def broker_zips_grouped(now): """ ) yield - with connections[DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[BROKER_DB_ALIAS].cursor() as cursor: cursor.execute("DROP TABLE zips_grouped_test;") -@pytest.mark.django_db(databases=[DATA_BROKER_DB_ALIAS, DEFAULT_DB_ALIAS], transaction=True) +@pytest.mark.django_db(databases=[BROKER_DB_ALIAS, DEFAULT_DB_ALIAS], transaction=True) def test_load_broker_table(broker_zips_grouped, now): call_command( "load_broker_table", @@ -61,7 +61,7 @@ def test_load_broker_table(broker_zips_grouped, now): assert rows == [(now, now, 1, "00001", "KS", "01", "01"), (now, now, 2, "00002", "KS", "02", "02")] -@pytest.mark.django_db(databases=[DATA_BROKER_DB_ALIAS, DEFAULT_DB_ALIAS], transaction=True) +@pytest.mark.django_db(databases=[BROKER_DB_ALIAS, DEFAULT_DB_ALIAS], transaction=True) def test_load_broker_table_exists_failure(broker_zips_grouped): with pytest.raises(ValueError): call_command( diff --git a/usaspending_api/broker/tests/integration/test_update_table_value_from_broker.py b/usaspending_api/broker/tests/integration/test_update_table_value_from_broker.py index e36251ed28..90cf70cf12 100644 --- a/usaspending_api/broker/tests/integration/test_update_table_value_from_broker.py +++ b/usaspending_api/broker/tests/integration/test_update_table_value_from_broker.py @@ -3,7 +3,7 @@ from django.db import connections -from usaspending_api.settings import DATA_BROKER_DB_ALIAS, DEFAULT_DB_ALIAS +from usaspending_api.settings import BROKER_DB_ALIAS, DEFAULT_DB_ALIAS @pytest.fixture @@ -32,7 +32,7 @@ def setup_usas_test_table(): @pytest.fixture def setup_broker_matching_table(): - with connections[DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[BROKER_DB_ALIAS].cursor() as cursor: cursor.execute( """ CREATE TABLE test_update_usas ( @@ -50,13 +50,13 @@ def setup_broker_matching_table(): """ ) yield - with connections[DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[BROKER_DB_ALIAS].cursor() as cursor: cursor.execute("DROP TABLE test_update_usas;") @pytest.fixture def setup_broker_different_table(): - with connections[DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[BROKER_DB_ALIAS].cursor() as cursor: cursor.execute( """ CREATE TABLE test_update_usas_from_broker ( @@ -74,7 +74,7 @@ def setup_broker_different_table(): """ ) yield - with connections[DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[BROKER_DB_ALIAS].cursor() as cursor: cursor.execute("DROP TABLE test_update_usas_from_broker;") @@ -96,7 +96,7 @@ def validate_usas_test_data(is_updated: bool): def validate_broker_test_data(is_table_match: bool): sql_parts_suffix = "" if is_table_match else "_from_broker" - with connections[DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[BROKER_DB_ALIAS].cursor() as cursor: cursor.execute( f""" SELECT match_field{sql_parts_suffix}, load_field{sql_parts_suffix} @@ -108,7 +108,7 @@ def validate_broker_test_data(is_table_match: bool): assert rows == [(1, "UPDATED TEXT 1"), (3, "UPDATED TEXT 3")] -@pytest.mark.django_db(databases=[DATA_BROKER_DB_ALIAS, DEFAULT_DB_ALIAS], transaction=True) +@pytest.mark.django_db(databases=[BROKER_DB_ALIAS, DEFAULT_DB_ALIAS], transaction=True) def test_update_with_only_usas_fields(broker_server_dblink_setup, setup_usas_test_table, setup_broker_matching_table): validate_usas_test_data(is_updated=False) validate_broker_test_data(is_table_match=True) @@ -124,7 +124,7 @@ def test_update_with_only_usas_fields(broker_server_dblink_setup, setup_usas_tes validate_usas_test_data(is_updated=True) -@pytest.mark.django_db(databases=[DATA_BROKER_DB_ALIAS, DEFAULT_DB_ALIAS], transaction=True) +@pytest.mark.django_db(databases=[BROKER_DB_ALIAS, DEFAULT_DB_ALIAS], transaction=True) def test_update_with_broker_fields(broker_server_dblink_setup, setup_usas_test_table, setup_broker_different_table): validate_usas_test_data(is_updated=False) validate_broker_test_data(is_table_match=False) diff --git a/usaspending_api/common/helpers/s3_helpers.py b/usaspending_api/common/helpers/s3_helpers.py index e289e34027..de6f84291e 100644 --- a/usaspending_api/common/helpers/s3_helpers.py +++ b/usaspending_api/common/helpers/s3_helpers.py @@ -13,7 +13,7 @@ from usaspending_api.config import CONFIG -logger = logging.getLogger("script") +logger = logging.getLogger(__name__) def _get_boto3(method_name: str, *args, region_name=CONFIG.AWS_REGION, **kwargs): @@ -111,7 +111,9 @@ def download_s3_object( s3_client = _get_boto3("client", "s3", region_name=region_name) for attempt in range(retry_count + 1): try: + logger.info(f"Retrieving file from S3. Bucket: {bucket_name} Key: {key}") s3_client.download_file(bucket_name, key, file_path) + logger.info(f"Saving {key} to: {file_path}") return except ClientError as e: logger.info( diff --git a/usaspending_api/common/helpers/spark_helpers.py b/usaspending_api/common/helpers/spark_helpers.py index ee45cd255f..91421aa671 100644 --- a/usaspending_api/common/helpers/spark_helpers.py +++ b/usaspending_api/common/helpers/spark_helpers.py @@ -365,7 +365,7 @@ def attach_java_gateway( return gateway -def get_jdbc_connection_properties(fix_strings=True) -> dict: +def get_jdbc_connection_properties(fix_strings: bool = True, truncate: bool = False) -> dict: jdbc_props = {"driver": "org.postgresql.Driver", "fetchsize": str(CONFIG.SPARK_PARTITION_ROWS)} if fix_strings: # This setting basically tells the JDBC driver how to process the strings, which could be a special type casted @@ -373,6 +373,8 @@ def get_jdbc_connection_properties(fix_strings=True) -> dict: # tells the driver to not make that assumption and let the schema try to infer the type on insertion. # See the `stringtype` param on https://jdbc.postgresql.org/documentation/94/connect.html for details. jdbc_props["stringtype"] = "unspecified" + if truncate: + jdbc_props["truncate"] = "true" return jdbc_props @@ -397,10 +399,10 @@ def get_usas_jdbc_url(): def get_broker_jdbc_url(): """Getting a JDBC-compliant Broker Postgres DB connection string hard-wired to the POSTGRES vars set in CONFIG""" - if not CONFIG.DATA_BROKER_DATABASE_URL: - raise ValueError("DATA_BROKER_DATABASE_URL config val must provided") + if not CONFIG.BROKER_DB: + raise ValueError("BROKER_DB config val must provided") - return get_jdbc_url_from_pg_uri(CONFIG.DATA_BROKER_DATABASE_URL) + return get_jdbc_url_from_pg_uri(CONFIG.BROKER_DB) def get_es_config(): # pragma: no cover -- will be used eventually diff --git a/usaspending_api/common/helpers/sql_helpers.py b/usaspending_api/common/helpers/sql_helpers.py index eb53872cac..335e7b262f 100644 --- a/usaspending_api/common/helpers/sql_helpers.py +++ b/usaspending_api/common/helpers/sql_helpers.py @@ -29,8 +29,8 @@ def get_database_dsn_string(db_alias: str = DEFAULT_DB_ALIAS): def get_broker_dsn_string(): - if settings.DATA_BROKER_DB_ALIAS in settings.DATABASES: # Primary DB connection in a deployed environment - return build_dsn_string(settings.DATABASES[settings.DATA_BROKER_DB_ALIAS]) + if settings.BROKER_DB_ALIAS in settings.DATABASES: # Primary DB connection in a deployed environment + return build_dsn_string(settings.DATABASES[settings.BROKER_DB_ALIAS]) else: raise Exception("No valid Broker database connection is configured") diff --git a/usaspending_api/config/envs/default.py b/usaspending_api/config/envs/default.py index e12a25aa48..01f660f2f6 100644 --- a/usaspending_api/config/envs/default.py +++ b/usaspending_api/config/envs/default.py @@ -46,7 +46,7 @@ class DefaultConfig(BaseSettings): USASPENDING_DB_PASSWORD: Password for the user used to connect to the USAspending DB USASPENDING_DB_HOST: Host on which to to connect to the USAspending DB USASPENDING_DB_PORT: Port on which to connect to the USAspending DB - DATA_BROKER_DATABASE_URL: (optional) Full URL to Broker DB that can be used to override the URL-by-parts + BROKER_DB: (optional) Full URL to Broker DB that can be used to override the URL-by-parts BROKER_DB_NAME: The name of the Postgres DB that contains Broker data BROKER_DB_USER: Authorized user used to connect to the Broker DB BROKER_DB_PASSWORD: Password for the user used to connect to the Broker DB @@ -86,7 +86,7 @@ def __new__(cls, *args, **kwargs): USASPENDING_DB_HOST: str = ENV_SPECIFIC_OVERRIDE USASPENDING_DB_PORT: str = ENV_SPECIFIC_OVERRIDE - DATA_BROKER_DATABASE_URL: str = None # FACTORY_PROVIDED_VALUE. See its root validator-factory below + BROKER_DB: str = None # FACTORY_PROVIDED_VALUE. See its root validator-factory below BROKER_DB_SCHEME: str = "postgres" BROKER_DB_NAME: str = "data_broker" BROKER_DB_USER: str = ENV_SPECIFIC_OVERRIDE @@ -158,8 +158,8 @@ def _DATABASE_URL_and_parts_factory(cls, values): # noinspection PyMethodParameters # Pydantic returns a classmethod for its validators, so the cls param is correct @root_validator - def _DATA_BROKER_DATABASE_URL_and_parts_factory(cls, values): - """A root validator to backfill DATA_BROKER_DATABASE_URL and BROKER_DB_* part config vars and validate + def _BROKER_DB_and_parts_factory(cls, values): + """A root validator to backfill BROKER_DB and BROKER_DB_* part config vars and validate that they are all consistent. - Serves as a factory function to fill out all places where we track the database URL as both one complete @@ -171,7 +171,7 @@ def _DATA_BROKER_DATABASE_URL_and_parts_factory(cls, values): cls._validate_database_url( cls=cls, values=values, - url_conf_name="DATA_BROKER_DATABASE_URL", + url_conf_name="BROKER_DB", resource_conf_prefix="BROKER_DB", required=False, ) @@ -314,9 +314,6 @@ def _validate_http_url(cls, values, url_conf_name, resource_conf_prefix, require # Ex: 3-data-node cluster of i3.large.elasticsearch = 2 vCPU * 3 nodes = 6 vCPU: 300*6 = 1800 doc batches # Ex: 5-data-node cluster of i3.xlarge.elasticsearch = 4 vCPU * 5 nodes = 20 vCPU: 300*20 = 6000 doc batches ES_BATCH_ENTRIES: int = 4000 - # Setting SPARK_COVID19_DOWNLOAD_README_FILE_PATH to the unique location of the README - # for the COVID-19 download generation using spark. - SPARK_COVID19_DOWNLOAD_README_FILE_PATH: str = f"/dbfs/FileStore/{BRANCH}/COVID-19_download_readme.txt" # ==== [AWS] ==== USE_AWS: bool = True @@ -332,6 +329,11 @@ def _validate_http_url(cls, values, url_conf_name, resource_conf_prefix, require AWS_S3_ENDPOINT: str = "s3.us-gov-west-1.amazonaws.com" AWS_STS_ENDPOINT: str = "sts.us-gov-west-1.amazonaws.com" + # ==== [MISC] ==== + # Miscellaneous configs that are used through the codebase but don't fall into one of the categories above + COVID19_DOWNLOAD_README_FILE_NAME: str = "COVID-19_download_readme.txt" + COVID19_DOWNLOAD_README_OBJECT_KEY: str = f"files/{COVID19_DOWNLOAD_README_FILE_NAME}" + class Config: pass # supporting use of a user-provided (ang git-ignored) .env file for overrides diff --git a/usaspending_api/config/envs/local.py b/usaspending_api/config/envs/local.py index 7c137962d3..6fa385020f 100644 --- a/usaspending_api/config/envs/local.py +++ b/usaspending_api/config/envs/local.py @@ -68,7 +68,6 @@ class LocalConfig(DefaultConfig): # Sensible defaults to underneath the project root dir. But look in .env for overriding of these SPARK_SQL_WAREHOUSE_DIR: str = str(_PROJECT_ROOT_DIR / "spark-warehouse") HIVE_METASTORE_DERBY_DB_DIR: str = str(_PROJECT_ROOT_DIR / "spark-warehouse" / "metastore_db") - SPARK_COVID19_DOWNLOAD_README_FILE_PATH = str(_PROJECT_ROOT_DIR / "data" / "COVID-19_download_readme.txt") # ==== [MinIO] ==== MINIO_HOST: str = "localhost" diff --git a/usaspending_api/config/envs/production.py b/usaspending_api/config/envs/production.py index f451ec47b1..05f9dbc526 100644 --- a/usaspending_api/config/envs/production.py +++ b/usaspending_api/config/envs/production.py @@ -25,6 +25,6 @@ class ProductionConfig(DefaultConfig): # ==== [AWS] ==== AWS_PROFILE: Union[str, None] = None - SPARK_S3_BUCKET = "dti-da-usaspending-spark-prod" + SPARK_S3_BUCKET = "dti-usaspending-emr-prod" BULK_DOWNLOAD_S3_BUCKET_NAME: str = "dti-usaspending-bulk-download" DATABASE_DOWNLOAD_S3_BUCKET_NAME = "dti-usaspending-db-prod" diff --git a/usaspending_api/config/envs/qat.py b/usaspending_api/config/envs/qat.py index 59ae88a48a..608ed4ebd1 100644 --- a/usaspending_api/config/envs/qat.py +++ b/usaspending_api/config/envs/qat.py @@ -25,6 +25,6 @@ class QATConfig(DefaultConfig): # ==== [AWS] ==== AWS_PROFILE: Union[str, None] = None - SPARK_S3_BUCKET = "dti-da-usaspending-spark-qat" + SPARK_S3_BUCKET = "dti-usaspending-emr-qat" BULK_DOWNLOAD_S3_BUCKET_NAME: str = "dti-usaspending-bulk-download-qat" DATABASE_DOWNLOAD_S3_BUCKET_NAME = "dti-usaspending-db-nonprod" diff --git a/usaspending_api/config/envs/sandbox.py b/usaspending_api/config/envs/sandbox.py index 7bdf1720ea..6b96c8fcf8 100644 --- a/usaspending_api/config/envs/sandbox.py +++ b/usaspending_api/config/envs/sandbox.py @@ -25,9 +25,9 @@ class SandboxConfig(DefaultConfig): # ==== [AWS] ==== AWS_PROFILE: Union[str, None] = None - SPARK_S3_BUCKET = "dti-da-usaspending-spark-qat" + SPARK_S3_BUCKET = "dti-usaspending-emr-qat" BULK_DOWNLOAD_S3_BUCKET_NAME: str = "dti-usaspending-bulk-download-qat" # Prefix data paths with data/sandbox to not interfere with qat data in the same shared bucket - DELTA_LAKE_S3_PATH: str = "data/sandbox/delta" - SPARK_CSV_S3_PATH: str = "data/sandbox/csv" + # DELTA_LAKE_S3_PATH: str = "data/sandbox/delta" + # SPARK_CSV_S3_PATH: str = "data/sandbox/csv" DATABASE_DOWNLOAD_S3_BUCKET_NAME = "dti-usaspending-db-nonprod" diff --git a/usaspending_api/config/envs/staging.py b/usaspending_api/config/envs/staging.py index 1260b9f74a..871035d7f5 100644 --- a/usaspending_api/config/envs/staging.py +++ b/usaspending_api/config/envs/staging.py @@ -25,6 +25,6 @@ class StagingConfig(DefaultConfig): # ==== [AWS] ==== AWS_PROFILE: Union[str, None] = None - SPARK_S3_BUCKET = "dti-da-usaspending-spark-staging" + SPARK_S3_BUCKET = "dti-usaspending-emr-staging" BULK_DOWNLOAD_S3_BUCKET_NAME: str = "dti-usaspending-bulk-download-staging" DATABASE_DOWNLOAD_S3_BUCKET_NAME: str = "dti-usaspending-db-prod" diff --git a/usaspending_api/config/tests/unit/test_config.py b/usaspending_api/config/tests/unit/test_config.py index 127e876e56..f9736156e3 100644 --- a/usaspending_api/config/tests/unit/test_config.py +++ b/usaspending_api/config/tests/unit/test_config.py @@ -389,7 +389,7 @@ class _UnitTestDbPartsPlaceholderConfig(DefaultConfig): def test_config_values(): """Test that config values are picked up. Also convenient for eyeballing the parsed config vals when - pytest is configurd with flags to output printed statements. Note: unlike DATABASE_URL, DATA_BROKER_DATABASE_URL + pytest is configurd with flags to output printed statements. Note: unlike DATABASE_URL, BROKER_DB is not required, and so it is not included in this test as it can vary depending on the local settings.""" config_values: dict = CONFIG.dict() assert len(config_values) > 0 @@ -411,7 +411,7 @@ def test_config_loading(): def test_database_urls_and_parts_config_populated(): """Validate that DATABASE_URL and all USASPENDING_DB_* parts are populated after config is loaded. - Note: unlike DATABASE_URL, DATA_BROKER_DATABASE_URL is not required, and so it is not included in this test as it + Note: unlike DATABASE_URL, BROKER_DB is not required, and so it is not included in this test as it can vary depending on the local settings.""" assert CONFIG.DATABASE_URL is not None assert CONFIG.USASPENDING_DB_HOST is not None @@ -423,19 +423,19 @@ def test_database_urls_and_parts_config_populated(): def test_database_urls_only_backfills_none_parts(): """Test that only providing a value for DATABASE_URL backfills the CONFIG.USASPENDING_DB_* parts and keeps them - consistent. Similarly with DATA_BROKER_DATABASE_URL and CONFIG.BROKER_DB_* parts. + consistent. Similarly with BROKER_DB and CONFIG.BROKER_DB_* parts. - Use a FRESH (empty) set of environment variables - Use NO .env file - Build-out a new subclass of DefaultConfig, which overrides the part values to be defaulted to None (not set) - - Instantiate the config with a DATABASE_URL and DATA_BROKER_DATABASE_URL env vars (ONLY) set + - Instantiate the config with a DATABASE_URL and BROKER_DB env vars (ONLY) set """ with mock.patch.dict( os.environ, { ENV_CODE_VAR: _UnitTestDbPartsNoneConfig.ENV_CODE, "DATABASE_URL": "postgres://dummy:pwd@foobar:12345/fresh_new_db_name", - "DATA_BROKER_DATABASE_URL": "postgres://broker:pass@broker-foobar:54321/fresh_new_db_name_broker", + "BROKER_DB": "postgres://broker:pass@broker-foobar:54321/fresh_new_db_name_broker", }, clear=True, ): @@ -454,7 +454,7 @@ def test_database_urls_only_backfills_none_parts(): assert cfg.USASPENDING_DB_USER == "dummy" assert cfg.USASPENDING_DB_PASSWORD.get_secret_value() == "pwd" - assert cfg.DATA_BROKER_DATABASE_URL is not None + assert cfg.BROKER_DB is not None assert cfg.BROKER_DB_HOST is not None assert cfg.BROKER_DB_PORT is not None assert cfg.BROKER_DB_NAME is not None @@ -470,19 +470,19 @@ def test_database_urls_only_backfills_none_parts(): def test_database_url_only_backfills_placeholder_parts(): """Test that only providing a value for DATABASE_URL backfills the CONFIG.USASPENDING_DB_* parts and keeps them - consistent. Similarly with DATA_BROKER_DATABASE_URL and CONFIG.BROKER_DB_* parts. + consistent. Similarly with BROKER_DB and CONFIG.BROKER_DB_* parts. - Use a FRESH (empty) set of environment variables - Use NO .env file - Build-out a new subclass of DefaultConfig, which overrides the part values to ENV_SPECIFIC_PLACEHOLDERs - - Instantiate the config with a DATABASE_URL and DATA_BROKER_DATABASE_URL env vars (ONLY) set + - Instantiate the config with a DATABASE_URL and BROKER_DB env vars (ONLY) set """ with mock.patch.dict( os.environ, { ENV_CODE_VAR: _UnitTestDbPartsPlaceholderConfig.ENV_CODE, "DATABASE_URL": "postgres://dummy:pwd@foobar:12345/fresh_new_db_name", - "DATA_BROKER_DATABASE_URL": "postgres://broker:pass@broker-foobar:54321/fresh_new_db_name_broker", + "BROKER_DB": "postgres://broker:pass@broker-foobar:54321/fresh_new_db_name_broker", }, clear=True, ): @@ -501,7 +501,7 @@ def test_database_url_only_backfills_placeholder_parts(): assert cfg.USASPENDING_DB_USER == "dummy" assert cfg.USASPENDING_DB_PASSWORD.get_secret_value() == "pwd" - assert cfg.DATA_BROKER_DATABASE_URL is not None + assert cfg.BROKER_DB is not None assert cfg.BROKER_DB_HOST is not None assert cfg.BROKER_DB_PORT is not None assert cfg.BROKER_DB_NAME is not None @@ -517,13 +517,13 @@ def test_database_url_only_backfills_placeholder_parts(): def test_database_url_none_parts_will_build_database_url_with_only_parts_set(): """Test that if only the CONFIG.USASPENDING_DB_* parts are provided, the DATABASE_URL will be built-up from - parts, set on the CONFIG object, and consistent with the parts. Similarly with DATA_BROKER_DATABASE_URL and + parts, set on the CONFIG object, and consistent with the parts. Similarly with BROKER_DB and CONFIG.BROKER_DB_* parts. - Use a FRESH (empty) set of environment variables - Use NO .env file - Build-out a new subclass of DefaultConfig, which overrides the part values to be defaulted to None (not set) - - DefaultConfig leaves DATABASE_URL and DATA_BROKER_DATABASE_URL unset, and the subclass does not set it + - DefaultConfig leaves DATABASE_URL and BROKER_DB unset, and the subclass does not set it - Instantiate the config with a env vars for each part """ with mock.patch.dict( @@ -559,7 +559,7 @@ def test_database_url_none_parts_will_build_database_url_with_only_parts_set(): assert cfg.USASPENDING_DB_PASSWORD.get_secret_value() == "pwd" assert cfg.DATABASE_URL == "postgres://dummy:pwd@foobar:12345/fresh_new_db_name" - assert cfg.DATA_BROKER_DATABASE_URL is not None + assert cfg.BROKER_DB is not None assert cfg.BROKER_DB_HOST is not None assert cfg.BROKER_DB_PORT is not None assert cfg.BROKER_DB_NAME is not None @@ -571,18 +571,18 @@ def test_database_url_none_parts_will_build_database_url_with_only_parts_set(): assert cfg.BROKER_DB_NAME == "fresh_new_db_name_broker" assert cfg.BROKER_DB_USER == "broker" assert cfg.BROKER_DB_PASSWORD.get_secret_value() == "pass" - assert cfg.DATA_BROKER_DATABASE_URL == "postgres://broker:pass@broker-foobar:54321/fresh_new_db_name_broker" + assert cfg.BROKER_DB == "postgres://broker:pass@broker-foobar:54321/fresh_new_db_name_broker" def test_database_url_placeholder_parts_will_build_database_url_with_only_parts_set(): """Test that if only the CONFIG.USASPENDING_DB_* parts are provided, the DATABASE_URL will be built-up from - parts, set on the CONFIG object, and consistent with the parts. Similarly with DATA_BROKER_DATABASE_URL and + parts, set on the CONFIG object, and consistent with the parts. Similarly with BROKER_DB and CONFIG.BROKER_DB_* parts. - Use a FRESH (empty) set of environment variables - Use NO .env file - Build-out a new subclass of DefaultConfig, which overrides the part values to ENV_SPECIFIC_PLACEHOLDERs - - DefaultConfig leaves DATABASE_URL and DATA_BROKER_DATABASE_URL unset, and the subclass does not set it + - DefaultConfig leaves DATABASE_URL and BROKER_DB unset, and the subclass does not set it - Instantiate the config with a env vars for each part """ with mock.patch.dict( @@ -618,7 +618,7 @@ def test_database_url_placeholder_parts_will_build_database_url_with_only_parts_ assert cfg.USASPENDING_DB_PASSWORD.get_secret_value() == "pwd" assert cfg.DATABASE_URL == "postgres://dummy:pwd@foobar:12345/fresh_new_db_name" - assert cfg.DATA_BROKER_DATABASE_URL is not None + assert cfg.BROKER_DB is not None assert cfg.BROKER_DB_HOST is not None assert cfg.BROKER_DB_PORT is not None assert cfg.BROKER_DB_NAME is not None @@ -630,19 +630,19 @@ def test_database_url_placeholder_parts_will_build_database_url_with_only_parts_ assert cfg.BROKER_DB_NAME == "fresh_new_db_name_broker" assert cfg.BROKER_DB_USER == "broker" assert cfg.BROKER_DB_PASSWORD.get_secret_value() == "pass" - assert cfg.DATA_BROKER_DATABASE_URL == "postgres://broker:pass@broker-foobar:54321/fresh_new_db_name_broker" + assert cfg.BROKER_DB == "postgres://broker:pass@broker-foobar:54321/fresh_new_db_name_broker" def test_database_url_and_parts_defined_ok_if_consistent_none_parts(): """Test that if BOTH the CONFIG.DATABASE_URL and the CONFIG.USASPENDING_DB_* parts are provided, neither is built-up or backfilled, but they are validated to ensure they are equal. This should validate fine. Similarly - with DATA_BROKER_DATABASE_URL and CONFIG.BROKER_DB_* parts. + with BROKER_DB and CONFIG.BROKER_DB_* parts. - Use a FRESH (empty) set of environment variables - Use NO .env file - Build-out a new subclass of DefaultConfig, which overrides the part values to be defaulted to None (not set) - - DefaultConfig leaves DATABASE_URL and DATA_BROKER_DATABASE_URL unset, and the subclass does not set it - - Instantiate the config with a env vars for each part and with a DATABASE_URL and DATA_BROKER_DATABASE_URL env vars + - DefaultConfig leaves DATABASE_URL and BROKER_DB unset, and the subclass does not set it + - Instantiate the config with a env vars for each part and with a DATABASE_URL and BROKER_DB env vars made up of those parts. """ with mock.patch.dict( @@ -655,7 +655,7 @@ def test_database_url_and_parts_defined_ok_if_consistent_none_parts(): "USASPENDING_DB_NAME": "fresh_new_db_name", "USASPENDING_DB_USER": "dummy", "USASPENDING_DB_PASSWORD": "pwd", - "DATA_BROKER_DATABASE_URL": "postgres://broker:pass@broker-foobar:54321/fresh_new_db_name_broker", + "BROKER_DB": "postgres://broker:pass@broker-foobar:54321/fresh_new_db_name_broker", "BROKER_DB_HOST": "broker-foobar", "BROKER_DB_PORT": "54321", "BROKER_DB_NAME": "fresh_new_db_name_broker", @@ -680,7 +680,7 @@ def test_database_url_and_parts_defined_ok_if_consistent_none_parts(): assert cfg.USASPENDING_DB_PASSWORD.get_secret_value() == "pwd" assert cfg.DATABASE_URL == "postgres://dummy:pwd@foobar:12345/fresh_new_db_name" - assert cfg.DATA_BROKER_DATABASE_URL is not None + assert cfg.BROKER_DB is not None assert cfg.BROKER_DB_HOST is not None assert cfg.BROKER_DB_PORT is not None assert cfg.BROKER_DB_NAME is not None @@ -692,19 +692,19 @@ def test_database_url_and_parts_defined_ok_if_consistent_none_parts(): assert cfg.BROKER_DB_NAME == "fresh_new_db_name_broker" assert cfg.BROKER_DB_USER == "broker" assert cfg.BROKER_DB_PASSWORD.get_secret_value() == "pass" - assert cfg.DATA_BROKER_DATABASE_URL == "postgres://broker:pass@broker-foobar:54321/fresh_new_db_name_broker" + assert cfg.BROKER_DB == "postgres://broker:pass@broker-foobar:54321/fresh_new_db_name_broker" def test_database_url_and_parts_defined_ok_if_consistent_placeholder_parts(): """Test that if BOTH the CONFIG.DATABASE_URL and the CONFIG.USASPENDING_DB_* parts are provided, neither is built-up or backfilled, but they are validated to ensure they are equal. This should validate fine. Similarly - with DATA_BROKER_DATABASE_URL and CONFIG.BROKER_DB_* parts. + with BROKER_DB and CONFIG.BROKER_DB_* parts. - Use a FRESH (empty) set of environment variables - Use NO .env file - Build-out a new subclass of DefaultConfig, which overrides the part values to ENV_SPECIFIC_PLACEHOLDERs - - DefaultConfig leaves DATABASE_URL and DATA_BROKER_DATABASE_URL unset, and the subclass does not set it - - Instantiate the config with a env vars for each part and with a DATABASE_URL and DATA_BROKER_DATABASE_URL env vars + - DefaultConfig leaves DATABASE_URL and BROKER_DB unset, and the subclass does not set it + - Instantiate the config with a env vars for each part and with a DATABASE_URL and BROKER_DB env vars made up of those parts. """ with mock.patch.dict( @@ -717,7 +717,7 @@ def test_database_url_and_parts_defined_ok_if_consistent_placeholder_parts(): "USASPENDING_DB_NAME": "fresh_new_db_name", "USASPENDING_DB_USER": "dummy", "USASPENDING_DB_PASSWORD": "pwd", - "DATA_BROKER_DATABASE_URL": "postgres://broker:pass@broker-foobar:54321/fresh_new_db_name_broker", + "BROKER_DB": "postgres://broker:pass@broker-foobar:54321/fresh_new_db_name_broker", "BROKER_DB_HOST": "broker-foobar", "BROKER_DB_PORT": "54321", "BROKER_DB_NAME": "fresh_new_db_name_broker", @@ -742,7 +742,7 @@ def test_database_url_and_parts_defined_ok_if_consistent_placeholder_parts(): assert cfg.USASPENDING_DB_PASSWORD.get_secret_value() == "pwd" assert cfg.DATABASE_URL == "postgres://dummy:pwd@foobar:12345/fresh_new_db_name" - assert cfg.DATA_BROKER_DATABASE_URL is not None + assert cfg.BROKER_DB is not None assert cfg.BROKER_DB_HOST is not None assert cfg.BROKER_DB_PORT is not None assert cfg.BROKER_DB_NAME is not None @@ -754,7 +754,7 @@ def test_database_url_and_parts_defined_ok_if_consistent_placeholder_parts(): assert cfg.BROKER_DB_NAME == "fresh_new_db_name_broker" assert cfg.BROKER_DB_USER == "broker" assert cfg.BROKER_DB_PASSWORD.get_secret_value() == "pass" - assert cfg.DATA_BROKER_DATABASE_URL == "postgres://broker:pass@broker-foobar:54321/fresh_new_db_name_broker" + assert cfg.BROKER_DB == "postgres://broker:pass@broker-foobar:54321/fresh_new_db_name_broker" def test_database_url_and_parts_error_if_inconsistent_none_parts(): @@ -854,14 +854,14 @@ def test_database_url_and_parts_error_if_inconsistent_placeholder_parts(): def test_data_act_database_url_and_parts_error_if_inconsistent_none_parts(): - """Test that if BOTH the CONFIG.DATA_BROKER_DATABASE_URL and the CONFIG.BROKER_DB_* parts are provided, + """Test that if BOTH the CONFIG.BROKER_DB and the CONFIG.BROKER_DB_* parts are provided, but their values are not consistent with each other, than the validation will catch that and throw an error - Use a FRESH (empty) set of environment variables - Use NO .env file - Build-out a new subclass of DefaultConfig, which overrides the part values to be defaulted to None (not set) - - DefaultConfig leaves DATA_BROKER_DATABASE_URL unset, and the subclass does not set it - - Instantiate the config with a env vars for each part and with a DATA_BROKER_DATABASE_URL env var made up of those + - DefaultConfig leaves BROKER_DB unset, and the subclass does not set it + - Instantiate the config with a env vars for each part and with a BROKER_DB env var made up of those parts. - Force the values to not match - Iterate through each part and test it fails validation @@ -869,7 +869,7 @@ def test_data_act_database_url_and_parts_error_if_inconsistent_none_parts(): consistent_dict = { ENV_CODE_VAR: _UnitTestDbPartsNoneConfig.ENV_CODE, "DATABASE_URL": "postgres://dummy:pwd@foobar:12345/fresh_new_db_name", - "DATA_BROKER_DATABASE_URL": "postgres://broker:pass@broker-foobar:54321/fresh_new_db_name_broker", + "BROKER_DB": "postgres://broker:pass@broker-foobar:54321/fresh_new_db_name_broker", "BROKER_DB_HOST": "broker-foobar", "BROKER_DB_PORT": "54321", "BROKER_DB_NAME": "fresh_new_db_name_broker", @@ -897,23 +897,19 @@ def test_data_act_database_url_and_parts_error_if_inconsistent_none_parts(): # The error keeps the provided password obfuscated as a SecretStr provided = SecretStr(provided) expected = "*" * len(expected) if expected else None - expected_error = ( - f"Part: {part}, Part Value Provided: {provided}, " - f"Value found in DATA_BROKER_DATABASE_URL:" - f" {expected}" - ) + expected_error = f"Part: {part}, Part Value Provided: {provided}, Value found in BROKER_DB: {expected}" assert exc_info.match(re.escape(expected_error)) def test_data_act_database_url_and_parts_error_if_inconsistent_placeholder_parts(): - """Test that if BOTH the CONFIG.DATA_BROKER_DATABASE_URL and the CONFIG.BROKER_DB_* parts are provided, + """Test that if BOTH the CONFIG.BROKER_DB and the CONFIG.BROKER_DB_* parts are provided, but their values are not consistent with each other, than the validation will catch that and throw an error - Use a FRESH (empty) set of environment variables - Use NO .env file - Build-out a new subclass of DefaultConfig, which overrides the part values ENV_SPECIFIC_PLACEHOLDERs - - DefaultConfig leaves DATA_BROKER_DATABASE_URL unset, and the subclass does not set it - - Instantiate the config with a env vars for each part and with a DATA_BROKER_DATABASE_URL env var made up of those + - DefaultConfig leaves BROKER_DB unset, and the subclass does not set it + - Instantiate the config with a env vars for each part and with a BROKER_DB env var made up of those parts. - Force the values to not match - Iterate through each part and test it fails validation @@ -921,7 +917,7 @@ def test_data_act_database_url_and_parts_error_if_inconsistent_placeholder_parts consistent_dict = { ENV_CODE_VAR: _UnitTestDbPartsPlaceholderConfig.ENV_CODE, "DATABASE_URL": "postgres://dummy:pwd@foobar:12345/fresh_new_db_name", - "DATA_BROKER_DATABASE_URL": "postgres://broker:pass@broker-foobar:54321/fresh_new_db_name_broker", + "BROKER_DB": "postgres://broker:pass@broker-foobar:54321/fresh_new_db_name_broker", "BROKER_DB_HOST": "broker-foobar", "BROKER_DB_PORT": "54321", "BROKER_DB_NAME": "fresh_new_db_name_broker", @@ -949,11 +945,7 @@ def test_data_act_database_url_and_parts_error_if_inconsistent_placeholder_parts # The error keeps the provided password obfuscated as a SecretStr provided = SecretStr(provided) expected = "*" * len(expected) if expected else None - expected_error = ( - f"Part: {part}, Part Value Provided: {provided}, " - f"Value found in DATA_BROKER_DATABASE_URL:" - f" {expected}" - ) + expected_error = f"Part: {part}, Part Value Provided: {provided}, Value found in BROKER_DB: {expected}" assert exc_info.match(re.escape(expected_error)) @@ -969,7 +961,7 @@ def test_postgres_dsn_constructed_with_only_url_leaves_none_parts(): assert pg_dsn.path is None assert pg_dsn.scheme is not None - pg_dsn = PostgresDsn(str(CONFIG.DATA_BROKER_DATABASE_URL), scheme="postgres") + pg_dsn = PostgresDsn(str(CONFIG.BROKER_DB), scheme="postgres") assert pg_dsn.host is None assert pg_dsn.port is None diff --git a/usaspending_api/conftest.py b/usaspending_api/conftest.py index c3f5683d24..8b17215603 100644 --- a/usaspending_api/conftest.py +++ b/usaspending_api/conftest.py @@ -265,22 +265,22 @@ def django_db_setup( {**settings.DATABASES[settings.DEFAULT_DB_ALIAS], **{"NAME": test_usas_db_name}} ) - old_broker_db_url = CONFIG.DATA_BROKER_DATABASE_URL + old_broker_db_url = CONFIG.BROKER_DB old_broker_ps_db = CONFIG.BROKER_DB_NAME - if settings.DATA_BROKER_DB_ALIAS in settings.DATABASES: - test_broker_db = settings.DATABASES[settings.DATA_BROKER_DB_ALIAS].get("NAME") + if settings.BROKER_DB_ALIAS in settings.DATABASES: + test_broker_db = settings.DATABASES[settings.BROKER_DB_ALIAS].get("NAME") if test_broker_db is None: raise ValueError( - f"DB 'NAME' for DB alias {settings.DATA_BROKER_DB_ALIAS} came back as None. Check config." + f"DB 'NAME' for DB alias {settings.BROKER_DB_ALIAS} came back as None. Check config." ) if "test" not in test_broker_db: raise ValueError( - f"DB 'NAME' for DB alias {settings.DATA_BROKER_DB_ALIAS} does not contain 'test' when expected to." + f"DB 'NAME' for DB alias {settings.BROKER_DB_ALIAS} does not contain 'test' when expected to." ) CONFIG.BROKER_DB_NAME = test_broker_db - CONFIG.DATA_BROKER_DATABASE_URL = build_dsn_string( - {**settings.DATABASES[settings.DATA_BROKER_DB_ALIAS], **{"NAME": test_broker_db}} + CONFIG.BROKER_DB = build_dsn_string( + {**settings.DATABASES[settings.BROKER_DB_ALIAS], **{"NAME": test_broker_db}} ) # This will be added to the finalizer which will be run when the newly made test database is being torn down @@ -288,7 +288,7 @@ def reset_postgres_dsn(): CONFIG.DATABASE_URL = old_usas_db_url CONFIG.USASPENDING_DB_NAME = old_usas_ps_db - CONFIG.DATA_BROKER_DATABASE_URL = old_broker_db_url + CONFIG.BROKER_DB = old_broker_db_url CONFIG.BROKER_DB_NAME = old_broker_ps_db request.addfinalizer(reset_postgres_dsn) @@ -453,7 +453,7 @@ def broker_db_setup(django_db_setup, django_db_use_migrations, worker_id): broker_integrationtest_secrets_file = f"{broker_config_env_envvar}_secrets.yml" # Check that a Connection to the Broker DB has been configured - if settings.DATA_BROKER_DB_ALIAS not in settings.DATABASES: + if settings.BROKER_DB_ALIAS not in settings.DATABASES: logger.error("Error finding 'data_broker' database configured in django settings.DATABASES.") pytest.skip( "'data_broker' database not configured in django settings.DATABASES. " @@ -486,7 +486,7 @@ def broker_db_setup(django_db_setup, django_db_use_migrations, worker_id): # ==== Run the DB setup script using the Broker docker image. ==== - broker_test_db_name = settings.DATABASES[settings.DATA_BROKER_DB_ALIAS]["NAME"] + broker_test_db_name = settings.DATABASES[settings.BROKER_DB_ALIAS]["NAME"] # Using a mounts to copy the broker source code into the container, and create a modifiable copy # of the broker config dir in order to execute broker DB setup code using that config @@ -534,13 +534,13 @@ def broker_db_setup(django_db_setup, django_db_use_migrations, worker_id): # Setup Broker config files to work with the same DB configured via the Broker DB URL env var # This will ensure that when the broker script is run, it uses the same test broker DB broker_db_config_cmds = rf""" \ - sed -i.bak -E "s/host:.*$/host: {settings.DATABASES[settings.DATA_BROKER_DB_ALIAS]["HOST"]}/" \ + sed -i.bak -E "s/host:.*$/host: {settings.DATABASES[settings.BROKER_DB_ALIAS]["HOST"]}/" \ {broker_src_target}/{broker_config_dir}/{broker_integrationtest_config_file}; \ - sed -i.bak -E "s/port:.*$/port: {settings.DATABASES[settings.DATA_BROKER_DB_ALIAS]["PORT"]}/" \ + sed -i.bak -E "s/port:.*$/port: {settings.DATABASES[settings.BROKER_DB_ALIAS]["PORT"]}/" \ {broker_src_target}/{broker_config_dir}/{broker_integrationtest_config_file}; \ - sed -i.bak -E "s/username:.*$/username: {settings.DATABASES[settings.DATA_BROKER_DB_ALIAS]["USER"]}/" \ + sed -i.bak -E "s/username:.*$/username: {settings.DATABASES[settings.BROKER_DB_ALIAS]["USER"]}/" \ {broker_src_target}/{broker_config_dir}/{broker_integrationtest_secrets_file}; \ - sed -i.bak -E "s/password:.*$/password: {settings.DATABASES[settings.DATA_BROKER_DB_ALIAS]["PASSWORD"]}/" \ + sed -i.bak -E "s/password:.*$/password: {settings.DATABASES[settings.BROKER_DB_ALIAS]["PASSWORD"]}/" \ {broker_src_target}/{broker_config_dir}/{broker_integrationtest_secrets_file}; \ """ diff --git a/usaspending_api/conftest_helpers.py b/usaspending_api/conftest_helpers.py index 2ee4d817fd..4cd4ae209b 100644 --- a/usaspending_api/conftest_helpers.py +++ b/usaspending_api/conftest_helpers.py @@ -265,11 +265,11 @@ def ensure_broker_server_dblink_exists(): # Gather tokens from database connection strings if settings.DEFAULT_DB_ALIAS not in settings.DATABASES: raise Exception(f"'{settings.DEFAULT_DATABASE_ALIAS}' database not configured in django settings.DATABASES") - if settings.DATA_BROKER_DB_ALIAS not in settings.DATABASES: - raise Exception(f"'{settings.DATA_BROKER_DB_ALIAS}' database not configured in django settings.DATABASES") + if settings.BROKER_DB_ALIAS not in settings.DATABASES: + raise Exception(f"'{settings.BROKER_DB_ALIAS}' database not configured in django settings.DATABASES") db_conn_tokens_dict = { **{"USASPENDING_DB_" + k: v for k, v in settings.DATABASES[settings.DEFAULT_DB_ALIAS].items()}, - **{"BROKER_DB_" + k: v for k, v in settings.DATABASES[settings.DATA_BROKER_DB_ALIAS].items()}, + **{"BROKER_DB_" + k: v for k, v in settings.DATABASES[settings.BROKER_DB_ALIAS].items()}, } extensions_script_path = str(settings.APP_DIR / "database_scripts" / "extensions" / "extensions.sql") diff --git a/usaspending_api/database_scripts/etl/award_delta_view.sql b/usaspending_api/database_scripts/etl/award_delta_view.sql index 0910ad12c9..f48a0ebb33 100644 --- a/usaspending_api/database_scripts/etl/award_delta_view.sql +++ b/usaspending_api/database_scripts/etl/award_delta_view.sql @@ -99,7 +99,7 @@ SELECT "pop_zip4", "cfda_number", - "cfda_program_title" as cfda_title, + "cfda_program_title" AS cfda_title, "cfdas", "sai_number", @@ -116,7 +116,7 @@ SELECT "tas_components", "disaster_emergency_fund_codes", - "spending_by_defc"::JSON, + CAST("spending_by_defc" AS VARCHAR(65535)) AS spending_by_defc, "total_covid_outlay", "total_covid_obligation", @@ -125,7 +125,7 @@ SELECT "total_iija_obligation", "generated_pragmatic_obligation", - "program_activities"::JSON, + CAST("program_activities" AS VARCHAR(65535)) AS program_activities, "subaward_count", "total_subaward_amount", "transaction_count" diff --git a/usaspending_api/database_scripts/etl/drop_view.sql b/usaspending_api/database_scripts/etl/drop_view.sql index f0b4743f6d..e8ba65d964 100644 --- a/usaspending_api/database_scripts/etl/drop_view.sql +++ b/usaspending_api/database_scripts/etl/drop_view.sql @@ -1,3 +1,5 @@ DROP VIEW IF EXISTS transaction_delta_view; DROP VIEW IF EXISTS award_delta_view; -DROP VIEW IF EXISTS subaward_es_view; \ No newline at end of file +DROP VIEW IF EXISTS subaward_es_view; +DROP VIEW IF EXISTS location_delta_view; +DROP VIEW IF EXISTS recipient_profile_delta_view; diff --git a/usaspending_api/database_scripts/etl/subaward_es_view.sql b/usaspending_api/database_scripts/etl/subaward_es_view.sql index 82dae3374f..82c7a1f287 100644 --- a/usaspending_api/database_scripts/etl/subaward_es_view.sql +++ b/usaspending_api/database_scripts/etl/subaward_es_view.sql @@ -118,7 +118,7 @@ SELECT a.disaster_emergency_fund_codes, a.recipient_hash, a.parent_uei, - s.program_activities::JSON, + CAST(s.program_activities AS VARCHAR(65535)) AS program_activities, s.prime_award_recipient_id, a.tas_paths, a.tas_components, diff --git a/usaspending_api/database_scripts/etl/transaction_delta_view.sql b/usaspending_api/database_scripts/etl/transaction_delta_view.sql index a29f44dc28..e637656d22 100644 --- a/usaspending_api/database_scripts/etl/transaction_delta_view.sql +++ b/usaspending_api/database_scripts/etl/transaction_delta_view.sql @@ -124,8 +124,8 @@ SELECT "funding_subtier_agency_abbreviation", "tas_paths", "tas_components", - "federal_accounts"::JSON, + CAST("federal_accounts" AS VARCHAR(65535)) AS federal_accounts, "disaster_emergency_fund_codes", - "program_activities"::JSON + CAST("program_activities" AS VARCHAR(65535)) AS program_activities FROM "transaction_search" WHERE "action_date" >= '2007-10-01'; diff --git a/usaspending_api/database_scripts/job_archive/backfill_per_transaction_exec_comp.py b/usaspending_api/database_scripts/job_archive/backfill_per_transaction_exec_comp.py index 7446f6b871..4d1145e4e8 100644 --- a/usaspending_api/database_scripts/job_archive/backfill_per_transaction_exec_comp.py +++ b/usaspending_api/database_scripts/job_archive/backfill_per_transaction_exec_comp.py @@ -28,7 +28,7 @@ # DEFINE THESE ENVIRONMENT VARIABLES BEFORE RUNNING! SPENDING_CONNECTION_STRING = environ["DATABASE_URL"] -BROKER_CONNECTION_STRING = environ["DATA_BROKER_DATABASE_URL"] +BROKER_CONNECTION_STRING = environ["BROKER_DB"] BROKER_FABS_SELECT_SQL = """ diff --git a/usaspending_api/database_scripts/job_archive/backfill_primary_place_of_performance_scope.py b/usaspending_api/database_scripts/job_archive/backfill_primary_place_of_performance_scope.py index 9421de0392..d974b3405a 100644 --- a/usaspending_api/database_scripts/job_archive/backfill_primary_place_of_performance_scope.py +++ b/usaspending_api/database_scripts/job_archive/backfill_primary_place_of_performance_scope.py @@ -25,7 +25,7 @@ # DEFINE THESE ENVIRONMENT VARIABLES BEFORE RUNNING! SPENDING_CONNECTION_STRING = environ["DATABASE_URL"] -BROKER_CONNECTION_STRING = environ["DATA_BROKER_DATABASE_URL"] +BROKER_CONNECTION_STRING = environ["BROKER_DB"] BROKER_FABS_SELECT_SQL = """ diff --git a/usaspending_api/database_scripts/job_archive/backfill_solicitation_date.py b/usaspending_api/database_scripts/job_archive/backfill_solicitation_date.py index 6d23f4961e..cac76afe1e 100644 --- a/usaspending_api/database_scripts/job_archive/backfill_solicitation_date.py +++ b/usaspending_api/database_scripts/job_archive/backfill_solicitation_date.py @@ -28,7 +28,7 @@ # DEFINE THESE ENVIRONMENT VARIABLES BEFORE RUNNING! SPENDING_CONNECTION_STRING = environ["DATABASE_URL"] -BROKER_CONNECTION_STRING = environ["DATA_BROKER_DATABASE_URL"] +BROKER_CONNECTION_STRING = environ["BROKER_DB"] BROKER_FPDS_SELECT_SQL = """ diff --git a/usaspending_api/database_scripts/job_archive/management/commands/resynchronize_fabs_and_fpds.py b/usaspending_api/database_scripts/job_archive/management/commands/resynchronize_fabs_and_fpds.py index 625557c44c..de9ef64f25 100644 --- a/usaspending_api/database_scripts/job_archive/management/commands/resynchronize_fabs_and_fpds.py +++ b/usaspending_api/database_scripts/job_archive/management/commands/resynchronize_fabs_and_fpds.py @@ -392,7 +392,7 @@ def validate_deletions(source_temp_table, transaction_temp_table, key_column, br return sql = broker_sql % (str(ids) if len(ids) > 1 else f"({ids[0]})") - connection = connections[settings.DATA_BROKER_DB_ALIAS] + connection = connections[settings.BROKER_DB_ALIAS] with connection.cursor() as cursor: cursor.execute(sql) results = cursor.fetchall() diff --git a/usaspending_api/disaster/management/commands/generate_covid19_download.py b/usaspending_api/disaster/management/commands/generate_covid19_download.py index 13b793f6e3..b1e34b77b5 100644 --- a/usaspending_api/disaster/management/commands/generate_covid19_download.py +++ b/usaspending_api/disaster/management/commands/generate_covid19_download.py @@ -8,7 +8,7 @@ from pathlib import Path from usaspending_api.common.etl.spark import create_ref_temp_views -from usaspending_api.common.helpers.s3_helpers import upload_download_file_to_s3 +from usaspending_api.common.helpers.s3_helpers import download_s3_object, upload_download_file_to_s3 from usaspending_api.common.helpers.spark_helpers import configure_spark_session, get_active_spark_session from usaspending_api.common.helpers.sql_helpers import read_sql_file_to_text from usaspending_api.common.helpers.download_csv_strategies import ( @@ -67,7 +67,6 @@ class Command(BaseCommand): "disaster_covid19_file_f_grants": "usaspending_api/disaster/management/sql/disaster_covid19_file_f_grants.sql", }, "download_to_csv_strategy": PostgresToCSVStrategy(logger=logger), - "readme_path": Path(settings.COVID19_DOWNLOAD_README_FILE_PATH), }, ComputeTypeEnum.SPARK.value: { "source_sql_strategy": { @@ -79,7 +78,6 @@ class Command(BaseCommand): "disaster_covid19_file_f_grants": f_grants_sql_string, }, "download_to_csv_strategy": SparkToCSVStrategy(logger=logger), - "readme_path": Path(CONFIG.SPARK_COVID19_DOWNLOAD_README_FILE_PATH), }, } @@ -119,7 +117,6 @@ def handle(self, *args, **options): self.spark = configure_spark_session(**extra_conf, spark_context=self.spark) create_ref_temp_views(self.spark) - self.readme_path = self.compute_types[self.compute_type_arg]["readme_path"] self.download_csv_strategy = self.compute_types[self.compute_type_arg]["download_to_csv_strategy"] self.download_source_sql = self.compute_types[self.compute_type_arg]["source_sql_strategy"] self.zip_file_path = ( @@ -210,13 +207,16 @@ def cleanup(self): path.unlink() def finalize_zip_contents(self): - self.filepaths_to_delete.append(self.working_dir_path / "Data_Dictionary_Crosswalk.xlsx") + # Handle the data dictionary + self.filepaths_to_delete.append(self.working_dir_path / settings.DATA_DICTIONARY_FILE_NAME) + add_data_dictionary_to_zip(str(self.working_dir_path), str(self.zip_file_path)) - add_data_dictionary_to_zip(str(self.zip_file_path.parent), str(self.zip_file_path)) - - file_description = build_file_description(str(self.readme_path), dict()) + # Handle the COVID-19 readme + covid_readme_path = self.working_dir_path / CONFIG.COVID19_DOWNLOAD_README_FILE_NAME + download_s3_object(CONFIG.SPARK_S3_BUCKET, CONFIG.COVID19_DOWNLOAD_README_OBJECT_KEY, str(covid_readme_path)) + file_description = build_file_description(str(covid_readme_path), dict()) file_description_path = save_file_description( - str(self.zip_file_path.parent), self.readme_path.name, file_description + str(self.working_dir_path), covid_readme_path.name, file_description ) self.filepaths_to_delete.append(Path(file_description_path)) append_files_to_zip_file([file_description_path], str(self.zip_file_path)) diff --git a/usaspending_api/download/filestreaming/download_generation.py b/usaspending_api/download/filestreaming/download_generation.py index 8badb283d7..02daeb2a02 100644 --- a/usaspending_api/download/filestreaming/download_generation.py +++ b/usaspending_api/download/filestreaming/download_generation.py @@ -927,11 +927,6 @@ def add_data_dictionary_to_zip(working_dir, zip_file_path): write_to_log(message="Adding data dictionary to zip file") data_dictionary_file_name = "Data_Dictionary_Crosswalk.xlsx" data_dictionary_file_path = os.path.join(working_dir, data_dictionary_file_name) - - logger.info( - f"Retrieving the data dictionary from S3. Bucket: {settings.DATA_DICTIONARY_S3_BUCKET_NAME} Key: {settings.DATA_DICTIONARY_S3_KEY}" - ) - logger.info(f"Saving the data dictionary to: {data_dictionary_file_path}") download_s3_object( bucket_name=settings.DATA_DICTIONARY_S3_BUCKET_NAME, key=settings.DATA_DICTIONARY_S3_KEY, diff --git a/usaspending_api/download/tests/integration/test_populate_monthly_files.py b/usaspending_api/download/tests/integration/test_populate_monthly_files.py index f2e559b244..79d0410b58 100644 --- a/usaspending_api/download/tests/integration/test_populate_monthly_files.py +++ b/usaspending_api/download/tests/integration/test_populate_monthly_files.py @@ -529,7 +529,7 @@ def monthly_download_data(db, monkeypatch): monkeypatch.setattr("usaspending_api.settings.MONTHLY_DOWNLOAD_S3_BUCKET_NAME", "whatever") -@pytest.mark.django_db(databases=[settings.DATA_BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS]) +@pytest.mark.django_db(databases=[settings.BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS]) def test_all_agencies(client, fake_csv_local_path, monthly_download_data, monkeypatch): download_generation.retrieve_db_string = Mock(return_value=get_database_dsn_string(settings.DOWNLOAD_DB_ALIAS)) call_command("populate_monthly_files", "--fiscal_year=2020", "--local", "--clobber") @@ -539,7 +539,7 @@ def test_all_agencies(client, fake_csv_local_path, monthly_download_data, monkey assert f"FY2020_All_Assistance_Full_{TODAY}.zip" in file_list -@pytest.mark.django_db(databases=[settings.DATA_BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS]) +@pytest.mark.django_db(databases=[settings.BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS]) def test_specific_agency(client, fake_csv_local_path, monthly_download_data, monkeypatch): download_generation.retrieve_db_string = Mock(return_value=get_database_dsn_string(settings.DOWNLOAD_DB_ALIAS)) contract_data = generate_contract_data(2020, 1) @@ -586,7 +586,7 @@ def test_specific_agency(client, fake_csv_local_path, monthly_download_data, mon assert row_count >= 1 -@pytest.mark.django_db(databases=[settings.DATA_BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS]) +@pytest.mark.django_db(databases=[settings.BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS]) def test_agency_no_data(client, fake_csv_local_path, monthly_download_data, monkeypatch): download_generation.retrieve_db_string = Mock(return_value=get_database_dsn_string(settings.DOWNLOAD_DB_ALIAS)) call_command("populate_monthly_files", "--agencies=2", "--fiscal_year=2022", "--local", "--clobber") @@ -609,7 +609,7 @@ def test_agency_no_data(client, fake_csv_local_path, monthly_download_data, monk assert row_count == 1, f"{csv_file} was not empty" -@pytest.mark.django_db(databases=[settings.DATA_BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS]) +@pytest.mark.django_db(databases=[settings.BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS]) def test_fiscal_years(client, fake_csv_local_path, monthly_download_data, monkeypatch): download_generation.retrieve_db_string = Mock(return_value=get_database_dsn_string(settings.DOWNLOAD_DB_ALIAS)) # contract_data = generate_contract_data(2020, 1) @@ -628,7 +628,7 @@ def test_fiscal_years(client, fake_csv_local_path, monthly_download_data, monkey assert expected_file in file_list -@pytest.mark.django_db(databases=[settings.DATA_BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS]) +@pytest.mark.django_db(databases=[settings.BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS]) def test_award_type(client, fake_csv_local_path, monthly_download_data, monkeypatch): download_generation.retrieve_db_string = Mock(return_value=get_database_dsn_string(settings.DOWNLOAD_DB_ALIAS)) call_command( diff --git a/usaspending_api/etl/broker_etl_helpers.py b/usaspending_api/etl/broker_etl_helpers.py index 45f18ff928..084c26d5cd 100644 --- a/usaspending_api/etl/broker_etl_helpers.py +++ b/usaspending_api/etl/broker_etl_helpers.py @@ -44,7 +44,7 @@ def setup_broker_fdw(): with connection.cursor() as cursor: with open("usaspending_api/etl/management/setup_broker_fdw.sql") as infile: - logger.info(connections.databases[settings.DATA_BROKER_DB_ALIAS]) + logger.info(connections.databases[settings.BROKER_DB_ALIAS]) for raw_sql in infile.read().split("\n\n\n"): logger.info("SETUP BROKER FDW: Running SQL => " + str(raw_sql)) - cursor.execute(raw_sql, connections.databases[settings.DATA_BROKER_DB_ALIAS]) + cursor.execute(raw_sql, connections.databases[settings.BROKER_DB_ALIAS]) diff --git a/usaspending_api/etl/management/commands/create_delta_table.py b/usaspending_api/etl/management/commands/create_delta_table.py index 52462f76e3..b47b1ee46a 100644 --- a/usaspending_api/etl/management/commands/create_delta_table.py +++ b/usaspending_api/etl/management/commands/create_delta_table.py @@ -3,7 +3,7 @@ from django.core.management.base import BaseCommand from pyspark.sql.types import StructType - +from usaspending_api.awards.delta_models.award_id_lookup import AWARD_ID_LOOKUP_SCHEMA from usaspending_api.config import CONFIG from usaspending_api.common.helpers.spark_helpers import ( configure_spark_session, @@ -12,9 +12,21 @@ from usaspending_api.etl.management.commands.archive_table_in_delta import TABLE_SPEC as ARCHIVE_TABLE_SPEC from usaspending_api.etl.management.commands.load_query_to_delta import TABLE_SPEC as LOAD_QUERY_TABLE_SPEC from usaspending_api.etl.management.commands.load_table_to_delta import TABLE_SPEC as LOAD_TABLE_TABLE_SPEC - - -TABLE_SPEC = {**ARCHIVE_TABLE_SPEC, **LOAD_TABLE_TABLE_SPEC, **LOAD_QUERY_TABLE_SPEC} +from usaspending_api.transactions.delta_models.transaction_id_lookup import TRANSACTION_ID_LOOKUP_SCHEMA + +TABLE_SPEC = { + **ARCHIVE_TABLE_SPEC, + **LOAD_TABLE_TABLE_SPEC, + **LOAD_QUERY_TABLE_SPEC, + "award_id_lookup": { + "destination_database": "int", + "delta_table_create_sql": AWARD_ID_LOOKUP_SCHEMA, + }, + "transaction_id_lookup": { + "destination_database": "int", + "delta_table_create_sql": TRANSACTION_ID_LOOKUP_SCHEMA, + }, +} logger = logging.getLogger(__name__) diff --git a/usaspending_api/etl/management/commands/load_submission.py b/usaspending_api/etl/management/commands/load_submission.py index b70bc86d0b..1c8e7355ff 100644 --- a/usaspending_api/etl/management/commands/load_submission.py +++ b/usaspending_api/etl/management/commands/load_submission.py @@ -33,7 +33,7 @@ class Command(load_base.Command): db_cursor = None help = ( - "Loads a single submission from Data Broker. The DATA_BROKER_DATABASE_URL environment variable " + "Loads a single submission from Data Broker. The BROKER_DB environment variable " "must set so we can pull submission data from their db." ) diff --git a/usaspending_api/etl/management/commands/load_transactions_in_delta.py b/usaspending_api/etl/management/commands/load_transactions_in_delta.py index 9cd89f64eb..b7c90a00f5 100644 --- a/usaspending_api/etl/management/commands/load_transactions_in_delta.py +++ b/usaspending_api/etl/management/commands/load_transactions_in_delta.py @@ -1064,7 +1064,7 @@ def prepare_orphaned_transaction_temp_table(): # managed table in the temp database. self.spark.sql("CREATE DATABASE IF NOT EXISTS temp") self.spark.sql( - """ + f""" CREATE OR REPLACE TABLE temp.orphaned_transaction_info ( transaction_id LONG NOT NULL, transaction_unique_id STRING NOT NULL, @@ -1072,6 +1072,7 @@ def prepare_orphaned_transaction_temp_table(): unique_award_key STRING NOT NULL ) USING DELTA + LOCATION 's3a://{CONFIG.SPARK_S3_BUCKET}/{CONFIG.DELTA_LAKE_S3_PATH}/temp/orphaned_transaction_info' """ ) @@ -1086,11 +1087,12 @@ def prepare_orphaned_transaction_temp_table(): def prepare_orphaned_award_temp_table(): # We actually need another temporary table to handle orphaned awards self.spark.sql( - """ + f""" CREATE OR REPLACE TABLE temp.orphaned_award_info ( award_id LONG NOT NULL ) USING DELTA + LOCATION 's3a://{CONFIG.SPARK_S3_BUCKET}/{CONFIG.DELTA_LAKE_S3_PATH}/temp/orphaned_award_info' """ ) diff --git a/usaspending_api/etl/management/commands/update_covid_awards_in_delta.py b/usaspending_api/etl/management/commands/update_covid_awards_in_delta.py index 406e2d325f..bd8b486317 100644 --- a/usaspending_api/etl/management/commands/update_covid_awards_in_delta.py +++ b/usaspending_api/etl/management/commands/update_covid_awards_in_delta.py @@ -12,8 +12,7 @@ logger = logging.getLogger(__name__) -UPDATE_AWARDS_SQL = """ - WITH recent_covid_awards AS ( +DISTINCT_AWARD_SQL = """ SELECT DISTINCT award_id FROM @@ -46,30 +45,34 @@ AND submission_fiscal_month = {this_quarters_month} AND is_quarter = TRUE ) - ) - {operation_sql} - WHERE - id IN ( - SELECT - award_id - FROM - recent_covid_awards - ) - AND update_date < '{submission_reveal_date}' """ -UPDATE_OPERATION_SQL = """ -UPDATE - int.awards -SET - update_date = NOW() +CONDITION_SQL = """ + update_date < '{submission_reveal_date}' """ COUNT_OPERATION_SQL = """ -SELECT - count(*) -FROM - int.awards AS award_to_update_count + WITH recent_covid_awards AS ( + {distinct_award_sql} + ) + SELECT COUNT(*) + FROM int.awards + WHERE + {condition_sql} + AND EXISTS ( + SELECT 1 + FROM recent_covid_awards + WHERE awards.id = recent_covid_awards.award_id + ) +""" + +UPDATE_OPERATION_SQL = """ + MERGE INTO int.awards USING ({distinct_award_sql}) AS recent_covid_awards + ON + {condition_sql} + AND awards.id = recent_covid_awards.award_id + WHEN MATCHED + THEN UPDATE SET update_date = NOW() """ @@ -118,23 +121,28 @@ def handle(self, *args, **options): # Use the dry_run option to determine whether to actually update awards or only determine the count of # awards that would be updated + distinct_award_sql = DISTINCT_AWARD_SQL.format( + last_months_year=periods["last_month"]["year"], + last_months_month=periods["last_month"]["month"], + last_quarters_year=periods["last_quarter"]["year"], + last_quarters_month=periods["last_quarter"]["month"], + this_months_year=periods["this_month"]["year"], + this_months_month=periods["this_month"]["month"], + this_quarters_year=periods["this_quarter"]["year"], + this_quarters_month=periods["this_quarter"]["month"], + ) + condition_sql = CONDITION_SQL.format( + submission_reveal_date=periods["this_month"]["submission_reveal_date"], + ) operation_sql = UPDATE_OPERATION_SQL if dry_run: logger.info("Dry run flag provided. No records will be updated.") operation_sql = COUNT_OPERATION_SQL results = self.spark.sql( - UPDATE_AWARDS_SQL.format( - last_months_year=periods["last_month"]["year"], - last_months_month=periods["last_month"]["month"], - last_quarters_year=periods["last_quarter"]["year"], - last_quarters_month=periods["last_quarter"]["month"], - this_months_year=periods["this_month"]["year"], - this_months_month=periods["this_month"]["month"], - this_quarters_year=periods["this_quarter"]["year"], - this_quarters_month=periods["this_quarter"]["month"], - submission_reveal_date=periods["this_month"]["submission_reveal_date"], - operation_sql=operation_sql, + operation_sql.format( + distinct_award_sql=distinct_award_sql, + condition_sql=condition_sql, ) ) diff --git a/usaspending_api/etl/management/commands/update_file_c_linkages_in_delta.py b/usaspending_api/etl/management/commands/update_file_c_linkages_in_delta.py index 210ff8c4ce..d0a0e550c2 100644 --- a/usaspending_api/etl/management/commands/update_file_c_linkages_in_delta.py +++ b/usaspending_api/etl/management/commands/update_file_c_linkages_in_delta.py @@ -5,6 +5,7 @@ from pyspark.sql import SparkSession from usaspending_api.awards.delta_models import c_to_d_linkage_view_sql_strings, c_to_d_linkage_drop_view_sql_strings +from usaspending_api.awards.models import CToDLinkageUpdates from usaspending_api.common.helpers.spark_helpers import ( configure_spark_session, get_active_spark_session, @@ -116,7 +117,7 @@ def handle(self, *args, **options): self.spark.sql(delete_sql) # Clean up deletion view(s) - self.spark.sql("DROP VIEW identify_faba_deletions_query;") + self.spark.sql("DROP VIEW IF EXISTS identify_faba_deletions_query;") # Setup int table. Creates a shallow clone of the `raw` FABA table in the `int` schema. # If the --no-clone option is provided a full table is created instead. @@ -135,6 +136,8 @@ def handle(self, *args, **options): "INSERT OVERWRITE int.financial_accounts_by_awards SELECT * FROM raw.financial_accounts_by_awards;" ) else: + if self.spark.catalog.tableExists("int.financial_accounts_by_awards"): + self.spark.sql("DELETE FROM int.financial_accounts_by_awards;") self.spark.sql( f""" CREATE OR REPLACE TABLE int.financial_accounts_by_awards @@ -197,9 +200,9 @@ def handle(self, *args, **options): c_to_d_linkage_updates_df = self.spark.sql("SELECT * FROM union_all_priority;") c_to_d_linkage_updates_df.write.jdbc( url=get_usas_jdbc_url(), - table="public.c_to_d_linkage_updates", + table=f"public.{CToDLinkageUpdates._meta.db_table}", mode="overwrite", - properties=get_jdbc_connection_properties(), + properties=get_jdbc_connection_properties(truncate=True), ) # Run Linkage Queries diff --git a/usaspending_api/etl/management/load_base.py b/usaspending_api/etl/management/load_base.py index 0ab2fe83e5..6da546d223 100644 --- a/usaspending_api/etl/management/load_base.py +++ b/usaspending_api/etl/management/load_base.py @@ -38,10 +38,10 @@ def handle(self, *args, **options): # Grab data broker database connections if not options["test"]: try: - db_conn = connections[settings.DATA_BROKER_DB_ALIAS] + db_conn = connections[settings.BROKER_DB_ALIAS] db_cursor = db_conn.cursor() except Exception as err: - logger.critical("Could not connect to database. Is DATA_BROKER_DATABASE_URL set?") + logger.critical("Could not connect to database. Is BROKER_DB set?") logger.critical(print(err)) raise else: diff --git a/usaspending_api/etl/submission_loader_helpers/file_c.py b/usaspending_api/etl/submission_loader_helpers/file_c.py index 014e1316cc..1a006f8ba2 100644 --- a/usaspending_api/etl/submission_loader_helpers/file_c.py +++ b/usaspending_api/etl/submission_loader_helpers/file_c.py @@ -47,7 +47,7 @@ def _retrieve_and_prepare_next_chunk(self): limit {self.chunk_size} """ - award_financial_frame = pd.read_sql(sql, connections[settings.DATA_BROKER_DB_ALIAS]) + award_financial_frame = pd.read_sql(sql, connections[settings.BROKER_DB_ALIAS]) if award_financial_frame.size > 0: award_financial_frame["object_class"] = award_financial_frame.apply(get_object_class_row, axis=1) diff --git a/usaspending_api/etl/submission_loader_helpers/program_activities.py b/usaspending_api/etl/submission_loader_helpers/program_activities.py index ef51966be3..3f81c653c4 100644 --- a/usaspending_api/etl/submission_loader_helpers/program_activities.py +++ b/usaspending_api/etl/submission_loader_helpers/program_activities.py @@ -37,7 +37,7 @@ def update_program_activities(submission_id): now() from dblink( - '{settings.DATA_BROKER_DBLINK_NAME}', + '{settings.BROKER_DBLINK_NAME}', ' select b.program_activity_code, upper(b.program_activity_name) program_activity_name, diff --git a/usaspending_api/etl/submission_loader_helpers/submission_ids.py b/usaspending_api/etl/submission_loader_helpers/submission_ids.py index 6a8402f724..450c0a929a 100644 --- a/usaspending_api/etl/submission_loader_helpers/submission_ids.py +++ b/usaspending_api/etl/submission_loader_helpers/submission_ids.py @@ -13,7 +13,7 @@ def get_new_or_updated_submission_ids(since_datetime=None): bs.submission_id from dblink( - '{settings.DATA_BROKER_DBLINK_NAME}', + '{settings.BROKER_DBLINK_NAME}', ' select s.submission_id, diff --git a/usaspending_api/etl/tests/integration/test_model.py b/usaspending_api/etl/tests/data/delta_model_for_test.py similarity index 100% rename from usaspending_api/etl/tests/integration/test_model.py rename to usaspending_api/etl/tests/data/delta_model_for_test.py diff --git a/usaspending_api/etl/tests/integration/test_create_delta_table.py b/usaspending_api/etl/tests/integration/test_create_delta_table.py index 61bbe145fc..53b62652c8 100644 --- a/usaspending_api/etl/tests/integration/test_create_delta_table.py +++ b/usaspending_api/etl/tests/integration/test_create_delta_table.py @@ -111,3 +111,11 @@ def test_create_delta_table_for_object_class_program_activity_download( spark, s3_unittest_data_bucket, hive_unittest_metastore_db ): _verify_delta_table_creation(spark, "object_class_program_activity_download", s3_unittest_data_bucket) + + +def test_create_delta_table_for_award_id_lookup(spark, s3_unittest_data_bucket, hive_unittest_metastore_db): + _verify_delta_table_creation(spark, "award_id_lookup", s3_unittest_data_bucket) + + +def test_create_delta_table_for_transaction_id_lookup(spark, s3_unittest_data_bucket, hive_unittest_metastore_db): + _verify_delta_table_creation(spark, "transaction_id_lookup", s3_unittest_data_bucket) diff --git a/usaspending_api/etl/tests/integration/test_load_multiple_submissions.py b/usaspending_api/etl/tests/integration/test_load_multiple_submissions.py index 2573d9a605..eca991fc94 100644 --- a/usaspending_api/etl/tests/integration/test_load_multiple_submissions.py +++ b/usaspending_api/etl/tests/integration/test_load_multiple_submissions.py @@ -107,7 +107,7 @@ def setUp(self): is_quarter=True, ) baker.make("references.ProgramActivityPark", code="ABCD0000", name="TEST PARK") - connection = connections[settings.DATA_BROKER_DB_ALIAS] + connection = connections[settings.BROKER_DB_ALIAS] with connection.cursor() as cursor: self._nuke_broker_data() @@ -314,7 +314,7 @@ def _nuke_broker_data(): back Broker changes. I spent entirely too much time trying to figure out a more graceful way, sooooo, brute force it is. """ - connection = connections[settings.DATA_BROKER_DB_ALIAS] + connection = connections[settings.BROKER_DB_ALIAS] with connection.cursor() as cursor: cursor.execute( """ @@ -477,7 +477,7 @@ def test_all_the_things(self): SubmissionAttributes.objects.filter(submission_id=1).delete() # Make it really old. - with connections[settings.DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[settings.BROKER_DB_ALIAS].cursor() as cursor: cursor.execute("update submission set updated_at = '1999-01-01' where submission_id = 1") # Make sure it DOESN'T reload. @@ -496,7 +496,7 @@ def test_all_the_things(self): assert SubmissionAttributes.objects.get(submission_id=2).update_date > update_date_sub_2 # Let's test the new certified_date change detection code. But first, bring submission 1 back to the present. - with connections[settings.DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[settings.BROKER_DB_ALIAS].cursor() as cursor: cursor.execute("update submission set updated_at = now() where submission_id = 1") call_command("load_multiple_submissions", "--submission-ids", 1) diff --git a/usaspending_api/etl/tests/integration/test_load_submission_mgmt_cmd.py b/usaspending_api/etl/tests/integration/test_load_submission_mgmt_cmd.py index 745a1f6a06..b965f7f9be 100644 --- a/usaspending_api/etl/tests/integration/test_load_submission_mgmt_cmd.py +++ b/usaspending_api/etl/tests/integration/test_load_submission_mgmt_cmd.py @@ -68,7 +68,7 @@ def setUpTestData(cls): "conflict_column": "published_award_financial_id", }, } - connection = connections[settings.DATA_BROKER_DB_ALIAS] + connection = connections[settings.BROKER_DB_ALIAS] with connection.cursor() as cursor: for broker_table_name, value in broker_objects_to_insert.items(): diff --git a/usaspending_api/etl/tests/integration/test_load_to_from_delta.py b/usaspending_api/etl/tests/integration/test_load_to_from_delta.py index f1d78664d9..ab13fc56bf 100644 --- a/usaspending_api/etl/tests/integration/test_load_to_from_delta.py +++ b/usaspending_api/etl/tests/integration/test_load_to_from_delta.py @@ -26,7 +26,7 @@ from usaspending_api.etl.management.commands.create_delta_table import ( TABLE_SPEC, ) -from usaspending_api.etl.tests.integration.test_model import TestModel, TEST_TABLE_POSTGRES, TEST_TABLE_SPEC +from usaspending_api.etl.tests.data.delta_model_for_test import TestModel, TEST_TABLE_POSTGRES, TEST_TABLE_SPEC from usaspending_api.recipient.models import RecipientLookup from usaspending_api.tests.conftest_spark import create_and_load_all_delta_tables from copy import deepcopy @@ -223,7 +223,7 @@ def verify_delta_table_loaded_to_delta( dummy_data = list(dummy_query.all().values()) elif is_from_broker: # model can be None if loading from the Broker - broker_connection = connections[settings.DATA_BROKER_DB_ALIAS] + broker_connection = connections[settings.BROKER_DB_ALIAS] source_broker_name = TABLE_SPEC[delta_table_name]["source_table"] with broker_connection.cursor() as cursor: dummy_query = f"SELECT * from {source_broker_name}" @@ -306,7 +306,7 @@ def verify_delta_table_loaded_from_delta( ) -@pytest.mark.django_db(databases=[settings.DATA_BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) +@pytest.mark.django_db(databases=[settings.BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) def test_load_table_to_from_delta_for_recipient_lookup( spark, s3_unittest_data_bucket, populate_usas_data_and_recipients_from_broker, hive_unittest_metastore_db ): @@ -436,7 +436,7 @@ def test_load_table_to_delta_for_published_fabs(spark, s3_unittest_data_bucket, verify_delta_table_loaded_to_delta(spark, "published_fabs", s3_unittest_data_bucket) -@pytest.mark.django_db(databases=[settings.DATA_BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) +@pytest.mark.django_db(databases=[settings.BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) def test_load_table_to_from_delta_for_recipient_profile( spark, s3_unittest_data_bucket, populate_usas_data_and_recipients_from_broker, hive_unittest_metastore_db ): @@ -653,7 +653,7 @@ def test_load_table_to_from_delta_for_recipient_profile_testing( ) -@pytest.mark.django_db(databases=[settings.DATA_BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) +@pytest.mark.django_db(databases=[settings.BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) def test_load_table_to_from_delta_for_transaction_search( spark, s3_unittest_data_bucket, populate_usas_data_and_recipients_from_broker, hive_unittest_metastore_db ): @@ -771,7 +771,7 @@ def test_load_table_to_from_delta_for_transaction_search_alt_db_and_name( # ) -@pytest.mark.django_db(databases=[settings.DATA_BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) +@pytest.mark.django_db(databases=[settings.BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) def test_load_table_to_from_delta_for_award_search( spark, s3_unittest_data_bucket, populate_usas_data_and_recipients_from_broker, hive_unittest_metastore_db ): @@ -813,7 +813,7 @@ def test_load_table_to_from_delta_for_award_search( verify_delta_table_loaded_from_delta(spark, "award_search", jdbc_inserts=True) # test alt write strategy -@pytest.mark.django_db(databases=[settings.DATA_BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) +@pytest.mark.django_db(databases=[settings.BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) def test_incremental_load_table_to_delta_for_award_search( spark, s3_unittest_data_bucket, populate_usas_data_and_recipients_from_broker, hive_unittest_metastore_db ): @@ -893,7 +893,7 @@ def test_incremental_load_table_to_delta_for_award_search( pd.testing.assert_frame_equal(result, expected) -@pytest.mark.django_db(databases=[settings.DATA_BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) +@pytest.mark.django_db(databases=[settings.BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) def test_incremental_load_table_to_delta_for_transaction_search( spark, s3_unittest_data_bucket, populate_usas_data_and_recipients_from_broker, hive_unittest_metastore_db ): @@ -973,7 +973,7 @@ def test_incremental_load_table_to_delta_for_transaction_search( pd.testing.assert_frame_equal(result, expected) -@pytest.mark.django_db(databases=[settings.DATA_BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) +@pytest.mark.django_db(databases=[settings.BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) def test_load_table_to_delta_for_sam_recipient(spark, s3_unittest_data_bucket, populate_broker_data): expected_data = [ { @@ -1004,10 +1004,10 @@ def test_load_table_to_delta_for_sam_recipient(spark, s3_unittest_data_bucket, p @pytest.mark.skipif( - settings.DATA_BROKER_DB_ALIAS not in settings.DATABASES, + settings.BROKER_DB_ALIAS not in settings.DATABASES, reason="'data_broker' database not configured in django settings.DATABASES.", ) -@pytest.mark.django_db(databases=[settings.DATA_BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) +@pytest.mark.django_db(databases=[settings.BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) def test_load_table_to_delta_for_summary_state_view( spark, s3_unittest_data_bucket, populate_usas_data_and_recipients_from_broker, hive_unittest_metastore_db ): diff --git a/usaspending_api/etl/tests/integration/test_load_to_from_delta_subawards.py b/usaspending_api/etl/tests/integration/test_load_to_from_delta_subawards.py index 8edb3a0c9e..dd88ae5fc5 100644 --- a/usaspending_api/etl/tests/integration/test_load_to_from_delta_subawards.py +++ b/usaspending_api/etl/tests/integration/test_load_to_from_delta_subawards.py @@ -31,7 +31,7 @@ } -@pytest.mark.django_db(databases=[settings.DATA_BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) +@pytest.mark.django_db(databases=[settings.BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) def test_load_table_to_from_delta_for_subawards( spark, s3_unittest_data_bucket, diff --git a/usaspending_api/etl/tests/integration/test_spark_app.py b/usaspending_api/etl/tests/integration/test_spark_app.py index 8fa6a42e9b..09387d99f1 100644 --- a/usaspending_api/etl/tests/integration/test_spark_app.py +++ b/usaspending_api/etl/tests/integration/test_spark_app.py @@ -252,7 +252,7 @@ def test_spark_write_to_s3_delta_from_db( @mark.skipif( - settings.DATA_BROKER_DB_ALIAS not in settings.DATABASES, + settings.BROKER_DB_ALIAS not in settings.DATABASES, reason="'data_broker' database not configured in django settings.DATABASES.", ) @mark.django_db(transaction=True) diff --git a/usaspending_api/etl/tests/integration/test_update_delta_award_with_subaward_counts.py b/usaspending_api/etl/tests/integration/test_update_delta_award_with_subaward_counts.py index 8e2682ca30..f485566153 100644 --- a/usaspending_api/etl/tests/integration/test_update_delta_award_with_subaward_counts.py +++ b/usaspending_api/etl/tests/integration/test_update_delta_award_with_subaward_counts.py @@ -63,7 +63,7 @@ def initial_award_and_subaward_data(): "sub_action_date": "2024-01-01", }, ] - with connections[settings.DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[settings.BROKER_DB_ALIAS].cursor() as cursor: for subaward in broker_subawards: cursor.execute( """ @@ -80,7 +80,7 @@ def initial_award_and_subaward_data(): "subaward_to_delete": broker_subawards[2], } - with connections[settings.DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[settings.BROKER_DB_ALIAS].cursor() as cursor: cursor.execute(f"TRUNCATE subaward") cursor.execute(f"SELECT COUNT(*) FROM subaward") assert cursor.fetchall()[0][0] == 0 @@ -88,7 +88,7 @@ def initial_award_and_subaward_data(): @pytest.fixture def create_new_subaward(initial_award_and_subaward_data): - with connections[settings.DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[settings.BROKER_DB_ALIAS].cursor() as cursor: subaward = { "unique_award_key": initial_award_and_subaward_data[ "manipulated_award_with_subawards" @@ -111,7 +111,7 @@ def create_new_subaward(initial_award_and_subaward_data): @pytest.fixture def delete_one_subaward(initial_award_and_subaward_data): subaward_to_delete = initial_award_and_subaward_data["subaward_to_delete"] - with connections[settings.DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[settings.BROKER_DB_ALIAS].cursor() as cursor: cursor.execute( """ DELETE FROM subaward WHERE subaward_number = %(subaward_number)s @@ -124,7 +124,7 @@ def delete_one_subaward(initial_award_and_subaward_data): @pytest.fixture def delete_all_subawards(initial_award_and_subaward_data): subaward_to_delete = initial_award_and_subaward_data["subaward_to_delete"] - with connections[settings.DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[settings.BROKER_DB_ALIAS].cursor() as cursor: cursor.execute( """ DELETE FROM subaward WHERE unique_award_key = %(unique_award_key)s @@ -134,7 +134,7 @@ def delete_all_subawards(initial_award_and_subaward_data): return initial_award_and_subaward_data -@pytest.mark.django_db(transaction=True, databases=[settings.DATA_BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS]) +@pytest.mark.django_db(transaction=True, databases=[settings.BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS]) @pytest.mark.parametrize( "fixture_name,expected_manipulated_value", [ diff --git a/usaspending_api/etl/tests/integration/test_update_file_c_linkages_in_delta.py b/usaspending_api/etl/tests/integration/test_update_file_c_linkages_in_delta.py index 9a0271a656..45697ee805 100644 --- a/usaspending_api/etl/tests/integration/test_update_file_c_linkages_in_delta.py +++ b/usaspending_api/etl/tests/integration/test_update_file_c_linkages_in_delta.py @@ -1,7 +1,9 @@ from datetime import datetime from django.core.management import call_command +from django.db import connection from pytest import mark +from usaspending_api.awards.models import CToDLinkageUpdates from usaspending_api.common.helpers.spark_helpers import load_dict_to_delta_table starting_update_date = datetime.utcfromtimestamp(0) @@ -226,8 +228,18 @@ def test_update_file_c_linkages_in_delta(spark, s3_unittest_data_bucket, hive_un spark, s3_unittest_data_bucket, "int", "financial_accounts_by_awards", full_int_faba_records, True ) + # Validate that the table has indexes before running the update + with connection.cursor() as cursor: + cursor.execute(f"SELECT COUNT(*) FROM pg_indexes WHERE tablename = '{CToDLinkageUpdates._meta.db_table}'") + assert cursor.fetchone()[0] > 0 + call_command("update_file_c_linkages_in_delta", "--no-clone", "--spark-s3-bucket", s3_unittest_data_bucket) + # Validate that indexes are still present after the command + with connection.cursor() as cursor: + cursor.execute(f"SELECT COUNT(*) FROM pg_indexes WHERE tablename = '{CToDLinkageUpdates._meta.db_table}'") + assert cursor.fetchone()[0] > 0 + # Verify mapping of FABA->Awards matches the expected results output_faba_to_award_id = read_output_faba_to_award_id(spark) for faba_id, expected_award_id in expected_faba_to_award_id.items(): diff --git a/usaspending_api/references/delta_models/__init__.py b/usaspending_api/references/delta_models/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/usaspending_api/references/delta_models/world_cities.py b/usaspending_api/references/delta_models/world_cities.py new file mode 100644 index 0000000000..12e8c41266 --- /dev/null +++ b/usaspending_api/references/delta_models/world_cities.py @@ -0,0 +1,37 @@ +from pyspark.sql.types import ( + BooleanType, + DecimalType, + FloatType, + IntegerType, + LongType, + StringType, + StructField, + StructType, +) + +schema = StructType( + [ + StructField("city", StringType(), False), + StructField("city_ascii", StringType()), + StructField("city_alt", StringType()), + StructField("city_local", StringType()), + StructField("city_local_lang", StringType()), + StructField("lat", DecimalType(6, 4), False), + StructField("lng", DecimalType(7, 4), False), + StructField("country", StringType(), False), + StructField("iso2", StringType(), False), + StructField("iso3", StringType(), False), + StructField("admin_name", StringType()), + StructField("admin_name_ascii", StringType()), + StructField("admin_code", StringType()), + StructField("admin_type", StringType()), + StructField("capital", StringType()), + StructField("density", FloatType(), False), + StructField("population", IntegerType()), + StructField("population_proper", IntegerType()), + StructField("ranking", StringType(), False), + StructField("timezone", StringType(), False), + StructField("same_name", BooleanType(), False), + StructField("id", LongType(), False), + ] +) diff --git a/usaspending_api/references/management/commands/load_gtas.py b/usaspending_api/references/management/commands/load_gtas.py index cbc18c7455..4875324cc0 100644 --- a/usaspending_api/references/management/commands/load_gtas.py +++ b/usaspending_api/references/management/commands/load_gtas.py @@ -54,7 +54,7 @@ def handle(self, *args, **options): @transaction.atomic() def process_data(self): - broker_cursor = connections[settings.DATA_BROKER_DB_ALIAS].cursor() + broker_cursor = connections[settings.BROKER_DB_ALIAS].cursor() logger.info("Extracting data from Broker") broker_cursor.execute(self.broker_fetch_sql) diff --git a/usaspending_api/references/management/commands/load_offices.py b/usaspending_api/references/management/commands/load_offices.py index 0084fba7e1..48916f7134 100644 --- a/usaspending_api/references/management/commands/load_offices.py +++ b/usaspending_api/references/management/commands/load_offices.py @@ -27,7 +27,7 @@ def handle(self, *args, **options): @transaction.atomic() def process_data(self): - broker_cursor = connections[settings.DATA_BROKER_DB_ALIAS].cursor() + broker_cursor = connections[settings.BROKER_DB_ALIAS].cursor() logger.info("Extracting data from Broker") broker_cursor.execute(self.broker_fetch_sql) diff --git a/usaspending_api/references/tests/integration/test_assistance_listing.py b/usaspending_api/references/tests/integration/test_assistance_listing.py index f1b2c9c90a..4e3a746477 100644 --- a/usaspending_api/references/tests/integration/test_assistance_listing.py +++ b/usaspending_api/references/tests/integration/test_assistance_listing.py @@ -91,8 +91,18 @@ def test_filter_without_code(client, assistance_listings_test_data): resp = client.get("/api/v2/references/assistance_listing/?filter=Title 1", content_type="application/json") expected_results = [ - {"code": "10", "description": None, "count": 1}, - {"code": "11", "description": None, "count": 1}, + { + "code": "10", + "description": None, + "count": 1, + "children": [{"code": "10.001", "description": "CFDA Title 1"}], + }, + { + "code": "11", + "description": None, + "count": 1, + "children": [{"code": "11.004", "description": "CFDA Title 1"}], + }, ] assert resp.status_code == status.HTTP_200_OK assert resp.json() == expected_results diff --git a/usaspending_api/references/tests/integration/test_load_gtas_mgmt_cmd.py b/usaspending_api/references/tests/integration/test_load_gtas_mgmt_cmd.py index ee9a5c3aa2..de70c3df2a 100644 --- a/usaspending_api/references/tests/integration/test_load_gtas_mgmt_cmd.py +++ b/usaspending_api/references/tests/integration/test_load_gtas_mgmt_cmd.py @@ -23,7 +23,7 @@ def test_program_activity_fresh_load(monkeypatch): data_broker_mock.cursor.return_value = PhonyCursor("usaspending_api/references/tests/data/broker_gtas.json") mock_connections = { settings.DEFAULT_DB_ALIAS: MagicMock(), - settings.DATA_BROKER_DB_ALIAS: data_broker_mock, + settings.BROKER_DB_ALIAS: data_broker_mock, } monkeypatch.setattr("usaspending_api.references.management.commands.load_gtas.connections", mock_connections) diff --git a/usaspending_api/references/tests/integration/test_load_offices.py b/usaspending_api/references/tests/integration/test_load_offices.py index a6b7b7b01d..58a203d64a 100644 --- a/usaspending_api/references/tests/integration/test_load_offices.py +++ b/usaspending_api/references/tests/integration/test_load_offices.py @@ -19,7 +19,7 @@ def test_load_offices(monkeypatch): data_broker_mock.cursor.return_value = PhonyCursor("usaspending_api/references/tests/data/broker_offices.json") mock_connections = { settings.DEFAULT_DB_ALIAS: MagicMock(), - settings.DATA_BROKER_DB_ALIAS: data_broker_mock, + settings.BROKER_DB_ALIAS: data_broker_mock, } monkeypatch.setattr("usaspending_api.references.management.commands.load_offices.connections", mock_connections) diff --git a/usaspending_api/references/tests/integration/test_load_park.py b/usaspending_api/references/tests/integration/test_load_park.py index 0b05e8783d..e3c6802608 100644 --- a/usaspending_api/references/tests/integration/test_load_park.py +++ b/usaspending_api/references/tests/integration/test_load_park.py @@ -4,12 +4,12 @@ from django.db import DEFAULT_DB_ALIAS, connections from usaspending_api.references.models import ProgramActivityPark -from usaspending_api.settings import DATA_BROKER_DB_ALIAS +from usaspending_api.settings import BROKER_DB_ALIAS @pytest.fixture def setup_broker_data(broker_db_setup): - with connections[DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[BROKER_DB_ALIAS].cursor() as cursor: cursor.execute( """ INSERT INTO program_activity_park (program_activity_park_id, fiscal_year, period, agency_id, allocation_transfer_id, main_account_number, sub_account_number, park_code, park_name) @@ -19,11 +19,11 @@ def setup_broker_data(broker_db_setup): """ ) yield - with connections[DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[BROKER_DB_ALIAS].cursor() as cursor: cursor.execute("TRUNCATE program_activity_park;") -@pytest.mark.django_db(databases=[DATA_BROKER_DB_ALIAS, DEFAULT_DB_ALIAS], transaction=True) +@pytest.mark.django_db(databases=[BROKER_DB_ALIAS, DEFAULT_DB_ALIAS], transaction=True) def test_load_park(setup_broker_data): actual_park_count = ProgramActivityPark.objects.count() assert actual_park_count == 0 @@ -41,7 +41,7 @@ def test_load_park(setup_broker_data): actual_park = list(ProgramActivityPark.objects.order_by("code").values_list("code", "name")) assert actual_park == expected_results - with connections[DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[BROKER_DB_ALIAS].cursor() as cursor: cursor.execute( """ INSERT INTO program_activity_park (program_activity_park_id, fiscal_year, period, agency_id, allocation_transfer_id, main_account_number, sub_account_number, park_code, park_name) diff --git a/usaspending_api/references/v2/views/filter_tree/assistance_listing.py b/usaspending_api/references/v2/views/filter_tree/assistance_listing.py index df3e136352..7a8e849e97 100644 --- a/usaspending_api/references/v2/views/filter_tree/assistance_listing.py +++ b/usaspending_api/references/v2/views/filter_tree/assistance_listing.py @@ -44,6 +44,8 @@ def _business_logic(self, cfda_code: int | None, cfda_filter: str | None) -> Que if cfda_code: qs = qs.filter(program_number__startswith=cfda_code) + + if cfda_code or cfda_filter: annotations["children"] = ArrayAgg( Func( Cast(Value("code"), TextField()), diff --git a/usaspending_api/search/delta_models/award_search.py b/usaspending_api/search/delta_models/award_search.py index 8610436623..b313e7f90f 100644 --- a/usaspending_api/search/delta_models/award_search.py +++ b/usaspending_api/search/delta_models/award_search.py @@ -659,7 +659,7 @@ """, f""" MERGE INTO {{DESTINATION_DATABASE}}.{{DESTINATION_TABLE}} AS t - USING (SELECT * FROM temp_award_search_view) AS s + USING temp_award_search_view AS s ON t.award_id = s.award_id WHEN MATCHED AND ({" OR ".join([f"NOT (s.{col} <=> t.{col})" for col in AWARD_SEARCH_DELTA_COLUMNS])}) diff --git a/usaspending_api/settings.py b/usaspending_api/settings.py index 1073a249ad..52a231f915 100644 --- a/usaspending_api/settings.py +++ b/usaspending_api/settings.py @@ -133,12 +133,14 @@ ) SERVER_BASE_URL = FILES_SERVER_BASE_URL[FILES_SERVER_BASE_URL.find(".") + 1 :] +DATA_DICTIONARY_FILE_NAME = "Data_Dictionary_Crosswalk.xlsx" + AGENCY_DOWNLOAD_URL = f"{FILES_SERVER_BASE_URL}/reference_data/agency_codes.csv" -DATA_DICTIONARY_DOWNLOAD_URL = f"{FILES_SERVER_BASE_URL}/docs/Data_Dictionary_Crosswalk.xlsx" +DATA_DICTIONARY_DOWNLOAD_URL = f"{FILES_SERVER_BASE_URL}/docs/{DATA_DICTIONARY_FILE_NAME}" # S3 Bucket and Key to retrieve the Data Dictionary DATA_DICTIONARY_S3_BUCKET_NAME = f"dti-da-public-files-{'nonprod' if CONFIG.ENV_CODE not in ('prd', 'stg') else 'prod'}" -DATA_DICTIONARY_S3_KEY = "user_reference_docs/Data_Dictionary_Crosswalk.xlsx" +DATA_DICTIONARY_S3_KEY = f"user_reference_docs/{DATA_DICTIONARY_FILE_NAME}" # Local download files IDV_DOWNLOAD_README_FILE_PATH = str(APP_DIR / "data" / "idv_download_readme.txt") @@ -324,12 +326,12 @@ def _configure_database_connection(environment_variable, test_options=None): ) # import a second database connection for ETL, connecting to data broker -# using the environment variable, DATA_BROKER_DATABASE_URL - only if it is set -DATA_BROKER_DB_ALIAS = "data_broker" -if os.environ.get("DATA_BROKER_DATABASE_URL"): - DATABASES[DATA_BROKER_DB_ALIAS] = _configure_database_connection("DATA_BROKER_DATABASE_URL") +# using the environment variable, BROKER_DB - only if it is set +BROKER_DB_ALIAS = "data_broker" +if os.environ.get("BROKER_DB"): + DATABASES[BROKER_DB_ALIAS] = _configure_database_connection("BROKER_DB") -DATA_BROKER_DBLINK_NAME = "broker_server" +BROKER_DBLINK_NAME = "broker_server" # Password validation # https://docs.djangoproject.com/en/3.2/ref/settings/#auth-password-validators diff --git a/usaspending_api/submissions/management/commands/load_dabs_submission_window_schedule.py b/usaspending_api/submissions/management/commands/load_dabs_submission_window_schedule.py index 8b15d202c0..291e711159 100644 --- a/usaspending_api/submissions/management/commands/load_dabs_submission_window_schedule.py +++ b/usaspending_api/submissions/management/commands/load_dabs_submission_window_schedule.py @@ -145,7 +145,7 @@ def handle(self, *args, **options): def generate_schedules_from_broker(self): logger.info("Creating broker cursor") - broker_cursor = connections[settings.DATA_BROKER_DB_ALIAS].cursor() + broker_cursor = connections[settings.BROKER_DB_ALIAS].cursor() logger.info("Running MONTH_SCHEDULE_SQL") broker_cursor.execute(MONTH_SCHEDULE_SQL) diff --git a/usaspending_api/tests/conftest_spark.py b/usaspending_api/tests/conftest_spark.py index 86075fc2bb..587c877b63 100644 --- a/usaspending_api/tests/conftest_spark.py +++ b/usaspending_api/tests/conftest_spark.py @@ -237,7 +237,7 @@ def populate_broker_data(broker_server_dblink_setup): ), } insert_statement = "INSERT INTO %(table_name)s (%(columns)s) VALUES %(values)s" - with connections[settings.DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[settings.BROKER_DB_ALIAS].cursor() as cursor: for table_name, rows in broker_data.items(): # An assumption is made that each set of rows have the same columns in the same order columns = list(rows[0]) @@ -249,7 +249,7 @@ def populate_broker_data(broker_server_dblink_setup): cursor.execute(sql_string) yield # Cleanup test data for each Broker test table - with connections[settings.DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[settings.BROKER_DB_ALIAS].cursor() as cursor: for table in broker_data: cursor.execute(f"TRUNCATE TABLE {table} RESTART IDENTITY CASCADE") diff --git a/usaspending_api/transactions/agnostic_transaction_loader.py b/usaspending_api/transactions/agnostic_transaction_loader.py index 1a50484821..fad5f01c43 100644 --- a/usaspending_api/transactions/agnostic_transaction_loader.py +++ b/usaspending_api/transactions/agnostic_transaction_loader.py @@ -184,7 +184,7 @@ def combine_sql(self): def copy_broker_table_data(self, source_tablename, dest_tablename, primary_key): """Loop through the batches of IDs and load using the ETL tables""" destination = ETLTable(dest_tablename, schema_name="raw") - source = ETLDBLinkTable(source_tablename, settings.DATA_BROKER_DBLINK_NAME, destination.data_types) + source = ETLDBLinkTable(source_tablename, settings.BROKER_DBLINK_NAME, destination.data_types) transactions_remaining_count = self.total_ids_to_process for id_list in read_file_for_database_ids(str(self.file_path), self.chunk_size, is_numeric=False): diff --git a/usaspending_api/transactions/delta_models/transaction_id_lookup.py b/usaspending_api/transactions/delta_models/transaction_id_lookup.py new file mode 100644 index 0000000000..098851ba4f --- /dev/null +++ b/usaspending_api/transactions/delta_models/transaction_id_lookup.py @@ -0,0 +1,10 @@ +from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType + + +TRANSACTION_ID_LOOKUP_SCHEMA = StructType( + [ + StructField("transaction_id", LongType(), False), + StructField("is_fpds", BooleanType(), False), + StructField("transaction_unique_id", StringType(), False), + ] +) diff --git a/usaspending_api/transactions/management/commands/delete_assistance_records.py b/usaspending_api/transactions/management/commands/delete_assistance_records.py index d100c62985..4c45f648fa 100644 --- a/usaspending_api/transactions/management/commands/delete_assistance_records.py +++ b/usaspending_api/transactions/management/commands/delete_assistance_records.py @@ -39,7 +39,7 @@ def fetch_deleted_transactions(self) -> Optional[dict]: ) and is_active is not true """ - with connections[settings.DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[settings.BROKER_DB_ALIAS].cursor() as cursor: cursor.execute(sql, [date_time]) results = cursor.fetchall() @@ -67,7 +67,7 @@ def store_delete_records(self, deleted_dict: dict) -> None: id_list += id_list if len(id_list) > 0: - with connections[settings.DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[settings.BROKER_DB_ALIAS].cursor() as cursor: cursor.execute(sql.format(ids=tuple(id_list))) afa_id_list = cursor.fetchall() records = [afa_id[0] for afa_id in afa_id_list] diff --git a/usaspending_api/transactions/tests/integration/test_assistance_etl.py b/usaspending_api/transactions/tests/integration/test_assistance_etl.py index f2b80c7006..39c17ff538 100644 --- a/usaspending_api/transactions/tests/integration/test_assistance_etl.py +++ b/usaspending_api/transactions/tests/integration/test_assistance_etl.py @@ -131,20 +131,20 @@ def load_broker_data(db, broker_server_dblink_setup): (E'2018-11-28 20:12:42.541441',E'2018-11-28 20:12:42.541445',89126462,E'20181003',E'A',E'10',E'AGRICULTURAL RISK COVERAGE PROG - COUNTY',E'Redacted due to PII',NULL,E'012',E'12346T',E'12D2',NULL,E'NON',E'P',E'10.112',NULL,NULL,E'12FA00PY58106320',1499,NULL,E'012',E'12346T',E'12D2',NULL,NULL,NULL,E'USA',NULL,NULL,NULL,E'52565',NULL,0,NULL,E'20181003',E'20181003',E'IA40935',E'02',E'USA',NULL,E'52565',3,E'SAI EXEMPT',NULL,E'02',E'1499.00',E'Price Loss Coverage',E'Department of Agriculture (USDA)',E'Farm Service Agency',E'Department of Agriculture (USDA)',E'Farm Service Agency',NULL,E'Van Buren',E'Iowa',E'KEOSAUQUA',E'KEOSAUQUA',E'177',E'Van Buren',E'IA',E'Iowa',E'2018-11-28 20:12:42.540405',E'12D2_12FA00PY58106320_-none-_10.112_-none-',TRUE,E'Deputy Administrator Farm Programs',E'Deputy Administrator Farm Programs',E'40935',NULL,E'UNITED STATES',E'UNITED STATES',E'177',14903,NULL,E'52565',NULL,E'{{individuals}}',E'New',E'direct payment with unrestricted use (retirement, pension, veterans benefits, etc.) (D)',E'Not Recovery Act',E'Individual',NULL,E'Non-Aggregate Record to an Individual Recipient (PII-Redacted)',NULL,NULL,E'ASST_NON_12FA00PY58106320_12D2',NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,E'Single ZIP Code',NULL,NULL,NULL,NULL,NULL), (E'2017-09-16 23:41:06.101705',E'2017-09-16 23:41:06.101705',46763320,E'09/22/2016',E'C',E'06',E'GRANT PROGRAM',E'Central Carolina Technical College',E'073708414',E'091',NULL,E'9100',E'0008',E'NON',E'06',E'84.063',NULL,0,E'P063P162482',2665,NULL,NULL,NULL,NULL,E'506 N Guignard Dr',NULL,NULL,E'USA',NULL,NULL,NULL,E'29150',E'2468',0,0,E'08/31/2022',E'03/23/2016',E'4570405',E'05',E'USA',NULL,E'291502468',2,E'SAI NOT AVAILABLE',NULL,E'05',E'2665.0',E'PELL',E'Department of Education (ED)',E'Department of Education',NULL,NULL,TRUE,E'Sumter',E'SOUTH CAROLINA',E'SUMTER',E'Sumter',E'085',E'Sumter',E'SC',E'South Carolina',E'2016-10-05 00:00:00',E'9100_P063P162482_-none-_84.063_0008',TRUE,NULL,NULL,E'70405',NULL,E'UNITED STATES',E'UNITED STATES',E'085',NULL,E'SC',E'29150',E'2468',NULL,NULL,NULL,NULL,NULL,NULL,NULL,E'CENTRAL CAROLINA TECHNICAL COLLEGE',E'073708414',E'ASST_NON_P063P162482_9100',NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL); """ - with connections[settings.DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[settings.BROKER_DB_ALIAS].cursor() as cursor: cursor.execute(insert_test_data) cursor.execute(f"SELECT COUNT(*) FROM {BROKER_TABLE}") assert cursor.fetchall()[0][0] == NUMBER_OF_SOURCE_RECORDS yield - with connections[settings.DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[settings.BROKER_DB_ALIAS].cursor() as cursor: cursor.execute(f"TRUNCATE {BROKER_TABLE}") cursor.execute(f"SELECT COUNT(*) FROM {BROKER_TABLE}") assert cursor.fetchall()[0][0] == 0 -@pytest.mark.django_db(databases=[settings.DATA_BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) +@pytest.mark.django_db(databases=[settings.BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) def test_data_transfer_from_broker(load_broker_data): call_command("transfer_assistance_records", "--reload-all") table = SourceAssistanceTransaction().table_name @@ -266,7 +266,7 @@ def test_data_transfer_from_broker(load_broker_data): ) -@pytest.mark.django_db(databases=[settings.DATA_BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) +@pytest.mark.django_db(databases=[settings.BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) def test_correction_overwrites_when_afa_casing_is_different(load_broker_data): """Verify that if a correction comes in for a FABS record that has the same case-INSENSITIVE afa_generated_unique key, but in fact has different letter-casing than the original, that it will STILL replace the record and put its @@ -302,7 +302,7 @@ def test_correction_overwrites_when_afa_casing_is_different(load_broker_data): (E'2017-09-16 22:22:42.760993','{insert_now_time}',{new_published_fabs_id},E'07/12/2017',E'C',E'06',E'UNKNOWN TITLE',E'Columbus State Community College',NULL,E'091',NULL,E'9100',E'3',E'NON',E'06',E'84.033',E'C',0,E'P033A173267',520000,NULL,E'091',NULL,NULL,E'550 E Spring St',NULL,NULL,E'USA',NULL,NULL,NULL,E'43215',E'1722',0,0,E'08/31/2023',NULL,E'OH18000',E'03',E'USA',NULL,E'432151722',2,NULL,NULL,E'03',E'520000.0',E'Federal Work-Study Program',E'Department of Education (ED)',E'Department of Education',E'EDUCATION, DEPARTMENT OF (9100)',NULL,TRUE,E'Delaware',E'Ohio',E'COLUMBUS',E'Columbus',E'049',E'Franklin',E'OH',E'Ohio',E'2017-07-21 00:00:00','{case_change_afa}',TRUE,NULL,NULL,NULL,NULL,E'UNITED STATES',E'UNITED STATES',E'041',NULL,E'OH',E'43215',E'1722',NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,E'ASST_NON_P033A173267_9100',NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,E'Single ZIP Code',E'awardee-uei',E'parent-uei',E'funding-opportunity-goals',E'funding-opportunity-number',123456) ; """ - with connections[settings.DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[settings.BROKER_DB_ALIAS].cursor() as cursor: cursor.execute(update_fabs_record) cursor.execute(insert_corrected_fabs_record) cursor.execute(f"SELECT COUNT(*) FROM {BROKER_TABLE}") @@ -430,7 +430,7 @@ def test_correction_overwrites_when_afa_casing_is_different(load_broker_data): @override_settings(IS_LOCAL=False) -@pytest.mark.django_db(databases=[settings.DATA_BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) +@pytest.mark.django_db(databases=[settings.BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) def test_delete(load_broker_data): # Load initial Broker data into USAspending call_command("transfer_assistance_records", "--reload-all") @@ -475,7 +475,7 @@ def test_delete(load_broker_data): WHERE UPPER({transaction_unique_field}) IN %s """ - with connections[settings.DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[settings.BROKER_DB_ALIAS].cursor() as cursor: cursor.execute(insert_deleted_transactions) cursor.execute(f"SELECT COUNT(*) FROM {BROKER_TABLE}") assert cursor.fetchall()[0][0] == NUMBER_OF_SOURCE_RECORDS + 4 diff --git a/usaspending_api/transactions/tests/integration/test_procurement_etl.py b/usaspending_api/transactions/tests/integration/test_procurement_etl.py index 6d983e92d5..50fcc6f80e 100644 --- a/usaspending_api/transactions/tests/integration/test_procurement_etl.py +++ b/usaspending_api/transactions/tests/integration/test_procurement_etl.py @@ -128,20 +128,20 @@ def load_broker_data(db, broker_server_dblink_setup): (E'2019-09-29 13:18:51.335141',E'2019-09-30 13:02:22.556656',120435586,E'4732_4732_47QSEA19FAH3G_0_GS07Q1647AJN5N_0',E'47QSEA19FAH3G',E'4732',E'4732',E'FEDERAL ACQUISITION SERVICE',E'047',E'General Services Administration (GSA)',E'GS07Q1647AJN5N',E'0',E'K',E'FIXED PRICE WITH ECONOMIC PRICE ADJUSTMENT',E'A',E'BPA CALL',E'424130',E'INDUSTRIAL AND PERSONAL SERVICE PAPER MERCHANT WHOLESALERS',E'014654966',E'CAPP INC.',E'014654966',E'PAPER, TOILET: - SEE ATTACHED DOCUMENT FOR DETAIL.',E'190182414',E'CLIFTON HEIGHTS',E'DELAWARE',E'05',E'CAPP, INC.',E'CLIFTON HEIGHTS',E'PA',E'PENNSYLVANIA',E'190182414',E'05',E'201 MARPLE AVE',NULL,NULL,E'USA',E'UNITED STATES',E'2019-09-26 00:00:00',E'2019-10-02 00:00:00',E'2019-10-02 00:00:00',NULL,E'2019-09-26 00:00:00',NULL,NULL,231.4,E'231.40',E'231.40',E'4732',E'FEDERAL ACQUISITION SERVICE',E'47QSEA',E'GSA/FAS ADMIN SVCS ACQUISITION BR(2',E'47QSEA',E'GSA/FAS ADMIN SVCS ACQUISITION BR(2',E'4732',E'FEDERAL ACQUISITION SERVICE',E'047',E'General Services Administration (GSA)',NULL,E'PA',E'PENNSYLVANIA',E'USA',E'UNITED STATES',NULL,NULL,E'E',E'BPA',NULL,E'6103941169',E'6103941110',NULL,NULL,E'S',E'SINGLE AWARD',NULL,NULL,E'N',E'NO',NULL,NULL,E'N',E'NO',E'A',E'COMMERCIAL ITEM',NULL,NULL,E'D',E'NOT CONSOLIDATED',E'X',E'NOT APPLICABLE',E'H',E'NOT BUNDLED',NULL,NULL,E'S',E'SMALL BUSINESS',NULL,NULL,NULL,NULL,E'USA',E'UNITED STATES',E'N',E'NO',E'NONE',E'NO PREFERENCE USED',E'A',E'FULL AND OPEN COMPETITION',NULL,NULL,E'X',E'NOT APPLICABLE',E'N',E'Transaction does not use GFE/GFP',NULL,NULL,E'X',E'NOT APPLICABLE',E'N',E'NO',E'JANSAN REQUISITION CHANNEL BPA',E'N',E'NO',NULL,NULL,E'NONE',E'NONE',E'1',NULL,NULL,E'X',E'NOT APPLICABLE',E'D',E'MFG IN U.S.',NULL,E'7510',E'OFFICE SUPPLIES',NULL,NULL,NULL,E'A',E'FAR 52.223-4 INCLUDED',NULL,NULL,NULL,NULL,E'X',E'NOT APPLICABLE',NULL,E'MAFO',E'SUBJECT TO MULTIPLE AWARD FAIR OPPORTUNITY',E'FAIR',E'FAIR OPPORTUNITY GIVEN',NULL,NULL,NULL,NULL,NULL,NULL,E'A',E'MEETS REQUIREMENTS',E'X',E'NOT APPLICABLE',E'0',NULL,NULL,E'PM0011',E'X',E'NO',E'A',E'U.S. OWNED BUSINESS',E'award',E'2019-09-28 13:25:13',NULL,NULL,NULL,NULL,NULL,E'190182414',NULL,NULL,NULL,E'014654966',E'2019-09-28 13:25:10',E'231.40',E'231.40',E'231.40',NULL,NULL,NULL,NULL,E'045',E'DELAWARE',E'19018',E'2414',E'045',E'19018',E'2414',E'{{category_business,small_business,us_owned_business,special_designations,corporate_entity_not_tax_exempt}}',E'2B510',NULL,E'CORPORATE NOT TAX EXEMPT',NULL,E'CONT_AWD_47QSEA19FAH3G_4732_GS07Q1647AJN5N_4732',NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,E'NONE: NONE OF THE ABOVE',NULL,NULL,NULL,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,TRUE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,TRUE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,TRUE,FALSE,FALSE,FALSE,E'D&B'), (E'2019-10-10 13:04:45.747034',E'2020-01-04 14:03:12.506889',121983859,E'1540_1540_15B50120FVB120016_0_DJBP0700NASBPA095A_0',E'15B50120FVB120016',E'1540',E'1540',E'FEDERAL PRISON SYSTEM / BUREAU OF PRISONS',E'015',E'Department of Justice (DOJ)',E'DJBP0700NASBPA095A',E'0',E'J',E'FIRM FIXED PRICE',E'A',E'BPA CALL',E'621511',E'MEDICAL LABORATORIES',E'056354640',E'QUEST DIAGNOSTICS INCORPORATED',E'056354640',E'QUEST DIAGNOSTICS - HEPATITIS TESTING - 31J INCREASE AMOUNT',E'079401027',E'MADISON',E'MORRIS',E'11',E'QUEST DIAGNOSTICS INCORPORATED',E'MADISON',E'NJ',E'NEW JERSEY',E'079401027',E'11',E'3 GIRALDA FARMS',NULL,NULL,E'USA',E'UNITED STATES',E'2019-10-01 00:00:00',E'2020-01-01 00:00:00',E'2020-01-01 00:00:00',NULL,E'2019-10-09 00:00:00',NULL,NULL,13500,E'13500.00',E'13500.00',E'1540',E'FEDERAL PRISON SYSTEM / BUREAU OF PRISONS',E'15B501',E'FCI BASTROP',E'15B501',E'FCI BASTROP',E'1540',E'FEDERAL PRISON SYSTEM / BUREAU OF PRISONS',E'015',E'Department of Justice (DOJ)',NULL,E'NJ',E'NEW JERSEY',E'USA',E'UNITED STATES',NULL,NULL,E'E',E'BPA',E'QUEST DIAGNOSTICS',E'6156729798',E'6156726746',NULL,NULL,E'S',E'SINGLE AWARD',NULL,NULL,E'N',E'NO',NULL,NULL,E'N',E'NO',E'A',E'COMMERCIAL ITEM',NULL,NULL,E'D',E'NOT CONSOLIDATED',E'X',E'NOT APPLICABLE',E'H',E'NOT BUNDLED',NULL,NULL,E'O',E'OTHER THAN SMALL BUSINESS',NULL,NULL,NULL,NULL,E'USA',E'UNITED STATES',E'X',E'NOT APPLICABLE',E'NONE',E'NO PREFERENCE USED',E'A',E'FULL AND OPEN COMPETITION',NULL,NULL,E'X',E'NOT APPLICABLE',E'N',E'Transaction does not use GFE/GFP',NULL,NULL,E'X',E'NOT APPLICABLE',E'N',E'NO',NULL,E'N',E'NO',NULL,NULL,E'NONE',E'NONE',E'1',NULL,NULL,E'N',E'NO - SERVICE WHERE PBA IS NOT USED.',E'C',E'NOT A MANUFACTURED END PRODUCT',NULL,E'Q301',E'MEDICAL- LABORATORY TESTING',NULL,NULL,NULL,E'C',E'NO CLAUSES INCLUDED AND NO SUSTAINABILITY INCLUDED',NULL,NULL,NULL,NULL,E'X',E'NOT APPLICABLE',NULL,E'MAFO',E'SUBJECT TO MULTIPLE AWARD FAIR OPPORTUNITY',E'FAIR',E'FAIR OPPORTUNITY GIVEN',NULL,NULL,NULL,NULL,NULL,NULL,E'E',E'NOT REQUIRED',E'X',E'NOT APPLICABLE',E'0',NULL,NULL,E'P00008',E'X',E'NO',E'A',E'U.S. OWNED BUSINESS',E'award',E'2020-01-02 09:17:47',NULL,NULL,NULL,NULL,NULL,E'194262998',NULL,NULL,NULL,E'0561682140000PR',E'2019-10-09 08:20:24',E'13500.00',E'13500.00',E'13500.00',NULL,NULL,NULL,NULL,E'027',E'MORRIS',E'07940',E'1027',E'027',E'07940',E'1027',E'{{us_owned_business,other_than_small_business,category_business,special_designations}}',E'1FHB9',E'OT',NULL,E'OTHER FUNCTIONS',E'CONT_AWD_15B50120FVB120016_1540_DJBP0700NASBPA095A_1540',NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,E'NONE: NONE OF THE ABOVE',NULL,NULL,NULL,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,FALSE,E'D&B'); """ - with connections[settings.DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[settings.BROKER_DB_ALIAS].cursor() as cursor: cursor.execute(insert_test_data) cursor.execute(f"SELECT COUNT(*) FROM {BROKER_TABLE}") assert cursor.fetchall()[0][0] == NUMBER_OF_RECORDS yield - with connections[settings.DATA_BROKER_DB_ALIAS].cursor() as cursor: + with connections[settings.BROKER_DB_ALIAS].cursor() as cursor: cursor.execute(f"TRUNCATE {BROKER_TABLE}") cursor.execute(f"SELECT COUNT(*) FROM {BROKER_TABLE}") assert cursor.fetchall()[0][0] == 0 -@pytest.mark.django_db(databases=[settings.DATA_BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) +@pytest.mark.django_db(databases=[settings.BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) def test_data_transfer_from_broker(load_broker_data): call_command("transfer_procurement_records", "--reload-all") table = SourceProcurementTransaction().table_name @@ -473,7 +473,7 @@ def test_data_transfer_from_broker(load_broker_data): @override_settings(IS_LOCAL=False) -@pytest.mark.django_db(databases=[settings.DATA_BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) +@pytest.mark.django_db(databases=[settings.BROKER_DB_ALIAS, settings.DEFAULT_DB_ALIAS], transaction=True) def test_delete(monkeypatch, load_broker_data): # Load initial Broker data into USAspending call_command("transfer_procurement_records", "--reload-all")