Skip to content

Commit 19c8be1

Browse files
authored
Update source range cursor after queueing for upload (#938)
* Update source range cursor after queueing for upload * Review comment * fix lint
1 parent e7aae93 commit 19c8be1

File tree

4 files changed

+192
-29
lines changed

4 files changed

+192
-29
lines changed

Extractor/History/HistoryScheduler.cs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -590,19 +590,9 @@ private async Task HistoryDataHandler(HistoryReadNode node)
590590
}
591591
}
592592

593-
node.Completed = IsNodeCompleted(node, first, last, data?.DataValues != null && data.DataValues.Count > 0);
594-
595-
if (Frontfill)
596-
{
597-
node.State.UpdateFromFrontfill(Config.IgnoreContinuationPoints ? last : goodLast, node.Completed);
598-
}
599-
else
600-
{
601-
node.State.UpdateFromBackfill(Config.IgnoreContinuationPoints ? first : goodFirst, node.Completed);
602-
}
603-
604593
int cnt = 0;
605594

595+
node.Completed = IsNodeCompleted(node, first, last, data?.DataValues != null && data.DataValues.Count > 0);
606596

607597
if (node.State is not VariableExtractionState nodeState) return;
608598

@@ -620,6 +610,15 @@ private async Task HistoryDataHandler(HistoryReadNode node)
620610
node.LastRead = cnt;
621611
node.TotalRead += cnt;
622612

613+
if (Frontfill)
614+
{
615+
node.State.UpdateFromFrontfill(Config.IgnoreContinuationPoints ? last : goodLast, node.Completed);
616+
}
617+
else
618+
{
619+
node.State.UpdateFromBackfill(Config.IgnoreContinuationPoints ? first : goodFirst, node.Completed);
620+
}
621+
623622
if (!node.Completed || !Frontfill) return;
624623

625624
var buffered = nodeState.FlushBuffer();
@@ -712,6 +711,8 @@ private async Task HistoryEventHandler(HistoryReadNode node, HistoryReadDetails
712711
}
713712
}
714713

714+
await extractor.Streamer.EnqueueAsync(createdEvents);
715+
715716
node.Completed = IsNodeCompleted(node, first, last, any);
716717

717718
if (Frontfill)
@@ -723,8 +724,6 @@ private async Task HistoryEventHandler(HistoryReadNode node, HistoryReadDetails
723724
node.State.UpdateFromBackfill(first, node.Completed);
724725
}
725726

726-
await extractor.Streamer.EnqueueAsync(createdEvents);
727-
728727
node.LastRead = createdEvents.Count;
729728
node.TotalRead += createdEvents.Count;
730729

@@ -736,9 +735,9 @@ private async Task HistoryEventHandler(HistoryReadNode node, HistoryReadDetails
736735
if (buffered.Any())
737736
{
738737
var (smin, smax) = buffered.MinMax(dp => dp.Time);
739-
emitterState.UpdateFromStream(smin, smax);
740738
log.LogDebug("Read {Count} events from buffer of state {Id}", buffered.Count(), node.State.Id);
741739
await extractor.Streamer.EnqueueAsync(buffered);
740+
emitterState.UpdateFromStream(smin, smax);
742741
}
743742
}
744743
#endregion

