forked from bonsai-rx/spikeglx
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathFetch.cs
More file actions
199 lines (180 loc) · 8.94 KB
/
Fetch.cs
File metadata and controls
199 lines (180 loc) · 8.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
using System;
using System.ComponentModel;
using System.Reactive.Linq;
using OpenCV.Net;
using System.Threading;
using System.Threading.Tasks;
using PrecisionTiming;
namespace Bonsai.SpikeGLX
{
/// <summary>
/// Represents an operator that generates a sequence of data buffers from a SpikeGLX data stream.
/// </summary>
[Combinator(MethodName = nameof(Generate))]
[WorkflowElementCategory(ElementCategory.Source)]
[Description("Streams buffers of data from a SpikeGLX data stream.")]
public class Fetch : Source<Mat>
{
/// <summary>
/// Gets or sets the IP address of the SpikeGLX command server
/// </summary>
[Category("Command Server")]
[Description("IP address of the SpikeGLX command server." +
"\"localhost\" evaluates to 127.0.0.1.")]
public string Host { get; set; } = "localhost";
/// <summary>
/// Gets or sets the port of the SpikeGLX command server.
/// </summary>
[Category("Command Server")]
[Description("Port of the SpikeGLX command server.")]
public int Port { get; set; } = 4142;
/// <summary>
/// Gets or sets the duration of fetched data buffers, in ms.
/// </summary>
[Description("Duration of streamed data buffers, in ms.")]
public int BufferLength { get; set; } = 1000;
/// <summary>
/// Gets or sets a value specifying the type of stream to fetch from.
/// </summary>
[Description("The type of stream to fetch from.")]
public StreamType StreamType { get; set; } = StreamType.Daq;
/// <summary>
/// Gets or sets the substream (0 for NIDAQ, probe number for IMEC Probe).
/// </summary>
[Description("Substream (0 for NIDAQ, probe number for IMEC Probe).")]
public int Substream { get; set; } = 0;
/// <summary>
/// Gets or sets the array of channels to fetch data from.
/// </summary>
/// <remarks>
/// Channels may be provided as an array of integers, or as comma separated ranges of
/// channels with an optional step size, e.g. "0:10,20:5:100". These ranges include both
/// end points.
/// </remarks>
[Description("Array of channels to fetch data from.")]
[TypeConverter(typeof(ChannelRangeTypeConverter))]
public int[] Channels { get; set; } = { 0 };
/// <summary>
/// Gets or sets the factor by which streamed data is downsampled.
/// </summary>
[Description("Factor by which streamed data is downsampled.")]
public int Downsample { get; set; } = 1;
/// <summary>
/// Gets or sets the flag to convert the streamed data from a unitless quantity
/// to a voltage, in volts.
/// </summary>
[Description("Flag to convert the streamed data from a unitless quantity to a voltage, in volts.")]
public bool ConvertToVoltage { get; set; } = false;
/// <summary>
/// Gets or sets the flag to use a high resolution timer (resolution ~1ms vs. ~15ms).
/// </summary>
/// <remarks>
/// Using the high resolution timer allows streaming data at a higher rate, at the
/// cost of more computational load due to increased polling of the SpikeGLX command server.
/// </remarks>
[Description("Flag to use a high resolution timer (resolution ~1ms vs. ~15ms).")]
public bool HighResolutionTimer { get; set; } = false;
/// <summary>
/// Generates an observable sequence of buffers of data streamed from a SpikeGLX data stream.
/// </summary>
/// <returns>
/// A sequence of <see cref="Mat"/> objects representing buffers of streamed data from a
/// SpikeGLX data stream.
/// </returns>
public override IObservable<Mat> Generate()
{
return Observable.Create<Mat>((observer, cancellationToken) =>
{
return Task.Factory.StartNew(() =>
{
// Establish connection to SpikeGLX command server.
using SpikeGLXDataStream connection = new(Host, Port, (int)StreamType, Substream, Channels);
// Get the sample rate of the stream and use it to convert the buffer length,
// in ms, to a buffer size, in number of elements.
double sampleRate = connection.GetStreamSampleRate();
ulong bufferSize = (ulong)Math.Ceiling(sampleRate * BufferLength / 1000);
// Create a timer that triggers this task to poll SpikeGLX for new data
// at regular intervals. The interval is set to be half the length of the
// buffer. If a high resolution timer has been requested, the resolution of
// the timer is set to its minimum possible value. Otherwise it is set to
// 15ms.
using AutoResetEvent pollSignal = new(false);
using PrecisionTimer pollTimer = new();
int pollPeriod = (int)Math.Floor((double)BufferLength / 2);
pollTimer.SetInterval(() => pollSignal.Set(),
pollPeriod,
false);
pollTimer.Start(HighResolutionTimer ? 0 : 15);
// Get the current sample count from the data stream
ulong tailCount = connection.GetStreamSampleCount();
// While cancellation has not been requested, see if there are enough
// available samples to fill a buffer. If so, fetch and emit them. Then
// update the most recently fetched sample number. If not, wait for the
// timer to signal it is time to poll again.
while (!cancellationToken.IsCancellationRequested)
{
while ((connection.GetStreamSampleCount() - tailCount) >= bufferSize)
{
ulong headCount = connection.Fetch(out Mat data, tailCount,
(int)bufferSize, Downsample, ConvertToVoltage);
tailCount = headCount + bufferSize;
observer.OnNext(data);
}
pollSignal.WaitOne();
}
// If cancellation has been requested, stop the timer
pollTimer.Stop();
},
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
})
.PublishReconnectable()
.RefCount();
}
/// <summary>
/// Generates an observable sequence of buffers of data fetched from a SpikeGLX data stream.
/// </summary>
/// <typeparam name="TSource">The type of the input sequence.</typeparam>
/// <param name="source">Input sequence used to trigger fetching. A new buffer is fetched every time the input sequence emits a notification.</param>
/// <returns>A sequence of <see cref="Mat"/> objects representing buffers of fetched data from a SpikeGLX data stream.</returns>
public IObservable<Mat> Generate<TSource>(IObservable<TSource> source)
{
// Create a disposable data stream connection using the provided host, port, stream type, substream, and channels.
return Observable
.Using<Mat, SpikeGLXDataStream>(() => new SpikeGLXDataStream(Host, Port, (int)StreamType, Substream, Channels),
// Use the data stream connection to fetch the latest data for each input notification.
connection => source
.Select(input =>
{
// Calculate the maximum number of samples to fetch based on the buffer length and stream sample rate.
int maxSamples = (int)(BufferLength * connection.GetStreamSampleRate() / 1000);
// Fetch the latest data from the data stream, downsampling and converting to voltage as needed.
connection.FetchLatest(out Mat data, maxSamples, Downsample, ConvertToVoltage);
return data;
}))
// Publish the observable sequence and reconnect if it is disconnected.
.PublishReconnectable()
// Reference count the observable sequence to manage its lifetime.
.RefCount();
}
}
/// <summary>
/// Specifies possible values for the SpikeGLX stream.
/// </summary>
public enum StreamType
{
/// <summary>
/// NIDAQ
/// </summary>
Daq = 0,
/// <summary>
/// Onebox
/// </summary>
OneBox = 1,
/// <summary>
/// IMEC probe
/// </summary>
Probe = 2
}
}