Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ jobs:
- example: iceberg-dedup
path: iceberg-data-deduplication
test_commands: |
compile dedup_package.json
compile data_deduplication_package.json

- example: iot-sensor
path: iot-sensor-metrics
Expand Down
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,8 @@ Temporary Items

py-venv
# End of https://www.toptal.com/developers/gitignore/api/macos

##performance benchmarking
node_modules/
iceberg-data/
sqrl-iceberg-data/
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
-- This script runs a deduplication compaction on Iceberg tables
-- It currently assumes a single primary partition column and a single time bucket partition column based on seconds

IMPORT tables.{{tableName}};
IMPORT tables.compaction_tracker;
IMPORT tables.{{tableName}}.*;
IMPORT tables.compaction_tracker.*;
IMPORT udfs.read_partition_sizes;
IMPORT udfs.delete_duplicated_data;

--
-- Part 1: Gather partitions to compact
--

_AllPartitions :=
SELECT partition_map['{{partitionCol}}'] AS partition_id,
Expand Down Expand Up @@ -52,41 +47,17 @@ SELECT DISTINCT '{{tableName}}' AS table_name,
partition_id,
new_rel_percentage,
new_abs_percentage,
FLOOR((CAST('${DEPLOYMENT_TIMESTAMP}' AS BIGINT) - {{bufferSeconds}}) / {{bucketSeconds}}) * {{bucketSeconds}} AS max_time_bucket,
'initialize' AS action,
NOW() AS action_time
FLOOR((CAST('${DEPLOYMENT_TIMESTAMP}' AS BIGINT) - {{bufferSeconds}}) / {{bucketSeconds}}) * {{bucketSeconds}} AS max_time_bucket
FROM _PartitionPriority
WHERE cumulative_total_size <= {{maxTotalPartitionSizeInMB}} * 1024 * 1024
OR row_num <= 1;

INSERT INTO CompactionTracker SELECT * FROM _PartitionsToCompact;

NEXT_BATCH;

--
-- Part 2: Compact gathered partitions
--

_DataToCompact :=
SELECT DISTINCT table_name,
job_id,
partition_id,
new_rel_percentage,
new_abs_percentage,
max_time_bucket
FROM CompactionTracker
WHERE LOWER(table_name) = LOWER('{{tableName}}')
AND job_id = '${DEPLOYMENT_ID}'
AND action = 'initialize'
AND action_time > NOW() - INTERVAL '4' HOUR; -- last condition is for efficient pruning of iceberg read

_CastDataToCompact := SELECT * FROM _DataToCompact;
_CastDataToCompact.partition_id := CAST(partition_id AS {{partitionColType}});
_PartitionsToCompact.partition_id := CAST(partition_id AS {{partitionColType}});

_InputData :=
SELECT /*+ BROADCAST(c) */ d.*
FROM {{tableName}} AS d
JOIN _CastDataToCompact AS c
JOIN _PartitionsToCompact AS c
ON d.{{partitionCol}} = c.partition_id
AND d.{{timeBucketCol}} <= c.max_time_bucket;

