Skip to content

Commit be94f13

Browse files
Addressing ruff issues
1 parent b799dcd commit be94f13

File tree

6 files changed

+266
-129
lines changed

6 files changed

+266
-129
lines changed

usaspending_api/etl/management/commands/archive_table_in_delta.py

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
import logging
2-
import psycopg2
3-
42
from datetime import datetime, timedelta
5-
from django.core.management.base import BaseCommand
63

7-
from usaspending_api.common.helpers.sql_helpers import get_database_dsn_string
4+
import psycopg2
5+
from django.core.management.base import BaseCommand, CommandParser
6+
87
from usaspending_api.common.etl.spark import load_delta_table
98
from usaspending_api.common.helpers.spark_helpers import (
109
configure_spark_session,
1110
get_active_spark_session,
1211
get_jdbc_connection_properties,
1312
get_usas_jdbc_url,
1413
)
15-
from usaspending_api.download.delta_models.download_job import download_job_create_sql_string
14+
from usaspending_api.common.helpers.sql_helpers import get_database_dsn_string
15+
from usaspending_api.download.delta_models.download_job import (
16+
download_job_create_sql_string,
17+
)
1618
from usaspending_api.etl.table_specs import ArchiveTableSpec
1719

1820
logger = logging.getLogger(__name__)
@@ -38,7 +40,8 @@ class Command(BaseCommand):
3840
those records from Postgres.
3941
"""
4042

41-
def add_arguments(self, parser):
43+
@staticmethod
44+
def add_arguments(parser: CommandParser) -> None:
4245
parser.add_argument(
4346
"--destination-table",
4447
type=str,
@@ -57,7 +60,8 @@ def add_arguments(self, parser):
5760
"--alt-db",
5861
type=str,
5962
required=False,
60-
help="An alternate Delta Database (aka schema) in which to archive this table, overriding the TABLE_SPEC's destination_database",
63+
help="An alternate Delta Database (aka schema) in which to archive this table, overriding the TABLE_SPEC's"
64+
" destination_database",
6165
)
6266
parser.add_argument(
6367
"--alt-name",
@@ -66,7 +70,7 @@ def add_arguments(self, parser):
6670
help="An alternate Delta Table name which to archive this table, overriding the destination_table",
6771
)
6872

69-
def handle(self, *args, **options):
73+
def handle(self, *args, **options) -> None:
7074
extra_conf = {
7175
# Config for Delta Lake tables and SQL. Need these to keep Dela table metadata in the metastore
7276
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
@@ -107,14 +111,16 @@ def handle(self, *args, **options):
107111
# Resolve JDBC URL for Source Database
108112
jdbc_url = get_usas_jdbc_url()
109113
if not jdbc_url:
110-
raise RuntimeError(f"Couldn't find JDBC url, please properly configure your CONFIG.")
114+
raise RuntimeError(
115+
"Couldn't find JDBC url, please properly configure your CONFIG."
116+
)
111117
if not jdbc_url.startswith("jdbc:postgresql://"):
112-
raise ValueError("JDBC URL given is not in postgres JDBC URL format (e.g. jdbc:postgresql://...")
118+
raise ValueError(
119+
"JDBC URL given is not in postgres JDBC URL format (e.g. jdbc:postgresql://..."
120+
)
113121

114122
# Retrieve data from Postgres
115-
query_with_predicate = (
116-
f"(SELECT * FROM {qualified_source_table} WHERE {archive_date_field} < '{archive_date_string}') AS tmp"
117-
)
123+
query_with_predicate = f"(SELECT * FROM {qualified_source_table} WHERE {archive_date_field} < '{archive_date_string}') AS tmp"
118124

119125
df = spark.read.jdbc(
120126
url=jdbc_url,
@@ -125,7 +131,9 @@ def handle(self, *args, **options):
125131
# Write data to Delta Lake in Append Mode
126132
load_delta_table(spark, df, destination_table_name, overwrite=False)
127133
archived_count = df.count()
128-
logger.info(f"Archived {archived_count} records from the {qualified_source_table}")
134+
logger.info(
135+
f"Archived {archived_count} records from the {qualified_source_table}"
136+
)
129137

130138
# Delete data from
131139
with psycopg2.connect(dsn=get_database_dsn_string()) as connection:
@@ -135,7 +143,9 @@ def handle(self, *args, **options):
135143
)
136144
deleted_count = cursor.rowcount
137145

138-
logger.info(f"Deleted {deleted_count} records from the {qualified_source_table} table")
146+
logger.info(
147+
f"Deleted {deleted_count} records from the {qualified_source_table} table"
148+
)
139149

140150
# Shut down spark
141151
if spark_created_by_command:

usaspending_api/etl/management/commands/create_delta_table.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22

3-
from django.core.management.base import BaseCommand
3+
from django.core.management.base import BaseCommand, CommandParser
44
from pyspark.sql.types import StructType
55

66
from usaspending_api.awards.delta_models.award_id_lookup import AWARD_ID_LOOKUP_SCHEMA
@@ -50,7 +50,7 @@ class Command(BaseCommand):
5050
This command creates an empty Delta Table based on the provided --destination-table argument.
5151
"""
5252

