Skip to content

Commit 28bdb54

Browse files
committed
Fix tests
1 parent 51980dc commit 28bdb54

File tree

5 files changed

+37
-35
lines changed

5 files changed

+37
-35
lines changed

Extractor/Tasks/BrowseTask.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,7 @@ public override bool CanRunNow()
164164

165165
if (result.ShouldBackgroundBrowse && batchIsInitial)
166166
{
167-
// TODO: Once the browse task is integrated into the extractor,
168-
// trigger a rebrowse if it is enabled.
169-
// extractor.ScheduleRebrowse();
167+
extractor.ScheduleRebrowse();
170168
}
171169

172170
var sourceName = "the OPC-UA server";

Extractor/UAExtractor.cs

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,13 @@ You should have received a copy of the GNU General Public License
1818
using Cognite.Extensions;
1919
using Cognite.Extractor.Common;
2020
using Cognite.Extractor.StateStorage;
21-
using Cognite.Extractor.Utils;
2221
using Cognite.Extractor.Utils.Unstable.Configuration;
2322
using Cognite.Extractor.Utils.Unstable.Tasks;
2423
using Cognite.OpcUa.Config;
2524
using Cognite.OpcUa.History;
2625
using Cognite.OpcUa.Nodes;
2726
using Cognite.OpcUa.NodeSources;
2827
using Cognite.OpcUa.PubSub;
29-
using Cognite.OpcUa.Pushers;
3028
using Cognite.OpcUa.Subscriptions;
3129
using Cognite.OpcUa.Tasks;
3230
using Cognite.OpcUa.TypeCollectors;
@@ -40,7 +38,6 @@ You should have received a copy of the GNU General Public License
4038
using Opc.Ua.Client;
4139
using Prometheus;
4240
using System;
43-
using System.Collections.Concurrent;
4441
using System.Collections.Generic;
4542
using System.Linq;
4643
using System.Reflection;
@@ -802,14 +799,8 @@ private void PushNodesFailure(
802799
/// </summary>
803800
/// <param name="input">Nodes to push</param>
804801
/// <param name="initial">True if this counts as initialization of the pusher</param>
805-
public async Task PushNodes(PusherInput? input, bool initial)
802+
private async Task PushNodesInner(PusherInput input, bool initial)
806803
{
807-
if (input == null)
808-
{
809-
log.LogWarning("No input given to pusher {Name}, not initializing", Pusher.GetType());
810-
return;
811-
}
812-
813804
var result = new FullPushResult();
814805
if (Pusher.NoInit)
815806
{
@@ -861,15 +852,19 @@ public async Task PushNodes(PusherInput? input, bool initial)
861852
/// Push given lists of nodes to pusher destinations, and fetches latest timestamp for relevant nodes.
862853
/// </summary>
863854
/// <param name="input">Nodes to push</param>
864-
private async Task PushNodes(PusherInput input)
855+
public async Task PushNodes(PusherInput? input, bool initial)
865856
{
857+
if (input == null)
858+
{
859+
log.LogWarning("No input given to pusher {Name}, not initializing", Pusher.GetType());
860+
return;
861+
}
862+
866863
var newStates = input.Variables
867864
.Select(ts => ts.Id)
868865
.Distinct()
869866
.SelectNonNull(id => State.GetNodeState(id));
870867

871-
bool initial = input.Variables.Count() + input.Objects.Count() >= State.NumActiveNodes;
872-
873868
if (initial && !input.Variables.Any() && Config.Subscriptions.DataPoints)
874869
{
875870
log.LogWarning("No variables found, the extractor can run without any variables, but will not read history. " +
@@ -878,10 +873,6 @@ private async Task PushNodes(PusherInput input)
878873
"data point subscriptions by setting `subscriptions.data-points` to false");
879874
}
880875

881-
var pushTasks = new List<Task> {
882-
PushNodes(input, initial)
883-
};
884-
885876
if (Config.DryRun)
886877
{
887878
log.LogInformation("Dry run is enabled");
@@ -927,10 +918,7 @@ private async Task PushNodes(PusherInput input)
927918
historyTask?.AddStates(newStates, State.EmitterStates);
928919
}
929920

930-
931-
pushTasks = pushTasks.ToList();
932-
log.LogInformation("Waiting for pushes on pushers");
933-
await Task.WhenAll(pushTasks);
921+
await PushNodesInner(input, initial);
934922

935923
if (initial)
936924
{

Test/Integration/DataPointTests.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -545,15 +545,15 @@ public async Task TestHistoryContinuation(bool backfill)
545545
tester.Config.History.Data = true;
546546
tester.Config.History.Backfill = backfill;
547547

548-
await using var extractor = tester.BuildExtractor(pusher, stateStore: stateStore);
549-
550548
var dataTypes = tester.Config.Extraction.DataTypes;
551549
dataTypes.AllowStringVariables = true;
552550
dataTypes.AutoIdentifyTypes = true;
553551
dataTypes.MaxArraySize = 4;
554552

555553
tester.Config.Extraction.RootNode = CommonTestUtils.ToProtoNodeId(tester.Server.Ids.Custom.Root, tester.Client);
556554

555+
await using var extractor = tester.BuildExtractor(pusher, stateStore: stateStore);
556+
557557
var now = DateTime.UtcNow;
558558

559559
tester.WipeCustomHistory();

Test/Unit/HistoryReaderTest.cs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -915,6 +915,10 @@ public async Task TestReadHistoryMaxLength()
915915
Data = true,
916916
EndTime = new TimestampWrapper(tester.HistoryStart.AddSeconds(20).ToUnixTimeMilliseconds().ToString())
917917
};
918+
cfg.MaxReadLength = "1s";
919+
cfg.DataChunk = 50;
920+
921+
tester.Config.History = cfg;
918922

919923
await using var extractor = tester.BuildExtractor();
920924

@@ -947,11 +951,6 @@ public async Task TestReadHistoryMaxLength()
947951

948952
CommonTestUtils.ResetMetricValues("opcua_frontfill_data_count", "opcua_frontfill_data_points");
949953

950-
cfg.MaxReadLength = "1s";
951-
cfg.DataChunk = 50;
952-
953-
tester.Config.History = cfg;
954-
955954
var reader = new HistoryTask(log, tester.Client, extractor, tester.Config, extractor.TriggerHistoryRestart);
956955

957956
await CommonTestUtils.RunHistory(reader, states, HistoryReadType.FrontfillData, tester.Source.Token);

Test/Unit/TasksTest.cs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public async Task TestBasicBrowseTask()
4848
{
4949
using var pusher = new DummyPusher(new DummyPusherConfig());
5050
await using var extractor = tester.BuildExtractor(pusher);
51+
await extractor.Init(tester.Source.Token);
5152

5253
var browseTask = new BrowseTask(extractor, tester.Client, tester.Config, tester.Provider);
5354
browseTask.AddNodesToBrowse(new[] { tester.Ids.Base.Root }, isFull: true);
@@ -66,6 +67,7 @@ public async Task TestBrowseTaskFromNodeSetSource()
6667
{
6768
using var pusher = new DummyPusher(new DummyPusherConfig());
6869
await using var extractor = tester.BuildExtractor(pusher);
70+
await extractor.Init(tester.Source.Token);
6971
tester.Config.Source.NodeSetSource = new NodeSetSourceConfig
7072
{
7173
NodeSets = new[]
@@ -124,6 +126,7 @@ public async Task TestBrowseTaskFromCDFNodeSource()
124126
{
125127
var (handler, pusher) = tester.GetCDFPusher();
126128
await using var extractor = tester.BuildExtractor(pusher);
129+
await extractor.Init(tester.Source.Token);
127130
tester.Config.Cognite.RawNodeBuffer = new CDFNodeSourceConfig
128131
{
129132
AssetsTable = "assets",
@@ -164,6 +167,7 @@ public async Task TestRebrowseDifferentSubset()
164167
{
165168
using var pusher = new DummyPusher(new DummyPusherConfig());
166169
await using var extractor = tester.BuildExtractor(pusher);
170+
await extractor.Init(tester.Source.Token);
167171

168172
tester.Config.Extraction.DataTypes.AllowStringVariables = true;
169173
tester.Config.Extraction.DataTypes.AutoIdentifyTypes = true;
@@ -194,6 +198,7 @@ public async Task TestBrowseWithReferences()
194198
{
195199
using var pusher = new DummyPusher(new DummyPusherConfig());
196200
await using var extractor = tester.BuildExtractor(pusher);
201+
await extractor.Init(tester.Source.Token);
197202

198203
tester.Config.Extraction.Relationships.Enabled = true;
199204
tester.Config.Extraction.Relationships.Hierarchical = true;
@@ -224,6 +229,7 @@ public async Task TestPusherTask()
224229
pusher.UniqueToNodeId["test"] = (id, -1);
225230
pusher.DataPoints[(id, -1)] = new List<UADataPoint>();
226231
await using var extractor = tester.BuildExtractor(pusher);
232+
await extractor.Init(tester.Source.Token);
227233

228234
extractor.Streamer.AllowData = true;
229235

@@ -249,6 +255,7 @@ public async Task TestPusherTaskLateInit()
249255
pusher.UniqueToNodeId["test"] = (id, -1);
250256
pusher.DataPoints[(id, -1)] = new List<UADataPoint>();
251257
await using var extractor = tester.BuildExtractor(pusher);
258+
await extractor.Init(tester.Source.Token);
252259

253260
extractor.Streamer.AllowData = true;
254261

@@ -281,6 +288,7 @@ public async Task TestSubscriptionTask()
281288
{
282289
using var pusher = new DummyPusher(new DummyPusherConfig());
283290
await using var extractor = tester.BuildExtractor(pusher);
291+
await extractor.Init(tester.Source.Token);
284292

285293
var task = new SubscriptionsTask(extractor, tester.Client, tester.Config);
286294
var reporter = new DummyIntegrationSink();
@@ -321,6 +329,7 @@ public async Task TestEventSubscriptionTask()
321329
{
322330
using var pusher = new DummyPusher(new DummyPusherConfig());
323331
await using var extractor = tester.BuildExtractor(pusher);
332+
await extractor.Init(tester.Source.Token);
324333

325334
var task = new SubscriptionsTask(extractor, tester.Client, tester.Config);
326335
var reporter = new DummyIntegrationSink();
@@ -360,6 +369,7 @@ public async Task TestAuditSubscriptionTask()
360369
{
361370
using var pusher = new DummyPusher(new DummyPusherConfig());
362371
await using var extractor = tester.BuildExtractor(pusher);
372+
await extractor.Init(tester.Source.Token);
363373

364374
var task = new SubscriptionsTask(extractor, tester.Client, tester.Config);
365375
var reporter = new DummyIntegrationSink();
@@ -379,17 +389,22 @@ public async Task TestAuditSubscriptionTask()
379389
{
380390
tester.Server.DirectGrowth(0);
381391

382-
var extraNodesToBrowse = (ConcurrentQueue<NodeId>)extractor.GetType().GetField(
383-
"extraNodesToBrowse", System.Reflection.BindingFlags.NonPublic
392+
var browseTask = (BrowseTask)extractor.GetType().GetField(
393+
"browseTask", System.Reflection.BindingFlags.NonPublic
384394
| System.Reflection.BindingFlags.Instance)
385395
.GetValue(extractor);
386396

397+
var nodesToBrowse = (HashSet<NodeId>)browseTask.GetType().GetField(
398+
"nodesToBrowse", System.Reflection.BindingFlags.NonPublic
399+
| System.Reflection.BindingFlags.Instance)
400+
.GetValue(browseTask);
401+
387402
await TestUtils.WaitForCondition(() =>
388403
{
389-
return !extraNodesToBrowse.IsEmpty;
404+
return nodesToBrowse.Count > 0;
390405
}, 10);
391406

392-
Assert.False(extraNodesToBrowse.IsEmpty);
407+
Assert.True(nodesToBrowse.Count > 0);
393408
}
394409
finally
395410
{
@@ -403,6 +418,7 @@ public async Task TestHistoryTask()
403418
using var pusher = new DummyPusher(new DummyPusherConfig());
404419
using var stateStore = new DummyStateStore();
405420
await using var extractor = tester.BuildExtractor(pusher, stateStore: stateStore);
421+
await extractor.Init(tester.Source.Token);
406422

407423
int cbCount = 0;
408424
void TriggerHistoryRestart() => cbCount++;
@@ -474,6 +490,7 @@ public async Task TestHistoryTaskEvents()
474490
tester.Config.Events.Enabled = true;
475491
tester.Config.Events.History = true;
476492
await using var extractor = tester.BuildExtractor(pusher, stateStore: stateStore);
493+
await extractor.Init(tester.Source.Token);
477494

478495
int cbCount = 0;
479496
void TriggerHistoryRestart() => cbCount++;

0 commit comments

Comments
 (0)