Skip to content

Commit 524ef8c

Browse files
perf(ingest/redshift): limit copy lineage (#11662)
Co-authored-by: Mayuri Nehate <[email protected]>
1 parent 8638bf9 commit 524ef8c

File tree

1 file changed

+29
-18
lines changed
  • metadata-ingestion/src/datahub/ingestion/source/redshift

1 file changed

+29
-18
lines changed

metadata-ingestion/src/datahub/ingestion/source/redshift/query.py

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
# We use 290 instead instead of the standard 320, because escape characters can add to the length.
1010
_QUERY_SEQUENCE_LIMIT = 290
1111

12+
_MAX_COPY_ENTRIES_PER_TABLE = 20
13+
1214

1315
class RedshiftCommonQuery:
1416
CREATE_TEMP_TABLE_CLAUSE = "create temp table"
@@ -293,28 +295,37 @@ def alter_table_rename_query(
293295
def list_copy_commands_sql(
294296
db_name: str, start_time: datetime, end_time: datetime
295297
) -> str:
296-
return """
297-
select
298-
distinct
299-
"schema" as target_schema,
300-
"table" as target_table,
301-
c.file_name as filename
302-
from
303-
SYS_QUERY_DETAIL as si
304-
join SYS_LOAD_DETAIL as c on
305-
si.query_id = c.query_id
306-
join SVV_TABLE_INFO sti on
307-
sti.table_id = si.table_id
308-
where
309-
database = '{db_name}'
310-
and si.start_time >= '{start_time}'
311-
and si.start_time < '{end_time}'
312-
order by target_schema, target_table, si.start_time asc
313-
""".format(
298+
return """\
299+
SELECT DISTINCT
300+
target_schema,
301+
target_table,
302+
filename
303+
FROM (
304+
SELECT
305+
sti."schema" AS target_schema,
306+
sti."table" AS target_table,
307+
c.file_name AS filename,
308+
ROW_NUMBER() OVER (
309+
PARTITION BY sti."schema", sti."table"
310+
ORDER BY si.start_time DESC
311+
) AS rn
312+
FROM
313+
SYS_QUERY_DETAIL AS si
314+
JOIN SYS_LOAD_DETAIL AS c ON si.query_id = c.query_id
315+
JOIN SVV_TABLE_INFO sti ON sti.table_id = si.table_id
316+
WHERE
317+
sti.database = '{db_name}'
318+
AND si.start_time >= '{start_time}'
319+
AND si.start_time < '{end_time}'
320+
) subquery
321+
WHERE rn <= {_MAX_COPY_ENTRIES_PER_TABLE}
322+
ORDER BY target_schema, target_table, filename
323+
""".format(
314324
# We need the original database name for filtering
315325
db_name=db_name,
316326
start_time=start_time.strftime(redshift_datetime_format),
317327
end_time=end_time.strftime(redshift_datetime_format),
328+
_MAX_COPY_ENTRIES_PER_TABLE=_MAX_COPY_ENTRIES_PER_TABLE,
318329
)
319330

320331
@staticmethod

0 commit comments

Comments
 (0)