Skip to content

Commit 163578b

Browse files
Ensure destination range is capped within source range (#942)
* Release changes for updating source range cursor * Ensure destination range is capped within source range * fix build * Use two dicts * Fix tests * Cleaner code and better comments * Refactor for less operations * lint fix * Test fix * Test fix * test fix --------- Co-authored-by: Toshad Salwekar <toshad.salwekar@cognite.com>
1 parent 19c8be1 commit 163578b

File tree

3 files changed

+43
-13
lines changed

3 files changed

+43
-13
lines changed

Extractor/Streamer.cs

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,12 @@ public virtual async Task EnqueueAsync(IEnumerable<UAEvent> events)
149149
if (events == null) return;
150150
await eventQueue.EnqueueAsync(events);
151151
}
152+
153+
private struct Ranges
154+
{
155+
public TimeRange sourceExtractedRange;
156+
public TimeRange queuePointsRange;
157+
}
152158
/// <summary>
153159
/// Push data points to destinations
154160
/// </summary>
@@ -161,17 +167,26 @@ public async Task PushDataPoints(IEnumerable<IPusher> passingPushers,
161167
if (!AllowData) return;
162168

163169
var dataPointList = new List<UADataPoint>();
164-
var pointRanges = new Dictionary<string, TimeRange>();
170+
171+
// Track source extracted timestamps and normal timestamps for each node in the current batch to update state later.
172+
var ranges = new Dictionary<string, Ranges>();
165173

166174
await foreach (var dp in dataPointQueue.DrainAsync(token))
167175
{
168176
dataPointList.Add(dp);
169-
if (!pointRanges.TryGetValue(dp.Id, out var range))
177+
if (!ranges.TryGetValue(dp.Id, out var range))
170178
{
171-
pointRanges[dp.Id] = new TimeRange(dp.Timestamp, dp.Timestamp);
179+
// Get source extracted range while we are draining, to make sure we don't update the state with a range that exceeds this.
180+
ranges[dp.Id] = new Ranges
181+
{
182+
// Source extracted range will always exist, so default is irrelevant.
183+
sourceExtractedRange = extractor.State.GetNodeState(dp.Id)?.SourceExtractedRange ?? new TimeRange(dp.Timestamp, dp.Timestamp),
184+
queuePointsRange = new TimeRange(dp.Timestamp, dp.Timestamp)
185+
};
172186
continue;
173187
}
174-
pointRanges[dp.Id] = range.Extend(dp.Timestamp, dp.Timestamp);
188+
range.queuePointsRange = range.queuePointsRange.Extend(dp.Timestamp, dp.Timestamp);
189+
ranges[dp.Id] = range;
175190
}
176191

177192
var results = await Task.WhenAll(passingPushers.Select(pusher => pusher.PushDataPoints(dataPointList, token)));
@@ -196,6 +211,7 @@ public async Task PushDataPoints(IEnumerable<IPusher> passingPushers,
196211
}
197212
if (config.FailureBuffer.Enabled && extractor.FailureBuffer != null)
198213
{
214+
var pointRanges = ranges.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.queuePointsRange);
199215
await extractor.FailureBuffer.WriteDatapoints(dataPointList, pointRanges, token);
200216
}
201217

