Skip to content

Commit a5cf81d

Browse files
Dillon Nysdnys1
authored andcommitted
fix(core): Cross-zone completion
Fixes an issue with state machines and how errors are propagated between Zones. Since errors never cross error Zone boundaries, and because these boundaries are inserted often outside our control, all Futures we vend from EventCompleter must be created in the listening Zone.
1 parent 93e8135 commit a5cf81d

File tree

4 files changed

+146
-26
lines changed

4 files changed

+146
-26
lines changed

packages/amplify_core/lib/src/state_machine/event.dart

Lines changed: 79 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -54,55 +54,114 @@ final class EventCompleter<Event extends StateMachineEvent,
5454
/// here and chain it with later stack traces.
5555
final StackTrace stackTrace;
5656

57-
/// The zone in which this event was created.
57+
/// The zone in which this event was created, used to guarantee Zone values
58+
/// are present from the Zone in which the event was created.
5859
///
59-
/// Used in [run] to guarantee callbacks run in the same zone that this event
60+
/// Due to how Zones work with Streams, it cannot be guaranteed that the Zone
61+
/// in which this event is accepted (which is the zone in which the state
62+
/// machine was created) will be the same as the zone in which the event
6063
/// was created.
6164
final Zone _zone = Zone.current;
62-
final Completer<void> _acceptedCompleter = Completer();
63-
final Completer<State> _completer = Completer();
65+
66+
/// Every time [accepted] or [completed] is called, generate a new completer.
67+
/// This is because the Zone in which a [Completer] is instantiated **must**
68+
/// match the Zone in which its future is listened to, otherwise the future
69+
/// will never complete.
70+
///
71+
/// That is, running `_zone.run(completer.complete)` would still throw
72+
/// the error in the Zone where the completer was instantiated. And due
73+
/// to how Zone's work, a listener for a completer which completes in a
74+
/// different error zone will never finish.
75+
///
76+
/// The following example illustrates the problem we're trying to solve
77+
/// here:
78+
///
79+
/// ```dart
80+
/// import "dart:async";
81+
///
82+
/// void main() {
83+
/// final completer = Completer();
84+
/// runZonedGuarded(() async {
85+
/// await completer.future;
86+
/// print('never printed');
87+
/// }, (e, s) {
88+
/// print('never printed');
89+
/// });
90+
/// completer.future.catchError((e) => print('outer zone: $e'));
91+
/// completer.completeError('error');
92+
/// }
93+
/// ```
94+
///
95+
/// See this [Dart issue](https://github.com/dart-lang/sdk/issues/49457) for
96+
/// more information.
97+
final List<Completer<void>> _acceptedCompleters = [];
98+
final List<Completer<State>> _completers = [];
99+
100+
var _accepted = false;
101+
(State?, Object?, StackTrace?)? _completion;
64102

65103
/// Completes when the event is accepted by the respective state machine.
66104
///
67105
/// After this completes, intermediate changes can be listened for on the
68106
/// event's state machine.
69-
Future<void> get accepted => _acceptedCompleter.future;
107+
Future<void> get accepted {
108+
if (_accepted) {
109+
return Future.value();
110+
}
111+
final completer = Completer<void>();
112+
_acceptedCompleters.add(completer);
113+
return completer.future;
114+
}
70115

71116
/// Completes with the stopping state emitted after the full propagation
72117
/// of this event.
73-
Future<State> get completed => _completer.future;
118+
Future<State> get completed {
119+
if (_completion case (final completion?, _, _)) {
120+
return Future.value(completion);
121+
}
122+
if (_completion case (_, final error?, final stackTrace?)) {
123+
return Future.error(error, stackTrace);
124+
}
125+
final completer = Completer<State>();
126+
_completers.add(completer);
127+
return completer.future;
128+
}
74129

