Skip to content

Commit 56395ef

Browse files
authored
Add configurable filter and sort at INSERT OVERWRITE in iceberg dedup example (#85)
1 parent 128d44b commit 56395ef

File tree

2 files changed

+13
-3
lines changed

2 files changed

+13
-3
lines changed

iceberg-data-deduplication/dedup.sqrl

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,22 @@ _CastDataToCompact.partition_id := CAST(partition_id AS {{partitionColType}});
8686
_InputData :=
8787
SELECT /*+ BROADCAST(c) */ d.*
8888
FROM {{tableName}} AS d
89-
JOIN _CastDataToCompact AS c
90-
ON d.{{partitionCol}} = c.partition_id AND d.{{timeBucketCol}} <= c.max_time_bucket;
89+
JOIN _CastDataToCompact AS c
90+
ON d.{{partitionCol}} = c.partition_id
91+
AND d.{{timeBucketCol}} <= c.max_time_bucket;
9192

9293
_InputData.{{timeBucketCol}} := 0;
9394

9495
_DistInputData := DISTINCT _InputData ON {{distinctKeyCol}} ORDER BY {{timestampCol}} DESC;
9596

96-
INSERT OVERWRITE {{tableName}} SELECT * FROM _DistInputData;
97+
INSERT OVERWRITE {{tableName}}
98+
SELECT * FROM _DistInputData
99+
-- filter only when "deleteFlagCol" has a value
100+
{{#deleteFlagCol}}
101+
WHERE {{deleteFlagCol}} IS TRUE
102+
{{/deleteFlagCol}}
103+
-- sort by partition column, and optionally any other column(s)
104+
ORDER BY {{partitionCol}} {{#sortKey}}, {{sortKey}}{{/sortKey}};
97105

98106
_CompactionResult :=
99107
SELECT table_name,

iceberg-data-deduplication/dedup_package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
"partitionColType": "bigint",
1515
"timeBucketCol": "time_bucket",
1616
"timestampCol": "ts",
17+
"deleteFlagCol": null,
18+
"sortKey": null,
1719
"bufferSeconds": 2,
1820
"bucketSeconds": 5,
1921
"newDataNormalizerInMB": 1,

0 commit comments

Comments
 (0)