Skip to content

Commit 2f82232

Browse files
authored
[ch] Set up ingestion for benchmarks tables and aggregated_test_metrics (#5720)
s3 tested by running locally and seeing if it got inserted dynamo only checked `python aws/lambda/clickhouse-replicator-dynamo/test_lambda_function.py` to make sure other ingestions didn't break Also small fix for external_contribution_stats error handler
1 parent 84f2e16 commit 2f82232

File tree

2 files changed

+110
-1
lines changed

2 files changed

+110
-1
lines changed

aws/lambda/clickhouse-replicator-dynamo/lambda_function.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
"torchci-pull-request-review": "default.pull_request_review",
2828
"torchci-pull-request-review-comment": "default.pull_request_review_comment",
2929
"torchci-metrics-ci-wait-time": "misc.metrics_ci_wait_time",
30+
"torchci-dynamo-perf-stats": "benchmark.inductor_torch_dynamo_perf_stats",
31+
"torchci-oss-ci-benchmark": "benchmark.oss_ci_benchmark_v2",
3032
}
3133

3234

aws/lambda/clickhouse-replicator-s3/lambda_function.py

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,10 +351,111 @@ def get_insert_query(compression):
351351
get_clickhouse_client().query(get_insert_query("gzip"))
352352
except Exception as e:
353353
get_clickhouse_client().query(
354-
f"insert into errors.gen_errors ('{table}', '{bucket}', '{key}', '{json.dumps(str(e))}')"
354+
f"insert into errors.gen_errors VALUES ('{table}', '{bucket}', '{key}', '{json.dumps(str(e))}')"
355355
)
356356

357357

358+
def general_adapter(table, bucket, key, schema, compression, format) -> None:
359+
url = f"https://{bucket}.s3.amazonaws.com/{encode_url_component(key)}"
360+
361+
def get_insert_query(compression):
362+
return f"""
363+
insert into {table}
364+
select *, ('{bucket}', '{key}') as _meta
365+
from s3('{url}', '{format}', '{schema}', '{compression}',
366+
extra_credentials(
367+
role_arn = 'arn:aws:iam::308535385114:role/clickhouse_role'
368+
)
369+
)
370+
"""
371+
372+
try:
373+
get_clickhouse_client().query(get_insert_query(compression))
374+
except Exception as e:
375+
get_clickhouse_client().query(
376+
f"insert into errors.gen_errors values ('{table}', '{bucket}', '{key}', '{json.dumps(str(e))}')"
377+
)
378+
379+
380+
def external_aggregated_test_metrics_adapter(table, bucket, key) -> None:
381+
schema = """
382+
`avg_duration_in_second` Int64,
383+
`avg_skipped` Int64,
384+
`avg_tests` Int64,
385+
`base_name` String,
386+
`date` DateTime64(3),
387+
`job_name` String,
388+
`max_errors` Int64,
389+
`max_failures` Int64,
390+
`occurences` Int64,
391+
`oncalls` Array(String),
392+
`sum_duration_in_second` Int64,
393+
`sum_skipped` Int64,
394+
`sum_tests` Int64,
395+
`test_class` String,
396+
`test_config` String,
397+
`test_file` String,
398+
`workflow_id` Int64,
399+
`workflow_name` String,
400+
`workflow_run_attempt` Int64
401+
"""
402+
general_adapter(table, bucket, key, schema, "gzip", "JSONEachRow")
403+
404+
405+
def torchao_perf_stats_adapter(table, bucket, key) -> None:
406+
schema = """
407+
`CachingAutotuner.benchmark_all_configs` String,
408+
`GraphLowering.compile_to_module` String,
409+
`GraphLowering.run` String,
410+
`OutputGraph.call_user_compiler` String,
411+
`Scheduler.__init__` String,
412+
`Scheduler.codegen` String,
413+
`WrapperCodeGen.generate` String,
414+
`_compile.<locals>.compile_inner` String,
415+
`_compile.compile_inner` String,
416+
`abs_latency` String,
417+
`accuracy` String,
418+
`autograd_captures` String,
419+
`autograd_compiles` String,
420+
`batch_size` String,
421+
`calls_captured` String,
422+
`compilation_latency` String,
423+
`compile_fx.<locals>.bw_compiler` String,
424+
`compile_fx.<locals>.fw_compiler_base` String,
425+
`compile_fx_inner` String,
426+
`compression_ratio` String,
427+
`create_aot_dispatcher_function` String,
428+
`cudagraph_skips` String,
429+
`dev` String,
430+
`dynamo_peak_mem` String,
431+
`eager_peak_mem` String,
432+
`filename` String,
433+
`graph_breaks` String,
434+
`head_branch` String,
435+
`head_repo` String,
436+
`head_sha` String,
437+
`job_id` String,
438+
`name` String,
439+
`run_attempt` String,
440+
`runner` String,
441+
`speedup` String,
442+
`test_name` String,
443+
`unique_graph_breaks` String,
444+
`unique_graphs` String,
445+
`workflow_id` String
446+
"""
447+
general_adapter(table, bucket, key, schema, "none", "CSV")
448+
449+
450+
def torchbench_userbenchmark_adapter(table, bucket, key):
451+
schema = """
452+
`environ` String,
453+
`metrics` String,
454+
`name` String
455+
"""
456+
general_adapter(table, bucket, key, schema, "none", "JSONEachRow")
457+
458+
358459
SUPPORTED_PATHS = {
359460
"merges": "default.merges",
360461
"queue_times_historical": "default.queue_times_historical",
@@ -364,6 +465,9 @@ def get_insert_query(compression):
364465
"failed_test_runs": "default.failed_test_runs",
365466
"rerun_disabled_tests": "default.rerun_disabled_tests",
366467
"external_contribution_counts": "misc.external_contribution_stats",
468+
"test_data_aggregates": "misc.aggregated_test_metrics",
469+
"torchbench-csv/torchao": "benchmark.inductor_torchao_perf_stats",
470+
"torchbench-userbenchmark": "benchmark.torchbench_userbenchmark",
367471
}
368472

369473
OBJECT_CONVERTER = {
@@ -375,6 +479,9 @@ def get_insert_query(compression):
375479
"default.rerun_disabled_tests": rerun_disabled_tests_adapter,
376480
"default.queue_times_historical": queue_times_historical_adapter,
377481
"misc.external_contribution_stats": external_contribution_stats_adapter,
482+
"misc.aggregated_test_metrics": external_aggregated_test_metrics_adapter,
483+
"benchmark.inductor_torchao_perf_stats": torchao_perf_stats_adapter,
484+
"benchmark.torchbench_userbenchmark": torchbench_userbenchmark_adapter,
378485
}
379486

380487

0 commit comments

Comments
 (0)