75130
/// Accepts the event by a state machine.
76131
void accept() {
77-
if (!_acceptedCompleter.isCompleted) {
78-
_acceptedCompleter.complete();
132+
_accepted = true;
133+
for (final completer in _acceptedCompleters) {
134+
if (!completer.isCompleted) {
135+
completer.complete();
136+
}
79137
}
138+
_acceptedCompleters.clear();
80139
}
81140

82141
/// Completes the event propagation with its stopping state.
83142
void complete(State state) {
84-
if (!_completer.isCompleted) {
85-
_completer.complete(state);
143+
_completion ??= (state, null, null);
144+
for (final completer in _completers) {
145+
if (!completer.isCompleted) {
146+
completer.complete(state);
147+
}
86148
}
149+
_completers.clear();
87150
}
88151

89152
/// Completes the event propagation with an error, if the event failed to
90153
/// resolve to a meaningful stopping state.
91154
void completeError(Object error, StackTrace stackTrace) {
92-
if (!_completer.isCompleted) {
93-
_completer.completeError(error, stackTrace);
155+
_completion ??= (null, error, stackTrace);
156+
for (final completer in _completers) {
157+
if (!completer.isCompleted) {
158+
completer.completeError(error, stackTrace);
159+
}
94160
}
161+
_completers.clear();
95162
}
96163

97164
/// Runs [body] in the [Zone] which this event was created.
98-
///
99-
/// Due to how Zones work in Flutter, it cannot be guaranteed that the Zone
100-
/// in which this event is accepted (which is the zone in which the state
101-
/// machine was created) will be the same as the zone in which the _event_
102-
/// was created.
103-
///
104-
/// Since events are created in the same zone as the user's call, we should
105-
/// default to using this zone for running state machine actions.
106165
R run<R>(R Function() body) => _zone.run(body);
107166

108167
/// Ignores the result of the event completer.

packages/amplify_core/lib/src/state_machine/state_machine.dart

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ abstract class StateMachineManager<
6868
E extends StateMachineEvent,
6969
S extends StateMachineState,
7070
Manager extends StateMachineManager<E, S, Manager>>
71-
with Dispatcher<E, S>
71+
with Dispatcher<E, S>, AWSDebuggable, AWSLoggerMixin
7272
implements DependencyManager, Closeable {
7373
/// {@macro amplify_core.state_machinedispatcher}
7474
StateMachineManager(
@@ -102,10 +102,17 @@ abstract class StateMachineManager<
102102

103103
Future<void> _listenForEvents() async {
104104
await for (final completer in _eventController.stream) {
105-
await dispatch(completer.event, completer).completed;
105+
try {
106+
await dispatch(completer.event, completer).completed;
107+
} on Object {
108+
continue;
109+
}
106110
}
107111
}
108112

113+
@override
114+
String get runtimeTypeName => 'StateMachineManager';
115+
109116
@override
110117
void addBuilder<T extends Object>(
111118
DependencyBuilder<T> builder, [
@@ -288,8 +295,8 @@ abstract class StateMachine<
288295
// Chain the stack trace of [_currentEvent]'s creation and the state machine
289296
// error to create a full picture of the error's lifecycle.
290297
final eventTrace = Trace.from(_currentCompleter.stackTrace);
291-
final stateMachineTrace = Trace.from(stackTrace);
292-
stackTrace = Chain([stateMachineTrace, eventTrace]);
298+
final stateMachineTrace = Chain.forTrace(stackTrace);
299+
stackTrace = Chain([...stateMachineTrace.traces, eventTrace]);
293300

294301
logger.debug('Emitted error', error, stackTrace);
295302

@@ -298,8 +305,9 @@ abstract class StateMachine<
298305
// Add the error to the state stream if it cannot be resolved to a new
299306
// state internally.
300307
if (resolution == null) {
301-
_currentCompleter.completeError(error, stackTrace);
302308
_stateController.addError(error, stackTrace);
309+
manager._stateController.addError(error, stackTrace);
310+
_currentCompleter.completeError(error, stackTrace);
303311
return;
304312
}
305313

packages/amplify_core/test/state_machine/my_state_machine.dart

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ final _builders = <StateMachineToken,
1212
WorkerMachine.type: WorkerMachine.new,
1313
};
1414

15-
enum MyType { initial, doWork, tryWork, delegateWork, success, error }
15+
enum MyType { initial, doWork, tryWork, delegateWork, failHard, success, error }
1616

1717
class MyPreconditionException implements PreconditionException {
1818
const MyPreconditionException(this.precondition);
@@ -105,6 +105,8 @@ class MyStateMachine extends StateMachine<MyEvent, MyState, StateMachineEvent,
105105
case MyType.delegateWork:
106106
await manager.delegateWork();
107107
emit(const MyState(MyType.success));
108+
case MyType.failHard:
109+
await Future<void>.error(StateError('Worker crashed'));
108110
}
109111
}
110112

@@ -240,4 +242,7 @@ class MyStateMachineManager extends StateMachineManager<StateMachineEvent,
240242
}
241243
throw ArgumentError('Invalid event: $event');
242244
}
245+
246+
@override
247+
String get runtimeTypeName => 'MyStateMachineManager';
243248
}

packages/amplify_core/test/state_machine/state_machine_test.dart

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
import 'dart:async';
5+
46
import 'package:amplify_core/amplify_core.dart';
57
import 'package:stack_trace/stack_trace.dart';
68
import 'package:test/test.dart';
@@ -123,6 +125,52 @@ void main() {
123125
});
124126
});
125127

128+
group('cross-zone', () {
129+
test('success', () {
130+
final completer = stateMachine.accept(const MyEvent(MyType.doWork));
131+
runZoned(() {
132+
expect(
133+
completer.completed,
134+
completes,
135+
reason: 'Should complete in forked zone',
136+
);
137+
});
138+
runZonedGuarded(
139+
() {
140+
expect(
141+
completer.completed,
142+
completes,
143+
reason: 'Should complete with different error zone',
144+
);
145+
},
146+
(e, st) {},
147+
);
148+
expect(completer.completed, completes);
149+
});
150+
151+
test('errors', () {
152+
final completer = stateMachine.accept(const MyEvent(MyType.failHard));
153+
runZoned(() {
154+
expect(
155+
completer.completed,
156+
throwsA(isA<Error>()),
157+
reason: 'Should complete in forked zone',
158+
);
159+
});
160+
runZonedGuarded(
161+
() {
162+
expect(
163+
completer.completed,
164+
throwsA(isA<Error>()),
165+
reason: 'Should complete with different error zone',
166+
);
167+
},
168+
(e, st) {},
169+
);
170+
expect(completer.completed, throwsA(isA<Error>()));
171+
});
172+
});
173+
126174
group('subscribeTo', () {
127175
test('can listen to other machines', () {
128176
stateMachine.accept(const MyEvent(MyType.delegateWork)).ignore();

0 commit comments

Comments
 (0)