Extractor/Streamer.cs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -119,15 +119,15 @@ public void Enqueue(IEnumerable<UAEvent> events)
119119
/// Enqueue a datapoint, pushes if this exceeds the maximum.
120120
/// </summary>
121121
/// <param name="dp">Datapoint to enqueue.</param>
122-
public async Task EnqueueAsync(UADataPoint dp)
122+
public virtual async Task EnqueueAsync(UADataPoint dp)
123123
{
124124
await dataPointQueue.EnqueueAsync(dp);
125125
}
126126
/// <summary>
127127
/// Enqueue a list of datapoints, pushes if this exceeds the maximum.
128128
/// </summary>
129129
/// <param name="dps">Datapoints to enqueue.</param>
130-
public async Task EnqueueAsync(IEnumerable<UADataPoint> dps)
130+
public virtual async Task EnqueueAsync(IEnumerable<UADataPoint> dps)
131131
{
132132
if (dps == null) return;
133133
await dataPointQueue.EnqueueAsync(dps);
@@ -136,15 +136,15 @@ public async Task EnqueueAsync(IEnumerable<UADataPoint> dps)
136136
/// Enqueues an event, pushes if this exceeds the maximum.
137137
/// </summary>
138138
/// <param name="evt">Event to enqueue.</param>
139-
public async Task EnqueueAsync(UAEvent evt)
139+
public virtual async Task EnqueueAsync(UAEvent evt)
140140
{
141141
await eventQueue.EnqueueAsync(evt);
142142
}
143143
/// <summary>
144144
/// Enqueues a list of events, pushes if this exceeds the maximum.
145145
/// </summary>
146146
/// <param name="events">Events to enqueue.</param>
147-
public async Task EnqueueAsync(IEnumerable<UAEvent> events)
147+
public virtual async Task EnqueueAsync(IEnumerable<UAEvent> events)
148148
{
149149
if (events == null) return;
150150
await eventQueue.EnqueueAsync(events);
@@ -356,26 +356,26 @@ public void HandleStreamedDatapoint(DataValue datapoint, VariableExtractionState
356356
if (node.AsEvents)
357357
{
358358
var evt = DpAsEvent(datapoint, node);
359+
Enqueue(evt);
359360
log.LogTrace("Subscription DataPoint treated as event {Event}", node);
360361
node.UpdateFromStream(DateTime.MaxValue, datapoint.SourceTimestamp);
361-
Enqueue(evt);
362362
return;
363363
}
364364

365-
var buffDps = ToDataPoint(datapoint, node);
366-
if (StatusCode.IsGood(datapoint.StatusCode))
367-
{
368-
node.UpdateFromStream(buffDps);
369-
}
370-
371365
if ((extractor.StateStorage == null || config.StateStorage.IntervalValue.Value == Timeout.InfiniteTimeSpan)
372366
&& (node.IsFrontfilling && datapoint.SourceTimestamp > node.SourceExtractedRange.Last
373367
|| node.IsBackfilling && datapoint.SourceTimestamp < node.SourceExtractedRange.First)) return;
374368

369+
var buffDps = ToDataPoint(datapoint, node);
370+
375371
foreach (var buffDp in buffDps)
376372
{
377373
Enqueue(buffDp);
378374
}
375+
if (StatusCode.IsGood(datapoint.StatusCode))
376+
{
377+
node.UpdateFromStream(buffDps);
378+
}
379379
}
380380

381381

Test/Unit/HistoryReaderTest.cs

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using Cognite.Extractor.Common;
22
using Cognite.Extractor.Configuration;
3+
using Cognite.OpcUa;
34
using Cognite.OpcUa.Config;
45
using Cognite.OpcUa.History;
56
using Cognite.OpcUa.Nodes;
@@ -1157,5 +1158,168 @@ public async Task TestHistoryReadFailureThreshold()
11571158
var exc = await Assert.ThrowsAsync<SmartAggregateException>(() => CommonTestUtils.RunHistory(reader, states, HistoryReadType.BackfillData));
11581159
Assert.Equal("2 errors of type Opc.Ua.ServiceResultException. StatusCode: BadInvalidArgument", exc.Message);
11591160
}
1161+
1162+
/// <summary>
1163+
/// Test that if EnqueueAsync fails (simulating a crash), the state is NOT updated.
1164+
/// This verifies the fix where we enqueue BEFORE updating state to ensure at-least-once delivery.
1165+
/// </summary>
1166+
[Fact]
1167+
public async Task TestHistoryDataHandlerEnqueueFailure()
1168+
{
1169+
await using var extractor = tester.BuildExtractor();
1170+
var cfg = new HistoryConfig
1171+
{
1172+
Data = true
1173+
};
1174+
1175+
tester.Config.History = cfg;
1176+
1177+
// Inject a mock streamer that throws on EnqueueAsync
1178+
var mockStreamer = new FailingStreamer(
1179+
tester.Provider.GetRequiredService<ILogger<Streamer>>(),
1180+
extractor,
1181+
tester.Config);
1182+
var streamerField = typeof(UAExtractor).GetField("<Streamer>k__BackingField", BindingFlags.Instance | BindingFlags.NonPublic);
1183+
streamerField?.SetValue(extractor, mockStreamer);
1184+
1185+
using var throttler = new TaskThrottler(2, false);
1186+
var cps = new BlockingResourceCounter(1000);
1187+
var dummyState = new UAHistoryExtractionState(tester.Client, new NodeId("test", 0), true, true);
1188+
var log = tester.Provider.GetRequiredService<ILogger<HistoryReaderTest>>();
1189+
1190+
using var reader = new HistoryScheduler(log, tester.Client, extractor, extractor.TypeManager, tester.Config, HistoryReadType.FrontfillData,
1191+
throttler, cps, new[] { dummyState }, tester.Source.Token);
1192+
1193+
var dt = new UADataType(DataTypeIds.Double);
1194+
var var1 = new UAVariable(new NodeId("state1", 0), "state1", null, null, NodeId.Null, null);
1195+
var1.FullAttributes.DataType = dt;
1196+
var state1 = new VariableExtractionState(extractor, var1, true, true, true);
1197+
extractor.State.SetNodeState(state1, "state1");
1198+
state1.FinalizeRangeInit();
1199+
1200+
// Capture initial state
1201+
Assert.True(state1.IsFrontfilling);
1202+
var initialRange = state1.SourceExtractedRange;
1203+
1204+
var start = DateTime.UtcNow;
1205+
var dataValues = new DataValueCollection(Enumerable.Range(0, 10)
1206+
.Select(idx => new DataValue(idx, StatusCodes.Good, start.AddSeconds(idx))));
1207+
var historyData = new HistoryData { DataValues = dataValues };
1208+
1209+
var node = new HistoryReadNode(HistoryReadType.FrontfillData, new NodeId("state1", 0))
1210+
{
1211+
LastResult = historyData,
1212+
ContinuationPoint = null
1213+
};
1214+
1215+
var historyDataHandler = reader.GetType().GetMethod("HistoryDataHandler", BindingFlags.NonPublic | BindingFlags.Instance);
1216+
1217+
// The handler should throw because EnqueueAsync fails
1218+
var ex = await Assert.ThrowsAsync<Exception>(() => (Task)historyDataHandler!.Invoke(reader, new object[] { node })!);
1219+
Assert.Contains("Simulated enqueue failure", ex.Message, StringComparison.OrdinalIgnoreCase);
1220+
1221+
// CRITICAL: State should NOT have been updated because the exception happened before state update
1222+
Assert.Equal(initialRange.Last, state1.SourceExtractedRange.Last);
1223+
Assert.Equal(initialRange.First, state1.SourceExtractedRange.First);
1224+
Assert.True(state1.IsFrontfilling, "State should still be frontfilling since update was never called");
1225+
}
1226+
1227+
/// <summary>
1228+
/// Test that if EnqueueAsync fails for events (simulating a crash), the state is NOT updated.
1229+
/// This verifies the fix where we enqueue BEFORE updating state to ensure at-least-once delivery.
1230+
/// </summary>
1231+
[Fact]
1232+
public async Task TestHistoryEventHandlerEnqueueFailure()
1233+
{
1234+
await using var extractor = tester.BuildExtractor();
1235+
var cfg = new HistoryConfig
1236+
{
1237+
Backfill = true
1238+
};
1239+
1240+
tester.Config.History = cfg;
1241+
1242+
// Inject a mock streamer that throws on EnqueueAsync for events
1243+
var mockStreamer = new FailingStreamer(
1244+
tester.Provider.GetRequiredService<ILogger<Streamer>>(),
1245+
extractor,
1246+
tester.Config);
1247+
var streamerField = typeof(UAExtractor).GetField("<Streamer>k__BackingField", BindingFlags.Instance | BindingFlags.NonPublic);
1248+
streamerField?.SetValue(extractor, mockStreamer);
1249+
1250+
var log = tester.Provider.GetRequiredService<ILogger<HistoryReaderTest>>();
1251+
1252+
using var throttler = new TaskThrottler(2, false);
1253+
var cps = new BlockingResourceCounter(1000);
1254+
var dummyState = new UAHistoryExtractionState(tester.Client, new NodeId("test", 0), true, true);
1255+
1256+
using var reader = new HistoryScheduler(log, tester.Client, extractor, extractor.TypeManager, tester.Config, HistoryReadType.FrontfillEvents,
1257+
throttler, cps, new[] { dummyState }, tester.Source.Token);
1258+
1259+
var state = EventUtils.PopulateEventData(extractor, tester, false);
1260+
state.FinalizeRangeInit();
1261+
1262+
// Capture initial state
1263+
Assert.True(state.IsFrontfilling);
1264+
var initialRange = state.SourceExtractedRange;
1265+
1266+
var filter = new EventFilter { SelectClauses = EventUtils.GetSelectClause(tester) };
1267+
var details = new ReadEventDetails { Filter = filter };
1268+
1269+
var start = DateTime.UtcNow;
1270+
var frontfillEvents = new HistoryEventFieldListCollection(Enumerable.Range(0, 10)
1271+
.Select(idx => EventUtils.GetEventValues(start.AddSeconds(idx)))
1272+
.Select(values => new HistoryEventFieldList { EventFields = values }));
1273+
var historyEvents = new HistoryEvent { Events = frontfillEvents };
1274+
1275+
var node = new HistoryReadNode(HistoryReadType.FrontfillEvents, new NodeId("emitter", 0))
1276+
{
1277+
LastResult = historyEvents,
1278+
ContinuationPoint = null
1279+
};
1280+
1281+
var historyEventHandler = reader.GetType().GetMethod("HistoryEventHandler", BindingFlags.NonPublic | BindingFlags.Instance);
1282+
1283+
// The handler should throw because EnqueueAsync fails
1284+
var ex = await Assert.ThrowsAsync<Exception>(() => (Task)historyEventHandler!.Invoke(reader, new object[] { node, details })!);
1285+
Assert.Contains("Simulated enqueue failure", ex.Message, StringComparison.OrdinalIgnoreCase);
1286+
1287+
// CRITICAL: State should NOT have been updated because the exception happened before state update
1288+
Assert.Equal(initialRange.Last, state.SourceExtractedRange.Last);
1289+
Assert.Equal(initialRange.First, state.SourceExtractedRange.First);
1290+
Assert.True(state.IsFrontfilling, "State should still be frontfilling since update was never called");
1291+
}
1292+
1293+
/// <summary>
1294+
/// Mock Streamer that throws on EnqueueAsync to simulate a crash during enqueue.
1295+
/// Used to verify that state updates happen AFTER successful enqueue.
1296+
/// </summary>
1297+
private class FailingStreamer : Streamer
1298+
{
1299+
public FailingStreamer(ILogger<Streamer> log, UAExtractor extractor, FullConfig config)
1300+
: base(log, extractor, config)
1301+
{
1302+
}
1303+
1304+
public override Task EnqueueAsync(UADataPoint dp)
1305+
{
1306+
throw new Exception("Simulated enqueue failure for datapoint");
1307+
}
1308+
1309+
public override Task EnqueueAsync(IEnumerable<UADataPoint> dps)
1310+
{
1311+
throw new Exception("Simulated enqueue failure for datapoints");
1312+
}
1313+
1314+
public override Task EnqueueAsync(UAEvent evt)
1315+
{
1316+
throw new Exception("Simulated enqueue failure for event");
1317+
}
1318+
1319+
public override Task EnqueueAsync(IEnumerable<UAEvent> events)
1320+
{
1321+
throw new Exception("Simulated enqueue failure for events");
1322+
}
1323+
}
11601324
}
11611325
}