53-
def add_arguments(self, parser):
53+
def add_arguments(self, parser: CommandParser) -> None:
5454
parser.add_argument(
5555
"--destination-table",
5656
type=str,
@@ -79,7 +79,7 @@ def add_arguments(self, parser):
7979
"name",
8080
)
8181

82-
def handle(self, *args, **options):
82+
def handle(self, *args, **options) -> None:
8383
spark = get_active_spark_session()
8484
spark_created_by_command = False
8585
if not spark:

usaspending_api/etl/management/commands/load_query_to_delta.py

Lines changed: 67 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from argparse import ArgumentTypeError
33
from typing import Callable
44

5-
from django.core.management.base import BaseCommand
5+
from django.core.management.base import BaseCommand, CommandParser
66
from pyspark.sql import SparkSession
77

88
from usaspending_api.common.etl.spark import create_ref_temp_views
@@ -35,7 +35,9 @@
3535
load_object_class_program_activity_incremental,
3636
object_class_program_activity_schema,
3737
)
38-
from usaspending_api.download.delta_models.transaction_download import transaction_download_schema
38+
from usaspending_api.download.delta_models.transaction_download import (
39+
transaction_download_schema,
40+
)
3941
from usaspending_api.etl.table_specs import QueryTableSpec
4042
from usaspending_api.recipient.delta_models import (
4143
RECIPIENT_LOOKUP_POSTGRES_COLUMNS,
@@ -58,7 +60,10 @@
5860
AWARD_SEARCH_POSTGRES_GOLD_COLUMNS,
5961
award_search_create_sql_string,
6062
)
61-
from usaspending_api.search.delta_models.dataframes.award_search import load_award_search, load_award_search_incremental
63+
from usaspending_api.search.delta_models.dataframes.award_search import (
64+
load_award_search,
65+
load_award_search_incremental,
66+
)
6267
from usaspending_api.search.delta_models.dataframes.transaction_search import (
6368
load_transaction_search,
6469
load_transaction_search_incremental,
@@ -70,7 +75,12 @@
7075
subaward_search_create_sql_string,
7176
subaward_search_load_sql_string,
7277
)
73-
from usaspending_api.search.models import AwardSearch, SubawardSearch, SummaryStateView, TransactionSearch
78+
from usaspending_api.search.models import (
79+
AwardSearch,
80+
SubawardSearch,
81+
SummaryStateView,
82+
TransactionSearch,
83+
)
7484
from usaspending_api.settings import HOST
7585
from usaspending_api.transactions.delta_models import (
7686
SUMMARY_STATE_VIEW_COLUMNS,
@@ -226,8 +236,14 @@
226236
"partition_keys": ["is_fpds"],
227237
"partitioning_form": "LIST",
228238
"partitions": [
229-
{"table_suffix": "_fpds", "partitioning_clause": "FOR VALUES IN (TRUE)"},
230-
{"table_suffix": "_fabs", "partitioning_clause": "FOR VALUES IN (FALSE)"},
239+
{
240+
"table_suffix": "_fpds",
241+
"partitioning_clause": "FOR VALUES IN (TRUE)",
242+
},
243+
{
244+
"table_suffix": "_fabs",
245+
"partitioning_clause": "FOR VALUES IN (FALSE)",
246+
},
231247
],
232248
},
233249
}
@@ -286,8 +302,11 @@
286302
"partition_column_type": "numeric",
287303
"delta_table_create_sql": account_balances_schema,
288304
"delta_table_create_options": {"delta.enableChangeDataFeed": True},
289-
"column_names": list(),
290-
"delta_table_create_partitions": ["reporting_fiscal_year", "funding_toptier_agency_id"],
305+
"column_names": [],
306+
"delta_table_create_partitions": [
307+
"reporting_fiscal_year",
308+
"funding_toptier_agency_id",
309+
],
291310
}
292311
),
293312
"award_financial_download": QueryTableSpec(
@@ -299,8 +318,11 @@
299318
"partition_column_type": "numeric",
300319
"delta_table_create_sql": award_financial_schema,
301320
"delta_table_create_options": {"delta.enableChangeDataFeed": True},
302-
"column_names": list(),
303-
"delta_table_create_partitions": ["reporting_fiscal_year", "funding_toptier_agency_id"],
321+
"column_names": [],
322+
"delta_table_create_partitions": [
323+
"reporting_fiscal_year",
324+
"funding_toptier_agency_id",
325+
],
304326
}
305327
),
306328
"object_class_program_activity_download": QueryTableSpec(
@@ -312,8 +334,11 @@
312334
"partition_column_type": "numeric",
313335
"delta_table_create_sql": object_class_program_activity_schema,
314336
"delta_table_create_options": {"delta.enableChangeDataFeed": True},
315-
"column_names": list(),
316-
"delta_table_create_partitions": ["reporting_fiscal_year", "funding_toptier_agency_id"],
337+
"column_names": [],
338+
"delta_table_create_partitions": [
339+
"reporting_fiscal_year",
340+
"funding_toptier_agency_id",
341+
],
317342
}
318343
),
319344
"transaction_download": QueryTableSpec(
@@ -323,8 +348,12 @@
323348
"partition_column_type": "numeric",
324349
"delta_table_create_sql": transaction_download_schema,
325350
"delta_table_create_options": {"delta.enableChangeDataFeed": True},
326-
"column_names": list(),
327-
"delta_table_create_partitions": ["awarding_agency_code", "is_fpds", "action_date_fiscal_year"],
351+
"column_names": [],
352+
"delta_table_create_partitions": [
353+
"awarding_agency_code",
354+
"is_fpds",
355+
"action_date_fiscal_year",
356+
],
328357
}
329358
),
330359
}
@@ -342,7 +371,8 @@ class Command(BaseCommand):
342371
destination_table_name: str
343372
spark: SparkSession
344373

