-
-
Notifications
You must be signed in to change notification settings - Fork 295
Expand file tree
/
Copy pathTransferProcess.cs
More file actions
265 lines (219 loc) · 9.31 KB
/
TransferProcess.cs
File metadata and controls
265 lines (219 loc) · 9.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
// -----------------------------------------------------------------------
// <copyright file="TransferProcess.cs" company="Asynkron AB">
// Copyright (C) 2015-2024 Asynkron AB All rights reserved
// </copyright>
// -----------------------------------------------------------------------
using System;
using System.Threading.Tasks;
using Proto;
using Proto.Persistence;
using Saga.Messages;
namespace Saga;
internal class TransferProcess : IActor
{
private readonly decimal _amount;
private readonly double _availability;
private readonly Behavior _behavior = new();
private readonly PID _from;
private readonly Persistence _persistence;
private readonly string _persistenceId;
private readonly Random _random;
private readonly PID _to;
private bool _processCompleted;
private bool _restarting;
private bool _stopping;
public TransferProcess(
PID from,
PID to,
decimal amount,
IProvider provider,
string persistenceId,
Random random,
double availability
)
{
_from = from;
_to = to;
_amount = amount;
_persistenceId = persistenceId;
_random = random;
_availability = availability;
_persistence = Persistence.WithEventSourcing(provider, persistenceId, ApplyEvent);
}
public async Task ReceiveAsync(IContext context)
{
var message = context.Message;
Console.WriteLine($"[{_persistenceId}] Receiving :{message}");
switch (message)
{
case Started:
// default to Starting behavior
_behavior.Become(Starting);
// recover state from persistence - if there are any events, the current behavior
// should change
await _persistence.RecoverStateAsync();
break;
case Stopping:
_stopping = true;
break;
case Restarting:
_restarting = true;
break;
case Stopped _ when !_processCompleted:
await _persistence.PersistEventAsync(new TransferFailed("Unknown. Transfer Process crashed"));
await _persistence.PersistEventAsync(
new EscalateTransfer("Unknown failure. Transfer Process crashed")
);
context.Send(context.Parent!, new UnknownResult(context.Self));
return;
case Terminated _ when _restarting || _stopping:
// if the TransferProcess itself is restarting or stopping due to failure, we will receive a
// Terminated message for any child actors due to them being stopped but we should not
// treat this as a failure of the saga, so return here to stop further processing
return;
default:
// simulate failures of the transfer process itself
if (Fail())
{
throw new Exception();
}
break;
}
// pass through all messages to the current behavior. Note this includes the Started message we
// may have just handled as what we should do when started depends on the current behavior
await _behavior.ReceiveAsync(context);
}
private static Props TryCredit(PID targetActor, decimal amount) =>
Props
.FromProducer(() => new AccountProxy(targetActor, sender => new ChangeBalance.Credit(amount, sender)));
private static Props TryDebit(PID targetActor, decimal amount) =>
Props
.FromProducer(() => new AccountProxy(targetActor, sender => new ChangeBalance.Debit(amount, sender)));
private void ApplyEvent(Event @event)
{
Console.WriteLine($"Applying event: {@event.Data}");
switch (@event.Data)
{
case TransferStarted:
_behavior.Become(AwaitingDebitConfirmation);
break;
case AccountDebited:
_behavior.Become(AwaitingCreditConfirmation);
break;
case CreditRefused:
_behavior.Become(RollingBackDebit);
break;
case AccountCredited:
case DebitRolledBack:
case TransferFailed:
_processCompleted = true;
break;
}
}
private bool Fail()
{
var comparison = _random.NextDouble() * 100;
return comparison > _availability;
}
private async Task Starting(IContext context)
{
if (context.Message is Started)
{
context.SpawnNamed(TryDebit(_from, -_amount), "DebitAttempt");
await _persistence.PersistEventAsync(new TransferStarted());
}
}
private async Task AwaitingDebitConfirmation(IContext context)
{
switch (context.Message)
{
case Started _:
// if we are in this state when restarted then we need to recreate the TryDebit actor
context.SpawnNamed(TryDebit(_from, -_amount), "DebitAttempt");
break;
case OK _:
// good to proceed to the credit
await _persistence.PersistEventAsync(new AccountDebited());
context.SpawnNamed(TryCredit(_to, +_amount), "CreditAttempt");
break;
case Refused _:
// the debit has been refused, and should not be retried
await _persistence.PersistEventAsync(new TransferFailed("Debit refused"));
context.Send(context.Parent!, new Result.FailedButConsistentResult(context.Self));
StopAll(context);
break;
case Terminated _:
// the actor that is trying to make the debit has failed to respond with success
// we dont know why
await _persistence.PersistEventAsync(new StatusUnknown());
StopAll(context);
break;
}
}
private async Task AwaitingCreditConfirmation(IContext context)
{
switch (context.Message)
{
case Started:
// if we are in this state when started then we need to recreate the TryCredit actor
context.SpawnNamed(TryCredit(_to, +_amount), "CreditAttempt");
break;
case OK:
var fromBalance =
await context.RequestAsync<decimal>(_from, new GetBalance(), TimeSpan.FromMilliseconds(2000));
var toBalance =
await context.RequestAsync<decimal>(_to, new GetBalance(), TimeSpan.FromMilliseconds(2000));
await _persistence.PersistEventAsync(new AccountCredited());
await _persistence.PersistEventAsync(new TransferCompleted(_from, fromBalance, _to, toBalance));
context.Send(context.Parent!, new Result.SuccessResult(context.Self));
StopAll(context);
break;
case Refused:
// sometimes a remote service might say it refuses to perform some operation.
// This is different from a failure
await _persistence.PersistEventAsync(new CreditRefused());
// we have definitely debited the _from account as it was confirmed, and we
// haven't credited to _to account, so try and rollback
context.SpawnNamed(TryCredit(_from, +_amount), "RollbackDebit");
break;
case Terminated:
// at this point, we do not know if the credit succeeded. The remote account has not
// confirmed success, but it might have succeeded then crashed, or failed to respond.
// Given that we don't know, just fail + escalate
await _persistence.PersistEventAsync(new StatusUnknown());
StopAll(context);
break;
}
}
private async Task RollingBackDebit(IContext context)
{
switch (context.Message)
{
case Started:
// if we are in this state when started then we need to recreate the TryCredit actor
context.SpawnNamed(TryCredit(_from, +_amount), "RollbackDebit");
break;
case OK:
await _persistence.PersistEventAsync(new DebitRolledBack());
await _persistence.PersistEventAsync(new TransferFailed($"Unable to rollback debit to {_to.Id}"));
context.Send(context.Parent!, new Result.FailedAndInconsistent(context.Self));
StopAll(context);
break;
case Refused: // in between making the credit and debit, the _from account has started refusing!! :O
case Terminated:
await _persistence.PersistEventAsync(
new TransferFailed($"Unable to rollback process. {_from.Id} is owed {_amount}")
);
await _persistence.PersistEventAsync(new EscalateTransfer($"{_from.Id} is owed {_amount}"));
context.Send(context.Parent!, new Result.FailedAndInconsistent(context.Self));
StopAll(context);
break;
}
}
private void StopAll(IContext context)
{
context.Stop(_from);
context.Stop(_to);
context.Stop(context.Self);
}
}