Test/Unit/StreamerTest.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -208,13 +208,13 @@ public async Task TestPushDataPoints()
208208
true, true, true);
209209
state.InitToEmpty();
210210
state.FinalizeRangeInit();
211-
state.UpdateFromBackfill(DateTime.MaxValue, true);
212-
state.UpdateFromFrontfill(DateTime.MinValue, true);
213211

214212
extractor.State.SetNodeState(state, "id");
215213
var toPush = Enumerable.Range(0, 1000).Select(idx => new UADataPoint(start.AddMilliseconds(idx), "id", idx, StatusCodes.Good)).ToList();
216214
await extractor.Streamer.EnqueueAsync(toPush);
217215
await extractor.Streamer.PushDataPoints(new[] { pusher, pusher2 }, Enumerable.Empty<IPusher>(), tester.Source.Token);
216+
state.UpdateFromBackfill(DateTime.MaxValue, true);
217+
state.UpdateFromFrontfill(DateTime.MinValue, true);
218218
Assert.Equal(1000, dps.Count);
219219
Assert.Equal(1000, dps2.Count);
220220

@@ -258,11 +258,11 @@ public async Task TestPushEvents()
258258
var state = new EventExtractionState(tester.Client, id, true, true, true);
259259
state.InitToEmpty();
260260
state.FinalizeRangeInit();
261-
state.UpdateFromBackfill(DateTime.MaxValue, true);
262-
state.UpdateFromFrontfill(DateTime.MinValue, true);
263261

264262
extractor.State.SetEmitterState(state);
265263
var toPush = Enumerable.Range(0, 1000).Select(idx => new UAEvent { Time = start.AddMilliseconds(idx), EmittingNode = id }).ToList();
264+
state.UpdateFromBackfill(DateTime.MaxValue, true);
265+
state.UpdateFromFrontfill(DateTime.MinValue, true);
266266
await extractor.Streamer.EnqueueAsync(toPush);
267267
await extractor.Streamer.PushEvents(new[] { pusher, pusher2 }, Enumerable.Empty<IPusher>(), tester.Source.Token);
268268

0 commit comments

Comments
 (0)