Skip to content

Commit 77d33fa

Browse files
authored
Rework Iceberg deduplication example (#103)
1 parent 1aa8430 commit 77d33fa

File tree

12 files changed

+112
-123
lines changed

12 files changed

+112
-123
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ jobs:
7373
- example: iceberg-dedup
7474
path: iceberg-data-deduplication
7575
test_commands: |
76-
compile dedup_package.json
76+
compile data_deduplication_package.json
7777
7878
- example: iot-sensor
7979
path: iot-sensor-metrics

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,3 +224,8 @@ Temporary Items
224224

225225
py-venv
226226
# End of https://www.toptal.com/developers/gitignore/api/macos
227+
228+
##performance benchmarking
229+
node_modules/
230+
iceberg-data/
231+
sqrl-iceberg-data/
Lines changed: 10 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
11
-- This script runs a deduplication compaction on Iceberg tables
22
-- It currently assumes a single primary partition column and a single time bucket partition column based on seconds
33

4-
IMPORT tables.{{tableName}};
5-
IMPORT tables.compaction_tracker;
4+
IMPORT tables.{{tableName}}.*;
5+
IMPORT tables.compaction_tracker.*;
66
IMPORT udfs.read_partition_sizes;
7-
IMPORT udfs.delete_duplicated_data;
8-
9-
--
10-
-- Part 1: Gather partitions to compact
11-
--
127

138
_AllPartitions :=
149
SELECT partition_map['{{partitionCol}}'] AS partition_id,
@@ -52,41 +47,17 @@ SELECT DISTINCT '{{tableName}}' AS table_name,
5247
partition_id,
5348
new_rel_percentage,
5449
new_abs_percentage,
55-
FLOOR((CAST('${DEPLOYMENT_TIMESTAMP}' AS BIGINT) - {{bufferSeconds}}) / {{bucketSeconds}}) * {{bucketSeconds}} AS max_time_bucket,
56-
'initialize' AS action,
57-
NOW() AS action_time
50+
FLOOR((CAST('${DEPLOYMENT_TIMESTAMP}' AS BIGINT) - {{bufferSeconds}}) / {{bucketSeconds}}) * {{bucketSeconds}} AS max_time_bucket
5851
FROM _PartitionPriority
5952
WHERE cumulative_total_size <= {{maxTotalPartitionSizeInMB}} * 1024 * 1024
6053
OR row_num <= 1;
6154

62-
INSERT INTO CompactionTracker SELECT * FROM _PartitionsToCompact;
63-
64-
NEXT_BATCH;
65-
66-
--
67-
-- Part 2: Compact gathered partitions
68-
--
69-
70-
_DataToCompact :=
71-
SELECT DISTINCT table_name,
72-
job_id,
73-
partition_id,
74-
new_rel_percentage,
75-
new_abs_percentage,
76-
max_time_bucket
77-
FROM CompactionTracker
78-
WHERE LOWER(table_name) = LOWER('{{tableName}}')
79-
AND job_id = '${DEPLOYMENT_ID}'
80-
AND action = 'initialize'
81-
AND action_time > NOW() - INTERVAL '4' HOUR; -- last condition is for efficient pruning of iceberg read
82-
83-
_CastDataToCompact := SELECT * FROM _DataToCompact;
84-
_CastDataToCompact.partition_id := CAST(partition_id AS {{partitionColType}});
55+
_PartitionsToCompact.partition_id := CAST(partition_id AS {{partitionColType}});
8556

8657
_InputData :=
8758
SELECT /*+ BROADCAST(c) */ d.*
8859
FROM {{tableName}} AS d
89-
JOIN _CastDataToCompact AS c
60+
JOIN _PartitionsToCompact AS c
9061
ON d.{{partitionCol}} = c.partition_id
9162
AND d.{{timeBucketCol}} <= c.max_time_bucket;
9263

@@ -98,60 +69,20 @@ INSERT OVERWRITE {{tableName}}
9869
SELECT * FROM _DistInputData
9970
-- filter only when "deleteFlagCol" has a value
10071
{{#deleteFlagCol}}
101-
WHERE {{deleteFlagCol}} IS TRUE
72+
WHERE {{deleteFlagCol}} IS NOT TRUE
10273
{{/deleteFlagCol}}
10374
-- sort by partition column, and optionally any other column(s)
10475
ORDER BY {{partitionCol}} {{#sortKey}}, {{sortKey}}{{/sortKey}};
10576

10677
_CompactionResult :=
107-
SELECT table_name,
108-
job_id,
109-
partition_id,
78+
SELECT CAST(table_name AS STRING),
79+
CAST(job_id AS STRING),
80+
CAST(partition_id AS STRING),
11081
new_rel_percentage,
11182
new_abs_percentage,
11283
max_time_bucket,
11384
'overwrite' AS action,
11485
NOW() AS action_time
115-
FROM _DataToCompact;
86+
FROM _PartitionsToCompact;
11687

11788
INSERT INTO CompactionTracker SELECT * FROM _CompactionResult;
118-
119-
NEXT_BATCH;
120-
121-
--
122-
-- Part 3: Delete source partitions that were compacted
123-
--
124-
125-
_DataToDelete :=
126-
SELECT DISTINCT table_name,
127-
job_id,
128-
partition_id,
129-
new_rel_percentage,
130-
new_abs_percentage,
131-
max_time_bucket
132-
FROM CompactionTracker
133-
WHERE LOWER(table_name) = LOWER('{{tableName}}')
134-
AND job_id = '${DEPLOYMENT_ID}'
135-
AND action = 'overwrite'
136-
AND action_time > NOW() - INTERVAL '4' HOUR;
137-
138-
_DeleteFnResult :=
139-
SELECT delete_duplicated_data(
140-
'{{warehouse}}',
141-
'{{catalogType}}',
142-
'{{catalogName}}',
143-
'{{databaseName}}',
144-
MAX(table_name),
145-
MAX(max_time_bucket),
146-
COLLECT(MAP['{{partitionCol}}', partition_id])) AS res
147-
FROM _DataToDelete;
148-
149-
_DeleteResult :=
150-
SELECT
151-
d.*,
152-
CASE WHEN fn.res = true THEN 'delete' ELSE 'delete_failed' END AS action,
153-
NOW() AS action_time
154-
FROM _DataToDelete AS d
155-
CROSS JOIN (SELECT res FROM _DeleteFnResult LIMIT 1) AS fn;
156-
157-
INSERT INTO CompactionTracker SELECT * FROM _DeleteResult;

iceberg-data-deduplication/dedup_package.json renamed to iceberg-data-deduplication/data_deduplication_package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@
22
"version": "1",
33
"enabled-engines": ["flink"],
44
"script": {
5-
"main": "dedup.sqrl",
5+
"main": "data_deduplication.sqrl",
66
"config": {
7-
"warehouse": "/tmp/duckdb",
7+
"warehouse": "sqrl-iceberg-data",
88
"catalogType": "hadoop",
99
"catalogName": "mycatalog",
1010
"databaseName": "mydatabase",
11-
"tableName": "CdcData",
11+
"tableName": "cdc_data",
1212
"distinctKeyCol": "account_id",
1313
"partitionCol": "account_id",
1414
"partitionColType": "bigint",
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
IMPORT tables.compaction_tracker.*;
2+
IMPORT udfs.delete_duplicated_data;
3+
4+
_DataToDelete :=
5+
SELECT table_name,
6+
job_id,
7+
partition_id,
8+
new_rel_percentage,
9+
new_abs_percentage,
10+
max_time_bucket
11+
FROM (
12+
SELECT table_name,
13+
job_id,
14+
partition_id,
15+
new_rel_percentage,
16+
new_abs_percentage,
17+
max_time_bucket,
18+
action,
19+
ROW_NUMBER() OVER (PARTITION BY table_name, partition_id ORDER BY action_time DESC) AS row_num
20+
FROM CompactionTracker
21+
WHERE action_time < NOW() - INTERVAL '{{commit_wait_sec}}' SECOND AND action_time > NOW() - INTERVAL '24' HOUR -- time buffer to ensure commit happened and stop attempting to delete
22+
)
23+
WHERE row_num = 1 AND action <> 'delete'; -- compacted partitions haven't been deleted yet
24+
25+
--TODO: need extra safety: validate that the selected partitions should be deleted by checking that the last partition update time < partition 0 update time
26+
27+
_DeleteFnResult :=
28+
SELECT delete_duplicated_data(
29+
'{{warehouse}}',
30+
'{{catalogType}}',
31+
'{{catalogName}}',
32+
'{{databaseName}}',
33+
MAX(table_name),
34+
MAX(max_time_bucket),
35+
COLLECT(MAP['{{partitionCol}}', partition_id])) AS res
36+
FROM _DataToDelete;
37+
38+
_DeleteResult :=
39+
SELECT
40+
d.*,
41+
CASE WHEN fn.res = true THEN 'delete' ELSE 'delete_failed' END AS action,
42+
NOW() AS action_time
43+
FROM _DataToDelete AS d
44+
CROSS JOIN (SELECT res FROM _DeleteFnResult LIMIT 1) AS fn;
45+
46+
INSERT INTO CompactionTracker SELECT * FROM _DeleteResult;
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{
2+
"version": "1",
3+
"enabled-engines": ["flink"],
4+
"script": {
5+
"main": "data_delete.sqrl",
6+
"config": {
7+
"warehouse": "sqrl-iceberg-data",
8+
"catalogType": "hadoop",
9+
"catalogName": "mycatalog",
10+
"databaseName": "mydatabase",
11+
"partitionCol": "account_id",
12+
"commit_wait_sec": 1
13+
}
14+
},
15+
"engines": {
16+
"flink": {
17+
"config": {
18+
"execution.runtime-mode": "batch"
19+
}
20+
}
21+
}
22+
}
Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
IMPORT tables.min_source AS _CDC;
2-
IMPORT tables.CdcData;
2+
IMPORT tables.{{tableName}}.*;
33

4-
INSERT INTO CdcData SELECT c.*, EXTRACT(SECOND FROM c.ts) + 1 AS time_bucket FROM _CDC c;
4+
CountRecords := SELECT COUNT(*) AS num_records FROM _CDC.SrcRecords;
5+
6+
EXPORT CountRecords TO logger.records;
7+
8+
INSERT INTO {{tableName}} SELECT c.*, EXTRACT(SECOND FROM c.ts) + 1 AS time_bucket FROM _CDC.SrcRecords c;

iceberg-data-deduplication/datagen_aws_package.json

Lines changed: 0 additions & 26 deletions
This file was deleted.

iceberg-data-deduplication/datagen_package.json

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,14 @@
22
"version": "1",
33
"enabled-engines": ["flink"],
44
"script": {
5-
"main": "datagen.sqrl"
5+
"main": "datagen.sqrl",
6+
"config": {
7+
"warehouse": "sqrl-iceberg-data",
8+
"catalogType": "hadoop",
9+
"catalogName": "mycatalog",
10+
"databaseName": "mydatabase",
11+
"tableName": "cdc_data"
12+
}
613
},
714
"engines": {
815
"flink": {
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
CREATE TABLE `CdcData` (
1+
CREATE TABLE `cdc_data` (
22
`account_id` BIGINT NOT NULL,
33
`id` BIGINT NOT NULL,
44
`pt` STRING NOT NULL,
@@ -10,10 +10,10 @@ CREATE TABLE `CdcData` (
1010
) PARTITIONED BY (`account_id`, `id`, `time_bucket`)
1111
WITH (
1212
'connector' = 'iceberg',
13-
'catalog-name' = 'mycatalog',
14-
'catalog-database' = 'mydatabase',
15-
'catalog-table' = 'cdcdata',
16-
'catalog-type' = 'hadoop',
17-
'warehouse' = '/tmp/duckdb',
13+
'catalog-name' = '{{catalogName}}',
14+
'catalog-database' = '{{databaseName}}',
15+
'catalog-table' = '{{tableName}}',
16+
'catalog-type' = '{{catalogType}}',
17+
'warehouse' = '{{warehouse}}',
1818
'format-version' = '2'
1919
);

0 commit comments

Comments
 (0)