diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index a9524dd3..30f8673f 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -73,7 +73,7 @@ jobs: - example: iceberg-dedup path: iceberg-data-deduplication test_commands: | - compile dedup_package.json + compile data_deduplication_package.json - example: iot-sensor path: iot-sensor-metrics diff --git a/.gitignore b/.gitignore index 99069b9c..3ff07544 100644 --- a/.gitignore +++ b/.gitignore @@ -224,3 +224,8 @@ Temporary Items py-venv # End of https://www.toptal.com/developers/gitignore/api/macos + +##performance benchmarking +node_modules/ +iceberg-data/ +sqrl-iceberg-data/ \ No newline at end of file diff --git a/iceberg-data-deduplication/dedup.sqrl b/iceberg-data-deduplication/data_deduplication.sqrl similarity index 58% rename from iceberg-data-deduplication/dedup.sqrl rename to iceberg-data-deduplication/data_deduplication.sqrl index f3e75331..26f10b83 100644 --- a/iceberg-data-deduplication/dedup.sqrl +++ b/iceberg-data-deduplication/data_deduplication.sqrl @@ -1,14 +1,9 @@ -- This script runs a deduplication compaction on Iceberg tables -- It currently assumes a single primary partition column and a single time bucket partition column based on seconds -IMPORT tables.{{tableName}}; -IMPORT tables.compaction_tracker; +IMPORT tables.{{tableName}}.*; +IMPORT tables.compaction_tracker.*; IMPORT udfs.read_partition_sizes; -IMPORT udfs.delete_duplicated_data; - --- --- Part 1: Gather partitions to compact --- _AllPartitions := SELECT partition_map['{{partitionCol}}'] AS partition_id, @@ -52,41 +47,17 @@ SELECT DISTINCT '{{tableName}}' AS table_name, partition_id, new_rel_percentage, new_abs_percentage, - FLOOR((CAST('${DEPLOYMENT_TIMESTAMP}' AS BIGINT) - {{bufferSeconds}}) / {{bucketSeconds}}) * {{bucketSeconds}} AS max_time_bucket, - 'initialize' AS action, - NOW() AS action_time + FLOOR((CAST('${DEPLOYMENT_TIMESTAMP}' AS BIGINT) - {{bufferSeconds}}) / {{bucketSeconds}}) * {{bucketSeconds}} AS max_time_bucket FROM _PartitionPriority WHERE cumulative_total_size <= {{maxTotalPartitionSizeInMB}} * 1024 * 1024 OR row_num <= 1; -INSERT INTO CompactionTracker SELECT * FROM _PartitionsToCompact; - -NEXT_BATCH; - --- --- Part 2: Compact gathered partitions --- - -_DataToCompact := -SELECT DISTINCT table_name, - job_id, - partition_id, - new_rel_percentage, - new_abs_percentage, - max_time_bucket -FROM CompactionTracker -WHERE LOWER(table_name) = LOWER('{{tableName}}') - AND job_id = '${DEPLOYMENT_ID}' - AND action = 'initialize' - AND action_time > NOW() - INTERVAL '4' HOUR; -- last condition is for efficient pruning of iceberg read - -_CastDataToCompact := SELECT * FROM _DataToCompact; -_CastDataToCompact.partition_id := CAST(partition_id AS {{partitionColType}}); +_PartitionsToCompact.partition_id := CAST(partition_id AS {{partitionColType}}); _InputData := SELECT /*+ BROADCAST(c) */ d.* FROM {{tableName}} AS d -JOIN _CastDataToCompact AS c +JOIN _PartitionsToCompact AS c ON d.{{partitionCol}} = c.partition_id AND d.{{timeBucketCol}} <= c.max_time_bucket; @@ -98,60 +69,20 @@ INSERT OVERWRITE {{tableName}} SELECT * FROM _DistInputData -- filter only when "deleteFlagCol" has a value {{#deleteFlagCol}} - WHERE {{deleteFlagCol}} IS TRUE + WHERE {{deleteFlagCol}} IS NOT TRUE {{/deleteFlagCol}} -- sort by partition column, and optionally any other column(s) ORDER BY {{partitionCol}} {{#sortKey}}, {{sortKey}}{{/sortKey}}; _CompactionResult := -SELECT table_name, - job_id, - partition_id, +SELECT CAST(table_name AS STRING), + CAST(job_id AS STRING), + CAST(partition_id AS STRING), new_rel_percentage, new_abs_percentage, max_time_bucket, 'overwrite' AS action, NOW() AS action_time -FROM _DataToCompact; +FROM _PartitionsToCompact; INSERT INTO CompactionTracker SELECT * FROM _CompactionResult; - -NEXT_BATCH; - --- --- Part 3: Delete source partitions that were compacted --- - -_DataToDelete := -SELECT DISTINCT table_name, - job_id, - partition_id, - new_rel_percentage, - new_abs_percentage, - max_time_bucket -FROM CompactionTracker -WHERE LOWER(table_name) = LOWER('{{tableName}}') - AND job_id = '${DEPLOYMENT_ID}' - AND action = 'overwrite' - AND action_time > NOW() - INTERVAL '4' HOUR; - -_DeleteFnResult := -SELECT delete_duplicated_data( - '{{warehouse}}', - '{{catalogType}}', - '{{catalogName}}', - '{{databaseName}}', - MAX(table_name), - MAX(max_time_bucket), - COLLECT(MAP['{{partitionCol}}', partition_id])) AS res -FROM _DataToDelete; - -_DeleteResult := -SELECT - d.*, - CASE WHEN fn.res = true THEN 'delete' ELSE 'delete_failed' END AS action, - NOW() AS action_time -FROM _DataToDelete AS d - CROSS JOIN (SELECT res FROM _DeleteFnResult LIMIT 1) AS fn; - -INSERT INTO CompactionTracker SELECT * FROM _DeleteResult; diff --git a/iceberg-data-deduplication/dedup_package.json b/iceberg-data-deduplication/data_deduplication_package.json similarity index 86% rename from iceberg-data-deduplication/dedup_package.json rename to iceberg-data-deduplication/data_deduplication_package.json index 6e5ba1e5..b7fff2b0 100644 --- a/iceberg-data-deduplication/dedup_package.json +++ b/iceberg-data-deduplication/data_deduplication_package.json @@ -2,13 +2,13 @@ "version": "1", "enabled-engines": ["flink"], "script": { - "main": "dedup.sqrl", + "main": "data_deduplication.sqrl", "config": { - "warehouse": "/tmp/duckdb", + "warehouse": "sqrl-iceberg-data", "catalogType": "hadoop", "catalogName": "mycatalog", "databaseName": "mydatabase", - "tableName": "CdcData", + "tableName": "cdc_data", "distinctKeyCol": "account_id", "partitionCol": "account_id", "partitionColType": "bigint", diff --git a/iceberg-data-deduplication/data_delete.sqrl b/iceberg-data-deduplication/data_delete.sqrl new file mode 100644 index 00000000..5be30809 --- /dev/null +++ b/iceberg-data-deduplication/data_delete.sqrl @@ -0,0 +1,46 @@ +IMPORT tables.compaction_tracker.*; +IMPORT udfs.delete_duplicated_data; + +_DataToDelete := +SELECT table_name, + job_id, + partition_id, + new_rel_percentage, + new_abs_percentage, + max_time_bucket +FROM ( + SELECT table_name, + job_id, + partition_id, + new_rel_percentage, + new_abs_percentage, + max_time_bucket, + action, + ROW_NUMBER() OVER (PARTITION BY table_name, partition_id ORDER BY action_time DESC) AS row_num + FROM CompactionTracker + WHERE action_time < NOW() - INTERVAL '{{commit_wait_sec}}' SECOND AND action_time > NOW() - INTERVAL '24' HOUR -- time buffer to ensure commit happened and stop attempting to delete +) +WHERE row_num = 1 AND action <> 'delete'; -- compacted partitions haven't been deleted yet + +--TODO: need extra safety: validate that the selected partitions should be deleted by checking that the last partition update time < partition 0 update time + +_DeleteFnResult := +SELECT delete_duplicated_data( + '{{warehouse}}', + '{{catalogType}}', + '{{catalogName}}', + '{{databaseName}}', + MAX(table_name), + MAX(max_time_bucket), + COLLECT(MAP['{{partitionCol}}', partition_id])) AS res +FROM _DataToDelete; + +_DeleteResult := +SELECT + d.*, + CASE WHEN fn.res = true THEN 'delete' ELSE 'delete_failed' END AS action, + NOW() AS action_time +FROM _DataToDelete AS d + CROSS JOIN (SELECT res FROM _DeleteFnResult LIMIT 1) AS fn; + +INSERT INTO CompactionTracker SELECT * FROM _DeleteResult; diff --git a/iceberg-data-deduplication/data_delete_package.json b/iceberg-data-deduplication/data_delete_package.json new file mode 100644 index 00000000..9c85017d --- /dev/null +++ b/iceberg-data-deduplication/data_delete_package.json @@ -0,0 +1,22 @@ +{ + "version": "1", + "enabled-engines": ["flink"], + "script": { + "main": "data_delete.sqrl", + "config": { + "warehouse": "sqrl-iceberg-data", + "catalogType": "hadoop", + "catalogName": "mycatalog", + "databaseName": "mydatabase", + "partitionCol": "account_id", + "commit_wait_sec": 1 + } + }, + "engines": { + "flink": { + "config": { + "execution.runtime-mode": "batch" + } + } + } +} diff --git a/iceberg-data-deduplication/datagen.sqrl b/iceberg-data-deduplication/datagen.sqrl index ff238792..93a320c7 100644 --- a/iceberg-data-deduplication/datagen.sqrl +++ b/iceberg-data-deduplication/datagen.sqrl @@ -1,4 +1,8 @@ IMPORT tables.min_source AS _CDC; -IMPORT tables.CdcData; +IMPORT tables.{{tableName}}.*; -INSERT INTO CdcData SELECT c.*, EXTRACT(SECOND FROM c.ts) + 1 AS time_bucket FROM _CDC c; +CountRecords := SELECT COUNT(*) AS num_records FROM _CDC.SrcRecords; + +EXPORT CountRecords TO logger.records; + +INSERT INTO {{tableName}} SELECT c.*, EXTRACT(SECOND FROM c.ts) + 1 AS time_bucket FROM _CDC.SrcRecords c; diff --git a/iceberg-data-deduplication/datagen_aws_package.json b/iceberg-data-deduplication/datagen_aws_package.json deleted file mode 100644 index 7cb0a6dd..00000000 --- a/iceberg-data-deduplication/datagen_aws_package.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "version": "1", - "enabled-engines": ["flink"], - "script": { - "main": "datagen.sqrl" - }, - "engines": { - "flink": { - "config": { - "execution.runtime-mode": "batch", - "execution.checkpointing.interval": "8 s", - "table.exec.source.idle-timeout": "1 ms" - } - } - }, - "connectors" : { - "iceberg" : { - "connector": "iceberg", - "warehouse": "s3://ferenc-iceberg-glue-test", - "catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog", - "catalog-name": "mycatalog", - "catalog-database": "mydatabase", - "format-version": 2 - } - } -} diff --git a/iceberg-data-deduplication/datagen_package.json b/iceberg-data-deduplication/datagen_package.json index 41022341..f916c5dd 100644 --- a/iceberg-data-deduplication/datagen_package.json +++ b/iceberg-data-deduplication/datagen_package.json @@ -2,7 +2,14 @@ "version": "1", "enabled-engines": ["flink"], "script": { - "main": "datagen.sqrl" + "main": "datagen.sqrl", + "config": { + "warehouse": "sqrl-iceberg-data", + "catalogType": "hadoop", + "catalogName": "mycatalog", + "databaseName": "mydatabase", + "tableName": "cdc_data" + } }, "engines": { "flink": { diff --git a/iceberg-data-deduplication/tables/CdcData.table.sql b/iceberg-data-deduplication/tables/cdc_data.sqrl similarity index 66% rename from iceberg-data-deduplication/tables/CdcData.table.sql rename to iceberg-data-deduplication/tables/cdc_data.sqrl index 4aee44fd..e4932321 100644 --- a/iceberg-data-deduplication/tables/CdcData.table.sql +++ b/iceberg-data-deduplication/tables/cdc_data.sqrl @@ -1,4 +1,4 @@ -CREATE TABLE `CdcData` ( +CREATE TABLE `cdc_data` ( `account_id` BIGINT NOT NULL, `id` BIGINT NOT NULL, `pt` STRING NOT NULL, @@ -10,10 +10,10 @@ CREATE TABLE `CdcData` ( ) PARTITIONED BY (`account_id`, `id`, `time_bucket`) WITH ( 'connector' = 'iceberg', - 'catalog-name' = 'mycatalog', - 'catalog-database' = 'mydatabase', - 'catalog-table' = 'cdcdata', - 'catalog-type' = 'hadoop', - 'warehouse' = '/tmp/duckdb', + 'catalog-name' = '{{catalogName}}', + 'catalog-database' = '{{databaseName}}', + 'catalog-table' = '{{tableName}}', + 'catalog-type' = '{{catalogType}}', + 'warehouse' = '{{warehouse}}', 'format-version' = '2' ); diff --git a/iceberg-data-deduplication/tables/compaction_tracker.table.sql b/iceberg-data-deduplication/tables/compaction_tracker.sqrl similarity index 70% rename from iceberg-data-deduplication/tables/compaction_tracker.table.sql rename to iceberg-data-deduplication/tables/compaction_tracker.sqrl index 27bf5d28..7df2bb62 100644 --- a/iceberg-data-deduplication/tables/compaction_tracker.table.sql +++ b/iceberg-data-deduplication/tables/compaction_tracker.sqrl @@ -10,10 +10,10 @@ CREATE TABLE `CompactionTracker` ( ) PARTITIONED BY (`table_name`) WITH ( 'connector' = 'iceberg', - 'catalog-name' = 'mycatalog', - 'catalog-database' = 'mydatabase', - 'catalog-table' = 'compactiontracker', - 'catalog-type' = 'hadoop', - 'warehouse' = '/tmp/duckdb', + 'catalog-name' = '{{catalogName}}', + 'catalog-database' = '{{databaseName}}', + 'catalog-table' = 'compaction_tracker', + 'catalog-type' = '{{catalogType}}', + 'warehouse' = '{{warehouse}}', 'format-version' = '2' ); diff --git a/iceberg-data-deduplication/tables/min_source.table.sql b/iceberg-data-deduplication/tables/min_source.sqrl similarity index 100% rename from iceberg-data-deduplication/tables/min_source.table.sql rename to iceberg-data-deduplication/tables/min_source.sqrl