|
| 1 | +""" |
| 2 | +Tests of neo.io.exampleio |
| 3 | +""" |
| 4 | + |
| 5 | +import unittest |
| 6 | + |
| 7 | +from neo.io.phyio import PhyIO # , HAVE_SCIPY |
| 8 | +from neo.test.iotest.common_io_test import BaseTestIO |
| 9 | +from neo.io.proxyobjects import (AnalogSignalProxy, |
| 10 | + SpikeTrainProxy, EventProxy, EpochProxy) |
| 11 | +from neo import (AnalogSignal, SpikeTrain) |
| 12 | + |
| 13 | +import quantities as pq |
| 14 | +import numpy as np |
| 15 | + |
| 16 | +import tempfile |
| 17 | +from pathlib import Path |
| 18 | + |
| 19 | +# This run standart tests, this is mandatory for all IO |
| 20 | + |
| 21 | + |
| 22 | +class TestPhyIO(BaseTestIO, unittest.TestCase): |
| 23 | + ioclass = PhyIO |
| 24 | + files_to_test = ['phy_example_0'] |
| 25 | + files_to_download = [ |
| 26 | + 'phy_example_0/spike_times.npy', |
| 27 | + 'phy_example_0/spike_templates.npy', |
| 28 | + 'phy_example_0/spike_clusters.npy', |
| 29 | + 'phy_example_0/params.py', |
| 30 | + 'phy_example_0/cluster_KSLabel.tsv', |
| 31 | + 'phy_example_0/cluster_ContamPct.tsv', |
| 32 | + 'phy_example_0/cluster_Amplitude.tsv', |
| 33 | + 'phy_example_0/cluster_group.tsv' |
| 34 | + ] |
| 35 | + |
| 36 | + |
| 37 | +class SpecificTestPhyIO(unittest.TestCase): |
| 38 | + def setUp(self): |
| 39 | + temp_directory = Path(tempfile.gettempdir()) |
| 40 | + self.temp_folder = temp_directory.joinpath( |
| 41 | + 'files_for_testing_neo/phy/phy_example_0') |
| 42 | + |
| 43 | + def test_read_segment_lazy(self): |
| 44 | + r = PhyIO(dirname=self.temp_folder) |
| 45 | + seg = r.read_segment(lazy=True) |
| 46 | + for ana in seg.analogsignals: |
| 47 | + assert isinstance(ana, AnalogSignalProxy) |
| 48 | + ana = ana.load() |
| 49 | + assert isinstance(ana, AnalogSignal) |
| 50 | + for st in seg.spiketrains: |
| 51 | + assert isinstance(st, SpikeTrainProxy) |
| 52 | + st = st.load() |
| 53 | + assert isinstance(st, SpikeTrain) |
| 54 | + |
| 55 | + seg = r.read_segment(lazy=False) |
| 56 | + for anasig in seg.analogsignals: |
| 57 | + assert isinstance(ana, AnalogSignal) |
| 58 | + self.assertNotEqual(anasig.size, 0) |
| 59 | + for st in seg.spiketrains: |
| 60 | + assert isinstance(st, SpikeTrain) |
| 61 | + self.assertNotEqual(st.size, 0) |
| 62 | + |
| 63 | + # annotations |
| 64 | + assert 'seg_extra_info' in seg.annotations |
| 65 | + assert seg.name == 'Seg #0 Block #0' |
| 66 | + for anasig in seg.analogsignals: |
| 67 | + assert anasig.name is not None |
| 68 | + for st in seg.spiketrains: |
| 69 | + assert st.name is not None |
| 70 | + for ev in seg.events: |
| 71 | + assert ev.name is not None |
| 72 | + for ep in seg.epochs: |
| 73 | + assert ep.name is not None |
| 74 | + |
| 75 | + def test_read_block(self): |
| 76 | + r = PhyIO(dirname=self.temp_folder) |
| 77 | + bl = r.read_block(lazy=True) |
| 78 | + |
| 79 | + def test_read_segment_with_time_slice(self): |
| 80 | + r = PhyIO(dirname=self.temp_folder) |
| 81 | + seg = r.read_segment(time_slice=None) |
| 82 | + spikes_full = seg.spiketrains[0] |
| 83 | + |
| 84 | + t_start, t_stop = 260 * pq.ms, 1.854 * pq.s |
| 85 | + seg = r.read_segment(time_slice=(t_start, t_stop)) |
| 86 | + spikes_slice = seg.spiketrains[0] |
| 87 | + |
| 88 | + assert spikes_full.size > spikes_slice.size |
| 89 | + assert np.all(spikes_slice >= t_start) |
| 90 | + assert np.all(spikes_slice <= t_stop) |
| 91 | + assert spikes_slice.t_start == t_start |
| 92 | + assert spikes_slice.t_stop == t_stop |
| 93 | + |
| 94 | + |
| 95 | +if __name__ == "__main__": |
| 96 | + unittest.main() |
0 commit comments