Skip to content

Commit baa09ca

Browse files
committed
[DEV-14237] Adds new command to run Spark SQL commands. Updates docker compose to allow override of project dir for volume in spark-submit
1 parent bf527b0 commit baa09ca

File tree

2 files changed

+124
-1
lines changed

2 files changed

+124
-1
lines changed

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ services:
395395
command: --help
396396
volumes:
397397
- type: bind
398-
source: .
398+
source: ${PROJECT_DIRECTORY:-.}
399399
target: /project
400400
read_only: false
401401
# NOTE: The hive metastore_db Derby database folder is expected to be configured to show up as a subfolder of the spark-warehouse dir
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
import logging
2+
3+
from django.core.management.base import BaseCommand, CommandError
4+
from pyspark.sql import SparkSession
5+
6+
from usaspending_api.common.etl.spark import create_ref_temp_views
7+
from usaspending_api.common.helpers.spark_helpers import (
8+
configure_spark_session,
9+
get_active_spark_session,
10+
)
11+
from usaspending_api.common.retrieve_file_from_uri import RetrieveFileFromUri
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
class Command(BaseCommand):
17+
help = """
18+
This command executes a Spark SQL commands from two possible sources:
19+
- SQL String - provide exactly one SQL String
20+
- File - Either a local, s3, or http file containing one or more SQL strings separated by
21+
semicolons
22+
The resulting dataframe will be printed to standard out using the df.show() method
23+
"""
24+
25+
# Values defined in the handler
26+
spark: SparkSession
27+
28+
def add_arguments(self, parser):
29+
30+
parser.add_argument(
31+
"--sql",
32+
type=str,
33+
required=False,
34+
help="Single Spark SQL statement to execute",
35+
)
36+
37+
parser.add_argument(
38+
"--file",
39+
type=str,
40+
required=False,
41+
help="Path to file containing semicolon-separated SQL statements. Can be a local file path or S3/HTTP url",
42+
)
43+
44+
parser.add_argument(
45+
"--create-temp-views",
46+
action="store_true",
47+
required=False,
48+
help="Controls whether all (USAs and Broker) temp views will be created before sql execution",
49+
)
50+
51+
parser.add_argument(
52+
"--result-limit",
53+
type=int,
54+
required=False,
55+
default=20,
56+
help="Maximum number of result records to display from Pyspark dataframe."
57+
)
58+
59+
parser.add_argument(
60+
"--dry-run",
61+
action="store_true",
62+
required=False,
63+
help="Print SQL statements without executing",
64+
)
65+
66+
def handle(self, *args, **options):
67+
extra_conf = {
68+
# Config for Delta Lake tables and SQL. Need these to keep Dela table metadata in the metastore
69+
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
70+
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog",
71+
# See comment below about old date and time values cannot parsed without these
72+
"spark.sql.legacy.parquet.datetimeRebaseModeInWrite": "LEGACY", # for dates at/before 1900
73+
"spark.sql.legacy.parquet.int96RebaseModeInWrite": "LEGACY", # for timestamps at/before 1900
74+
"spark.sql.jsonGenerator.ignoreNullFields": "false", # keep nulls in our json
75+
}
76+
77+
self.spark = get_active_spark_session()
78+
spark_created_by_command = False
79+
if not self.spark:
80+
spark_created_by_command = True
81+
self.spark = configure_spark_session(**extra_conf, spark_context=self.spark) # type: SparkSession
82+
83+
# Resolve Parameters
84+
sql_input = options.get('sql')
85+
file_path = options.get('file')
86+
create_temp_views = options.get('create_temp_views')
87+
result_limit = options.get('result_limit')
88+
dry_run = options.get('dry_run')
89+
90+
if create_temp_views:
91+
create_ref_temp_views(self.spark, create_broker_views=True)
92+
93+
# Prepare SQL Statements from either provided string or file
94+
if file_path and sql_input:
95+
raise CommandError('Cannot use both --sql and --file. Choose one.')
96+
elif file_path:
97+
with RetrieveFileFromUri(file_path).get_file_object(text=True) as f:
98+
file_contents = f.read()
99+
sql_statements = [query.strip() for query in file_contents.split(";") if query.strip()]
100+
elif sql_input:
101+
sql_statements = [sql_input.strip()]
102+
else:
103+
raise CommandError('Either --sql or --file must be provided')
104+
105+
logger.info(f'Found {len(sql_statements)} SQL statement(s)')
106+
107+
# Execute SQL Statements
108+
for idx, statement in enumerate(sql_statements, 1):
109+
logger.info(f'--- Statement {idx} ---')
110+
logger.info(statement)
111+
112+
if dry_run:
113+
logger.info('[DRY RUN - Not executed]')
114+
else:
115+
try:
116+
df = self.spark.sql(statement)
117+
df.show(result_limit)
118+
logger.info("Executed successfully")
119+
except Exception as e:
120+
logger.info(f'Error: {e}')
121+
122+
if spark_created_by_command:
123+
self.spark.stop()

0 commit comments

Comments
 (0)