Expand All @@ -98,60 +69,20 @@ INSERT OVERWRITE {{tableName}}
SELECT * FROM _DistInputData
-- filter only when "deleteFlagCol" has a value
{{#deleteFlagCol}}
WHERE {{deleteFlagCol}} IS TRUE
WHERE {{deleteFlagCol}} IS NOT TRUE
{{/deleteFlagCol}}
-- sort by partition column, and optionally any other column(s)
ORDER BY {{partitionCol}} {{#sortKey}}, {{sortKey}}{{/sortKey}};

_CompactionResult :=
SELECT table_name,
job_id,
partition_id,
SELECT CAST(table_name AS STRING),
CAST(job_id AS STRING),
CAST(partition_id AS STRING),
new_rel_percentage,
new_abs_percentage,
max_time_bucket,
'overwrite' AS action,
NOW() AS action_time
FROM _DataToCompact;
FROM _PartitionsToCompact;

INSERT INTO CompactionTracker SELECT * FROM _CompactionResult;

NEXT_BATCH;

--
-- Part 3: Delete source partitions that were compacted
--

_DataToDelete :=
SELECT DISTINCT table_name,
job_id,
partition_id,
new_rel_percentage,
new_abs_percentage,
max_time_bucket
FROM CompactionTracker
WHERE LOWER(table_name) = LOWER('{{tableName}}')
AND job_id = '${DEPLOYMENT_ID}'
AND action = 'overwrite'
AND action_time > NOW() - INTERVAL '4' HOUR;

_DeleteFnResult :=
SELECT delete_duplicated_data(
'{{warehouse}}',
'{{catalogType}}',
'{{catalogName}}',
'{{databaseName}}',
MAX(table_name),
MAX(max_time_bucket),
COLLECT(MAP['{{partitionCol}}', partition_id])) AS res
FROM _DataToDelete;

_DeleteResult :=
SELECT
d.*,
CASE WHEN fn.res = true THEN 'delete' ELSE 'delete_failed' END AS action,
NOW() AS action_time
FROM _DataToDelete AS d
CROSS JOIN (SELECT res FROM _DeleteFnResult LIMIT 1) AS fn;

INSERT INTO CompactionTracker SELECT * FROM _DeleteResult;
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
"version": "1",
"enabled-engines": ["flink"],
"script": {
"main": "dedup.sqrl",
"main": "data_deduplication.sqrl",
"config": {
"warehouse": "/tmp/duckdb",
"warehouse": "sqrl-iceberg-data",
"catalogType": "hadoop",
"catalogName": "mycatalog",
"databaseName": "mydatabase",
"tableName": "CdcData",
"tableName": "cdc_data",
"distinctKeyCol": "account_id",
"partitionCol": "account_id",
"partitionColType": "bigint",
Expand Down
46 changes: 46 additions & 0 deletions iceberg-data-deduplication/data_delete.sqrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
IMPORT tables.compaction_tracker.*;
IMPORT udfs.delete_duplicated_data;

_DataToDelete :=
SELECT table_name,
job_id,
partition_id,
new_rel_percentage,
new_abs_percentage,
max_time_bucket
FROM (
SELECT table_name,
job_id,
partition_id,
new_rel_percentage,
new_abs_percentage,
max_time_bucket,
action,
ROW_NUMBER() OVER (PARTITION BY table_name, partition_id ORDER BY action_time DESC) AS row_num
FROM CompactionTracker
WHERE action_time < NOW() - INTERVAL '{{commit_wait_sec}}' SECOND AND action_time > NOW() - INTERVAL '24' HOUR -- time buffer to ensure commit happened and stop attempting to delete
)
WHERE row_num = 1 AND action <> 'delete'; -- compacted partitions haven't been deleted yet

--TODO: need extra safety: validate that the selected partitions should be deleted by checking that the last partition update time < partition 0 update time

_DeleteFnResult :=
SELECT delete_duplicated_data(
'{{warehouse}}',
'{{catalogType}}',
'{{catalogName}}',
'{{databaseName}}',
MAX(table_name),
MAX(max_time_bucket),
COLLECT(MAP['{{partitionCol}}', partition_id])) AS res
FROM _DataToDelete;

_DeleteResult :=
SELECT
d.*,
CASE WHEN fn.res = true THEN 'delete' ELSE 'delete_failed' END AS action,
NOW() AS action_time
FROM _DataToDelete AS d
CROSS JOIN (SELECT res FROM _DeleteFnResult LIMIT 1) AS fn;

INSERT INTO CompactionTracker SELECT * FROM _DeleteResult;
22 changes: 22 additions & 0 deletions iceberg-data-deduplication/data_delete_package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"version": "1",
"enabled-engines": ["flink"],
"script": {
"main": "data_delete.sqrl",
"config": {
"warehouse": "sqrl-iceberg-data",
"catalogType": "hadoop",
"catalogName": "mycatalog",
"databaseName": "mydatabase",
"partitionCol": "account_id",
"commit_wait_sec": 1
}
},
"engines": {
"flink": {
"config": {
"execution.runtime-mode": "batch"
}
}
}
}
8 changes: 6 additions & 2 deletions iceberg-data-deduplication/datagen.sqrl
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
IMPORT tables.min_source AS _CDC;
IMPORT tables.CdcData;
IMPORT tables.{{tableName}}.*;

INSERT INTO CdcData SELECT c.*, EXTRACT(SECOND FROM c.ts) + 1 AS time_bucket FROM _CDC c;
CountRecords := SELECT COUNT(*) AS num_records FROM _CDC.SrcRecords;

EXPORT CountRecords TO logger.records;

INSERT INTO {{tableName}} SELECT c.*, EXTRACT(SECOND FROM c.ts) + 1 AS time_bucket FROM _CDC.SrcRecords c;
26 changes: 0 additions & 26 deletions iceberg-data-deduplication/datagen_aws_package.json

This file was deleted.

9 changes: 8 additions & 1 deletion iceberg-data-deduplication/datagen_package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@
"version": "1",
"enabled-engines": ["flink"],
"script": {
"main": "datagen.sqrl"
"main": "datagen.sqrl",
"config": {
"warehouse": "sqrl-iceberg-data",
"catalogType": "hadoop",
"catalogName": "mycatalog",
"databaseName": "mydatabase",
"tableName": "cdc_data"
}
},
"engines": {
"flink": {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
CREATE TABLE `CdcData` (
CREATE TABLE `cdc_data` (
`account_id` BIGINT NOT NULL,
`id` BIGINT NOT NULL,
`pt` STRING NOT NULL,
Expand All @@ -10,10 +10,10 @@ CREATE TABLE `CdcData` (
) PARTITIONED BY (`account_id`, `id`, `time_bucket`)
WITH (
'connector' = 'iceberg',
'catalog-name' = 'mycatalog',
'catalog-database' = 'mydatabase',
'catalog-table' = 'cdcdata',
'catalog-type' = 'hadoop',
'warehouse' = '/tmp/duckdb',
'catalog-name' = '{{catalogName}}',
'catalog-database' = '{{databaseName}}',
'catalog-table' = '{{tableName}}',
'catalog-type' = '{{catalogType}}',
'warehouse' = '{{warehouse}}',
'format-version' = '2'
);
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ CREATE TABLE `CompactionTracker` (
) PARTITIONED BY (`table_name`)
WITH (
'connector' = 'iceberg',
'catalog-name' = 'mycatalog',
'catalog-database' = 'mydatabase',
'catalog-table' = 'compactiontracker',
'catalog-type' = 'hadoop',
'warehouse' = '/tmp/duckdb',
'catalog-name' = '{{catalogName}}',
'catalog-database' = '{{databaseName}}',
'catalog-table' = 'compaction_tracker',
'catalog-type' = '{{catalogType}}',
'warehouse' = '{{warehouse}}',
'format-version' = '2'
);
Loading