-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreid_engine.py
More file actions
344 lines (288 loc) · 13.6 KB
/
reid_engine.py
File metadata and controls
344 lines (288 loc) · 13.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
"""
Re-identification Engine
------------------------
Moduł odpowiedzialny za ekstrakcję cech osób przy użyciu OSNet
oraz porównywanie embeddingów z galerią znanych osób.
"""
import torch
import torch.nn.functional as F
import numpy as np
from torchvision import transforms
from torchreid import models
from typing import List, Tuple, Optional, Dict
import cv2
import time
class ReIDEngine:
"""
Silnik Re-identyfikacji wykorzystujący model OSNet.
OSNet (Omni-Scale Network) to efektywny model deep learning
zaprojektowany specjalnie do zadań person re-identification.
"""
def __init__(self, model_name: str = 'osnet_x1_0', device: str = 'cuda', use_face: bool = True,
inactive_timeout: float = 30.0):
"""
Inicjalizacja modelu OSNet.
Args:
model_name: Nazwa modelu OSNet (osnet_x1_0, osnet_x0_75, etc.)
device: Urządzenie do obliczeń ('cuda' lub 'cpu')
use_face: Czy używać dodatkowej detekcji twarzy (True = lepsze rozpoznawanie po zmianie ubrania)
inactive_timeout: Czas w sekundach po którym niewidziana osoba jest usuwana z galerii
"""
self.device = torch.device(device if torch.cuda.is_available() else 'cpu')
self.inactive_timeout = inactive_timeout
print(f"[ReID Engine] Używam urządzenia: {self.device}")
print(f"[ReID Engine] Timeout nieaktywności: {inactive_timeout}s")
self.use_face = use_face
if self.use_face:
# Inicjalizuj detektor twarzy Haar Cascade (szybki, bez GPU)
import os
cascade_path = os.path.join(cv2.data.haarcascades, 'haarcascade_frontalface_default.xml')
if not os.path.exists(cascade_path):
print(f"[ReID Engine] UWAGA: Nie znaleziono Haar Cascade, wyłączam detekcję twarzy")
self.use_face = False
self.face_cascade = None
else:
self.face_cascade = cv2.CascadeClassifier(cascade_path)
if self.face_cascade.empty():
print(f"[ReID Engine] UWAGA: Nie udało się załadować Haar Cascade, wyłączam detekcję twarzy")
self.use_face = False
self.face_cascade = None
else:
print(f"[ReID Engine] Detekcja twarzy włączona (Haar Cascade)")
else:
self.face_cascade = None
# Ładowanie pretrenowanego modelu OSNet
print(f"[ReID Engine] Ładuję model {model_name}...")
self.model = models.build_model(
name=model_name,
num_classes=1000, # Nie ma znaczenia dla ekstrakcji cech
pretrained=True,
use_gpu=self.device.type == 'cuda'
)
# Tryb ewaluacji (wyłączenie dropout, batch norm w trybie inference)
self.model.eval()
self.model.to(self.device)
# Transformacje obrazu dla OSNet
# WAŻNE: OSNet wymaga konkretnych przekształceń:
# 1. Resize do 256x128 (wysokość x szerokość)
# 2. Normalizacja według statystyk ImageNet
self.transform = transforms.Compose([
transforms.ToPILImage(),
transforms.Resize((256, 128)), # OSNet standardowy rozmiar
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406], # ImageNet mean
std=[0.229, 0.224, 0.225] # ImageNet std
)
])
# Galeria przechowująca embeddingi znanych osób
# Każda osoba = {'id': int, 'embedding': tuple, 'last_seen': float, 'appearances': int}
self.gallery = []
self.next_id = 1
self.removed_ids = set() # ID które zostały usunięte - nie używaj ponownie
print("[ReID Engine] Inicjalizacja zakończona!")
def extract_face_embedding(self, image: np.ndarray) -> Optional[np.ndarray]:
"""
Ekstrakcja prostego face embedding (jeśli twarz wykryta).
Używamy Haar Cascade + histogram HOG jako prosty deskryptor.
Args:
image: Obraz osoby BGR (H, W, 3)
Returns:
Face embedding (128-d) lub None jeśli nie wykryto twarzy
"""
if not self.use_face:
return None
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
faces = self.face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=4, minSize=(30, 30))
if len(faces) == 0:
return None
# Weź największą twarz (najbliższą kamerze)
(x, y, w, h) = max(faces, key=lambda f: f[2]*f[3])
face_crop = gray[y:y+h, x:x+w]
# Resize do standardowego rozmiaru i oblicz prosty deskryptor
face_resized = cv2.resize(face_crop, (64, 64))
# Prosty deskryptor: znormalizowany histogram pikseli
hist = cv2.calcHist([face_resized], [0], None, [128], [0, 256])
hist = hist.flatten()
hist = hist / (np.linalg.norm(hist) + 1e-7)
return hist
def extract_features(self, image: np.ndarray) -> Tuple[np.ndarray, Optional[np.ndarray]]:
"""
Ekstrakcja wektora cech (embedding) z obrazu osoby + opcjonalnie face embedding.
Args:
image: Obraz osoby w formacie BGR (OpenCV), shape (H, W, 3)
Returns:
Tuple (body_embedding, face_embedding)
- body_embedding: Znormalizowany wektor cech, shape (512,) dla osnet_x1_0
- face_embedding: Znormalizowany face embedding (128,) lub None
"""
# Konwersja BGR -> RGB (OpenCV używa BGR, PIL/PyTorch oczekuje RGB)
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# Aplikacja transformacji (resize + normalizacja)
image_tensor = self.transform(image_rgb)
# Dodanie wymiaru batch: (3, 256, 128) -> (1, 3, 256, 128)
image_tensor = image_tensor.unsqueeze(0).to(self.device)
# Ekstrakcja cech bez obliczania gradientów
with torch.no_grad():
features = self.model(image_tensor)
# Konwersja do numpy i normalizacja L2
features = features.cpu().numpy().flatten()
# Normalizacja L2: ||features|| = 1
# To kluczowe dla dokładnego porównywania cosine similarity
features = features / np.linalg.norm(features)
# Ekstrakcja face embedding
face_emb = self.extract_face_embedding(image)
return features, face_emb
def compute_similarity(self, feat1: Tuple[np.ndarray, Optional[np.ndarray]],
feat2: Tuple[np.ndarray, Optional[np.ndarray]],
body_weight: float = 0.6,
face_weight: float = 0.4) -> float:
"""
Obliczanie podobieństwa między dwoma embeddingami (body + face).
Weighted similarity = body_weight * body_sim + face_weight * face_sim
Jeśli brak face embedding, używamy tylko body similarity.
Args:
feat1: Tuple (body_emb, face_emb or None)
feat2: Tuple (body_emb, face_emb or None)
body_weight: Waga dla body similarity
face_weight: Waga dla face similarity
Returns:
Podobieństwo w zakresie [0, 1], gdzie 1 = identyczne
"""
body1, face1 = feat1
body2, face2 = feat2
body_sim = float(np.dot(body1, body2))
# Jeśli oba mają face embedding, uwzględnij face similarity
if face1 is not None and face2 is not None:
face_sim = float(np.dot(face1, face2))
total_sim = body_weight * body_sim + face_weight * face_sim
else:
# Brak twarzy - używamy tylko body
total_sim = body_sim
return total_sim
def identify_person(self, embedding: Tuple[np.ndarray, Optional[np.ndarray]], threshold: float = 0.5) -> int:
"""
Identyfikacja osoby poprzez porównanie z galerią.
Strategia:
1. Porównaj embedding z wszystkimi osobami w galerii
2. Jeśli najwyższe podobieństwo > threshold, zwróć istniejące ID
3. W przeciwnym razie, dodaj jako nową osobę
Args:
embedding: Tuple (body_emb, face_emb or None) osoby do zidentyfikowania
threshold: Próg podobieństwa (0.5 = 50% podobieństwa, niższy = łatwiejsza reidentyfikacja)
Returns:
ID osoby (istniejące lub nowo utworzone)
"""
if len(self.gallery) == 0:
# Pierwsza osoba - dodaj do galerii
person_id = self.next_id
self.gallery.append({
'id': person_id,
'embedding': embedding,
'last_seen': time.time(),
'appearances': 1
})
self.next_id += 1
return person_id
# Porównaj z wszystkimi osobami w galerii
max_similarity = -1
best_match_id = None
for person in self.gallery:
similarity = self.compute_similarity(embedding, person['embedding'])
if similarity > max_similarity:
max_similarity = similarity
best_match_id = person['id']
# Decyzja: ta sama osoba czy nowa?
if max_similarity >= threshold:
# Znaleziono dopasowanie - zaktualizuj last_seen i zwróć istniejące ID
for person in self.gallery:
if person['id'] == best_match_id:
person['last_seen'] = time.time()
person['appearances'] += 1
break
return best_match_id
else:
# Nowa osoba - dodaj do galerii
person_id = self.next_id
self.gallery.append({
'id': person_id,
'embedding': embedding,
'last_seen': time.time(),
'appearances': 1
})
self.next_id += 1
return person_id
def get_similarities(self, embedding: Tuple[np.ndarray, Optional[np.ndarray]]) -> List[Tuple[int, float]]:
"""
Zwraca listę krotek (id, similarity) posortowaną malejąco względem podobieństwa.
Nie modyfikuje galerii.
"""
sims = []
for person in self.gallery:
sim = self.compute_similarity(embedding, person['embedding'])
sims.append((person['id'], sim))
# Sortuj malejąco
sims.sort(key=lambda x: x[1], reverse=True)
return sims
def add_new_person(self, embedding: Tuple[np.ndarray, Optional[np.ndarray]]) -> int:
"""Dodaje nową osobę do galerii i zwraca przypisane ID."""
person_id = self.next_id
self.gallery.append({
'id': person_id,
'embedding': embedding,
'last_seen': time.time(),
'appearances': 1
})
self.next_id += 1
return person_id
def update_person(self, person_id: int, embedding: Tuple[np.ndarray, Optional[np.ndarray]], alpha: float = 0.3) -> None:
"""
Aktualizuje embedding istniejącej osoby w galerii przez uśrednienie wykładnicze:
new = alpha * embedding + (1-alpha) * old
Dzięki temu galeria adaptuje się do nowych wyglądów tej samej osoby.
Alpha=0.3 oznacza wolniejszą adaptację (bardziej stabilne ID, lepsza reidentyfikacja).
"""
for person in self.gallery:
if person['id'] == person_id:
old_body, old_face = person['embedding']
new_body, new_face = embedding
# Aktualizuj body embedding
updated_body = alpha * new_body + (1.0 - alpha) * old_body
updated_body = updated_body / np.linalg.norm(updated_body)
# Aktualizuj face embedding (jeśli oba są dostępne)
if old_face is not None and new_face is not None:
updated_face = alpha * new_face + (1.0 - alpha) * old_face
updated_face = updated_face / np.linalg.norm(updated_face)
elif new_face is not None:
# Nowy face embedding pojawił się
updated_face = new_face
else:
# Pozostaw stary lub None
updated_face = old_face
person['embedding'] = (updated_body, updated_face)
person['last_seen'] = time.time()
person['appearances'] += 1
return
def clean_inactive_persons(self, current_time: float) -> int:
"""
Usuwa osoby, które nie były widziane przez dłuższy czas.
Zapobiega rozrostowi galerii i pomyłkom przy przypisywaniu ID.
Returns:
Liczba usuniętych osób
"""
initial_size = len(self.gallery)
self.gallery = [p for p in self.gallery
if (current_time - p['last_seen']) < self.inactive_timeout]
removed = initial_size - len(self.gallery)
if removed > 0:
print(f"[ReID Engine] Usunięto {removed} nieaktywnych osób z galerii")
return removed
def get_gallery_size(self) -> int:
"""Zwraca liczbę unikalnych osób w galerii."""
return len(self.gallery)
def reset_gallery(self):
"""Resetuje galerię (usuwa wszystkie zapisane osoby)."""
self.gallery = []
self.next_id = 1
self.removed_ids = set()
print("[ReID Engine] Galeria została zresetowana")