Skip to content

Commit 4eb1273

Browse files
esracosgunzohrehasadi00
authored andcommitted
[SYSTEMDS-3178] Builtin for tuple deduplication
Closes #2293. Co-authored-by: Zohreh Asadi <[email protected]>
1 parent d43088a commit 4eb1273

File tree

7 files changed

+466
-3
lines changed

7 files changed

+466
-3
lines changed

scripts/builtin/dedup.dml

Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
#-------------------------------------------------------------
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
#
20+
#-------------------------------------------------------------
21+
22+
# Builtin for deduplication using distributed representations (DRs) and
23+
# locality-sensitive hashing (LSH) based blocking.
24+
#
25+
# The function encodes each input tuple as a dense vector using pre-trained GloVe embeddings (simple averaging),
26+
# groups semantically similar tuples via LSH into buckets, and compares only those pairs for deduplication.
27+
#
28+
#
29+
# INPUT:
30+
# --------------------------------------------------------------------------------------
31+
# X Input Frame[String] with n rows and d columns (raw tuples)
32+
# gloveMatrix Matrix[Double] of size |V| × e (pretrained GloVe embeddings) -> |V| number of words and e = embedding dimesnion
33+
# vocab Frame[String] of size |V| × 1 (vocabulary aligned with gloveMatrix)
34+
# similarityMeasure (optional) String specifying similarity metric: "cosine", "euclidean"
35+
# threshold (optional) Double: threshold value above which tuples are considered duplicates
36+
# --------------------------------------------------------------------------------------
37+
#
38+
# OUTPUT:
39+
# --------------------------------------------------------------------------------------
40+
# Y_unique Frame[String] with deduplicated tuples
41+
# (first occurrence of each duplicate group is retained)
42+
# Y_duplicates Frame[String] with all detected duplicates
43+
# (i.e., tuples removed from the input)
44+
# --------------------------------------------------------------------------------------
45+
46+
f_dedup = function(Frame[String] X, Matrix[Double] gloveMatrix, Frame[String] vocab, String similarityMeasure = "cosine", Double threshold = 0.8)
47+
return(Frame[String] Y_unique, Frame[String] Y_duplicates)
48+
{
49+
# Step 1: Distributed Representation (DRs)
50+
V = computeDRMatrix(X, vocab, gloveMatrix)
51+
52+
# Step 2: generate LSH Hyperplanes
53+
K = 10 # number of hash functions
54+
d = ncol(V)
55+
H = rand(rows=K, cols=d, pdf="uniform", seed=-1)
56+
57+
# Step 3: Compute LSH Hashcodes
58+
hashCodes = computeLSH(V, H)
59+
60+
# Step 4: Form Buckets
61+
bucketIDs = formBuckets(hashCodes)
62+
63+
# Step 5: Candidate Pair Generation
64+
pairs = findCandidatePairs(bucketIDs)
65+
66+
# Step 6: Compute Similarity for Pairs
67+
sim = computeSimilarity(V, pairs, similarityMeasure)
68+
69+
# Step 7: Filter Duplicates
70+
matches = filterDuplicates(pairs, sim, threshold)
71+
72+
# Step 8: Extract duplicate indices
73+
rows = nrow(matches)
74+
75+
tmp1 = ifelse(rows > 0, matches[1:rows, 1:1], matrix(0, rows=0, cols=1))
76+
tmp2 = ifelse(rows > 0, matches[1:rows, 2:2], matrix(0, rows=0, cols=1))
77+
allDupIDs = rbind(tmp1, tmp2)
78+
allDupIDs = ifelse(nrow(allDupIDs) > 0, unique(allDupIDs), matrix(0, rows=0, cols=1))
79+
80+
# Step 9: Keep the first index, remove all others
81+
keepMask = matrix(1, rows=nrow(X), cols=1)
82+
83+
if (nrow(allDupIDs) > 0) {
84+
# Find the first index (minimum) among the duplicates
85+
minIdx = min(allDupIDs)
86+
for (i in 1:nrow(allDupIDs)) {
87+
idx = as.scalar(allDupIDs[i,1])
88+
if (idx != minIdx) {
89+
keepMask[idx,1] = 0
90+
}
91+
}
92+
}
93+
94+
# extract IDs from keepMask
95+
keepIDs = matrix(0, rows=0, cols=1)
96+
dupIndices = matrix(0, rows=0, cols=1)
97+
for (i in 1:nrow(keepMask)) {
98+
if (as.scalar(keepMask[i,1]) == 1) {
99+
keepIDs = rbind(keepIDs, matrix(i,1,1))
100+
} else {
101+
dupIndices = rbind(dupIndices, matrix(i,1,1))
102+
}
103+
}
104+
105+
# Step 10: Extract duplicates and unique rows from X
106+
Y_duplicates = removeEmpty(target=X[1,], margin="rows")
107+
Y_unique = removeEmpty(target=X[1,], margin="rows")
108+
109+
if (nrow(dupIndices) > 0) {
110+
for (i in 1:nrow(dupIndices)) {
111+
id = as.scalar(dupIndices[i, 1])
112+
row = X[id, ]
113+
Y_duplicates = rbind(Y_duplicates, row)
114+
}
115+
}
116+
if (nrow(keepIDs) > 0) {
117+
for (i in 1:nrow(keepIDs)) {
118+
id = as.scalar(keepIDs[i, 1])
119+
row = X[id, ]
120+
Y_unique = rbind(Y_unique, row)
121+
}
122+
}
123+
}
124+
125+
computeDRMatrix = function(Frame[String] X, Frame[String] vocab, Matrix[Double] gloveMatrix)
126+
return(Matrix[Double] V)
127+
{
128+
# TODO: Vectorize this implementation with dedicated transform incode permutation matrices
129+
n = nrow(X)
130+
d = ncol(gloveMatrix)
131+
V = matrix(0, rows=n, cols=d) # define output matrix
132+
133+
for (i in 1:n) {
134+
row = X[i,]
135+
words = transformapply(row, "UtilFunctions.cleanAndTokenizeRow")
136+
137+
sumVec = matrix(0, rows=1, cols=d)
138+
count = 0
139+
140+
for (k in 1:length(words)) {
141+
w = words[k]
142+
idx = -1
143+
found = FALSE
144+
145+
# search for word in vocabulary
146+
for (m in 1:nrow(vocab)) {
147+
if (!found & vocab[m,1] == w) {
148+
idx = m
149+
found = TRUE
150+
}
151+
}
152+
# word found
153+
if (idx > 0) {
154+
sumVec = sumVec + gloveMatrix[idx,]
155+
count = count + 1
156+
}
157+
}
158+
if (count > 0) {
159+
V[i,] = sumVec / count
160+
}
161+
else {
162+
V[i,] = sumVec
163+
}
164+
}
165+
}
166+
167+
computeLSH = function(Matrix[Double] V, Matrix[Double] H)
168+
return(Matrix[Double] hashCodes)
169+
{
170+
# matrix multiplication: projection of each DR vector on hyperplanes
171+
P = V %*% t(H)
172+
173+
# compare elementwise
174+
hashCodes = (P >= 0) # returns 1 for true, 0 for false
175+
}
176+
177+
formBuckets = function(Matrix[Double] hashCodes)
178+
return(Matrix[Double] bucketIDs)
179+
{
180+
# TODO vectorize
181+
n = nrow(hashCodes)
182+
K = ncol(hashCodes)
183+
184+
# generate binary weighting vector (e.g. 2^n-1, ..., 2^0)
185+
powers = matrix(0, rows=1, cols=K)
186+
for (k in 1:K) {
187+
powers[1, k] = 2^(K-k)
188+
}
189+
190+
# generate Bucket-IDs
191+
bucketIDs = hashCodes %*% t(powers)
192+
}
193+
194+
findCandidatePairs = function(Matrix[Double] bucketIDs)
195+
return(Matrix[Double] pairs)
196+
{
197+
n = nrow(bucketIDs)
198+
pairs = matrix(0, rows=0, cols=2)
199+
200+
# O(n^2)-Vergleich TODO: ggf. mit Java verbessern
201+
for (i in 1:(n - 1)) {
202+
for (j in (i + 1):n) {
203+
if (as.scalar(bucketIDs[i,1]) == as.scalar(bucketIDs[j,1])) {
204+
pairs = rbind(pairs, matrix([i, j], rows=1, cols=2))
205+
}
206+
}
207+
}
208+
}
209+
210+
computeSimilarity = function(Matrix[Double] V, Matrix[Double] pairs, String similarityMeasure)
211+
return(Matrix[Double] similarities)
212+
{
213+
m = nrow(pairs)
214+
d = ncol(V)
215+
similarities = matrix(0.0, rows=m, cols=1)
216+
217+
for (k in 1:m) {
218+
i = as.scalar(pairs[k,1])
219+
j = as.scalar(pairs[k,2])
220+
221+
vi = V[i,] # Vektor i
222+
vj = V[j,] # Vektor j
223+
224+
if (similarityMeasure == "cosine") {
225+
dot = sum(vi * vj)
226+
norm_i = sqrt(sum(vi^2))
227+
norm_j = sqrt(sum(vj^2))
228+
sim = dot / (norm_i * norm_j)
229+
}
230+
else if (similarityMeasure == "euclidean") {
231+
diff = vi - vj
232+
sim = -1 * sqrt(sum(diff^2))
233+
}
234+
else {
235+
stop("Unsupported similarity measure: " + similarityMeasure)
236+
}
237+
238+
similarities[k,1] = sim
239+
}
240+
}
241+
242+
filterDuplicates = function(Matrix[Double] pairs, Matrix[Double] similarities, Double threshold)
243+
return(Matrix[Double] matches)
244+
{
245+
m = nrow(pairs)
246+
matches = matrix(0, rows=0, cols=2)
247+
248+
for (i in 1:m) {
249+
sim = similarities[i,1]
250+
251+
if (sim >= threshold) {
252+
row = matrix(pairs[i,], rows=1, cols=2) #row = pairs[i,]
253+
matches = rbind(matches, row)
254+
}
255+
}
256+
}

