-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathpitchflow.py
More file actions
executable file
·311 lines (245 loc) · 8.81 KB
/
pitchflow.py
File metadata and controls
executable file
·311 lines (245 loc) · 8.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
#!/usr/bin/env python
"""
pitchflow.py
Python port of the new delta-pitch-without-pitch feature
based on cross-correlating adjacent spectra.
2014-01-24 Dan Ellis dpwe@ee.columbia.edu
"""
import numpy as np
# For filter
import scipy.signal
# For SRI's wavreading code
import scipy.io.wavfile as wav
# For command line
import sys
def frame(x, window, hop):
""" Convert vector x into an array of successive window-point segments,
stepped by hop
Done with stride_tricks, no copying, but have to lose the final
part-window
"""
nframes = 1 + int( np.floor( (len(x)-window)/hop ))
shape = (nframes, window)
strides = (x.strides[0] * hop, x.strides[0])
return np.lib.stride_tricks.as_strided(x, shape=shape, strides=strides)
def stftm(signal, nfft, window, hop):
""" calculate the short-time fourier transform magnitude """
frames = frame(signal, window, hop)
# apply frame window to each frame
window = np.hanning(window)
wframes = frames * window
return np.abs(np.fft.rfft(wframes, int(nfft)))
def fft2logfmx(nfft, sr=8000, nbins=0, width=1.0, fmin=50.0, bpo=12.0):
"""
Mapping matrix to convert FFT to logf (constant-Q) spectrogram.
2014-01-16 dpwe@ee.columbia.edu after logfsgram
"""
# Ratio between adjacent frequencies in log-f axis
fratio = np.power(2.0, (1.0/bpo))
# How many bins in log-f axis
if nbins == 0:
# default goes all the way to nyquist
nbins = int(np.floor( np.log((sr/2)/fmin) / np.log(fratio) ))
else:
nbins = int(nbins)
# Freqs corresponding to each bin in FFT
nfftbins = int(nfft/2+1)
fftfrqs = np.array(range(nfftbins))*(sr/nfft)
# Freqs corresponding to each bin in log F output
frqs = fmin * np.exp(np.log(2.0)*np.array(range(nbins))/bpo)
# Bandwidths of each bin in log F
logfbws = width * frqs * (fratio - 1.0)
# .. but bandwidth cannot be less than FFT binwidth
logfbws = np.maximum(logfbws, sr/nfft)
# Controls how much overlap there is between adjacent bands
ovfctr = 0.5475 # Adjusted by hand to make sum(mx'*mx) close to 1.0
# Weighting matrix mapping energy in FFT bins to logF bins
# is a set of Gaussian profiles depending on the difference in
# frequencies, scaled by the bandwidth of that bin
freqdiff = ( (np.tile(frqs[:, np.newaxis], (1,nfftbins))
- np.tile(fftfrqs, (nbins, 1)))
/ (ovfctr*logfbws[:, np.newaxis]) )
wts = np.exp( -0.5*np.square(freqdiff) )
# Normalize rows by sqrt(E), so multiplying by mx' gets aprx orig spec back
wts = wts / np.sqrt(2*np.sum(np.square(wts)))
return wts, frqs
def convbyrow(H, X):
"""
H is a vector, each row of Y is a row of X convolved with H
"""
(nr, nc) = np.shape(X)
lh = len(H);
Y = np.zeros( (nr, nc + lh - 1) )
for i in range(nr):
Y[i,] = np.convolve(H, X[i,])
return Y
def localmvnorm(X,W):
"""
Y = localmvnorm(X,W)
Remove local mean and divide out local SD from vector X to
return Y. W is the length of the local estimation window.
For X a matrix, normalization is applied to rows.
2014-01-17 Dan Ellis dpwe@ee.columbia.edu
"""
(bins, frames) = np.shape(X)
#print 'size(X)=',bins,'bins x',frames,'frames W=',W
# Pad with reflections at ends for off-the end continuity
padpoints = np.floor(W/2)
Xpad = np.r_[ X[padpoints:0:-1,],X,X[:-padpoints-1:-1,] ]
(padbins, frames) = np.shape(Xpad)
win = np.hanning(W+2)
# strip the zeros
win = win[1:-1]
win = win / np.sum(win)
whlen = int(np.floor((W-1)/2))
MAmean = convbyrow(win, Xpad.T).T
MAmean = MAmean[whlen:whlen+padbins,:]
MAvar = convbyrow(win, np.square(Xpad-MAmean).T).T
MAmean = MAmean[padpoints:padpoints+bins,:]
MAstd = np.sqrt(MAvar[whlen+padpoints : whlen+padpoints+bins,:])
return (X-MAmean) / (MAstd + (MAstd==0))
def xcorr(a, b, halfwidth):
""" cross-correlate a and b, returning points out to +/- halfwidth """
r = np.correlate(a,b,'full')
return r[len(a)-halfwidth-1:len(a)+halfwidth]
########### main function ##############
def pitchflow(d, sr):
"""
Y = pitchflow(d,sr)
Simply calculate a spectrogram
and take cross-correlation of successive spectra
to capture systematic shift in frequency
2014-01-16 Dan Ellis dpwe@ee.columbia.edu
"""
twin = 0.032
thop = 0.010
nwin = np.round(twin * sr)
nfft = np.power(2.0, np.ceil(np.log(twin*sr)/np.log(2.0)))
nhop = np.round(thop * sr)
#print "nfft=",nfft,"twin= %0.3f s"%(nfft/float(sr))
# Calculate base spectrogram
D = stftm(d, nfft, nwin, nhop)
# Convert to log-freq axis
bpo = 24
fmin = 50
fmax = 1500
nbins = np.round(bpo * np.log(fmax/fmin)/np.log(2.0))
width = 1.0
wts, frqs = fft2logfmx(nfft, sr, nbins, width, fmin, bpo)
# log-f sgram
DL = np.dot(wts, D.T)
(nbins, nframes) = np.shape(DL)
# local mean and variance normalization on each spectrum
mvnormwin = 48
DL = localmvnorm(DL, mvnormwin)
# Record frame-on-frame xcorr
delay = 2
halfwidth = 12
mxmd = np.empty( (2*halfwidth+1, nframes) )
mxmw = np.empty( nframes )
for i in range(nframes):
# output moves with first arg (so right-shift of first arg gives
# right-shift of output)
dl0 = DL[:,i]
dl1 = DL[:, np.maximum(1, i-delay)]
mxmd[:,i] = xcorr(dl0, dl1, halfwidth)
# normalizing constant
mxmw[i] = np.sqrt(np.sum(np.square(dl0)*np.sum(np.square(dl1))))
# Keep central bin as normalizing constant
#midbin = halfwidth
#Y0 = mxmd[midbin,]
Y = mxmd/(mxmw + (mxmw==0))
return Y
def pitchflow_collapse(Y, NBIN=0):
"""
F = pitchflow_collapse(Y)
Y is a set of column feature vectors indicating pitchflow from
dpitch. Collapse these 25 (?) dimensional vectors into 2 or 3
summary dimensions - something on peakiness, something on
center of mass, something on compactness
2014-01-16 Dan Ellis dpwe@ee.columbia
"""
(nr, nc) = np.shape(Y)
maxlag = (nr - 1)/2
# Maybe chop out just middle bins
if NBIN > 0:
newmaxlag = NBIN # was 8
Y = Y[maxlag-newmaxlag : maxlag+newmaxlag+1,]
(nr, nc) = np.shape(Y)
maxlag = newmaxlag
# Some preprocessing - convolve with smoothing window
DOSMOOTH = 1
if DOSMOOTH:
smoohwin = 2
smoo = 2*smoohwin+1
smwin = np.hanning(smoo+2)
smwin = smwin[1:-1]
Ys = convbyrow(smwin, Y)
Ys = Ys[:, smoohwin:-smoohwin];
else:
Ys = Y
# Raise to a power to increase dominance of peak values
#ep = 2.0;
#Y = Ys.^ep;
# Now Y is bipolar; best to make it positive before taking moments,
# and exponentiation also emphasizes peak
escale = 1.0
Y = np.exp(Ys/escale)
# First dimension - "spectral entropy" crest factor
#p = 2
#F0 = np.power(np.mean(np.power(Y, p)), 1/p) / np.mean(Y);
# .. or just first moment
F0 = np.mean(Y, axis=0)
# Second dimension - center of mass
lags = np.array(range(-maxlag,maxlag+1))
F1 = np.mean(Y.T*lags, axis=1) / F0
# Third dimension - inertial moment
F2 = np.sqrt(np.mean(Y.T*np.square(lags), axis=1) / F0 - np.square(F1))
ftrs = np.c_[F0,F1,F2]
return ftrs
############## Provide a command-line wrapper
import HTKfile
from scikits.audiolab import Sndfile
import os
def writehtk(filename, features):
""" write array of features to HTK file """
rows, veclen = np.shape(features)
writer = HTKfile.writer(filename, veclen)
writer.writeall(features)
def readsph(filename):
""" read in audio data from a sphere file. Return d, sr """
f = Sndfile(filename, 'r')
data = f.read_frames(f.nframes, dtype=np.float32)
sr = f.samplerate
return data, sr
def readwav(filename):
""" read in audio data from a wav file. Return d, sr """
# Read in wav file
sr, wavd = wav.read(filename)
# normalize short ints to floats of -1 / 1
data = np.asfarray(wavd) / 32768.0
return data, sr
def main(argv):
""" Main routine to apply pitchflow from command line """
if len(argv) != 3:
raise NameError( ("Usage: " + argv[0] +
" inputsound.wav outftrs.htk") )
inwavfile = argv[1]
outhtkfile = argv[2]
fileName, fileExtension = os.path.splitext(inwavfile)
if fileExtension == ".wav":
data, sr = readwav(inwavfile)
elif fileExtension == ".sph":
data, sr = readsph(inwavfile)
else:
raise NameError( ("Cannot determine type of infile " +
inwavfile) )
# Apply
ftrs1 = pitchflow(data, sr)
ftrs = pitchflow_collapse(ftrs1)
# Write the htk data out
writehtk(outhtkfile, ftrs)
print "Wrote", outhtkfile
# Actually run main
if __name__ == "__main__":
main(sys.argv)