345-
def add_arguments(self, parser):
374+
@staticmethod
375+
def add_arguments(parser: CommandParser) -> None:
346376
parser.add_argument(
347377
"--destination-table",
348378
type=str,
@@ -370,7 +400,7 @@ def add_arguments(self, parser):
370400
help="Whether or not the table will be updated incrementally",
371401
)
372402

373-
def handle(self, *args, **options):
403+
def handle(self, *args, **options) -> None:
374404
extra_conf = {
375405
# Config for Delta Lake tables and SQL. Need these to keep Dela table metadata in the metastore
376406
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
@@ -385,17 +415,25 @@ def handle(self, *args, **options):
385415
spark_created_by_command = False
386416
if not self.spark:
387417
spark_created_by_command = True
388-
self.spark = configure_spark_session(**extra_conf, spark_context=self.spark) # type: SparkSession
418+
self.spark = configure_spark_session(
419+
**extra_conf, spark_context=self.spark
420+
) # type: SparkSession
389421

390422
# Resolve Parameters
391423
destination_table = options["destination_table"]
392424
table_spec = TABLE_SPEC[destination_table]
393425
self.destination_database = options["alt_db"] or table_spec.destination_database
394-
self.destination_table_name = options["alt_name"] or destination_table.split(".")[-1]
395-
source_query_key = "source_query_incremental" if options["incremental"] else "source_query"
426+
self.destination_table_name = (
427+
options["alt_name"] or destination_table.split(".")[-1]
428+
)
429+
source_query_key = (
430+
"source_query_incremental" if options["incremental"] else "source_query"
431+
)
396432
load_query = getattr(table_spec, source_query_key)
397433
if load_query is None:
398-
raise ArgumentTypeError(f"Invalid source query. `{source_query_key}` must be specified in the TABLE_SPEC.")
434+
raise ArgumentTypeError(
435+
f"Invalid source query. `{source_query_key}` must be specified in the TABLE_SPEC."
436+
)
399437

400438
# Set the database that will be interacted with for all Delta Lake table Spark-based activity
401439
logger.info(f"Using Spark Database: {self.destination_database}")
@@ -405,15 +443,19 @@ def handle(self, *args, **options):
405443

406444
if isinstance(load_query, list):
407445
for index, query in enumerate(load_query):
408-
logger.info(f"Running query number: {index + 1}\nPreview of query: {query[:100]}")
446+
logger.info(
447+
f"Running query number: {index + 1}\nPreview of query: {query[:100]}"
448+
)
409449
self.run_spark_sql(query)
410450
else:
411451
self.run_spark_sql(load_query)
412452

413453
if spark_created_by_command:
414454
self.spark.stop()
415455

416-
def run_spark_sql(self, query: str | Callable[[SparkSession, str, str], None]):
456+
def run_spark_sql(
457+
self, query: str | Callable[[SparkSession, str, str], None]
458+
) -> None:
417459
if isinstance(query, str):
418460
jdbc_conn_props = get_jdbc_connection_properties()
419461
self.spark.sql(
@@ -430,4 +472,6 @@ def run_spark_sql(self, query: str | Callable[[SparkSession, str, str], None]):
430472
elif isinstance(query, Callable):
431473
query(self.spark, self.destination_database, self.destination_table_name)
432474
else:
433-
raise ArgumentTypeError(f"Invalid query. `{query}` must be a string or a Callable.")
475+
raise ArgumentTypeError(
476+
f"Invalid query. `{query}` must be a string or a Callable."
477+
)

0 commit comments

Comments
 (0)