Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions src/lava/lib/dl/slayer/object_detection/boundingbox/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (C) 2023 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause

from collections import deque
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union

import cv2
Expand All @@ -20,6 +21,133 @@
Width = int
Height = int

# from object_detection.boundingbox.metrics import bbox_iou


class temporal_NMS():
# implemented temporal Non Maximum Suppression
# over two (implemented) consecutive frames

"""Performs temporal Non-Maximal suppression of the input predictions.
First a basic filtering of the bounding boxes based on a minimum confidence
threshold are eliminated. Subsequently we operate a non-maximal suppression
is performed maching bboxes over two successive frames. A non-maximal
threshold is used to determine if the two bounding boxes represent the same
object, above(below) which the likelihood of the object is
increased(decreased). It supports batch inputs.


import temporal_NMS as t_nms
### The class is self initialized!! no need to initialize it.
### Instances can be accessed directly calling class_name.instance
### Calling the frame predictions to be analyzed, will automatically
initialize variables, for example:
detections = [t_nms.next(predictions[...,t]) for t in T]
### to reset to zero frame data call:
t_nms.reset()

Parameters
----------
pred : List[torch.tensor]
List of bounding box predictions per batch in
(x_center, y_center, width, height) format.
conf_threshold : float, optional
Confidence threshold, by default 0.5.
nms_threshold : float, optional
Non maximal overlap threshold, by default 0.4.
merge_conf : bool, optional
Flag indicating whether to merge objectness score with classification
confidence, by default True.
max_iterations_temporal : int, optional
Maximum limit of temporal iterations (default 15) that scale the class
likelihood given the neighboring bboxes.
temporal_scaling_threshold : float, optional
scaling factor on the nms_threshold for filtering off ious
scaling_prob: List of two floats [float, float]
scaling of the keep [0] and remove [1] probabilities of neighboring
bboxes
k_frames: int, optionl - default 2

Returns
-------
List[torch.tensor]
Non-maximal filterered prediction outputs per batch.
"""

init = 0
detections = []
dets_ = []
k_frames = 2
n_batches = None

def reset():
# initialize buffers
__class__.detections = [[] for _ in range(__class__.n_batches)]
__class__.dets_ = [deque(maxlen=__class__.k_frames)
for _ in range(__class__.n_batches)]
__class__.init = 1

def next(pred: List[torch.tensor],
conf_threshold=0.5,
nms_threshold=0.4,
merge_conf=True,
max_iterations_temporal=15,
temporal_scaling_threshold=.9,
scaling_prob=[1.15, .85]) -> List[torch.tensor]:
# def __call__(self, pred):

# housekeeping only on first call
if __class__.init == 0 or __class__.n_batches != pred.shape[0]:
__class__.n_batches = n_batches = pred.shape[0]
__class__.reset()

__class__.n_batches = n_batches = pred.shape[0]
dets_, detections = __class__.dets_, __class__.detections

for b_n, pred_, in enumerate(pred): # along the batch
filtered = pred_[pred_[:, 4] > conf_threshold]
obj_conf, labels = torch.max(filtered[:, 5:], dim=1, keepdim=True)
if merge_conf:
scores = filtered[:, 4:5] * obj_conf
else:
scores = filtered[:, 4:5]
boxes = filtered[:, :4]
# last updated frame in detections0
dets_[b_n].append(torch.cat([boxes, scores, labels], dim=-1))
if len(dets_[b_n]) == 1: # loads the first frame NMS components discarding NMS
detections0 = detections1 = dets_[b_n][-1]
else:
detections0, detections1 = dets_[
b_n][-1], dets_[b_n][-2] # best performer
for k in range(max_iterations_temporal):
if k == max_iterations_temporal-1: # last iteration is classic NMS
detections1 = detections0
order0 = torch.argsort(detections0[:, 4], descending=True)
if order0.shape:
detections0 = detections0[order0]
order1 = torch.argsort(detections1[:, 4], descending=True)
if order1.shape:
detections1 = detections1[order1]
ious = bbox_iou(detections1, detections0)
label_match = (
detections1[:, 5].reshape(-1, 1) == detections0[:, 5].reshape(1, -1))
keep = (
ious * label_match > nms_threshold*temporal_scaling_threshold
).long().triu(1).sum(dim=0, keepdim=True).T.expand_as(detections0) == 0
detections01 = detections0[keep].reshape(
-1, 6).contiguous()
detections00 = detections0[~keep].reshape(
-1, 6).contiguous()
# rescaling confidence of bboxes if overlapping and belonging to the same label
detections01[:, 4] = torch.minimum(
detections01[:, 4]*scaling_prob[0], torch.tensor(1.0))
detections00[:, 4] *= scaling_prob[1]
# considering also last iteration is classic NMS
detections0 = torch.cat(
[detections01, detections00], dim=0) if k < max_iterations_temporal-1 else detections01
detections[b_n] = detections0.clone()
return detections[:n_batches]


