Skip to content

Commit 9c7fd56

Browse files
dorellangcopybara-github
authored andcommitted
Optimize table metadata retrieval for temp view creation in spark_sql_runner.
PiperOrigin-RevId: 715431653
1 parent 2528073 commit 9c7fd56

File tree

4 files changed

+209
-188
lines changed

4 files changed

+209
-188
lines changed

perfkitbenchmarker/dpb_sparksql_benchmark_helper.py

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -158,32 +158,6 @@ def GetQueryIdsToStage() -> list[str]:
158158
return FLAGS.dpb_sparksql_order
159159

160160

161-
def GetTableMetadata(benchmark_spec):
162-
"""Compute map of table metadata for spark_sql_runner --table_metadata."""
163-
metadata = {}
164-
# TODO(user) : we support CSV format only when create_hive_tables
165-
# is false.
166-
if not FLAGS.dpb_sparksql_create_hive_tables:
167-
for subdir in benchmark_spec.table_subdirs or []:
168-
# Subdir is table name
169-
option_params = {
170-
'path': os.path.join(benchmark_spec.data_dir, subdir),
171-
}
172-
# support csv data format which contains a header and has delimiter
173-
# defined by dpb_sparksql_csv_delimiter flag
174-
if FLAGS.dpb_sparksql_data_format == 'csv':
175-
# TODO(user): currently we only support csv with a header.
176-
# If the csv does not have a header it will not load properly.
177-
option_params['header'] = 'true'
178-
option_params['delimiter'] = FLAGS.dpb_sparksql_csv_delimiter
179-
180-
metadata[subdir] = (
181-
FLAGS.dpb_sparksql_data_format or 'parquet',
182-
option_params,
183-
)
184-
return metadata
185-
186-
187161
def StageMetadata(
188162
json_metadata: Any,
189163
storage_service: object_storage_service.ObjectStorageService,
@@ -224,6 +198,7 @@ def Prepare(benchmark_spec):
224198
storage_service.CopyToBucket(src_url, cluster.bucket, script)
225199

226200
benchmark_spec.table_subdirs = []
201+
benchmark_spec.data_dir = None
227202
if FLAGS.dpb_sparksql_data:
228203
# Replace s3a scheme (used for S3 Express in Spark) with s3
229204
table_dir = re.sub(r'^s3a://', 's3://', FLAGS.dpb_sparksql_data)

perfkitbenchmarker/linux_benchmarks/dpb_sparksql_benchmark.py

Lines changed: 49 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,17 @@
9494
worker_count: 2
9595
"""
9696

97-
flags.DEFINE_list(
98-
'bigquery_tables',
99-
[],
100-
'A list of BigQuery tables to load as Temporary Spark SQL views instead '
101-
'of reading from external Hive tables.',
97+
_BIGQUERY_DATASET = flags.DEFINE_string(
98+
'dpb_sparksql_bigquery_dataset',
99+
None,
100+
'BigQuery dataset with the tables to load as Temporary Spark SQL views'
101+
' instead of reading from external Hive tables.',
102+
)
103+
_BIGQUERY_TABLES = flags.DEFINE_list(
104+
'dpb_sparksql_bigquery_tables',
105+
None,
106+
'BigQuery table names (unqualified) to load as Temporary Spark SQL views'
107+
' instead of reading from external Hive tables.',
102108
)
103109
flags.DEFINE_string(
104110
'bigquery_record_format',
@@ -132,15 +138,30 @@ def CheckPrerequisites(benchmark_config):
132138
raise errors.Config.InvalidValue(
133139
'You cannot create hive tables in a custom database.'
134140
)
141+
if bool(_BIGQUERY_DATASET.value) != bool(_BIGQUERY_TABLES.value):
142+
raise errors.Config.InvalidValue(
143+
'--dpb_sparksql_bigquery_dataset and '
144+
'--dpb_sparksql_bigquery_tables must be passed together.'
145+
)
135146
if not (
136147
FLAGS.dpb_sparksql_data
137-
or FLAGS.bigquery_tables
148+
or _BIGQUERY_TABLES.value
138149
or FLAGS.dpb_sparksql_database
139150
):
140151
# In the case of a static dpb_service, data could pre-exist
141152
logging.warning(
142-
'You did not specify --dpb_sparksql_data, --bigquery_tables, '
143-
'or dpb_sparksql_database. You will probably not have data to query!'
153+
'You did not specify --dpb_sparksql_data,'
154+
' --dpb_sparksql_bigquery_tables, or dpb_sparksql_database. You will'
155+
' probably not have data to query!'
156+
)
157+
if sum([
158+
bool(FLAGS.dpb_sparksql_data),
159+
bool(_BIGQUERY_TABLES.value),
160+
bool(FLAGS.dpb_sparksql_database),
161+
]) == 1:
162+
logging.warning(
163+
'You should only pass one of them: --dpb_sparksql_data,'
164+
' --dpb_sparksql_bigquery_tables, or --dpb_sparksql_database.'
144165
)
145166
if bool(FLAGS.dpb_sparksql_order) == bool(FLAGS.dpb_sparksql_streams):
146167
raise errors.Config.InvalidValue(
@@ -284,7 +305,6 @@ def _GetSampleMetadata(benchmark_spec):
284305
def _RunQueries(benchmark_spec) -> tuple[str, dpb_service.JobResult]:
285306
"""Runs queries. Returns storage path with metrics and JobResult object."""
286307
cluster = benchmark_spec.dpb_service
287-
storage_service = cluster.storage_service
288308
report_dir = '/'.join([cluster.base_dir, f'report-{int(time.time()*1000)}'])
289309
args = ['--sql-scripts-dir', benchmark_spec.query_dir]
290310
if FLAGS.dpb_sparksql_simultaneous:
@@ -299,18 +319,29 @@ def _RunQueries(benchmark_spec) -> tuple[str, dpb_service.JobResult]:
299319
args += ['--report-dir', report_dir]
300320
if FLAGS.dpb_sparksql_database:
301321
args += ['--database', FLAGS.dpb_sparksql_database]
302-
table_metadata = _GetTableMetadata(benchmark_spec)
303-
if table_metadata:
304-
table_metadata_file = '/'.join([cluster.base_dir, 'metadata.json'])
305-
dpb_sparksql_benchmark_helper.StageMetadata(
306-
table_metadata, storage_service, table_metadata_file
307-
)
308-
args += ['--table-metadata', table_metadata_file]
309-
else:
310-
# If we don't pass in tables, we must be reading from hive.
322+
if FLAGS.dpb_sparksql_create_hive_tables:
311323
# Note you can even read from Hive without --create_hive_tables if they
312324
# were precreated.
313325
args += ['--enable-hive', 'True']
326+
else:
327+
table_names = []
328+
if _BIGQUERY_DATASET.value:
329+
args += ['--bigquery-dataset', _BIGQUERY_DATASET.value]
330+
table_names = _BIGQUERY_TABLES.value
331+
elif benchmark_spec.data_dir:
332+
args += ['--table-base-dir', benchmark_spec.data_dir]
333+
table_names = benchmark_spec.table_subdirs or []
334+
if table_names:
335+
args += ['--table-names', *table_names]
336+
if FLAGS.dpb_sparksql_data_format:
337+
args += ['--table-format', FLAGS.dpb_sparksql_data_format]
338+
if (
339+
FLAGS.dpb_sparksql_data_format == 'csv'
340+
and FLAGS.dpb_sparksql_csv_delimiter
341+
):
342+
args += ['--csv-delim', FLAGS.dpb_sparksql_csv_delimiter]
343+
if FLAGS.bigquery_record_format:
344+
args += ['--bigquery-read-data-format', FLAGS.bigquery_record_format]
314345
if FLAGS.dpb_sparksql_table_cache:
315346
args += ['--table-cache', FLAGS.dpb_sparksql_table_cache]
316347
if dpb_sparksql_benchmark_helper.DUMP_SPARK_CONF.value:
@@ -503,17 +534,6 @@ def _GetDistCpMetadata(base_dir: str, subdirs: List[str], extra_metadata=None):
503534
return metadata
504535

505536

506-
def _GetTableMetadata(benchmark_spec):
507-
metadata = dpb_sparksql_benchmark_helper.GetTableMetadata(benchmark_spec)
508-
for table in FLAGS.bigquery_tables:
509-
name = table.split('.')[-1]
510-
bq_options = {'table': table}
511-
if FLAGS.bigquery_record_format:
512-
bq_options['readDataFormat'] = FLAGS.bigquery_record_format
513-
metadata[name] = (FLAGS.dpb_sparksql_data_format or 'bigquery', bq_options)
514-
return metadata
515-
516-
517537
def Cleanup(benchmark_spec):
518538
"""Cleans up the Benchmark."""
519539
del benchmark_spec # unused

perfkitbenchmarker/scripts/spark_sql_test_scripts/spark_sql_runner.py

Lines changed: 59 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
import argparse
1212
from concurrent import futures
13-
import json
1413
import logging
1514
import os
1615
import time
@@ -40,19 +39,44 @@ def parse_args(args=None):
4039
required=True,
4140
help='Object storage path where the SQL queries are located.',
4241
)
43-
parser.add_argument('--database', help='Hive database to look for data in.')
42+
group = parser.add_mutually_exclusive_group()
43+
group.add_argument('--database', help='Hive database to look for data in.')
44+
group.add_argument(
45+
'--table-base-dir',
46+
help=(
47+
'Base HCFS path containing the table data to be registered into Spark'
48+
' temporary view.'
49+
),
50+
)
51+
group.add_argument(
52+
'--bigquery-dataset',
53+
help=(
54+
'BQ Dataset containing the tables passed in --table-names to be'
55+
' registered into Spark temporary view.'
56+
),
57+
)
58+
parser.add_argument(
59+
'--table-names',
60+
nargs='+',
61+
help='Names of the tables to be registered into Spark temporary view.',
62+
)
63+
parser.add_argument(
64+
'--table-format',
65+
help=(
66+
'Format of data to be registered into Spark temporary view as passed'
67+
' to `spark.read.format()`. Assumed to be "parquet", or "bigquery" if'
68+
' a BQ dataset is also specified.'
69+
),
70+
)
71+
parser.add_argument(
72+
'--bigquery-read-data-format',
73+
help=(
74+
'The record format to use when connecting to BigQuery storage. See:'
75+
' https://github.com/GoogleCloudDataproc/spark-bigquery-connector#properties'
76+
),
77+
)
4478
parser.add_argument(
45-
'--table-metadata',
46-
metavar='METADATA_FILE',
47-
help="""\
48-
HCFS file containing JSON Object mapping table names to arrays of length 2.
49-
The arrays contain the format of the data and the options to pass to the
50-
dataframe reader. e.g.:
51-
{
52-
53-
"my_bq_table": ["bigquery", {"table": "bigquery_public_data:dataset.table"}],
54-
"my_parquet_table": ["parquet", {"path": "gs://some/directory"}]
55-
}""",
79+
'--csv-delimiter', help='CSV delimiter to load CSV files', default=','
5680
)
5781
parser.add_argument(
5882
'--enable-hive',
@@ -108,10 +132,7 @@ def main(args):
108132
spark = builder.getOrCreate()
109133
if args.database:
110134
spark.catalog.setCurrentDatabase(args.database)
111-
table_metadata = []
112-
if args.table_metadata:
113-
table_metadata = get_table_metadata(spark, args).items()
114-
for name, (fmt, options) in table_metadata:
135+
for name, (fmt, options) in get_table_metadata(args).items():
115136
logging.info('Loading %s', name)
116137
spark.read.format(fmt).options(**options).load().createTempView(name)
117138
if args.table_cache:
@@ -154,6 +175,27 @@ def main(args):
154175
)
155176

156177

178+
def get_table_metadata(args):
179+
"""Gets table metadata to create temporary views according to args passed."""
180+
metadata = {}
181+
if args.table_base_dir:
182+
for table_name in args.table_names:
183+
option_params = {'path': os.path.join(args.table_base_dir, table_name)}
184+
if args.table_format == 'csv':
185+
option_params['header'] = 'true'
186+
option_params['delimiter'] = args.csv_delimiter
187+
metadata[table_name] = (args.table_format or 'parquet', option_params)
188+
elif args.bigquery_dataset:
189+
for table_name in args.table_names:
190+
bq_options = {
191+
'table': '.'.join([args.bigquery_dataset, table_name])
192+
}
193+
if args.bigquery_read_data_format:
194+
bq_options['readDataFormat'] = args.bigquery_read_data_format
195+
metadata[table_name] = (args.table_format or 'bigquery', bq_options)
196+
return metadata
197+
198+
157199
def get_script_streams(args):
158200
"""Gets the script streams to run.
159201
@@ -170,11 +212,6 @@ def get_script_streams(args):
170212
]
171213

172214

173-
def get_table_metadata(spark, args):
174-
"""Gets table metadata to create temporary views."""
175-
return json.loads(_load_file(spark, args.table_metadata))
176-
177-
178215
def run_sql_script(
179216
spark_session, script_stream, stream_id, raise_query_execution_errors
180217
):

0 commit comments

Comments
 (0)