Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions src/guppy/extractors/csv_recording_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ def execute_import_csv(filepath, events, outputPath, numProcesses=mp.cpu_count()
extractor = CsvRecordingExtractor(folder_path=filepath)
start = time.time()
with mp.Pool(numProcesses) as p:
p.starmap(read_csv_and_save_hdf5, zip(repeat(extractor), events, repeat(outputPath)))
p.starmap(read_and_save_csv, zip(repeat(extractor), events, repeat(outputPath)))
logger.info("Time taken = {0:.5f}".format(time.time() - start))


def read_csv_and_save_hdf5(extractor, event, outputPath):
df = extractor.read_csv(event=event)
extractor.save_to_hdf5(df=df, event=event, outputPath=outputPath)
def read_and_save_csv(extractor, event, outputPath):
output_dicts = extractor.read(events=[event], outputPath=outputPath)
extractor.save(output_dicts=output_dicts, outputPath=outputPath)
logger.info("Data for event {} fetched and stored.".format(event))


Expand Down Expand Up @@ -178,3 +178,18 @@ def save_to_hdf5(self, df, event, outputPath):
write_hdf5(df[key[i]].dropna(), event, outputPath, key[i].lower())

logger.info("\033[1m" + "Reading data for {} from csv file is completed.".format(event) + "\033[0m")

def read(self, events, outputPath):
output_dicts = []
for event in events:
df = self.read_csv(event=event)
S = df.to_dict()
S["storename"] = event
output_dicts.append(S)
return output_dicts

def save(self, output_dicts, outputPath):
for S in output_dicts:
event = S.pop("storename")
df = pd.DataFrame.from_dict(S)
self.save_to_hdf5(df=df, event=event, outputPath=outputPath)
139 changes: 88 additions & 51 deletions src/guppy/extractors/doric_recording_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,12 @@

def execute_import_doric(folder_path, storesList, outputPath):
extractor = DoricRecordingExtractor(folder_path=folder_path)
flag = extractor.check_doric(folder_path)

if flag == "doric_csv":
extractor.read_doric_csv(folder_path, storesList, outputPath)
elif flag == "doric_doric":
extractor.read_doric_doric(folder_path, storesList, outputPath)
else:
logger.error("Doric file not found or not recognized.")
raise FileNotFoundError("Doric file not found or not recognized.")
output_dicts = extractor.read(storesList=storesList)
extractor.save(output_dicts=output_dicts, outputPath=outputPath)


class DoricRecordingExtractor:
# TODO: consolidate duplicate flag logic between the `__init__` and the `check_doric` method.

def __init__(self, folder_path):
self.folder_path = folder_path
Expand Down Expand Up @@ -110,9 +104,9 @@ def separate_last_element(self, arr):
l = arr[-1]
return arr[:-1], l

def check_doric(self, filepath):
def check_doric(self):
logger.debug("Checking if doric file exists")
path = glob.glob(os.path.join(filepath, "*.csv")) + glob.glob(os.path.join(filepath, "*.doric"))
path = glob.glob(os.path.join(self.folder_path, "*.csv")) + glob.glob(os.path.join(self.folder_path, "*.doric"))

flag_arr = []
for i in range(len(path)):
Expand Down Expand Up @@ -141,44 +135,50 @@ def check_doric(self, filepath):
logger.info("Doric file found.")
return flag_arr[0]

def read_doric_csv(self, filepath, storesList, outputPath):
path = glob.glob(os.path.join(filepath, "*.csv"))
def read_doric_csv(self, storesList):
path = glob.glob(os.path.join(self.folder_path, "*.csv"))
if len(path) > 1:
logger.error("An error occurred : More than one Doric csv file present at the location")
raise Exception("More than one Doric csv file present at the location")
else:
df = pd.read_csv(path[0], header=1, index_col=False)
df = df.dropna(axis=1, how="all")
df = df.dropna(axis=0, how="any")
df["Time(s)"] = df["Time(s)"] - df["Time(s)"].to_numpy()[0]
for i in range(storesList.shape[1]):
if "control" in storesList[1, i] or "signal" in storesList[1, i]:
timestamps = np.array(df["Time(s)"])
sampling_rate = np.array([1 / (timestamps[-1] - timestamps[-2])])
write_hdf5(sampling_rate, storesList[0, i], outputPath, "sampling_rate")
write_hdf5(df["Time(s)"].to_numpy(), storesList[0, i], outputPath, "timestamps")
write_hdf5(df[storesList[0, i]].to_numpy(), storesList[0, i], outputPath, "data")
else:
ttl = df[storesList[0, i]]
indices = np.where(ttl <= 0)[0]
diff_indices = np.where(np.diff(indices) > 1)[0]
write_hdf5(
df["Time(s)"][indices[diff_indices] + 1].to_numpy(), storesList[0, i], outputPath, "timestamps"
)

def read_doric_doric(self, filepath, storesList, outputPath):
path = glob.glob(os.path.join(filepath, "*.doric"))

df = pd.read_csv(path[0], header=1, index_col=False)
df = df.dropna(axis=1, how="all")
df = df.dropna(axis=0, how="any")
df["Time(s)"] = df["Time(s)"] - df["Time(s)"].to_numpy()[0]

output_dicts = []
for i in range(storesList.shape[1]):
if "control" in storesList[1, i] or "signal" in storesList[1, i]:
timestamps = np.array(df["Time(s)"])
sampling_rate = np.array([1 / (timestamps[-1] - timestamps[-2])])
data = np.array(df[storesList[0, i]])
storename = storesList[0, i]
S = {"storename": storename, "sampling_rate": sampling_rate, "timestamps": timestamps, "data": data}
output_dicts.append(S)
else:
ttl = df[storesList[0, i]]
indices = np.where(ttl <= 0)[0]
diff_indices = np.where(np.diff(indices) > 1)[0]
timestamps = df["Time(s)"][indices[diff_indices] + 1].to_numpy()
storename = storesList[0, i]
S = {"storename": storename, "timestamps": timestamps}
output_dicts.append(S)

return output_dicts

def read_doric_doric(self, storesList):
path = glob.glob(os.path.join(self.folder_path, "*.doric"))
if len(path) > 1:
logger.error("An error occurred : More than one Doric file present at the location")
raise Exception("More than one Doric file present at the location")
else:
with h5py.File(path[0], "r") as f:
if "Traces" in list(f.keys()):
keys = self.access_data_doricV1(f, storesList, outputPath)
elif list(f.keys()) == ["Configurations", "DataAcquisition"]:
keys = self.access_data_doricV6(f, storesList, outputPath)
with h5py.File(path[0], "r") as f:
if "Traces" in list(f.keys()):
output_dicts = self.access_data_doricV1(f, storesList)
elif list(f.keys()) == ["Configurations", "DataAcquisition"]:
output_dicts = self.access_data_doricV6(f, storesList)
return output_dicts

def access_data_doricV6(self, doric_file, storesList, outputPath):
def access_data_doricV6(self, doric_file, storesList):
data = [doric_file["DataAcquisition"]]
res = []
while len(data) != 0:
Expand All @@ -201,6 +201,7 @@ def access_data_doricV6(self, doric_file, storesList, outputPath):
if f"{sep_values[-2]}/{sep_values[-1]}" in storesList[0, :]:
decide_path.append(element)

output_dicts = []
for i in range(storesList.shape[1]):
if "control" in storesList[1, i] or "signal" in storesList[1, i]:
regex = re.compile("(.*?)" + str(storesList[0, i]) + "(.*?)")
Expand All @@ -212,9 +213,9 @@ def access_data_doricV6(self, doric_file, storesList, outputPath):
data = np.array(doric_file[decide_path[idx]])
timestamps = np.array(doric_file[decide_path[idx].rsplit("/", 1)[0] + "/Time"])
sampling_rate = np.array([1 / (timestamps[-1] - timestamps[-2])])
write_hdf5(sampling_rate, storesList[0, i], outputPath, "sampling_rate")
write_hdf5(timestamps, storesList[0, i], outputPath, "timestamps")
write_hdf5(data, storesList[0, i], outputPath, "data")
storename = storesList[0, i]
S = {"storename": storename, "sampling_rate": sampling_rate, "timestamps": timestamps, "data": data}
output_dicts.append(S)
else:
regex = re.compile("(.*?)" + storesList[0, i] + "$")
idx = [i for i in range(len(decide_path)) if regex.match(decide_path[i])]
Expand All @@ -226,21 +227,57 @@ def access_data_doricV6(self, doric_file, storesList, outputPath):
timestamps = np.array(doric_file[decide_path[idx].rsplit("/", 1)[0] + "/Time"])
indices = np.where(ttl <= 0)[0]
diff_indices = np.where(np.diff(indices) > 1)[0]
write_hdf5(timestamps[indices[diff_indices] + 1], storesList[0, i], outputPath, "timestamps")
timestamps = timestamps[indices[diff_indices] + 1]
storename = storesList[0, i]
S = {"storename": storename, "timestamps": timestamps}
output_dicts.append(S)

def access_data_doricV1(self, doric_file, storesList, outputPath):
return output_dicts

def access_data_doricV1(self, doric_file, storesList):
keys = list(doric_file["Traces"]["Console"].keys())
output_dicts = []
for i in range(storesList.shape[1]):
if "control" in storesList[1, i] or "signal" in storesList[1, i]:
timestamps = np.array(doric_file["Traces"]["Console"]["Time(s)"]["Console_time(s)"])
sampling_rate = np.array([1 / (timestamps[-1] - timestamps[-2])])
data = np.array(doric_file["Traces"]["Console"][storesList[0, i]][storesList[0, i]])
write_hdf5(sampling_rate, storesList[0, i], outputPath, "sampling_rate")
write_hdf5(timestamps, storesList[0, i], outputPath, "timestamps")
write_hdf5(data, storesList[0, i], outputPath, "data")
storename = storesList[0, i]
S = {"storename": storename, "sampling_rate": sampling_rate, "timestamps": timestamps, "data": data}
output_dicts.append(S)
else:
timestamps = np.array(doric_file["Traces"]["Console"]["Time(s)"]["Console_time(s)"])
ttl = np.array(doric_file["Traces"]["Console"][storesList[0, i]][storesList[0, i]])
indices = np.where(ttl <= 0)[0]
diff_indices = np.where(np.diff(indices) > 1)[0]
write_hdf5(timestamps[indices[diff_indices] + 1], storesList[0, i], outputPath, "timestamps")
timestamps = timestamps[indices[diff_indices] + 1]
storename = storesList[0, i]
S = {"storename": storename, "timestamps": timestamps}
output_dicts.append(S)

return output_dicts

def save_dict_to_hdf5(self, S, outputPath):
event = S["storename"]
write_hdf5(S["timestamps"], event, outputPath, "timestamps")

if "sampling_rate" in S:
write_hdf5(S["sampling_rate"], event, outputPath, "sampling_rate")
if "data" in S:
write_hdf5(S["data"], event, outputPath, "data")

def read(self, storesList):
flag = self.check_doric()
if flag == "doric_csv":
output_dicts = self.read_doric_csv(storesList)
elif flag == "doric_doric":
output_dicts = self.read_doric_doric(storesList)
else:
logger.error("Doric file not found or not recognized.")
raise FileNotFoundError("Doric file not found or not recognized.")

return output_dicts

def save(self, output_dicts, outputPath):
for S in output_dicts:
self.save_dict_to_hdf5(S=S, outputPath=outputPath)
23 changes: 19 additions & 4 deletions src/guppy/extractors/npm_recording_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ def execute_import_npm(folder_path, num_ch, inputParameters, events, outputPath,
extractor = NpmRecordingExtractor(folder_path=folder_path, num_ch=num_ch, inputParameters=inputParameters)
start = time.time()
with mp.Pool(numProcesses) as p:
p.starmap(read_npm_and_save_hdf5, zip(repeat(extractor), events, repeat(outputPath)))
p.starmap(read_and_save_npm, zip(repeat(extractor), events, repeat(outputPath)))
logger.info("Time taken = {0:.5f}".format(time.time() - start))


def read_npm_and_save_hdf5(extractor, event, outputPath):
df = extractor.read_npm(event=event)
extractor.save_to_hdf5(df=df, event=event, outputPath=outputPath)
def read_and_save_npm(extractor, event, outputPath):
output_dicts = extractor.read(events=[event], outputPath=outputPath)
extractor.save(output_dicts=output_dicts, outputPath=outputPath)
logger.info("Data for event {} fetched and stored.".format(event))


Expand Down Expand Up @@ -488,3 +488,18 @@ def save_to_hdf5(self, df, event, outputPath):
write_hdf5(df[key[i]].dropna(), event, outputPath, key[i].lower())

logger.info("\033[1m" + "Reading data for {} from csv file is completed.".format(event) + "\033[0m")

def read(self, events, outputPath):
output_dicts = []
for event in events:
df = self.read_npm(event=event)
S = df.to_dict()
S["storename"] = event
output_dicts.append(S)
return output_dicts

def save(self, output_dicts, outputPath):
for S in output_dicts:
event = S.pop("storename")
df = pd.DataFrame.from_dict(S)
self.save_to_hdf5(df=df, event=event, outputPath=outputPath)
Loading