def non_maximum_suppression(predictions: List[torch.tensor],
conf_threshold: float = 0.5,
Expand Down
28 changes: 25 additions & 3 deletions src/lava/lib/dl/slayer/object_detection/dataset/bdd100k.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ def __init__(self,
train: bool = False,
seq_len: int = 32,
randomize_seq: bool = False,
augment_prob: float = 0.0) -> None:
augment_prob: float = 0.0,
image_jitter: bool = False) -> None:
"""Berkley Deep Drive (BDD100K) dataset module. For details on the
dataset, refer to: https://bdd-data.berkeley.edu/.

Expand All @@ -141,6 +142,11 @@ def __init__(self,
augment_prob : float, optional
Augmentation probability of the frames and bounding boxes,
by default 0.0.
image_jitter : bool, optional
The images are now substituted by the difference of images at
consecutive times, mimiking the DVS format. Additional parameters
are to be set inside the body pf the function as per
single(greyscale)/multi-channel(RGB) and precision
"""
super().__init__()
self.blur = transforms.GaussianBlur(kernel_size=5)
Expand All @@ -159,6 +165,7 @@ def __init__(self,
self.augment_prob = augment_prob
self.seq_len = seq_len
self.randomize_seq = randomize_seq
self.image_jitter = image_jitter

def __getitem__(self, index: int) -> Tuple[torch.tensor, Dict[Any, Any]]:
"""Get a sample video sequence of BDD100K dataset.
Expand Down Expand Up @@ -196,9 +203,24 @@ def __getitem__(self, index: int) -> Tuple[torch.tensor, Dict[Any, Any]]:
if np.random.random() < self.augment_prob:
for idx in range(len(images)):
images[idx] = self.grayscale(images[idx])

#jitter for mimicking DVS
if self.image_jitter: ##(1) greay scale instead of color -- moving to 1Ch reduction of 3x
for idx in range(len(images)):
images[idx] = self.grayscale(images[idx])

image = torch.cat([torch.unsqueeze(self.img_transform(img), -1)
for img in images], dim=-1)
images = [torch.unsqueeze(self.img_transform(img), -1) for img in images]

#jitter for mimicking DVS
if self.image_jitter:
n_bits = 4
prc = 2**n_bits
for idx in range(len(images)):
images[idx] = (images[idx]*prc).int()//prc # (2) scale to 4bit
if idx<len(images)-1:
images[idx] = images[idx]-images[idx+1]

image = torch.cat(images, dim=-1)
annotations = [self.bb_transform(ann) for ann in annotations]

# [C, H, W, T], [bbox] * T
Expand Down
161 changes: 161 additions & 0 deletions src/lava/lib/dl/slayer/object_detection/dataset/coco.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
from sklearn.cluster import MiniBatchKMeans
import os
import sys
import subprocess
import importlib
import random
import numpy as np
from PIL import Image, ImageFilter, ImageTransform
from PIL.Image import Transpose

import torch
from torch.utils.data import Dataset
from torchvision import transforms

from object_detection.dataset.utils import flip_lr, flip_ud
from object_detection.boundingbox import utils as bbutils
from object_detection.boundingbox.utils import Height, Width

from typing import Any, Dict, Tuple, Optional, Callable


try:
from pycocotools.coco import COCO as COCOapi
except ModuleNotFoundError:
if importlib.util.find_spec('cython') is None:
subprocess.check_call([sys.executable,
'-m', 'pip', 'install', 'cython'])
subprocess.check_call([sys.executable, '-m', 'pip', 'install',
'git+https://github.com/philferriere/cocoapi.git'
'#egg=pycocotools&subdirectory=PythonAPI'])


def Image_Jitter(image, max_pixel_displacement_perc=0.01):
# max_pixel_displacement_perc = .01
x, y = (torch.tensor(image.shape[1:3]) *
max_pixel_displacement_perc).type(torch.int)
jitter_direction = random.randrange(-x, x), random.randrange(-y, y)
image_s = transforms.Pad(padding=(jitter_direction[0]*(jitter_direction[0] > 0),
jitter_direction[1] *
(jitter_direction[1] > 0),
-jitter_direction[0] *
(jitter_direction[0] < 0),
-jitter_direction[1]*(jitter_direction[1] < 0)))(image.squeeze())
SS = image.size()[1:]
ii = image_s[:, -jitter_direction[1]*(jitter_direction[1] < 0):SS[0]-jitter_direction[1]*(jitter_direction[1] < 0),
-jitter_direction[0]*(jitter_direction[0] < 0):SS[1]-jitter_direction[0]*(jitter_direction[0] < 0)]
return image-ii.unsqueeze(-1)


def quantize_global(image, k):
k_means = MiniBatchKMeans(k, compute_labels=False)
k_means.fit(image.reshape(-1, 1))
labels = k_means.predict(image.reshape(-1, 1))
q_img = k_means.cluster_centers_[labels]
q_image = np.uint8(q_img.reshape(image.shape))
return q_image


class _COCO(Dataset):
def __init__(self,
root: str = '.',
train: bool = False) -> None:
super().__init__()

image_set = 'train' if train else 'val'
self.coco = COCOapi(root + os.sep + 'annotations' + os.sep
+ f'instances_{image_set}2017.json')
self.root = root + os.sep + f'images{os.sep}{image_set}2017{os.sep}'
self.ids = list(sorted(self.coco.imgs.keys()))
self.cat_name = [d['name']
for d in self.coco.loadCats(self.coco.getCatIds())]
self.super_cat_name = [d['supercategory']
for d in
self.coco.loadCats(self.coco.getCatIds())]
self.idx_map = {name: idx for idx, name in enumerate(self.cat_name)}

def __getitem__(self, index: int) -> Tuple[torch.tensor, Dict[Any, Any]]:
id = self.ids[index]
path = self.coco.loadImgs(id)[0]['file_name']
image = Image.open(self.root + path).convert('RGB')
width, height = image.size
size = {'height': height, 'width': width}

anns = self.coco.loadAnns(self.coco.getAnnIds(id))
objects = []
for ann in anns:
name = self.coco.cats[ann['category_id']]['name']
bndbox = {'xmin': ann['bbox'][0],
'ymin': ann['bbox'][1],
'xmax': ann['bbox'][0] + ann['bbox'][2],
'ymax': ann['bbox'][1] + ann['bbox'][3]}
objects.append({'id': self.idx_map[name],
'name': name,
'bndbox': bndbox})

annotation = {'size': size, 'object': objects}

return image, {'annotation': annotation}

def __len__(self) -> int:
return len(self.ids)


class COCO(Dataset):
def __init__(self,
root: str = './',
size: Tuple[Height, Width] = (448, 448),
train: bool = False,
augment_prob: float = 0.0,
image_jitter: bool = False) -> None:
super().__init__()
self.blur = transforms.GaussianBlur(kernel_size=5)
self.color_jitter = transforms.ColorJitter()
self.grayscale = transforms.Grayscale(num_output_channels=3)
self.img_transform = transforms.Compose([transforms.Resize(size),
transforms.ToTensor()])
self.bb_transform = transforms.Compose([
lambda x: bbutils.resize_bounding_boxes(x, size),
])

self.datasets = [_COCO(root=root, train=train)]
self.classes = self.datasets[0].cat_name
self.idx_map = self.datasets[0].idx_map
self.augment_prob = augment_prob
self.image_jitter = image_jitter

def __getitem__(self, index) -> Tuple[torch.tensor, Dict[Any, Any]]:
dataset_idx = index // len(self.datasets[0])
index = index % len(self.datasets[0])
image, annotation = self.datasets[dataset_idx][index]

# flip left right
if random.random() < self.augment_prob:
image = Image.Image.transpose(image, Transpose.FLIP_LEFT_RIGHT)
annotation = bbutils.fliplr_bounding_boxes(annotation)
# # flip up down
# if random.random() < self.augment_prob:
# image = Image.Image.transpose(image, Transpose.FLIP_TOP_BOTTOM)
# annotation = bbutils.flipud_bounding_boxes(annotation)
# blur
if random.random() < self.augment_prob:
image = self.blur(image)
# color jitter
if random.random() < self.augment_prob:
image = self.color_jitter(image)
# grayscale
if random.random() < self.augment_prob:
image = self.grayscale(image)

image = torch.unsqueeze(self.img_transform(image), -1)

# jitter for mimicking DVS
if self.image_jitter:
image = Image_Jitter(image)

annotation = self.bb_transform(annotation)

return image, [annotation] # list in time

def __len__(self) -> int:
return sum([len(dataset) for dataset in self.datasets])
Loading