-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathBall_detect_updated.py
More file actions
243 lines (196 loc) · 6.9 KB
/
Ball_detect_updated.py
File metadata and controls
243 lines (196 loc) · 6.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
#! /usr/bin/env python2
import Queue
import sys
import cv2
import numpy as np
import keras
from keras.models import load_model
#from cv_bridge import CvBridge, CvBridgeError
#import detect_ball
import time
import signal
import imutils
#SIGINT handler
def sigint_handler(signal, frame):
#Do something while breaking
pdb.set_trace()
sys.exit(0)
class Ball_Detec:
def __init__(self):
self.resource = 0
self.max_size = 255*100000 # max size of ball allowed
self.full_size = 255*100000 # size of ball at minimum distance
self.high_thresh = {'low':(28,100,120), 'high':(48,200,230)}
self.low_thresh = {'low':(28,0,80), 'high':(57,255,255)}
self.high_thresh_sunny = {'low':(20,60,120), 'high':(48,200,230)}
self.low_thresh_sunny = {'low':(20,0,80), 'high':(57,255,255)}
self.Visited = np.zeros((1,1))
self.low_mask = np.zeros((1,1))
self.clusterQ = Queue.Queue()
# high and low colour boundaries in RGB 152 85.0 78.4
def open_stream(self):
print ("Trying to open resource: ") + str(self.resource)
self.stream = cv2.VideoCapture(self.resource)
# cap.set(cv2.CAP_PROP_POS_MSEC, 1000)
if not self.stream.isOpened():
print ("Error opening resource: ") + str(self.resource)
print ("Maybe opencv VideoCapture can't open it")
print ("Correctly opened resource ID:" +str(self.resource)+", starting to show feed.")
def read_stream(self,unsupervised_mode="hough",save=False,debugging=True):
while True:
rval, frame = self.stream.read()
if rval:
if(save):
self.save_img("Filename")
image, mask = self.get_mask(frame) # get hysterisis mask
if not np.any(mask):
continue
if np.sum(mask) > self.full_size:
print ("ball reached\t",np.sum(mask))
break
img_array = self.getMultiWindow(image, mask) # crop image and resize to 50x50
if(debugging):
cv2.imshow("mask", mask)
cv2.imshow("frame", image)
print(np.shape(img_array))
i = 0
while i < np.shape(img_array)[0]:
gray = cv2.cvtColor(img_array[i], cv2.COLOR_BGR2GRAY)
i = i+1
print(np.shape(gray))
if(unsupervised_mode=="hough"):
data[t] = self.hough(gray)
# elif (unsupervised_mode=="GMM"):
# data[t] = self.GMM(gray)
key = cv2.waitKey(20)
cv2.destroyWindow("preview")
else:
print("Stream Read RVal False: Unable to get frames")
def hough(self,array):
cv2.imwrite("/home/ameya/Desktop/MRT/curr" + '/' + "imp" + ".jpg",array)
circles = cv2.HoughCircles(array,cv2.HOUGH_GRADIENT,1,20,param1=40,param2=25,minRadius=5,maxRadius=30)
print(circles)
if circles is None:
a = 0
else:
circles = np.uint16(np.around(circles))
print(circles)
for i in circles[0,:]:
# draw the outer circle
cv2.circle(array,(i[0],i[1]),i[2],(0,255,0),2)
# draw the center of the circle
cv2.circle(array,(i[0],i[1]),2,(0,0,255),3)
cv2.imshow('detected circles',array)
cv2.imwrite("/home/ameya/Desktop/MRT/curr" + '/' + str(i) + ".jpg",array)
#cv2.waitKey(0)
cv2.destroyAllWindows()
print("printed")
def getMultiWindow(self,image, mask):
windows = []
mask = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY)[1]
if not np.any(mask):
window = self.getWindow(image, mask)
windows.append(window)
return np.array(windows)
ret, labels = cv2.connectedComponents(mask)
# xrange is just faster implementation of range
for i in xrange(1,ret):
label = cv2.inRange(labels,i,i)
# print np.any(label)
# cv2.imshow(str(i),label)
# cv2.waitKey()
# cv2.destroyWindow("preview")
window = self.getWindow(image, label)
windows.append(window)
return np.array(windows)#,labeled_img
def getWindow(self,image, mask):
if (np.any(mask)):
x,y,w,h = cv2.boundingRect(mask)
# print x,y,w,h,image.shape
s = max(w,h)
if (s < 40): s = 40
x1 = x-5
y1 = y-5
x2 = x+5+s
y2 = y+5+s
if x1 < 0:
x2 = x2 - x1
x1 = 0
if y1 < 0:
y2 = y2 - y1
y1 = 0
if x2 >= image.shape[1]:
x1 = max(0,x1-(x2-image.shape[1]))
x2 = image.shape[1]-1
if y2 >= image.shape[0]:
y1 = max(0,y1-(y2-image.shape[0]))
y2 = image.shape[0]-1
image = image[y1:y2,x1:x2]
# print x1,x2,y1,y2
# print
window = cv2.resize(image,(50,50))
# cv2.imshow("wind",window)
# kernel = np.zeros( (9,9), np.float32)
# kernel[4,4] = 2.0 #Identity, times two!
# boxFilter = np.ones( (9,9), np.float32) / 81.0
# kernel = kernel - boxFilter
# window = cv2.filter2D(window, -1, kernel)
# cv2.rectangle(image,(x1,y1),(x2,y2),(255,255,255),2)
return window
def save_img(self,filename):
cv2.imwrite(filename, self.frame)
########################## debugging #################################
# high_thresh = {'low':(28,100,120), 'high':(48,200,230)}
# low_thresh = {'low':(28,0,80), 'high':(57,255,255)}
######################################################################
def get_mask(frame,gaussian=False):
# shrink image
shrink_img = imutils.resize(frame, width=600)
# BGR to HSV
hsv_image = cv2.cvtColor(shrink_img, cv2.COLOR_BGR2HSV)
print("gauss",gaussian)
if(gaussian):
hsv_image = cv2.GaussianBlur(hsv_image, (11, 11), 0)
masked_high = cv2.inRange(hsv_image, self.high_thresh['low'], self.high_thresh['high'])
masked_low = cv2.inRange(hsv_image, self.low_thresh['low'], self.low_thresh['high'])
if (not np.any(masked_high)):
# print "calc sunny"
masked_high = cv2.inRange(hsv_image, self.high_thresh_sunny['low'], self.high_thresh_sunny['high'])
masked_low = cv2.inRange(hsv_image, self.low_thresh_sunny['low'], self.low_thresh_sunny['high'])
hyst_mask = self.hyst_threshold(masked_high,masked_low)
return shrink_img, hyst_mask
def hyst_threshold(self,mask_high,mask_low):
self.Visited = np.zeros(mask_low.shape)
self.low_mask = mask_low
for i in range(mask_high.shape[0]):
for j in range(mask_high.shape[1]):
if (self.Visited[i,j]==0 and mask_high[i,j]>0):
self.clusterQ.put((i,j))
while (not self.clusterQ.empty()):
self.cluster()
return self.Visited.astype(np.uint8)
def cluster(self):
(i,j) = self.clusterQ.get()
if (i<0 or j<0 or i>=self.Visited.shape[0] or j>=self.Visited.shape[1]):
return
if (self.Visited[i,j]>0):
return
if (self.low_mask[i,j]==0):
return
self.Visited[i,j] = 255
self.clusterQ.put((i-1,j))
self.clusterQ.put((i,j-1))
self.clusterQ.put((i,j+1))
self.clusterQ.put((i+1,j))
if __name__ == '__main__':
np.set_printoptions(threshold='nan')
signal.signal(signal.SIGINT, sigint_handler)
Ball_Detec_obj=Ball_Detec()
Ball_Detec_obj.open_stream()
Ball_Detec_obj.read_stream()
# how to get output of supervised with sync
# how to store relevant data from output of unsup and sup
# how to determine whether other node has failed
# how to get direction data with sync
# stop on reaching ball
# Can take probabilities of 5-10 images (setting frame rate high) and get the median probability and take the angle of the image at the centre of the sample of 10 images. This would help reduce noisy predictions.