Skip to content

Commit 8fe8694

Browse files
authored
Do not add state store restoration as an issue for new states (#892)
* Do not add state store restoration as an issue for new states If there are new states, we will trigger history based on that later. We were adding and removing a history issue during state restoration. This caused history to restart after rebrowse, even when unnecessary. Instead, I'll make two changes: - Only add new states to the history reader _after_ restoring state, if state restoration is enabled. - Add the state restoration history issue only on startup, so that we don't run history until we've done the _initial_ state restoration. This probably needs a rethink, but this is all changing a lot with the v3 refactor, and I'd prefer to wait with the big refactoring changes until then. This should fix the issue. * Try to improve test * Bump version
1 parent 23c4b91 commit 8fe8694

File tree

4 files changed

+29
-12
lines changed

4 files changed

+29
-12
lines changed

Extractor/History/HistoryReader.cs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,13 @@ public sealed class HistoryReader : IDisposable
6262
public bool CurrentHistoryRunIsBad { get; private set; }
6363

6464

65-
public HistoryReader(ILogger<HistoryReader> log,
66-
UAClient uaClient, UAExtractor extractor, TypeManager typeManager, FullConfig config, CancellationToken token)
65+
public HistoryReader(
66+
ILogger<HistoryReader> log,
67+
UAClient uaClient,
68+
UAExtractor extractor,
69+
TypeManager typeManager,
70+
FullConfig config,
71+
CancellationToken token)
6772
{
6873
this.log = log;
6974
this.config = config;
@@ -76,7 +81,7 @@ public HistoryReader(ILogger<HistoryReader> log,
7681
continuationPoints = new BlockingResourceCounter(
7782
throttling.MaxNodeParallelism > 0 ? throttling.MaxNodeParallelism : 1_000);
7883
waiter = new OperationWaiter();
79-
state = new State(config);
84+
state = new State(config, extractor.StateStorage != null && config.StateStorage.IntervalValue.Value != Timeout.InfiniteTimeSpan);
8085
}
8186

8287
private async Task RunHistoryBatch(IEnumerable<UAHistoryExtractionState> states, HistoryReadType type)
@@ -235,7 +240,7 @@ public bool RemoveIssue(StateIssue issue)
235240
return issues.Remove(issue);
236241
}
237242

238-
public State(FullConfig config)
243+
public State(FullConfig config, bool hasStateStore)
239244
{
240245
if (config.Subscriptions.Events && config.Events.Enabled && config.Events.History)
241246
{
@@ -245,6 +250,10 @@ public State(FullConfig config)
245250
{
246251
issues.Add(StateIssue.DataPointSubscription);
247252
}
253+
if (hasStateStore)
254+
{
255+
issues.Add(StateIssue.StateRestorationPending);
256+
}
248257
issues.Add(StateIssue.NodeHierarchyRead);
249258
}
250259

Extractor/UAExtractor.cs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1022,7 +1022,7 @@ private async Task PushNodes(PusherInput input)
10221022
var newStates = input.Variables
10231023
.Select(ts => ts.Id)
10241024
.Distinct()
1025-
.Select(id => State.GetNodeState(id));
1025+
.SelectNonNull(id => State.GetNodeState(id));
10261026

10271027
bool initial = input.Variables.Count() + input.Objects.Count() >= State.NumActiveNodes;
10281028

@@ -1077,6 +1077,10 @@ private async Task PushNodes(PusherInput input)
10771077
{
10781078
pushTasks = pushTasks.Append(RestoreExtractionStateWithRetry(newStates));
10791079
}
1080+
else
1081+
{
1082+
historyReader?.AddStates(newStates, State.EmitterStates);
1083+
}
10801084

10811085

10821086
pushTasks = pushTasks.ToList();
@@ -1094,7 +1098,7 @@ private async Task PushNodes(PusherInput input)
10941098
trackedTimeseres.Inc(input.Variables.Count());
10951099
}
10961100

1097-
foreach (var state in newStates.Concat<UAHistoryExtractionState?>(State.EmitterStates))
1101+
foreach (var state in newStates.Concat<UAHistoryExtractionState>(State.EmitterStates))
10981102
{
10991103
state?.FinalizeRangeInit();
11001104
}
@@ -1106,10 +1110,8 @@ private async Task PushNodes(PusherInput input)
11061110
/// </summary>
11071111
/// <param name="newStates">Variable states to restore</param>
11081112
/// <returns>Task that completes when state is restored or extractor is cancelled</returns>
1109-
private async Task RestoreExtractionStateWithRetry(IEnumerable<VariableExtractionState?> newStates)
1113+
private async Task RestoreExtractionStateWithRetry(IEnumerable<VariableExtractionState> newStates)
11101114
{
1111-
// Add issue to block history until state restoration completes
1112-
historyReader?.AddIssue(HistoryReader.StateIssue.StateRestorationPending);
11131115

11141116
// Retry state restoration forever until it succeeds
11151117
var retryConfig = new RetryUtilConfig
@@ -1138,7 +1140,7 @@ await RetryUtil.RetryAsync(
11381140
if (Streamer.AllowData)
11391141
{
11401142
tasks.Add(StateStorage!.RestoreExtractionState(
1141-
newStates.Where(state => state != null && state.FrontfillEnabled).ToDictionary(state => state?.Id!, state => state!),
1143+
newStates.Where(state => state.FrontfillEnabled).ToDictionary(state => state.Id),
11421144
Config.StateStorage.VariableStore,
11431145
false,
11441146
Source.Token));
@@ -1156,6 +1158,7 @@ await RetryUtil.RetryAsync(
11561158
);
11571159

11581160
// Successfully restored state - allow history to proceed
1161+
historyReader?.AddStates(newStates, State.EmitterStates);
11591162
historyReader?.RemoveIssue(HistoryReader.StateIssue.StateRestorationPending);
11601163
log.LogInformation("Successfully restored extraction state from state storage");
11611164
}
@@ -1209,8 +1212,6 @@ private IEnumerable<Func<CancellationToken, Task>> CreateSubscriptions(IEnumerab
12091212
{
12101213
var states = variables.Select(ts => ts.Id).Distinct().SelectNonNull(id => State.GetNodeState(id));
12111214

1212-
historyReader?.AddStates(states, State.EmitterStates);
1213-
12141215
log.LogInformation("Synchronize {NumNodesToSynch} nodes", variables.Count());
12151216
var tasks = new List<Func<CancellationToken, Task>>();
12161217
// Create tasks to subscribe to nodes, then start history read. We might lose data if history read finished before subscriptions were created.

Test/Integration/NodeExtractionTests.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1090,6 +1090,8 @@ public async Task TestRebrowseFailedWeirdState()
10901090
// This is a rebrowse, and should _not_ set the extractor to uninitialized.
10911091
Assert.True(pusher.Initialized);
10921092

1093+
await extractor.WaitForSubscription(SubscriptionName.DataPoints);
1094+
10931095
// Trigger a datapoint update.
10941096
tester.Server.UpdateNode(tester.Server.Ids.Base.DoubleVar1, 321.123);
10951097

manifest.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ schema:
6767
- "https://raw.githubusercontent.com/"
6868

6969
versions:
70+
"2.40.4":
71+
description: Fix regression causing history to restart on rebrowse if state store is enabled.
72+
changelog:
73+
fixed:
74+
- Fix regression causing history to restart on rebrowse if state store is enabled.
7075
"2.40.3":
7176
description: Set extractor version as a label value on the opcua_version metric.
7277
changelog:

0 commit comments

Comments
 (0)