Skip to content

Commit aba7b57

Browse files
authored
[ch] Refactor/unify s3 lambda, improve s3 backfill script (#5730)
Refactor the s3 replicator lambda to reduce dup code and have better error handling (upload to same database so we don't have to create a new one each time) Tested via: * randomly selecting a key for merges, merge_bases, rerun_disabled_tests, external_contribution_stats and checking that it could be uploaded * backfilled inductor_torchao_perf_stats and torchbench_userbenchmark * rebackfilled queue_times_historical and removed dups * raise an exception to see if the error handler wrote to the database (it did) Reused logic from lambda replicator in s3 backfill script
1 parent 533877f commit aba7b57

File tree

3 files changed

+50
-324
lines changed

3 files changed

+50
-324
lines changed

aws/lambda/clickhouse-replicator-s3/lambda_function.py

Lines changed: 37 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ def get_skipped_failure_parser_helper(name, type, field_to_check_for_existence):
9999
) as {name}
100100
"""
101101

102+
# Cannot use general_adapter due to custom field for now()::DateTime64(9)
103+
# time_inserted
102104
query = f"""
103105
insert into {table}
104106
select
@@ -158,14 +160,7 @@ def get_skipped_failure_parser_helper(name, type, field_to_check_for_existence):
158160
try:
159161
get_clickhouse_client().query(query)
160162
except Exception as e:
161-
if "Expected not greater than" in str(e):
162-
get_clickhouse_client().query(
163-
f"insert into errors.{table}_ingest_errors values ('{key}', 'file is too large?')"
164-
)
165-
else:
166-
get_clickhouse_client().query(
167-
f"insert into errors.{table}_ingest_errors values ('{key}', '{json.dumps(str(e))}')"
168-
)
163+
log_failure_to_clickhouse(table, bucket, key, e)
169164

170165

171166
def rerun_disabled_tests_adapter(table, bucket, key):
@@ -180,26 +175,7 @@ def rerun_disabled_tests_adapter(table, bucket, key):
180175
`workflow_run_attempt` Int64
181176
"""
182177

183-
url = f"https://{bucket}.s3.amazonaws.com/{encode_url_component(key)}"
184-
185-
def get_insert_query(compression):
186-
return f"""
187-
insert into {table}
188-
select *, ('{bucket}', '{key}')
189-
from s3('{url}', 'JSONEachRow', '{schema}', '{compression}')
190-
"""
191-
192-
try:
193-
get_clickhouse_client().query(get_insert_query("gzip"))
194-
except Exception as e:
195-
if "Expected not greater than" in str(e):
196-
get_clickhouse_client().query(
197-
f"insert into errors.{table}_ingest_errors values ('{key}', 'file is too large?')"
198-
)
199-
else:
200-
get_clickhouse_client().query(
201-
f"insert into errors.{table}_ingest_errors values ('{key}', '{json.dumps(str(e))}')"
202-
)
178+
general_adapter(table, bucket, key, schema, ["gzip"], "JSONEachRow")
203179

204180

205181
def handle_test_run_summary(table, bucket, key) -> None:
@@ -219,6 +195,7 @@ def handle_test_run_summary(table, bucket, key) -> None:
219195
"""
220196
url = f"https://{bucket}.s3.amazonaws.com/{encode_url_component(key)}"
221197

198+
# Cannot use general_adapter due to custom field for now()::DateTime64(9)
222199
def get_insert_query(compression):
223200
return f"""
224201
insert into {table} SETTINGS async_insert=1, wait_for_async_insert=1
@@ -229,14 +206,7 @@ def get_insert_query(compression):
229206
try:
230207
get_clickhouse_client().query(get_insert_query("gzip"))
231208
except Exception as e:
232-
if "Expected not greater than" in str(e):
233-
get_clickhouse_client().query(
234-
f"insert into errors.{table}_ingest_errors values ('{key}', 'file is too large?')"
235-
)
236-
else:
237-
get_clickhouse_client().query(
238-
f"insert into errors.{table}_ingest_errors values ('{key}', '{json.dumps(str(e))}')"
239-
)
209+
log_failure_to_clickhouse(table, bucket, key, e)
240210

241211

242212
def merges_adapter(table, bucket, key) -> None:
@@ -263,21 +233,7 @@ def merges_adapter(table, bucket, key) -> None:
263233
`unstable_checks` Array(Array(String))
264234
"""
265235

266-
url = f"https://{bucket}.s3.amazonaws.com/{encode_url_component(key)}"
267-
268-
def get_insert_query(compression):
269-
return f"""
270-
insert into {table}
271-
select *, ('{bucket}', '{key}')
272-
from s3('{url}', 'JSONEachRow', '{schema}', '{compression}')
273-
"""
274-
275-
try:
276-
get_clickhouse_client().query(get_insert_query("none"))
277-
except Exception as e:
278-
get_clickhouse_client().query(
279-
f"insert into errors.{table}_ingest_errors values ('{key}', '{json.dumps(str(e))}')"
280-
)
236+
general_adapter(table, bucket, key, schema, ["none"], "JSONEachRow")
281237

282238

283239
def merge_bases_adapter(table, bucket, key) -> None:
@@ -289,19 +245,7 @@ def merge_bases_adapter(table, bucket, key) -> None:
289245
`sha` String
290246
"""
291247

292-
url = f"https://{bucket}.s3.amazonaws.com/{encode_url_component(key)}"
293-
294-
def get_insert_query(compression):
295-
return f"""
296-
insert into {table}
297-
select *, ('{bucket}', '{key}')
298-
from s3('{url}', 'JSONEachRow', '{schema}', '{compression}')
299-
"""
300-
301-
try:
302-
get_clickhouse_client().query(get_insert_query("gzip"))
303-
except:
304-
get_clickhouse_client().query(get_insert_query("none"))
248+
general_adapter(table, bucket, key, schema, ["gzip", "none"], "JSONEachRow")
305249

306250

307251
def queue_times_historical_adapter(table, bucket, key) -> None:
@@ -311,20 +255,7 @@ def queue_times_historical_adapter(table, bucket, key) -> None:
311255
`count` Int64,
312256
`time` DateTime64(9)
313257
"""
314-
315-
url = f"https://{bucket}.s3.amazonaws.com/{encode_url_component(key)}"
316-
317-
def get_insert_query(compression):
318-
return f"""
319-
insert into {table}
320-
select *, ('{bucket}', '{key}')
321-
from s3('{url}', 'JSONEachRow', '{schema}', '{compression}')
322-
"""
323-
324-
try:
325-
get_clickhouse_client().query(get_insert_query("gzip"))
326-
except:
327-
get_clickhouse_client().query(get_insert_query("none"))
258+
general_adapter(table, bucket, key, schema, ["gzip", "none"], "JSONEachRow")
328259

329260

330261
def external_contribution_stats_adapter(table, bucket, key) -> None:
@@ -334,28 +265,22 @@ def external_contribution_stats_adapter(table, bucket, key) -> None:
334265
`user_count` Int64,
335266
`users` Array(String)
336267
"""
337-
url = f"https://{bucket}.s3.amazonaws.com/{encode_url_component(key)}"
338-
339-
def get_insert_query(compression):
340-
return f"""
341-
insert into {table}
342-
select *, ('{bucket}', '{key}')
343-
from s3('{url}', 'JSONEachRow', '{schema}', '{compression}',
344-
extra_credentials(
345-
role_arn = 'arn:aws:iam::308535385114:role/clickhouse_role'
346-
)
347-
)
348-
"""
349-
350-
try:
351-
get_clickhouse_client().query(get_insert_query("gzip"))
352-
except Exception as e:
353-
get_clickhouse_client().query(
354-
f"insert into errors.gen_errors VALUES ('{table}', '{bucket}', '{key}', '{json.dumps(str(e))}')"
355-
)
268+
general_adapter(table, bucket, key, schema, ["gzip"], "JSONEachRow")
269+
270+
271+
def log_failure_to_clickhouse(table, bucket, key, error) -> None:
272+
error = {
273+
"table": table,
274+
"bucket": bucket,
275+
"key": key,
276+
"reason": str(error),
277+
}
278+
get_clickhouse_client().query(
279+
f"insert into errors.gen_errors format JSONEachRow {json.dumps(error)}"
280+
)
356281

357282

358-
def general_adapter(table, bucket, key, schema, compression, format) -> None:
283+
def general_adapter(table, bucket, key, schema, compressions, format) -> None:
359284
url = f"https://{bucket}.s3.amazonaws.com/{encode_url_component(key)}"
360285

361286
def get_insert_query(compression):
@@ -370,11 +295,18 @@ def get_insert_query(compression):
370295
"""
371296

372297
try:
373-
get_clickhouse_client().query(get_insert_query(compression))
374-
except Exception as e:
375-
get_clickhouse_client().query(
376-
f"insert into errors.gen_errors values ('{table}', '{bucket}', '{key}', '{json.dumps(str(e))}')"
298+
exceptions = []
299+
for compression in compressions:
300+
try:
301+
get_clickhouse_client().query(get_insert_query(compression))
302+
return
303+
except Exception as e:
304+
exceptions.append(e)
305+
raise Exception(
306+
f"Failed to insert into {table} with {[str(x) for x in exceptions]}"
377307
)
308+
except Exception as e:
309+
log_failure_to_clickhouse(table, bucket, key, e)
378310

379311

380312
def external_aggregated_test_metrics_adapter(table, bucket, key) -> None:
@@ -399,7 +331,7 @@ def external_aggregated_test_metrics_adapter(table, bucket, key) -> None:
399331
`workflow_name` String,
400332
`workflow_run_attempt` Int64
401333
"""
402-
general_adapter(table, bucket, key, schema, "gzip", "JSONEachRow")
334+
general_adapter(table, bucket, key, schema, ["gzip"], "JSONEachRow")
403335

404336

405337
def torchao_perf_stats_adapter(table, bucket, key) -> None:
@@ -444,7 +376,7 @@ def torchao_perf_stats_adapter(table, bucket, key) -> None:
444376
`unique_graphs` String,
445377
`workflow_id` String
446378
"""
447-
general_adapter(table, bucket, key, schema, "none", "CSV")
379+
general_adapter(table, bucket, key, schema, ["none"], "CSV")
448380

449381

450382
def torchbench_userbenchmark_adapter(table, bucket, key):
@@ -453,7 +385,7 @@ def torchbench_userbenchmark_adapter(table, bucket, key):
453385
`metrics` String,
454386
`name` String
455387
"""
456-
general_adapter(table, bucket, key, schema, "none", "JSONEachRow")
388+
general_adapter(table, bucket, key, schema, ["none"], "JSONEachRow")
457389

458390

459391
SUPPORTED_PATHS = {

tools/rockset_migration/dynamo2ch.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ def upload_to_clickhouse(records, table):
160160
# Async insert to maybe make insertions more efficient on the ClickHouse side
161161
# https://clickhouse.com/docs/en/cloud/bestpractices/asynchronous-inserts
162162
get_clickhouse_client().query(
163-
f"INSERT INTO `{table}` SETTINGS async_insert=1, wait_for_async_insert=1 FORMAT JSONEachRow {body}"
163+
f"INSERT INTO {table} SETTINGS async_insert=1, wait_for_async_insert=1 FORMAT JSONEachRow {body}"
164164
)
165165

166166

0 commit comments

Comments
 (0)