Skip to content

Commit 27acc6c

Browse files
authored
Standardize read and save (#188)
1 parent 03ffd54 commit 27acc6c

File tree

3 files changed

+126
-59
lines changed

3 files changed

+126
-59
lines changed

src/guppy/extractors/csv_recording_extractor.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,13 @@ def execute_import_csv(filepath, events, outputPath, numProcesses=mp.cpu_count()
1919
extractor = CsvRecordingExtractor(folder_path=filepath)
2020
start = time.time()
2121
with mp.Pool(numProcesses) as p:
22-
p.starmap(read_csv_and_save_hdf5, zip(repeat(extractor), events, repeat(outputPath)))
22+
p.starmap(read_and_save_csv, zip(repeat(extractor), events, repeat(outputPath)))
2323
logger.info("Time taken = {0:.5f}".format(time.time() - start))
2424

2525

26-
def read_csv_and_save_hdf5(extractor, event, outputPath):
27-
df = extractor.read_csv(event=event)
28-
extractor.save_to_hdf5(df=df, event=event, outputPath=outputPath)
26+
def read_and_save_csv(extractor, event, outputPath):
27+
output_dicts = extractor.read(events=[event], outputPath=outputPath)
28+
extractor.save(output_dicts=output_dicts, outputPath=outputPath)
2929
logger.info("Data for event {} fetched and stored.".format(event))
3030

3131

@@ -178,3 +178,18 @@ def save_to_hdf5(self, df, event, outputPath):
178178
write_hdf5(df[key[i]].dropna(), event, outputPath, key[i].lower())
179179

180180
logger.info("\033[1m" + "Reading data for {} from csv file is completed.".format(event) + "\033[0m")
181+
182+
def read(self, events, outputPath):
183+
output_dicts = []
184+
for event in events:
185+
df = self.read_csv(event=event)
186+
S = df.to_dict()
187+
S["storename"] = event
188+
output_dicts.append(S)
189+
return output_dicts
190+
191+
def save(self, output_dicts, outputPath):
192+
for S in output_dicts:
193+
event = S.pop("storename")
194+
df = pd.DataFrame.from_dict(S)
195+
self.save_to_hdf5(df=df, event=event, outputPath=outputPath)

src/guppy/extractors/doric_recording_extractor.py

Lines changed: 88 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,12 @@
1515

1616
def execute_import_doric(folder_path, storesList, outputPath):
1717
extractor = DoricRecordingExtractor(folder_path=folder_path)
18-
flag = extractor.check_doric(folder_path)
19-
20-
if flag == "doric_csv":
21-
extractor.read_doric_csv(folder_path, storesList, outputPath)
22-
elif flag == "doric_doric":
23-
extractor.read_doric_doric(folder_path, storesList, outputPath)
24-
else:
25-
logger.error("Doric file not found or not recognized.")
26-
raise FileNotFoundError("Doric file not found or not recognized.")
18+
output_dicts = extractor.read(storesList=storesList)
19+
extractor.save(output_dicts=output_dicts, outputPath=outputPath)
2720

2821

2922
class DoricRecordingExtractor:
23+
# TODO: consolidate duplicate flag logic between the `__init__` and the `check_doric` method.
3024

3125
def __init__(self, folder_path):
3226
self.folder_path = folder_path
@@ -110,9 +104,9 @@ def separate_last_element(self, arr):
110104
l = arr[-1]
111105
return arr[:-1], l
112106

113-
def check_doric(self, filepath):
107+
def check_doric(self):
114108
logger.debug("Checking if doric file exists")
115-
path = glob.glob(os.path.join(filepath, "*.csv")) + glob.glob(os.path.join(filepath, "*.doric"))
109+
path = glob.glob(os.path.join(self.folder_path, "*.csv")) + glob.glob(os.path.join(self.folder_path, "*.doric"))
116110

117111
flag_arr = []
118112
for i in range(len(path)):
@@ -141,44 +135,50 @@ def check_doric(self, filepath):
141135
logger.info("Doric file found.")
142136
return flag_arr[0]
143137

144-
def read_doric_csv(self, filepath, storesList, outputPath):
145-
path = glob.glob(os.path.join(filepath, "*.csv"))
138+
def read_doric_csv(self, storesList):
139+
path = glob.glob(os.path.join(self.folder_path, "*.csv"))
146140
if len(path) > 1:
147141
logger.error("An error occurred : More than one Doric csv file present at the location")
148142
raise Exception("More than one Doric csv file present at the location")
149-
else:
150-
df = pd.read_csv(path[0], header=1, index_col=False)
151-
df = df.dropna(axis=1, how="all")
152-
df = df.dropna(axis=0, how="any")
153-
df["Time(s)"] = df["Time(s)"] - df["Time(s)"].to_numpy()[0]
154-
for i in range(storesList.shape[1]):
155-
if "control" in storesList[1, i] or "signal" in storesList[1, i]:
156-
timestamps = np.array(df["Time(s)"])
157-
sampling_rate = np.array([1 / (timestamps[-1] - timestamps[-2])])
158-
write_hdf5(sampling_rate, storesList[0, i], outputPath, "sampling_rate")
159-
write_hdf5(df["Time(s)"].to_numpy(), storesList[0, i], outputPath, "timestamps")
160-
write_hdf5(df[storesList[0, i]].to_numpy(), storesList[0, i], outputPath, "data")
161-
else:
162-
ttl = df[storesList[0, i]]
163-
indices = np.where(ttl <= 0)[0]
164-
diff_indices = np.where(np.diff(indices) > 1)[0]
165-
write_hdf5(
166-
df["Time(s)"][indices[diff_indices] + 1].to_numpy(), storesList[0, i], outputPath, "timestamps"
167-
)
168-
169-
def read_doric_doric(self, filepath, storesList, outputPath):
170-
path = glob.glob(os.path.join(filepath, "*.doric"))
143+
144+
df = pd.read_csv(path[0], header=1, index_col=False)
145+
df = df.dropna(axis=1, how="all")
146+
df = df.dropna(axis=0, how="any")
147+
df["Time(s)"] = df["Time(s)"] - df["Time(s)"].to_numpy()[0]
148+
149+
output_dicts = []
150+
for i in range(storesList.shape[1]):
151+
if "control" in storesList[1, i] or "signal" in storesList[1, i]:
152+
timestamps = np.array(df["Time(s)"])
153+
sampling_rate = np.array([1 / (timestamps[-1] - timestamps[-2])])
154+
data = np.array(df[storesList[0, i]])
155+
storename = storesList[0, i]
156+
S = {"storename": storename, "sampling_rate": sampling_rate, "timestamps": timestamps, "data": data}
157+
output_dicts.append(S)
158+
else:
159+
ttl = df[storesList[0, i]]
160+
indices = np.where(ttl <= 0)[0]
161+
diff_indices = np.where(np.diff(indices) > 1)[0]
162+
timestamps = df["Time(s)"][indices[diff_indices] + 1].to_numpy()
163+
storename = storesList[0, i]
164+
S = {"storename": storename, "timestamps": timestamps}
165+
output_dicts.append(S)
166+
167+
return output_dicts
168+
169+
def read_doric_doric(self, storesList):
170+
path = glob.glob(os.path.join(self.folder_path, "*.doric"))
171171
if len(path) > 1:
172172
logger.error("An error occurred : More than one Doric file present at the location")
173173
raise Exception("More than one Doric file present at the location")
174-
else:
175-
with h5py.File(path[0], "r") as f:
176-
if "Traces" in list(f.keys()):
177-
keys = self.access_data_doricV1(f, storesList, outputPath)
178-
elif list(f.keys()) == ["Configurations", "DataAcquisition"]:
179-
keys = self.access_data_doricV6(f, storesList, outputPath)
174+
with h5py.File(path[0], "r") as f:
175+
if "Traces" in list(f.keys()):
176+
output_dicts = self.access_data_doricV1(f, storesList)
177+
elif list(f.keys()) == ["Configurations", "DataAcquisition"]:
178+
output_dicts = self.access_data_doricV6(f, storesList)
179+
return output_dicts
180180

181-
def access_data_doricV6(self, doric_file, storesList, outputPath):
181+
def access_data_doricV6(self, doric_file, storesList):
182182
data = [doric_file["DataAcquisition"]]
183183
res = []
184184
while len(data) != 0:
@@ -201,6 +201,7 @@ def access_data_doricV6(self, doric_file, storesList, outputPath):
201201
if f"{sep_values[-2]}/{sep_values[-1]}" in storesList[0, :]:
202202
decide_path.append(element)
203203

204+
output_dicts = []
204205
for i in range(storesList.shape[1]):
205206
if "control" in storesList[1, i] or "signal" in storesList[1, i]:
206207
regex = re.compile("(.*?)" + str(storesList[0, i]) + "(.*?)")
@@ -212,9 +213,9 @@ def access_data_doricV6(self, doric_file, storesList, outputPath):
212213
data = np.array(doric_file[decide_path[idx]])
213214
timestamps = np.array(doric_file[decide_path[idx].rsplit("/", 1)[0] + "/Time"])
214215
sampling_rate = np.array([1 / (timestamps[-1] - timestamps[-2])])
215-
write_hdf5(sampling_rate, storesList[0, i], outputPath, "sampling_rate")
216-
write_hdf5(timestamps, storesList[0, i], outputPath, "timestamps")
217-
write_hdf5(data, storesList[0, i], outputPath, "data")
216+
storename = storesList[0, i]
217+
S = {"storename": storename, "sampling_rate": sampling_rate, "timestamps": timestamps, "data": data}
218+
output_dicts.append(S)
218219
else:
219220
regex = re.compile("(.*?)" + storesList[0, i] + "$")
220221
idx = [i for i in range(len(decide_path)) if regex.match(decide_path[i])]
@@ -226,21 +227,57 @@ def access_data_doricV6(self, doric_file, storesList, outputPath):
226227
timestamps = np.array(doric_file[decide_path[idx].rsplit("/", 1)[0] + "/Time"])
227228
indices = np.where(ttl <= 0)[0]
228229
diff_indices = np.where(np.diff(indices) > 1)[0]
229-
write_hdf5(timestamps[indices[diff_indices] + 1], storesList[0, i], outputPath, "timestamps")
230+
timestamps = timestamps[indices[diff_indices] + 1]
231+
storename = storesList[0, i]
232+
S = {"storename": storename, "timestamps": timestamps}
233+
output_dicts.append(S)
230234

231-
def access_data_doricV1(self, doric_file, storesList, outputPath):
235+
return output_dicts
236+
237+
def access_data_doricV1(self, doric_file, storesList):
232238
keys = list(doric_file["Traces"]["Console"].keys())
239+
output_dicts = []
233240
for i in range(storesList.shape[1]):
234241
if "control" in storesList[1, i] or "signal" in storesList[1, i]:
235242
timestamps = np.array(doric_file["Traces"]["Console"]["Time(s)"]["Console_time(s)"])
236243
sampling_rate = np.array([1 / (timestamps[-1] - timestamps[-2])])
237244
data = np.array(doric_file["Traces"]["Console"][storesList[0, i]][storesList[0, i]])
238-
write_hdf5(sampling_rate, storesList[0, i], outputPath, "sampling_rate")
239-
write_hdf5(timestamps, storesList[0, i], outputPath, "timestamps")
240-
write_hdf5(data, storesList[0, i], outputPath, "data")
245+
storename = storesList[0, i]
246+
S = {"storename": storename, "sampling_rate": sampling_rate, "timestamps": timestamps, "data": data}
247+
output_dicts.append(S)
241248
else:
242249
timestamps = np.array(doric_file["Traces"]["Console"]["Time(s)"]["Console_time(s)"])
243250
ttl = np.array(doric_file["Traces"]["Console"][storesList[0, i]][storesList[0, i]])
244251
indices = np.where(ttl <= 0)[0]
245252
diff_indices = np.where(np.diff(indices) > 1)[0]
246-
write_hdf5(timestamps[indices[diff_indices] + 1], storesList[0, i], outputPath, "timestamps")
253+
timestamps = timestamps[indices[diff_indices] + 1]
254+
storename = storesList[0, i]
255+
S = {"storename": storename, "timestamps": timestamps}
256+
output_dicts.append(S)
257+
258+
return output_dicts
259+
260+
def save_dict_to_hdf5(self, S, outputPath):
261+
event = S["storename"]
262+
write_hdf5(S["timestamps"], event, outputPath, "timestamps")
263+
264+
if "sampling_rate" in S:
265+
write_hdf5(S["sampling_rate"], event, outputPath, "sampling_rate")
266+
if "data" in S:
267+
write_hdf5(S["data"], event, outputPath, "data")
268+
269+
def read(self, storesList):
270+
flag = self.check_doric()
271+
if flag == "doric_csv":
272+
output_dicts = self.read_doric_csv(storesList)
273+
elif flag == "doric_doric":
274+
output_dicts = self.read_doric_doric(storesList)
275+
else:
276+
logger.error("Doric file not found or not recognized.")
277+
raise FileNotFoundError("Doric file not found or not recognized.")
278+
279+
return output_dicts
280+
281+
def save(self, output_dicts, outputPath):
282+
for S in output_dicts:
283+
self.save_dict_to_hdf5(S=S, outputPath=outputPath)

src/guppy/extractors/npm_recording_extractor.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ def execute_import_npm(folder_path, num_ch, inputParameters, events, outputPath,
2424
extractor = NpmRecordingExtractor(folder_path=folder_path, num_ch=num_ch, inputParameters=inputParameters)
2525
start = time.time()
2626
with mp.Pool(numProcesses) as p:
27-
p.starmap(read_npm_and_save_hdf5, zip(repeat(extractor), events, repeat(outputPath)))
27+
p.starmap(read_and_save_npm, zip(repeat(extractor), events, repeat(outputPath)))
2828
logger.info("Time taken = {0:.5f}".format(time.time() - start))
2929

3030

31-
def read_npm_and_save_hdf5(extractor, event, outputPath):
32-
df = extractor.read_npm(event=event)
33-
extractor.save_to_hdf5(df=df, event=event, outputPath=outputPath)
31+
def read_and_save_npm(extractor, event, outputPath):
32+
output_dicts = extractor.read(events=[event], outputPath=outputPath)
33+
extractor.save(output_dicts=output_dicts, outputPath=outputPath)
3434
logger.info("Data for event {} fetched and stored.".format(event))
3535

3636

@@ -488,3 +488,18 @@ def save_to_hdf5(self, df, event, outputPath):
488488
write_hdf5(df[key[i]].dropna(), event, outputPath, key[i].lower())
489489

490490
logger.info("\033[1m" + "Reading data for {} from csv file is completed.".format(event) + "\033[0m")
491+
492+
def read(self, events, outputPath):
493+
output_dicts = []
494+
for event in events:
495+
df = self.read_npm(event=event)
496+
S = df.to_dict()
497+
S["storename"] = event
498+
output_dicts.append(S)
499+
return output_dicts
500+
501+
def save(self, output_dicts, outputPath):
502+
for S in output_dicts:
503+
event = S.pop("storename")
504+
df = pd.DataFrame.from_dict(S)
505+
self.save_to_hdf5(df=df, event=event, outputPath=outputPath)

0 commit comments

Comments
 (0)