-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy patheasy_wave.py
More file actions
342 lines (276 loc) · 13.7 KB
/
easy_wave.py
File metadata and controls
342 lines (276 loc) · 13.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
import numpy as np
from enum import Enum
from io import BytesIO
import ftplib
from collections import OrderedDict
DEBUG = False
# -------------------------------------------------------------------------------------------------------
# The Waveform, AND_Waveform, Core_Pulses and Packaged_Waveform class should be hardware indepedant
# -------------------------------------------------------------------------------------------------------
class ANDOverlapError(Exception):pass
class ANDLengthError(Exception):pass
class TimeStepError(Exception):pass
class PulseLengthError(Exception):pass
#For us close enough means within one time step (100ps < 1/1.2e9 should be sufficient)
def is_close(a,b, tol=0.1e-9):
return abs(a-b) < tol
def to_integer(a, error=PulseLengthError()):
int_a = int(np.round(a))
if is_close(a, int_a, tol=1e-5):
# print(int_a)
return int_a
else:
raise error
class Waveform(object):
PULSE_TYPE = None
def __init__(self, wave_list=[], chs=set()):
self.wave_list = wave_list
self.chs = set().union(*[wave.chs for wave in wave_list])
self.t = sum(wave.t for wave in wave_list)
self.repr_str = ('(' + ' + '.join(['{}']*len(wave_list)) + ')').format(*[str(wave) for wave in wave_list])
def has_ch(self, ch):
#This can be used for specific outputs using the enum or for general channels (1,2,3,4)
if type(ch) == int:
return any(c.value[0]==ch for c in self.chs)
else:
return ch in self.chs
def __str__(self):
return self.repr_str
def __len__(self):
return self.t
def __add__(self, other):
a = [self] if type(self)==AND_Waveform else self.wave_list
b = [other] if type(other)==AND_Waveform else other.wave_list
return Waveform(wave_list = a + b)
def __and__(self, other):
a = self.wave_list if type(self)==AND_Waveform else [self]
b = other.wave_list if type(other)==AND_Waveform else [other]
return AND_Waveform(wave_list = a + b)
def __mul__(self, integer):
return self.repeat(integer)
def get_ts(self, rate):
wfm_len = self.t*rate
wfm_len = to_integer(wfm_len, error=PulseLengthError("Waveform has non-integer length ({}) with respect to rate ({})\n\tWaveform:{}".format(wfm_len, rate, str(self))))
return np.linspace(0, self.t, wfm_len, endpoint=False)
def generate(self, rate, ch):
ts = self.get_ts(rate)
return ts, self.generator(ts, rate, ch)
def blank(self):
return Zero(t=self.t, ch=Channel.no_ch)
def repeat(self, integer):
if type(integer) != int:
raise TypeError("Cannot repeat a waveform by something other that is not an integer")
out = Empty()
for i in range(integer):
out += self
return out
def generator(self,ts,rate,ch):
if not ch in self.chs:
if DEBUG: print("WARNING:{} is returning zeros".format(str(self)))
dtype = get_dtype(ch)
return np.zeros(len(ts), dtype=dtype)
ti = 0
gen_list = list()
for wave in self.wave_list:
ti_next = ti + (wave.t*rate)
ti_next = to_integer(ti_next,
error=TimeStepError("Length of waveform is not an integer multiple of the time step\n\tRate: {} (dt = {})\n\tWaveform: {}\n\tWaveform length:{}".format(rate, 1/rate, str(wave),wave.t))
)
gen_list.append(wave.generator(ts[ti:ti_next], rate, ch))
return np.concatenate(gen_list)
class AND_Waveform(Waveform):
def __init__(self, wave_list=[]):
self.wave_list = wave_list
self.chs = set().union(*[wave.chs for wave in wave_list])
self.repr_str = ('(' + '&'.join(['{}']*len(wave_list)) + ')').format(*[str(wave) for wave in wave_list])
if len(wave_list) == 0:
raise Exception("Trying to create an empty AND_Waveform. Not allowed. Use the base Waveform class")
self.t = wave_list[0].t
#Validate the waveform
temp = self.chs.copy()
for wave in wave_list:
if not wave.chs <= temp:
raise ANDOverlapError("Cannot perform a AND operation because channels overlap\n\tWaveform : {}".format(self.repr_str))
temp -= wave.chs
if not is_close(wave.t, self.t):
raise ANDLengthError("Cannot perform a AND operation because waveforms are different lenght\n\tWaveform A: {}\n\tWaveform B:{}".format(self.t, wave.t))
def generator(self,ts,rate,ch):
for wave in self.wave_list:
if ch in wave.chs:
return wave.generator(ts, rate, ch)
if DEBUG: print("WARNING:{} is returning zeros".format(str(self)))
dtype = get_dtype(ch)
return np.zeros(len(ts), dtype=dtype)
class Core_Pulse(Waveform):
#This is a single channel pulse
def __init__(self, t, ch, *args, **kwargs):
self.wave_list = [self]
self.chs = set([ch])
self.t = t
self.args = args
self.kwargs = kwargs
if hasattr(self, 'sim'):
self.PULSE_TYPE = self.sim(*args, **kwargs)
def __str__(self):
return self.__class__.__name__
def generator(self, ts, rate, ch):
dtype = get_dtype(ch)
if not ch in self.chs:
if DEBUG: print("Warning: Asked for a wrong channel ({}) in Core_Pulse {}".format(ch, str(self)))# @DEBUG
return np.zeros(len(ts), dtype=dtype)
# Build the array
arr = self.build(ts, *self.args, rate=rate, ch=ch, **self.kwargs)
# Validate the results
if not len(arr) == len(ts):
raise PulseLengthError("Pulse build function did not return the correct lenght\n\tPulse: {}\n\tlen(ts)={}\n\tlen(arr)={}".format(str(self),len(ts),len(arr)))
if arr.dtype != dtype:
arr = arr.astype(dtype)
return arr
def build(self, ts, *args, **kwargs):
"""
This is the function the user will implement to return a numpy array.
By default **kwargs will always contain the rate and the ch, which the user can make use of (or not)
"""
pass
def Packaged_Waveform(func, sim_func=None):
def wrapper(*args, **kwargs):
w = func(*args, **kwargs)
w.args = args
w.kwargs = kwargs
w.repr_str = func.__name__
return w
return wrapper
##This is to allow wave.blank and wave.repeat to work
from wave_library import Zero, Empty
# -------------------------------------------------------------------------------------------------------
# This will transform the waveforms into AWG lines and then write them to file. This part is hardware depedent. (Here written for the Tektronix AWG 5014C)
# -------------------------------------------------------------------------------------------------------
class Channel(Enum):
ch1_a = (1,0);ch1_m1 = (1,1);ch1_m2 = (1,2);
ch2_a = (2,0);ch2_m1 = (2,1);ch2_m2 = (2,2);
ch3_a = (3,0);ch3_m1 = (3,1);ch3_m2 = (3,2);
ch4_a = (4,0);ch4_m1 = (4,1);ch4_m2 = (4,2);
no_ch = (0,0);
REAL_CHANNELS = [ch for ch in Channel]
REAL_CHANNELS.remove(Channel.no_ch)
CHANNEL_GROUPS = OrderedDict([(i,[Channel['ch{}_{}'.format(i,ch)] for ch in ['a', 'm1', 'm2']]) for i in range(1,5)])
DEFAULTS_LIMITS = [(-0.5, 0.5),(0.0, 2.7),(0.0, 2.7)]
def get_dtype(ch):
return bool if ch.value[1] > 0 else float
try:
from lantz.drivers.tektronix.awg5014c_tools import AWG_File_Writer
import lantz.drivers.tektronix.awg5014c_constants as cst
except:
print("Warning: You do not have lantz install with the proper instrument drivers. You will not be able to generate .awg files for the AWG5014C, but you can still use the generator code")
class Block_Writer(object):
def __init__(self, shifts={}):
self.shifts = shifts
self.blocks = OrderedDict([])
self.cur_block_name = None
self.block_size = None
def new_block(self, block_name):
self.cur_block_name = block_name
self.blocks[block_name] = OrderedDict([])
def add_line(self, waveform, sub_name=None, repeat=1):
if not len(self.blocks):
raise Exception("Must create a new block first using the AWG_Block_writer.new_block function")
cur_block_size = len(self.blocks[self.cur_block_name]) + 1
if len(self.blocks) == 1:#First block defines the block size
self.block_size = cur_block_size
if cur_block_size > self.block_size:
raise Exception("Uneven block size")
sub_name = 'sub'+str(len(self.lines)) if sub_name is None else sub_name
name = self.cur_block_name + '_' + sub_name
self.blocks[self.cur_block_name][name] = {'waveform':waveform, 'repeat':repeat}
def make_awg_writer(self):
writer = AWG_Writer(shifts=self.shifts)
line_no = 1
for bname, lines in self.blocks.items():
block_start_line = line_no
last_name = list(lines.keys())[-1]
for lname, line in lines.items():
goto = block_start_line if lname == last_name else line_no + 1
writer.add_line(line['waveform'], lname, repeat=line['repeat'], goto=goto)
line_no += 1
return writer
def print_info(self):
print("This sequence contains {} blocks".format(len(self.blocks)))
print("This sequence should be run with:\n\tCountsVsLine where lines=arange(1,{},{})".format(len(self.blocks)*self.block_size, self.block_size))
def generate(self, rate, limits={}):
return self.make_awg_writer().generate(rate, limits=limits)
def generate_and_upload(self, address, remote_filename, rate=1e9, limits={}):
return self.make_awg_writer().generate_and_upload(address, remote_filename, rate=rate, limits=limits)
class AWG_Writer(object):
def __init__(self, shifts={}):
self.lines = list()
self.default_shifts = shifts
def add_line(self, waveform, name, repeat=0, goto=0, shifts={}, jump_target=0, wait_for_trigger=False, use_sub_seq=False, sub_seq_name=''):
shifts = self.default_shifts if shifts == {} else shifts
self.lines.append({'name':name, 'waveform':waveform,'shifts':shifts,
'params':{'repeat_count':repeat, 'goto_target':goto, 'jump_target':jump_target,
'wait_for_trigger':wait_for_trigger, 'use_sub_seq':use_sub_seq,
'sub_seq_name':sub_seq_name}})
def get_line(self, line):
return self.lines[line-1]['waveform']
def __getitem__(self, line):
return self.get_line(line)
def generate(self, rate, limits={}):
file_writer = AWG_File_Writer()
zero_lines = list()
for i, line in enumerate(self.lines):
line_no = i + 1
#Generate the 4 waveform in the line
wfm_names = list()
for group, chs in CHANNEL_GROUPS.items():
wfm_len = line['waveform'].t*rate
wfm_len = to_integer(wfm_len, error=PulseLengthError("Waveform <{}> channel group <{}> has non-integer length ({}) with respect to rate ({})".format(line['name'],group, wfm_len, rate)))
if any([line['waveform'].has_ch(ch) for ch in chs]):
name = line['name']+' ch'+str(group)
#Compute the integer shift
shifts = {ch:int(round(line['shifts'][ch]*rate)) if ch in line['shifts'] else 0 for ch in chs}
# Generate the arrays and add to file
ts = np.linspace(0, line['waveform'].t, wfm_len, endpoint=False)
ws = [np.roll(line['waveform'].generator(ts, rate, ch), shifts[ch]) for ch in chs]
file_writer.add_waveform(name, *ws)
else:
name = 'zeros {}'.format(wfm_len)
if not wfm_len in zero_lines:
file_writer.add_waveform(name, np.zeros(wfm_len, dtype=float), np.zeros(wfm_len, dtype=bool), np.zeros(wfm_len, dtype=bool))
zero_lines.append(wfm_len)
wfm_names.append(name)
#Add the sequence
file_writer.add_sequence_line(wfm = wfm_names, **line['params'])
#Add limits
for ch in REAL_CHANNELS:
lo, hi = limits[ch] if ch in limits else DEFAULTS_LIMITS[ch.value[1]]
if ch.value[1] == 0:
records = [
('ANALOG_METHOD_'+str(ch.value[0]), cst.EAnalogInputMethod.AnalogInputMethod_IMHighLow, 3),
('ANALOG_LOW_'+str(ch.value[0]), lo, 3),
('ANALOG_HIGH_'+str(ch.value[0]), hi, 3),
]
else:
records = [
('MARKER{1}_METHOD_{0}'.format(*ch.value), cst.EMarkerInputMethod.MarkerInputMethod_IMHighLow, 3),
('MARKER{1}_LOW_{0}'.format(*ch.value), lo, 3),
('MARKER{1}_HIGH_{0}'.format(*ch.value), hi, 3)
]
for r in records:
file_writer.add_record(*r)
#Add a few more records
file_writer.add_record('SAMPLING_RATE', rate, 2)
file_writer.add_record('RUN_MODE', cst.ERunMode.RunMode_SEQUENCE, 2)
sequence_file = BytesIO()
sequence_file.write(file_writer.get_bytes())
sequence_file.seek(0)
return sequence_file
def generate_and_upload(self, address, remote_filename, rate=1e9, limits={}):
sequence_file = self.generate(rate=rate, limits=limits)
print('file generated')
ftp = ftplib.FTP(address)
print('opening ftp at {}'.format(address))
ftp.login()
ftp.storbinary('STOR {}'.format(remote_filename), sequence_file, blocksize=1024)
print('uploaded')
ftp.quit()
return self