@@ -217,10 +233,12 @@ public async Task PushDataPoints(IEnumerable<IPusher> passingPushers,
217233
{
218234
await extractor.FailureBuffer.ReadDatapoints(passingPushers, token);
219235
}
220-
foreach ((string id, var range) in pointRanges)
236+
foreach ((string id, var range) in ranges)
221237
{
222238
var state = extractor.State.GetNodeState(id);
223-
if (state != null && (extractor.AllowUpdateState || !state.FrontfillEnabled && !state.BackfillEnabled)) state.UpdateDestinationRange(range.First, range.Last);
239+
// Make sure we don't update the destination range beyond the source extracted range at the time of reading from queue.
240+
var pointRange = range.queuePointsRange.Contract(range.sourceExtractedRange);
241+
if (state != null && (extractor.AllowUpdateState || !state.FrontfillEnabled && !state.BackfillEnabled)) state.UpdateDestinationRange(pointRange.First, pointRange.Last);
224242
}
225243
}
226244
/// <summary>
@@ -235,18 +253,19 @@ public async Task PushEvents(IEnumerable<IPusher> passingPushers,
235253
if (!AllowEvents) return;
236254

237255
var eventList = new List<UAEvent>();
238-
var eventRanges = new Dictionary<NodeId, TimeRange>();
256+
var ranges = new Dictionary<NodeId, Ranges>();
239257

240258
await foreach (var evt in eventQueue.DrainAsync(token))
241259
{
242260
eventList.Add(evt);
243-
if (!eventRanges.TryGetValue(evt.EmittingNode, out var range))
261+
if (!ranges.TryGetValue(evt.EmittingNode, out var range))
244262
{
245-
eventRanges[evt.EmittingNode] = new TimeRange(evt.Time, evt.Time);
263+
var sourceRange = extractor.State.GetEmitterState(evt.EmittingNode)?.SourceExtractedRange ?? new TimeRange(evt.Time, evt.Time);
264+
ranges[evt.EmittingNode] = new Ranges { sourceExtractedRange = sourceRange, queuePointsRange = new TimeRange(evt.Time, evt.Time) };
246265
continue;
247266
}
248-
249-
eventRanges[evt.EmittingNode] = range.Extend(evt.Time, evt.Time);
267+
range.queuePointsRange = range.queuePointsRange.Extend(evt.Time, evt.Time);
268+
ranges[evt.EmittingNode] = range;
250269
}
251270

252271
var results = await Task.WhenAll(passingPushers.Select(pusher => pusher.PushEvents(eventList, token)));
@@ -292,10 +311,12 @@ public async Task PushEvents(IEnumerable<IPusher> passingPushers,
292311
{
293312
await extractor.FailureBuffer.ReadEvents(passingPushers, token);
294313
}
295-
foreach (var (id, range) in eventRanges)
314+
foreach (var (id, range) in ranges)
296315
{
297316
var state = extractor.State.GetEmitterState(id);
298-
if (state != null && (extractor.AllowUpdateState || !state.FrontfillEnabled && !state.BackfillEnabled)) state?.UpdateDestinationRange(range.First, range.Last);
317+
// Make sure we don't update the destination range beyond the source extracted range at the time of reading from queue.
318+
var eventRange = range.queuePointsRange.Contract(range.sourceExtractedRange);
319+
if (state != null && (extractor.AllowUpdateState || !state.FrontfillEnabled && !state.BackfillEnabled)) state?.UpdateDestinationRange(eventRange.First, eventRange.Last);
299320
}
300321
}
301322
/// <summary>

Test/Unit/StreamerTest.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ void DummyLooper(CancellationToken token)
6868
Assert.False(evt.WaitOne(100));
6969

7070
await extractor.Streamer.EnqueueAsync(new UADataPoint(start, "id", -1, StatusCodes.Good));
71+
state.UpdateFromStream(start, start);
7172
Assert.Equal(1, queue.Count);
7273
await extractor.Streamer.PushDataPoints(new[] { pusher }, Enumerable.Empty<IPusher>(), tester.Source.Token);
7374

@@ -100,6 +101,7 @@ void DummyLooper(CancellationToken token)
100101
Assert.Equal(999_999, queue.Count);
101102

102103
await extractor.Streamer.EnqueueAsync(new UADataPoint(start.AddMilliseconds(3000000), "id", 300, StatusCodes.Good));
104+
state.UpdateFromStream(start.AddMilliseconds(3000000), start.AddMilliseconds(3000000));
103105
Assert.True(evt.WaitOne(10000));
104106
Assert.Equal(1_000_000, queue.Count);
105107

@@ -141,6 +143,7 @@ void DummyLooper(CancellationToken token)
141143
Assert.False(evt.WaitOne(100));
142144

143145
await extractor.Streamer.EnqueueAsync(new UAEvent { EmittingNode = id, Time = start });
146+
state.UpdateFromStream(start, start);
144147
Assert.Equal(1, queue.Count);
145148
await extractor.Streamer.PushEvents(new[] { pusher }, Enumerable.Empty<IPusher>(), tester.Source.Token);
146149

@@ -175,6 +178,7 @@ await extractor.Streamer.EnqueueAsync(Enumerable.Range(200000, 99999).Select(idx
175178
Assert.Equal(99999, queue.Count);
176179

177180
await extractor.Streamer.EnqueueAsync(new UAEvent { EmittingNode = id, Time = start.AddMilliseconds(300000) });
181+
state.UpdateFromStream(start.AddMilliseconds(300000), start.AddMilliseconds(300000));
178182
Assert.True(evt.WaitOne(10000));
179183
Assert.Equal(100000, queue.Count);
180184

manifest.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ schema:
6767
- "https://raw.githubusercontent.com/"
6868

6969
versions:
70+
"2.40.8":
71+
description: Fix rare race condition allowing history range read to increment without pushing events to CDF.
72+
changelog:
73+
fixed:
74+
- Fix rare race condition allowing history range read to increment without pushing events to CDF.
7075
"2.40.7":
7176
description: Fix issue causing error handling to fail when encountering mismatched timeseries against data modelling.
7277
changelog:

0 commit comments

Comments
 (0)