Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion bigframes/session/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,17 @@ def count_job_stats(
bytes_processed = getattr(row_iterator, "total_bytes_processed", 0) or 0
query_char_count = len(getattr(row_iterator, "query", "") or "")
slot_millis = getattr(row_iterator, "slot_millis", 0) or 0
exec_seconds = 0.0
created = getattr(row_iterator, "created", None)
ended = getattr(row_iterator, "ended", None)
exec_seconds = (
(ended - created).total_seconds() if created and ended else 0.0
)

self.execution_count += 1
self.query_char_count += query_char_count
self.bytes_processed += bytes_processed
self.slot_millis += slot_millis
self.execution_secs += exec_seconds

elif query_job.configuration.dry_run:
query_char_count = len(query_job.query)
Expand Down
134 changes: 53 additions & 81 deletions scripts/run_and_publish_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,43 +84,36 @@ def collect_benchmark_result(
path = pathlib.Path(benchmark_path)
try:
results_dict: Dict[str, List[Union[int, float, None]]] = {}
bytes_files = sorted(path.rglob("*.bytesprocessed"))
millis_files = sorted(path.rglob("*.slotmillis"))
bq_seconds_files = sorted(path.rglob("*.bq_exec_time_seconds"))
# Use local_seconds_files as the baseline
local_seconds_files = sorted(path.rglob("*.local_exec_time_seconds"))
query_char_count_files = sorted(path.rglob("*.query_char_count"))

error_files = sorted(path.rglob("*.error"))

if not (
len(millis_files)
== len(bq_seconds_files)
<= len(bytes_files)
== len(query_char_count_files)
== len(local_seconds_files)
):
raise ValueError(
"Mismatch in the number of report files for bytes, millis, seconds and query char count: \n"
f"millis_files: {len(millis_files)}\n"
f"bq_seconds_files: {len(bq_seconds_files)}\n"
f"bytes_files: {len(bytes_files)}\n"
f"query_char_count_files: {len(query_char_count_files)}\n"
f"local_seconds_files: {len(local_seconds_files)}\n"
)

has_full_metrics = len(bq_seconds_files) == len(local_seconds_files)

for idx in range(len(local_seconds_files)):
query_char_count_file = query_char_count_files[idx]
local_seconds_file = local_seconds_files[idx]
bytes_file = bytes_files[idx]
filename = query_char_count_file.relative_to(path).with_suffix("")
if filename != local_seconds_file.relative_to(path).with_suffix(
""
) or filename != bytes_file.relative_to(path).with_suffix(""):
raise ValueError(
"File name mismatch among query_char_count, bytes and seconds reports."
)
benchmarks_with_missing_files = []

for local_seconds_file in local_seconds_files:
base_name = local_seconds_file.name.removesuffix(".local_exec_time_seconds")
base_path = local_seconds_file.parent / base_name
filename = base_path.relative_to(path)

# Construct paths for other metric files
bytes_file = pathlib.Path(f"{base_path}.bytesprocessed")
millis_file = pathlib.Path(f"{base_path}.slotmillis")
bq_seconds_file = pathlib.Path(f"{base_path}.bq_exec_time_seconds")
query_char_count_file = pathlib.Path(f"{base_path}.query_char_count")

# Check if all corresponding files exist
missing_files = []
if not bytes_file.exists():
missing_files.append(bytes_file.name)
if not millis_file.exists():
missing_files.append(millis_file.name)
if not bq_seconds_file.exists():
missing_files.append(bq_seconds_file.name)
if not query_char_count_file.exists():
missing_files.append(query_char_count_file.name)

if missing_files:
benchmarks_with_missing_files.append((str(filename), missing_files))
continue

with open(query_char_count_file, "r") as file:
lines = file.read().splitlines()
Expand All @@ -135,26 +128,13 @@ def collect_benchmark_result(
lines = file.read().splitlines()
total_bytes = sum(int(line) for line in lines) / iterations

if not has_full_metrics:
total_slot_millis = None
bq_seconds = None
else:
millis_file = millis_files[idx]
bq_seconds_file = bq_seconds_files[idx]
if filename != millis_file.relative_to(path).with_suffix(
""
) or filename != bq_seconds_file.relative_to(path).with_suffix(""):
raise ValueError(
"File name mismatch among query_char_count, bytes, millis, and seconds reports."
)

with open(millis_file, "r") as file:
lines = file.read().splitlines()
total_slot_millis = sum(int(line) for line in lines) / iterations
with open(millis_file, "r") as file:
lines = file.read().splitlines()
total_slot_millis = sum(int(line) for line in lines) / iterations

with open(bq_seconds_file, "r") as file:
lines = file.read().splitlines()
bq_seconds = sum(float(line) for line in lines) / iterations
with open(bq_seconds_file, "r") as file:
lines = file.read().splitlines()
bq_seconds = sum(float(line) for line in lines) / iterations

results_dict[str(filename)] = [
query_count,
Expand Down Expand Up @@ -207,13 +187,9 @@ def collect_benchmark_result(
f"{index} - query count: {row['Query_Count']},"
+ f" query char count: {row['Query_Char_Count']},"
+ f" bytes processed sum: {row['Bytes_Processed']},"
+ (f" slot millis sum: {row['Slot_Millis']}," if has_full_metrics else "")
+ f" local execution time: {formatted_local_exec_time} seconds"
+ (
f", bigquery execution time: {round(row['BigQuery_Execution_Time_Sec'], 1)} seconds"
if has_full_metrics
else ""
)
+ f" slot millis sum: {row['Slot_Millis']},"
+ f" local execution time: {formatted_local_exec_time}"
+ f", bigquery execution time: {round(row['BigQuery_Execution_Time_Sec'], 1)} seconds"
)

geometric_mean_queries = geometric_mean_excluding_zeros(
Expand All @@ -239,30 +215,26 @@ def collect_benchmark_result(
f"---Geometric mean of queries: {geometric_mean_queries},"
+ f" Geometric mean of queries char counts: {geometric_mean_query_char_count},"
+ f" Geometric mean of bytes processed: {geometric_mean_bytes},"
+ (
f" Geometric mean of slot millis: {geometric_mean_slot_millis},"
if has_full_metrics
else ""
)
+ f" Geometric mean of slot millis: {geometric_mean_slot_millis},"
+ f" Geometric mean of local execution time: {geometric_mean_local_seconds} seconds"
+ (
f", Geometric mean of BigQuery execution time: {geometric_mean_bq_seconds} seconds---"
if has_full_metrics
else ""
)
+ f", Geometric mean of BigQuery execution time: {geometric_mean_bq_seconds} seconds---"
)

error_message = (
"\n"
+ "\n".join(
[
f"Failed: {error_file.relative_to(path).with_suffix('')}"
for error_file in error_files
]
all_errors: List[str] = []
if error_files:
all_errors.extend(
f"Failed: {error_file.relative_to(path).with_suffix('')}"
for error_file in error_files
)
if error_files
else None
)
if (
benchmarks_with_missing_files
and os.getenv("BENCHMARK_AND_PUBLISH", "false") == "true"
):
all_errors.extend(
f"Missing files for benchmark '{name}': {files}"
for name, files in benchmarks_with_missing_files
)
error_message = "\n" + "\n".join(all_errors) if all_errors else None
return (
benchmark_metrics.reset_index().rename(columns={"index": "Benchmark_Name"}),
error_message,
Expand Down
2 changes: 1 addition & 1 deletion testing/constraints-3.11.txt
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ google-auth==2.38.0
google-auth-httplib2==0.2.0
google-auth-oauthlib==1.2.2
google-cloud-aiplatform==1.106.0
google-cloud-bigquery==3.35.1
google-cloud-bigquery==3.36.0
google-cloud-bigquery-connection==1.18.3
google-cloud-bigquery-storage==2.32.0
google-cloud-core==2.4.3
Expand Down