-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathInventoryProcessData.cs
More file actions
185 lines (142 loc) · 6.42 KB
/
InventoryProcessData.cs
File metadata and controls
185 lines (142 loc) · 6.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
using System.Collections.Concurrent;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using ByteSync.Interfaces.Controls.Inventories;
using ByteSync.Models.Inventories;
using ReactiveUI;
using ReactiveUI.Fody.Helpers;
namespace ByteSync.Business.Inventories;
public class InventoryProcessData : ReactiveObject
{
private readonly object _monitorDataLock = new object();
private readonly ConcurrentQueue<SkippedEntry> _skippedEntries = new();
private readonly ConcurrentDictionary<SkipReason, int> _skippedCountsByReason = new();
private int _skippedCount;
public InventoryProcessData()
{
MainStatus = new ReplaySubject<InventoryTaskStatus>(1);
GlobalMainStatus = new ReplaySubject<InventoryTaskStatus>(1);
IdentificationStatus = new ReplaySubject<InventoryTaskStatus>(1);
AnalysisStatus = new ReplaySubject<InventoryTaskStatus>(1);
AreBaseInventoriesComplete = new ReplaySubject<bool>(1);
AreFullInventoriesComplete = new ReplaySubject<bool>(1);
InventoryMonitorDataSubject = new BehaviorSubject<InventoryMonitorData>(new InventoryMonitorData());
InventoryAbortionRequested = new ReplaySubject<bool>(1);
ErrorEvent = new ReplaySubject<bool>(1);
InventoryTransferError = new ReplaySubject<bool>(1);
CancellationTokenSource = new CancellationTokenSource();
Observable.CombineLatest(MainStatus, IdentificationStatus)
.Select(l => (Main: l[0], Identification: l[1]))
.Where(v => v.Main.In(InventoryTaskStatus.Error, InventoryTaskStatus.Cancelled))
.Subscribe(v =>
{
if (v.Identification == InventoryTaskStatus.Pending)
{
IdentificationStatus.OnNext(InventoryTaskStatus.NotLaunched);
}
else if (v.Identification == InventoryTaskStatus.Running)
{
IdentificationStatus.OnNext(v.Main == InventoryTaskStatus.Error
? InventoryTaskStatus.Error
: InventoryTaskStatus.Cancelled);
}
});
Observable.CombineLatest(MainStatus, AnalysisStatus)
.Select(l => (Main: l[0], Analysis: l[1]))
.Where(v => v.Main.In(InventoryTaskStatus.Error, InventoryTaskStatus.Cancelled))
.Subscribe(v =>
{
if (v.Analysis == InventoryTaskStatus.Pending)
{
AnalysisStatus.OnNext(InventoryTaskStatus.NotLaunched);
}
else if (v.Analysis == InventoryTaskStatus.Running)
{
AnalysisStatus.OnNext(v.Main == InventoryTaskStatus.Error
? InventoryTaskStatus.Error
: InventoryTaskStatus.Cancelled);
}
});
Reset();
}
public List<IInventoryBuilder>? InventoryBuilders { get; set; }
public List<Inventory>? Inventories
{
get { return InventoryBuilders?.Select(ib => ib.Inventory).ToList(); }
}
public CancellationTokenSource CancellationTokenSource { get; private set; }
public ISubject<bool> InventoryAbortionRequested { get; }
//
public ISubject<bool> ErrorEvent { get; }
public ISubject<bool> InventoryTransferError { get; }
public ISubject<InventoryTaskStatus> MainStatus { get; set; }
// Aggregated status across all DataMembers
public ISubject<InventoryTaskStatus> GlobalMainStatus { get; set; }
public ISubject<InventoryTaskStatus> IdentificationStatus { get; set; }
public ISubject<InventoryTaskStatus> AnalysisStatus { get; set; }
public ISubject<bool> AreBaseInventoriesComplete { get; set; }
public ISubject<bool> AreFullInventoriesComplete { get; set; }
private BehaviorSubject<InventoryMonitorData> InventoryMonitorDataSubject { get; set; }
public IObservable<InventoryMonitorData> InventoryMonitorObservable => InventoryMonitorDataSubject.AsObservable();
public IReadOnlyCollection<SkippedEntry> SkippedEntries => _skippedEntries.ToArray();
public int SkippedCount => _skippedCount;
[Reactive]
public DateTimeOffset InventoryStart { get; set; }
public Exception? LastException { get; set; }
public void RequestInventoryAbort()
{
CancellationTokenSource.Cancel();
InventoryAbortionRequested.OnNext(true);
}
public void Reset()
{
MainStatus.OnNext(InventoryTaskStatus.Pending);
GlobalMainStatus.OnNext(InventoryTaskStatus.Pending);
IdentificationStatus.OnNext(InventoryTaskStatus.Pending);
AnalysisStatus.OnNext(InventoryTaskStatus.Pending);
AreBaseInventoriesComplete.OnNext(false);
AreFullInventoriesComplete.OnNext(false);
InventoryAbortionRequested.OnNext(false);
CancellationTokenSource = new CancellationTokenSource();
ErrorEvent.OnNext(false);
InventoryTransferError.OnNext(false);
LastException = null;
InventoryMonitorDataSubject.OnNext(new InventoryMonitorData());
ClearSkippedEntries();
}
public void RecordSkippedEntry(SkippedEntry entry)
{
_skippedEntries.Enqueue(entry);
_skippedCountsByReason.AddOrUpdate(entry.Reason, 1, (_, currentCount) => currentCount + 1);
Interlocked.Increment(ref _skippedCount);
}
// should be used during issue 268 implementation
public int GetSkippedCountByReason(SkipReason reason)
{
return _skippedCountsByReason.GetValueOrDefault(reason, 0);
}
public void SetError(Exception exception)
{
LastException = exception;
ErrorEvent.OnNext(true);
}
public void UpdateMonitorData(Action<InventoryMonitorData> action)
{
lock (_monitorDataLock)
{
var currentValue = InventoryMonitorDataSubject.Value;
var newValue = currentValue with { };
action.Invoke(newValue);
InventoryMonitorDataSubject.OnNext(newValue);
}
}
private void ClearSkippedEntries()
{
while (_skippedEntries.TryDequeue(out _))
{
}
_skippedCountsByReason.Clear();
Interlocked.Exchange(ref _skippedCount, 0);
}
}