src/main/java/org/apache/sysds/common/Builtins.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ public enum Builtins {
115115
DECISIONTREE("decisionTree", true),
116116
DECISIONTREEPREDICT("decisionTreePredict", true),
117117
DECOMPRESS("decompress", false),
118+
DEDUP("dedup", true),
118119
DEEPWALK("deepWalk", true),
119120
DET("det", false),
120121
DETECTSCHEMA("detectSchema", false),

src/main/java/org/apache/sysds/runtime/instructions/ooc/TransposeOOCInstruction.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
3131
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
3232
import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
33-
import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
3433
import org.apache.sysds.runtime.util.CommonThreadPool;
3534

3635
import java.util.concurrent.ExecutorService;

src/main/java/org/apache/sysds/runtime/util/UtilFunctions.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1456,4 +1456,19 @@ public static double getWordErrorRate(String r, String h) {
14561456
//wer = number of edits / length
14571457
return (double)p[n] / Math.max(n, m);
14581458
}
1459-
}
1459+
1460+
public static String[] cleanAndTokenizeRow(String[] row) {
1461+
if (row == null || row.length == 0) {
1462+
return new String[0];
1463+
}
1464+
StringBuilder sb = new StringBuilder();
1465+
for (String s : row) {
1466+
if (s != null) {
1467+
sb.append(s).append(" ");
1468+
}
1469+
}
1470+
String joined = sb.toString().trim().toLowerCase();
1471+
1472+
return joined.split("\\s+");
1473+
}
1474+
}

0 commit comments

Comments
 (0)