forked from Chandra01951/Robust-Image-Copy-Detection
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgenerate_csv_hashes_proposedmethod.py
More file actions
370 lines (332 loc) · 15.9 KB
/
generate_csv_hashes_proposedmethod.py
File metadata and controls
370 lines (332 loc) · 15.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
# image_copy_detection_faiss.py
# Full pipeline: autoencoder global features + Meixner/Krawtchouk/Tchebichef local features
# Concatenate descriptors -> FAISS -> evaluate Precision@K and Recall@K
import os
import math
import cv2
import faiss
import numpy as np
import pandas as pd
import pywt
from scipy.special import eval_genlaguerre
from scipy.special import comb
import tensorflow as tf
from tensorflow.keras import layers, models, optimizers, losses
from collections import defaultdict
from tqdm import tqdm
# ------------------- ENVIRONMENT / SEED -------------------
SEED = 42
np.random.seed(SEED)
tf.random.set_seed(SEED)
os.environ['PYTHONHASHSEED'] = str(SEED)
# ------------------- SETTINGS -------------------
IMAGE_FOLDER = "C:\\Users\\chand\\OneDrive\\Desktop\\btp\\data2\\gallery" # change to your folder
HASH_CSV = "image_features_hashes.csv"
IMG_SIZE = (224, 224)
LATENT_DIM = 256
LOCAL_BLOCK_SIZE = 32
MEIXNER_ORDER = 15
KRAWT_ORDER = 15
TCHEB_ORDER = 15
TOP_K = 150
TRAIN_AUTOENCODER = True # set True to train autoencoder briefly on your dataset
AUTOENCODER_EPOCHS = 5
BATCH_SIZE = 16
# ------------------- UTILITIES -------------------
def list_images(folder):
return sorted([os.path.join(folder, f) for f in os.listdir(folder)
if f.lower().endswith(('.png', '.jpg', '.jpeg', '.tif'))])
def load_and_preprocess_image(image_path, target_size=IMG_SIZE):
img = cv2.imread(image_path)
if img is None:
raise ValueError(f"Cannot load {image_path}")
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, target_size)
img = img.astype(np.float32) / 255.0
return img
# ------------------- AUTOENCODER (GLOBAL) -------------------
def build_autoencoder(input_shape=(224,224,3), latent_dim=256):
# Encoder
inp = layers.Input(shape=input_shape)
x = layers.Conv2D(32, 3, padding='same', activation='relu')(inp)
x = layers.MaxPooling2D(2)(x) # 112
x = layers.Conv2D(64, 3, padding='same', activation='relu')(x)
x = layers.MaxPooling2D(2)(x) # 56
x = layers.Conv2D(128, 3, padding='same', activation='relu')(x)
x = layers.MaxPooling2D(2)(x) # 28
x = layers.Conv2D(256, 3, padding='same', activation='relu')(x)
x = layers.MaxPooling2D(2)(x) # 14
x = layers.Flatten()(x)
latent = layers.Dense(latent_dim, activation='relu', name='latent')(x)
# Decoder
d = layers.Dense(14*14*256, activation='relu')(latent)
d = layers.Reshape((14,14,256))(d)
d = layers.Conv2DTranspose(128, 3, strides=2, padding='same', activation='relu')(d) #28
d = layers.Conv2DTranspose(64, 3, strides=2, padding='same', activation='relu')(d) #56
d = layers.Conv2DTranspose(32, 3, strides=2, padding='same', activation='relu')(d) #112
d = layers.Conv2DTranspose(16, 3, strides=2, padding='same', activation='relu')(d) #224
out = layers.Conv2D(3, 3, padding='same', activation='sigmoid')(d)
encoder = models.Model(inp, latent, name='encoder')
autoencoder = models.Model(inp, out, name='autoencoder')
# tie encoder weights into autoencoder (we created layers separately, but shapes compatible)
# to get a single training graph, create whole model with same structure
# We'll create the training model explicitly:
full_inp = layers.Input(shape=input_shape)
# reuse encoder conv pipeline
x2 = layers.Conv2D(32, 3, padding='same', activation='relu')(full_inp)
x2 = layers.MaxPooling2D(2)(x2)
x2 = layers.Conv2D(64, 3, padding='same', activation='relu')(x2)
x2 = layers.MaxPooling2D(2)(x2)
x2 = layers.Conv2D(128, 3, padding='same', activation='relu')(x2)
x2 = layers.MaxPooling2D(2)(x2)
x2 = layers.Conv2D(256, 3, padding='same', activation='relu')(x2)
x2 = layers.MaxPooling2D(2)(x2)
x2 = layers.Flatten()(x2)
latent2 = layers.Dense(latent_dim, activation='relu', name='latent_shared')(x2)
# decoder (same as above)
d2 = layers.Dense(14*14*256, activation='relu')(latent2)
d2 = layers.Reshape((14,14,256))(d2)
d2 = layers.Conv2DTranspose(128, 3, strides=2, padding='same', activation='relu')(d2)
d2 = layers.Conv2DTranspose(64, 3, strides=2, padding='same', activation='relu')(d2)
d2 = layers.Conv2DTranspose(32, 3, strides=2, padding='same', activation='relu')(d2)
d2 = layers.Conv2DTranspose(16, 3, strides=2, padding='same', activation='relu')(d2)
out2 = layers.Conv2D(3, 3, padding='same', activation='sigmoid')(d2)
train_auto = models.Model(full_inp, out2, name='autoencoder_train')
# compile
train_auto.compile(optimizer=optimizers.Adam(1e-3), loss=losses.MeanSquaredError())
return encoder, train_auto
def train_autoencoder_if_needed(train_model, image_paths, epochs=5, batch_size=16):
if epochs <= 0:
return
# Create a simple data generator
def gen():
while True:
np.random.shuffle(image_paths)
for i in range(0, len(image_paths), batch_size):
batch_paths = image_paths[i:i+batch_size]
imgs = [load_and_preprocess_image(p) for p in batch_paths]
imgs = np.stack(imgs, axis=0)
yield imgs, imgs
steps = max(1, len(image_paths) // batch_size)
train_model.fit(gen(), steps_per_epoch=steps, epochs=epochs, verbose=1)
# ------------------- LOCAL MOMENTS -------------------
# Meixner (from earlier)
def meixner_polynomial(n, x, beta=3.0, c=0.7):
# use generalized Laguerre based approach (approximation)
# This is not the strict discrete Meixner polynomial implementation,
# but works as a stable fingerprinting moment basis for images.
return eval_genlaguerre(n, beta - 1, x) * (c ** x) / math.factorial(n)
def construct_meixner_matrix(size, order, beta=3.0, c=0.7):
T = np.zeros((size, order), dtype=np.float32)
x = np.arange(size, dtype=np.float32)
for n in range(order):
T[:, n] = meixner_polynomial(n, x, beta, c)
return T
def compute_meixner_moments(block, order=10, beta=3.0, c=0.7):
H, W = block.shape
T_x = construct_meixner_matrix(H, order, beta, c)
T_y = construct_meixner_matrix(W, order, beta, c)
F = T_x.T @ block @ T_y
return F.flatten()
# Krawtchouk moments (discrete orthogonal polynomials)
def krawtchouk_polynomial(n, x, N, p):
# recurrence generating for krawtchouk (normalized-ish)
# we implement direct via combinatorial formula (can be heavy for large n)
# but for low orders works fine
val = 0.0
for k in range(n+1):
val += ((-1)**k) * comb(x, k) * comb(N - x, n - k) * (p**k) * ((1 - p)**(n - k))
return val
def construct_krawtchouk_matrix(size, order, p=0.5):
T = np.zeros((size, order), dtype=np.float32)
N = size - 1
for i in range(size):
for n in range(order):
T[i, n] = krawtchouk_polynomial(n, i, N, p)
return T
def compute_krawtchouk_moments(block, order=10, p=0.5):
H, W = block.shape
T_x = construct_krawtchouk_matrix(H, order, p)
T_y = construct_krawtchouk_matrix(W, order, p)
F = T_x.T @ block @ T_y
return F.flatten()
# Tchebichef moments (discrete Chebyshev-like)
def tchebichef_poly(n, x, N):
# Orthogonal discrete Tchebichef polynomial approximation (normalized numeric)
# Using cosine form approximation for speed
return np.cos((n + 0.5) * np.pi * (2*x + 1) / (2 * N))
def construct_tchebichef_matrix(size, order):
T = np.zeros((size, order), dtype=np.float32)
x = np.arange(size, dtype=np.float32)
for n in range(order):
T[:, n] = tchebichef_poly(n, x, size)
return T
def compute_tchebichef_moments(block, order=10):
H, W = block.shape
T_x = construct_tchebichef_matrix(H, order)
T_y = construct_tchebichef_matrix(W, order)
F = T_x.T @ block @ T_y
return F.flatten()
# Local extractor blockwise (grayscale)
def extract_all_local_features(image_path, block_size=LOCAL_BLOCK_SIZE,
meix_order=MEIXNER_ORDER, kraw_order=KRAWT_ORDER, tcheb_order=TCHEB_ORDER):
img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
if img is None:
raise ValueError(f"Cannot open {image_path}")
H, W = img.shape
# pad to multiple of block_size
pad_h = (block_size - (H % block_size)) % block_size
pad_w = (block_size - (W % block_size)) % block_size
if pad_h or pad_w:
img = np.pad(img, ((0,pad_h),(0,pad_w)), mode='reflect')
H2, W2 = img.shape
features = []
for i in range(0, H2, block_size):
for j in range(0, W2, block_size):
blk = img[i:i+block_size, j:j+block_size].astype(np.float32)
# normalize block
if np.std(blk) > 1e-6:
blk = (blk - np.mean(blk)) / (np.std(blk))
else:
blk = blk - np.mean(blk)
# compute moments (we'll keep low-dim summaries: mean and variance of each moment matrix)
meix = compute_meixner_moments(blk, order=meix_order)
kraw = compute_krawtchouk_moments(blk, order=kraw_order)
tcheb = compute_tchebichef_moments(blk, order=tcheb_order)
# summarize to keep dimension reasonable: compute first N coefficients or stats
# take first 20 elements from each (or pad)
def take_first(x, n=20):
if len(x) >= n:
return x[:n]
else:
return np.pad(x, (0, n-len(x)), 'constant')
features.extend(take_first(meix, 20))
features.extend(take_first(kraw, 20))
features.extend(take_first(tcheb, 20))
# convert to float32
return np.array(features, dtype=np.float32)
# ------------------- HASH / NORMALIZE / CONCAT -------------------
def normalize_and_quantize(vec):
# Normalize to zero mean, unit norm for cosine similarity
norm = np.linalg.norm(vec)
if norm < 1e-9:
return vec.astype(np.float32)
v = vec / norm
return v.astype(np.float32)
# ------------------- FAISS -------------------
USE_COSINE = True # set True to use cosine similarity
def build_faiss_index_matrix(matrix):
# matrix: n x d float32
n, d = matrix.shape
if USE_COSINE:
# normalize each vector to unit length for inner-product = cosine similarity
faiss.normalize_L2(matrix)
index = faiss.IndexFlatIP(d)
else:
index = faiss.IndexFlatL2(d)
index.add(matrix)
return index
def faiss_search(index, query_vector, k=5):
q = np.asarray(query_vector).astype(np.float32).reshape(1,-1)
distances, indices = index.search(q, k)
return distances[0], indices[0]
# ------------------- EVALUATION -------------------
def compute_precision_recall_for_query(query_idx, retrieved_indices, labels, k):
query_label = labels[query_idx]
# exclude self from retrieved if present
retrieved = [idx for idx in retrieved_indices if idx != query_idx][:k]
relevant = sum(1 for ri in retrieved if labels[ri] == query_label)
total_relevant = sum(1 for lbl in labels if lbl == query_label) - 1 # exclude query itself
precision = relevant / k
recall = relevant / total_relevant if total_relevant > 0 else 0.0
return precision, recall, retrieved
# ------------------- MAIN PIPELINE -------------------
def main():
image_paths = list_images(IMAGE_FOLDER)
if len(image_paths) == 0:
raise SystemExit("No images found in folder. Update IMAGE_FOLDER path.")
# derive labels from filename prefix before first underscore
labels = [os.path.basename(p).split("_")[0] for p in image_paths]
n_images = len(image_paths)
print(f"Found {n_images} images.")
# Build or load autoencoder + encoder
encoder, train_auto = build_autoencoder(input_shape=(IMG_SIZE[0], IMG_SIZE[1], 3), latent_dim=LATENT_DIM)
if TRAIN_AUTOENCODER:
print("Training autoencoder for a few epochs to produce meaningful global features...")
train_autoencoder_if_needed(train_auto, image_paths, epochs=AUTOENCODER_EPOCHS, batch_size=BATCH_SIZE)
# After training, get the weights of encoder from the training model's layers 'latent_shared'
# Quick way: rebuild encoder from train_auto by creating an intermediate model reference:
# For simplicity, we'll create a separate model mapping inputs->latent_shared by name if present.
try:
latent_layer = train_auto.get_layer('latent_shared')
# create intermediate model
enc_model = models.Model(inputs=train_auto.input, outputs=latent_layer.output)
encoder = enc_model
print("Using trained encoder for feature extraction.")
except Exception:
print("Could not extract latent layer from training model; using untrained encoder weights (still usable).")
# Extract features for all images
all_descriptors = []
failed = []
print("Extracting features for all images (global + local). This may take time...")
for p in tqdm(image_paths):
try:
# global
img = load_and_preprocess_image(p)
latent = encoder.predict(np.expand_dims(img, axis=0), verbose=0)[0] # shape (LATENT_DIM,)
# local
local = extract_all_local_features(p, block_size=LOCAL_BLOCK_SIZE,
meix_order=MEIXNER_ORDER, kraw_order=KRAWT_ORDER,
tcheb_order=TCHEB_ORDER)
# concatenate
descriptor = np.concatenate([latent.astype(np.float32), local.astype(np.float32)])
descriptor = normalize_and_quantize(descriptor)
all_descriptors.append(descriptor)
except Exception as e:
print(f"Error extracting for {p}: {e}")
failed.append(p)
if len(failed) > 0:
print(f"Failed on {len(failed)} images. They will be skipped in indexing.")
all_descriptors = np.stack(all_descriptors, axis=0).astype(np.float32)
print("Descriptor matrix shape:", all_descriptors.shape)
# Save descriptors to CSV (optional)
df = pd.DataFrame(all_descriptors)
df.insert(0, "image_name", [os.path.basename(p) for p in image_paths if p not in failed])
df.to_csv(HASH_CSV, index=False)
print(f"Saved descriptor matrix to {HASH_CSV}")
# Build FAISS index
index = build_faiss_index_matrix(all_descriptors)
print("Built FAISS index.")
# Evaluate Precision@K and Recall@K
K = TOP_K
precisions = []
recalls = []
per_query_info = []
print(f"Searching top-{K} for each image and computing Precision/Recall...")
valid_idx_map = [i for i in range(len(image_paths)) if image_paths[i] not in failed]
# labels_aligned
labels_aligned = [labels[i] for i in range(len(image_paths)) if image_paths[i] not in failed]
for q_idx_local, global_idx in enumerate(range(len(valid_idx_map))):
q_vec = all_descriptors[global_idx]
dists, idxs = faiss_search(index, q_vec, k=K+1) # +1 may include self
# ensure we have K results excluding self
# computed precision/recall uses original aligned labels list indexing -> global_idx matches labels_aligned
precision, recall, retrieved = compute_precision_recall_for_query(global_idx, idxs.tolist(), labels_aligned, K)
precisions.append(precision)
recalls.append(recall)
per_query_info.append({
"query": os.path.basename([p for p in image_paths if p not in failed][global_idx]),
"precision": precision, "recall": recall,
"retrieved": [os.path.basename([p for p in image_paths if p not in failed][i]) for i in retrieved]
})
avg_p = float(np.mean(precisions))
avg_r = float(np.mean(recalls))
print(f"\n=== RESULTS ===\nAverage Precision@{K}: {avg_p:.4f}\nAverage Recall@{K}: {avg_r:.4f}\n")
# Print a few per-query examples for debugging
print("Sample per-query results (first 10):")
for info in per_query_info[:10]:
print(f"Query: {info['query']} Precision: {info['precision']:.3f} Recall: {info['recall']:.3f}")
print(" Retrieved:", info['retrieved'])
print("-----")
if __name__ == "__main__":
main()