Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

4 changes: 2 additions & 2 deletions usaspending_api/awards/models/award.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@


class AwardManager(models.Manager):
def get_queryset(self):
def get_queryset(self) -> models.QuerySet:
"""
A generated award will have these set to null, but will also receive no
transactions. Thus, these will remain null. This finds those awards and
Expand Down Expand Up @@ -141,7 +141,7 @@ class Award(DataSourceTrackedModel):
verbose_name="Combined Base and Exercised Options",
help_text="The sum of the base_exercised_options_val from associated transactions",
)
last_modified_date = models.DateField(blank=True, null=True, help_text="The date this award was last modified")
last_modified_date = models.DateTimeField(blank=True, null=True, help_text="The date this award was last modified")
certified_date = models.DateField(blank=True, null=True, help_text="The date this record was certified")
create_date = models.DateTimeField(
auto_now_add=True, blank=True, null=True, help_text="The date this record was created in the API"
Expand Down
9 changes: 5 additions & 4 deletions usaspending_api/awards/models/transaction_fpds.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ class TransactionFPDS(models.Model):
domestic_or_foreign_entity = models.TextField(blank=True, null=True)
domestic_or_foreign_e_desc = models.TextField(blank=True, null=True)
pulled_from = models.TextField(blank=True, null=True)
last_modified = models.TextField(blank=True, null=True)
last_modified = models.DateTimeField(blank=True, null=True)
cage_code = models.TextField(blank=True, null=True)
inherently_government_func = models.TextField(blank=True, null=True)
inherently_government_desc = models.TextField(blank=True, null=True)
Expand Down Expand Up @@ -352,7 +352,6 @@ class Meta:
FPDS_CASTED_COL_MAP = {
# transaction_fpds col name : type casting search -> fpds
"action_date": "TEXT",
"last_modified": "TEXT",
"period_of_performance_star": "TEXT",
"period_of_performance_curr": "TEXT",
}
Expand All @@ -364,8 +363,10 @@ class Meta:
vw_transaction_fpds_sql = f"""
CREATE OR REPLACE VIEW rpt.vw_transaction_fpds AS
SELECT
{(','+os.linesep+' '*12).join([
(v+(f'::{FPDS_CASTED_COL_MAP[k]}' if k in FPDS_CASTED_COL_MAP else '')).ljust(62)+' AS '+k.ljust(48)
{(',' + os.linesep + ' ' * 12).join([
(
v + (f'::{FPDS_CASTED_COL_MAP[k]}' if k in FPDS_CASTED_COL_MAP else '')
).ljust(62) + ' AS ' + k.ljust(48)
for k, v in FPDS_TO_TRANSACTION_SEARCH_COL_MAP.items()])}
FROM
rpt.transaction_search
Expand Down
75 changes: 47 additions & 28 deletions usaspending_api/etl/management/commands/copy_table_metadata.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import asyncio
import logging
import re
import asyncio
from pprint import pformat

import psycopg2
from django.core.management.base import BaseCommand, CommandParser
from django.db import connection
from django.core.management.base import BaseCommand

from usaspending_api.common.data_connectors.async_sql_query import async_run_creates
from usaspending_api.common.helpers.timing_helpers import ConsoleTimer as Timer
Expand All @@ -14,12 +16,20 @@
# copied from usaspending_api/database_scripts/matview_generator/shared_sql_generator.py
# since that script uses relative imports which do not work well with the rest of the repo code
TEMPLATE = {
"read_indexes": "SELECT indexname, indexdef FROM pg_indexes WHERE schemaname = '{}' AND tablename = '{}';",
"read_constraints": "select conname, pg_get_constraintdef(oid) from pg_constraint where contype IN ('p', 'f', 'u') and conrelid = '{}'::regclass;",
"read_indexes": (
"SELECT indexname, indexdef FROM pg_indexes"
" WHERE schemaname = '{}' AND tablename = '{}'"
" ORDER BY indexname;"
),
"read_constraints": (
"SELECT conname, pg_get_constraintdef(oid) FROM pg_constraint"
" WHERE contype IN ('p', 'f', 'u') AND conrelid = '{}'::regclass"
" ORDER BY conname;"
),
}


def make_read_indexes(table_name):
def make_read_indexes(table_name: str) -> list[str]:
if "." in table_name:
schema_name, table_name = table_name[: table_name.index(".")], table_name[table_name.index(".") + 1 :]
else:
Expand All @@ -28,21 +38,21 @@ def make_read_indexes(table_name):
return [TEMPLATE["read_indexes"].format(schema_name, table_name)]


def make_read_constraints(table_name):
def make_read_constraints(table_name: str) -> list[str]:
return [TEMPLATE["read_constraints"].format(table_name)]


def create_indexes(index_definitions, index_concurrency):
def create_indexes(index_definitions: list[str], index_concurrency: int) -> None:
loop = asyncio.new_event_loop()
loop.run_until_complete(index_with_concurrency(index_definitions, index_concurrency))
loop.close()


async def index_with_concurrency(index_definitions, index_concurrency):
async def index_with_concurrency(index_definitions: list[str], index_concurrency: int) -> list:
semaphore = asyncio.Semaphore(index_concurrency)
tasks = []

async def create_with_sem(sql, index):
async def create_with_sem(sql: str, index: int) -> None:
async with semaphore:
return await async_run_creates(
sql,
Expand All @@ -56,15 +66,15 @@ async def create_with_sem(sql, index):
return await asyncio.gather(*tasks)


def make_copy_constraints(
cursor,
source_table,
dest_table,
drop_foreign_keys=False,
source_suffix="",
dest_suffix="temp",
only_parent_partitioned_table=False,
):
def make_copy_constraints( # noqa: PLR0913
cursor: psycopg2.extensions.cursor,
source_table: str,
dest_table: str,
drop_foreign_keys: bool = False,
source_suffix: str = "",
dest_suffix: str = "temp",
only_parent_partitioned_table: bool = False,
) -> str:
# read the existing indexes
cursor.execute(make_read_constraints(source_table)[0])
src_constrs = dictfetchall(cursor)
Expand All @@ -88,8 +98,13 @@ def make_copy_constraints(


def make_copy_indexes(
cursor, source_table, dest_table, source_suffix="", dest_suffix="temp", only_parent_partitioned_table=False
):
cursor: psycopg2.extensions.cursor,
source_table: str,
dest_table: str,
source_suffix: str = "",
dest_suffix: str = "temp",
only_parent_partitioned_table: bool = False
) -> list[str]:
# read the existing indexes of source table
cursor.execute(make_read_indexes(source_table)[0])
src_indexes = dictfetchall(cursor)
Expand All @@ -114,7 +129,7 @@ def make_copy_indexes(
# for example, a table 'x' in the public schema could be provided and the string will include `public.x'
# Depending on whether the source table is a paritioned table or not, it may or may not already have the ONLY
# clause in its index definition(s)
ix_regex = rf"CREATE\s+.*INDEX\s+\S+\s+ON\s+(ONLY\s+)?(\S+)\s+.*"
ix_regex = r"CREATE\s+.*INDEX\s+\S+\s+ON\s+(ONLY\s+)?(\S+)\s+.*"
regex_groups = re.findall(ix_regex, create_ix_sql)[0]
contains_only = regex_groups[0]
src_table = regex_groups[1]
Expand All @@ -129,20 +144,24 @@ def make_copy_indexes(
return dest_ix_sql


def attach_child_partition_metadata(parent_partitioned_table, child_partition_name, dest_suffix="temp"):
# e.g. parent_parititioned_table=temp.transaction_search_temp
def attach_child_partition_metadata(
parent_partitioned_table: str,
child_partition_name: str,
dest_suffix: str = "temp"
) -> None:
# e.g. parent_partitioned_table=temp.transaction_search_temp
# child_partition_name=temp.transaction_search_fabs_temp

# The parent-child partitions must follow the convention of partition names just adding a suffix to parent tables
# And if a --dest-suffix was provided to this command, it needs to be stripped off first before deriving the
# child partition name suffix
dest_suffix_appendage = "" if not dest_suffix else f"_{dest_suffix}"
dest_suffix_chop_len = len(dest_suffix_appendage)
parent_table_without_schema = re.sub(rf"^.*?\.(.*?)$", rf"\g<1>", parent_partitioned_table)
child_partition_without_schema = re.sub(rf"^.*?\.(.*?)$", rf"\g<1>", child_partition_name)
parent_table_without_schema = re.sub(r"^.*?\.(.*?)$", r"\g<1>", parent_partitioned_table)
child_partition_without_schema = re.sub(r"^.*?\.(.*?)$", r"\g<1>", child_partition_name)
child_partition_suffix = re.sub(
rf"^.*?{parent_table_without_schema[:-dest_suffix_chop_len]}(.*?)$",
rf"\g<1>",
r"\g<1>",
child_partition_without_schema[:-dest_suffix_chop_len],
)

Expand Down Expand Up @@ -184,7 +203,7 @@ class Command(BaseCommand):
This command simply copies the constraints and indexes from one table to another in postgres
"""

def add_arguments(self, parser):
def add_arguments(self, parser: CommandParser) -> None:
parser.add_argument(
"--source-table",
type=str,
Expand Down Expand Up @@ -269,7 +288,7 @@ def add_arguments(self, parser):
"child suffix and then optional --dest-suffix",
)

def handle(self, *args, **options):
def handle(self, *args, **options) -> None:
# Resolve Parameters
source_table = options["source_table"]
source_suffix = options["source_suffix"]
Expand Down
58 changes: 35 additions & 23 deletions usaspending_api/etl/management/commands/load_table_from_delta.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,29 @@
import itertools
import logging
from datetime import datetime
from math import ceil
from typing import Any, Dict, List, Optional

import boto3
import numpy as np
import psycopg2

from django import db
from django.core.management.base import BaseCommand
from django.core.management.base import BaseCommand, CommandParser
from django.db.models import Model
from math import ceil
from pyspark.sql import SparkSession, DataFrame
from typing import Dict, Optional, List
from datetime import datetime
from pyspark.sql import DataFrame, SparkSession

from usaspending_api.common.csv_stream_s3_to_pg import copy_csvs_from_s3_to_pg
from usaspending_api.common.etl.spark import convert_array_cols_to_string
from usaspending_api.common.helpers.sql_helpers import get_database_dsn_string
from usaspending_api.common.helpers.spark_helpers import (
configure_spark_session,
get_active_spark_session,
get_jdbc_connection_properties,
get_usas_jdbc_url,
)
from usaspending_api.common.helpers.sql_helpers import get_database_dsn_string
from usaspending_api.config import CONFIG
from usaspending_api.settings import DEFAULT_TEXT_SEARCH_CONFIG

from usaspending_api.etl.management.commands.create_delta_table import TABLE_SPEC
from usaspending_api.settings import DEFAULT_TEXT_SEARCH_CONFIG

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -55,7 +53,7 @@ class Command(BaseCommand):
if a new table has been made.
"""

def add_arguments(self, parser):
def add_arguments(self, parser: CommandParser) -> None:
parser.add_argument(
"--delta-table",
type=str,
Expand Down Expand Up @@ -110,8 +108,15 @@ def add_arguments(self, parser):
help="In the case of a Postgres sequence for the provided 'delta-table' the sequence will be reset to 1. "
"If the job fails for some unexpected reason then the sequence will be reset to the previous value.",
)
parser.add_argument(
"--additional-columns",
type=str,
required=False,
help="A comma delimited list of column names that should be loaded in addition to the columns required "
"by the Delta tables schema. Useful for loading temporary columns from Delta to Postgres to swap in place."
)

def _split_dfs(self, df, special_columns):
def _split_dfs(self, df: DataFrame, special_columns: list[str]) -> list[DataFrame]:
"""Split a DataFrame into DataFrame subsets based on presence of NULL values in certain special columns

Unfortunately, pySpark with the JDBC doesn't handle UUIDs/JSON well.
Expand Down Expand Up @@ -145,7 +150,7 @@ def _split_dfs(self, df, special_columns):
split_dfs.append(split_df)
return split_dfs

def handle(self, *args, **options):
def handle(self, *args, **options) -> None: # noqa: C901,PLR0912,PLR0915
extra_conf = {
# Config for Delta Lake tables and SQL. Need these to keep Dela table metadata in the metastore
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
Expand Down Expand Up @@ -186,12 +191,11 @@ def handle(self, *args, **options):

# Postgres side - temp
temp_schema = "temp"
temp_table_suffix = "temp"
temp_table_suffix_appendage = f"_{temp_table_suffix}" if {temp_table_suffix} else ""
temp_table_suffix = "_temp"
if postgres_table:
temp_table_name = f"{postgres_table_name}{temp_table_suffix_appendage}"
temp_table_name = f"{postgres_table_name}{temp_table_suffix}"
else:
temp_table_name = f"{delta_table_name}{temp_table_suffix_appendage}"
temp_table_name = f"{delta_table_name}{temp_table_suffix}"
temp_table = f"{temp_schema}.{temp_table_name}"

summary_msg = f"Copying delta table {delta_table} to a Postgres temp table {temp_table}."
Expand Down Expand Up @@ -242,7 +246,7 @@ def handle(self, *args, **options):
(
f"CREATE TABLE "
# Below: e.g. my_tbl_temp -> my_tbl_part_temp
f"{temp_table[:-len(temp_table_suffix_appendage)]}{pt['table_suffix']}{temp_table_suffix_appendage} "
f"{temp_table[:-len(temp_table_suffix)]}{pt['table_suffix']}{temp_table_suffix} "
f"PARTITION OF {temp_table} {pt['partitioning_clause']} "
f"{storage_parameters}"
)
Expand Down Expand Up @@ -307,6 +311,7 @@ def handle(self, *args, **options):
# that of the Spark dataframe used to pull from the Postgres table. While not
# always needed, this should help to prevent any future mismatch between the two.
if column_names:
column_names = column_names + self.get_additional_column_names(options)
df = df.select(column_names)

# If we're working off an existing table, truncate before loading in all the data
Expand Down Expand Up @@ -360,7 +365,7 @@ def handle(self, *args, **options):
f"Command failed unexpectedly; resetting the sequence to previous value: {postgres_seq_last_value}"
)
self._set_sequence_value(table_spec["postgres_seq_name"], postgres_seq_last_value)
raise Exception(exc)
raise Exception(exc) from exc

logger.info(
f"LOAD (FINISH): Loaded data from Delta table {delta_table} to {temp_table} using {strategy} " f"strategy"
Expand Down Expand Up @@ -394,7 +399,7 @@ def _set_sequence_value(self, seq_name: str, val: Optional[int] = None) -> int:
cursor.execute(f"ALTER SEQUENCE IF EXISTS {seq_name} RESTART WITH {new_seq_val}")
return last_value

def _write_with_sql_bulk_copy_csv(
def _write_with_sql_bulk_copy_csv( # noqa: PLR0913
self,
spark: SparkSession,
df: DataFrame,
Expand All @@ -403,8 +408,8 @@ def _write_with_sql_bulk_copy_csv(
temp_table: str,
ordered_col_names: List[str],
spark_s3_bucket_name: str,
keep_csv_files=False,
):
keep_csv_files: bool = False,
) -> None:
"""
Write-from-delta-to-postgres strategy that relies on SQL bulk COPY of CSV files to Postgres. It uses the SQL
COPY command on CSV files, which are created from the Delta table's underlying parquet files.
Expand Down Expand Up @@ -549,7 +554,7 @@ def _write_with_sql_bulk_copy_csv(

logger.info(f"LOAD: Finished SQL bulk COPY of {file_count} CSV files to Postgres {temp_table} table")

def _write_with_jdbc_inserts(
def _write_with_jdbc_inserts( # noqa: PLR0913
self,
spark: SparkSession,
df: DataFrame,
Expand All @@ -558,7 +563,7 @@ def _write_with_jdbc_inserts(
postgres_model: Optional[Model] = None,
postgres_cols: Optional[Dict[str, str]] = None,
overwrite: bool = False,
):
) -> None:
"""
Write-from-delta-to-postgres strategy that leverages the native Spark ``DataFrame.write.jdbc`` approach.
This will issue a series of individual INSERT statements over a JDBC connection-per-executor.
Expand Down Expand Up @@ -625,3 +630,10 @@ def _write_with_jdbc_inserts(
mode=save_mode,
properties=get_jdbc_connection_properties(),
)

@staticmethod
def get_additional_column_names(options: dict[str, Any]) -> list[str]:
column_names = options.get("additional_columns")
if column_names:
column_names = [name.strip() for name in column_names.split(",")]
return column_names or []
Loading