Skip to content

Commit 383047d

Browse files
committed
feat: add mutation api
rxMutation creates a mutation function in the store delegating to a function/method returning an observable and takes care of race conditions by internally using mergeMap (default), switchMap, concatMap, or exhaustMap. mutation also creates an async mutation function delegating to a function/method returning an observable. When the async function is awaited without throwing an exception, the caller can safely assume that it has been executed. However, here, the consumer is responsible for preventing race conditions.
1 parent dd9f381 commit 383047d

File tree

2 files changed

+252
-0
lines changed

2 files changed

+252
-0
lines changed
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import { fakeAsync, TestBed, tick } from '@angular/core/testing';
2+
import { patchState, signalStore, withMethods, withState } from '@ngrx/signals';
3+
import { delay, Observable, of } from 'rxjs';
4+
import { mutation, rxMutation, withMutations } from './mutation';
5+
6+
function calcDouble(value: number): Observable<number> {
7+
return of(value * 2).pipe(delay(1000));
8+
}
9+
10+
describe('mutation', () => {
11+
it('rxMutation should update the state', fakeAsync(() => {
12+
TestBed.runInInjectionContext(() => {
13+
const Store = signalStore(
14+
withState({ counter: 3 }),
15+
withMutations(),
16+
withMethods((store) => ({
17+
increment: rxMutation(store, {
18+
operation: (value: number) => calcDouble(value),
19+
onSuccess: (_params, result) => {
20+
patchState(store, (state) => ({
21+
counter: state.counter + result,
22+
}));
23+
},
24+
}),
25+
})),
26+
);
27+
const store = new Store();
28+
29+
store.increment(2);
30+
tick(1000);
31+
expect(store.counter()).toEqual(7);
32+
});
33+
}));
34+
35+
it('rxMutation deals with race conditions', fakeAsync(() => {
36+
TestBed.runInInjectionContext(() => {
37+
const Store = signalStore(
38+
withState({ counter: 3 }),
39+
withMutations(),
40+
withMethods((store) => ({
41+
increment: rxMutation(store, {
42+
operation: (value: number) => calcDouble(value),
43+
onSuccess: (_params, result) => {
44+
patchState(store, (state) => ({
45+
counter: state.counter + result,
46+
}));
47+
},
48+
operator: 'switch',
49+
}),
50+
})),
51+
);
52+
53+
const store = new Store();
54+
55+
const successSpy = jest.fn();
56+
const errorSpy = jest.fn();
57+
58+
store.increment.success.subscribe(successSpy);
59+
store.increment.error.subscribe(errorSpy);
60+
61+
store.increment(1);
62+
tick(500);
63+
store.increment(2);
64+
tick(1000);
65+
66+
expect(store.counter()).toEqual(7);
67+
expect(successSpy).toHaveBeenCalledTimes(1);
68+
expect(errorSpy).toHaveBeenCalledTimes(0);
69+
expect(successSpy).toHaveBeenCalledWith({ params: 2, result: 4 });
70+
});
71+
}));
72+
73+
it('mutation should update the state', fakeAsync(() => {
74+
TestBed.runInInjectionContext(() => {
75+
const Store = signalStore(
76+
withState({ counter: 3 }),
77+
withMutations(),
78+
withMethods((store) => ({
79+
increment: mutation(store, {
80+
operation: (value: number) => calcDouble(value),
81+
onSuccess: (_params, result) => {
82+
patchState(store, (state) => ({
83+
counter: state.counter + result,
84+
}));
85+
},
86+
}),
87+
})),
88+
);
89+
const store = new Store();
90+
91+
store.increment(2);
92+
tick(1000);
93+
expect(store.counter()).toEqual(7);
94+
});
95+
}));
96+
});

libs/ngrx-toolkit/src/lib/mutation.ts

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
import { computed, DestroyRef, inject, Injector } from '@angular/core';
2+
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
3+
import {
4+
patchState,
5+
signalStoreFeature,
6+
withComputed,
7+
withState,
8+
WritableStateSource,
9+
} from '@ngrx/signals';
10+
import {
11+
catchError,
12+
concatMap,
13+
exhaustMap,
14+
firstValueFrom,
15+
mergeMap,
16+
Observable,
17+
of,
18+
OperatorFunction,
19+
Subject,
20+
switchMap,
21+
tap,
22+
} from 'rxjs';
23+
24+
//
25+
// CREDITS: This implementation is highly influenced by Marko Stanimirović' prototype:
26+
// https://github.com/markostanimirovic/rx-resource-proto
27+
//
28+
// Big thanks to Marko for sharing his knowledge and for his great work!
29+
//
30+
31+
export interface MutationState {
32+
_mutationCount: number;
33+
mutationError: string | null;
34+
}
35+
36+
const incrementCounter = (state: MutationState) => ({
37+
...state,
38+
_mutationCount: state._mutationCount + 1,
39+
});
40+
const decrementCounter = (state: MutationState) => ({
41+
...state,
42+
_mutationCount: state._mutationCount - 1,
43+
});
44+
45+
export const clearMutationError = (state: MutationState) => ({
46+
...state,
47+
mutationError: null,
48+
});
49+
50+
export type FlatteningOperator = 'merge' | 'concat' | 'switch' | 'exhaust';
51+
52+
export interface MutationOptions<P, R> {
53+
operation: (params: P) => Observable<R>;
54+
onSuccess?: (params: P, result: R) => void;
55+
onError?: (params: P, error: unknown) => string | void;
56+
operator?: FlatteningOperator;
57+
injector?: Injector;
58+
}
59+
60+
function flatten<P, R>(
61+
operation: (params: P) => Observable<R>,
62+
operator: FlatteningOperator,
63+
): OperatorFunction<P, R> {
64+
switch (operator) {
65+
case 'concat':
66+
return concatMap(operation);
67+
case 'switch':
68+
return switchMap(operation);
69+
case 'exhaust':
70+
return exhaustMap(operation);
71+
case 'merge':
72+
default:
73+
return mergeMap(operation);
74+
}
75+
}
76+
77+
export function rxMutation<P, R>(
78+
store: WritableStateSource<MutationState>,
79+
options: MutationOptions<P, R>,
80+
) {
81+
const destroyRef = options.injector?.get(DestroyRef) || inject(DestroyRef);
82+
const mutationSubject = new Subject<P>();
83+
84+
const successSubject = new Subject<{ params: P; result: R }>();
85+
const errorSubject = new Subject<{ params: P; error: unknown }>();
86+
87+
const operator = options.operator || 'merge';
88+
const flatteningOp = flatten((params: P) => {
89+
return options.operation(params).pipe(
90+
tap((result) => {
91+
options.onSuccess?.(params, result);
92+
patchState(store, decrementCounter);
93+
successSubject.next({ params, result });
94+
}),
95+
catchError((error) => {
96+
console.error('Mutation error:', error);
97+
const mutationError =
98+
options.onError?.(params, error) ??
99+
error.message ??
100+
'Mutation failed';
101+
patchState(store, mutationError, decrementCounter);
102+
errorSubject.next({ params, error });
103+
return of(null);
104+
}),
105+
);
106+
}, operator);
107+
108+
mutationSubject
109+
.pipe(flatteningOp, takeUntilDestroyed(destroyRef))
110+
.subscribe();
111+
112+
const result = (params: P) => {
113+
patchState(store, incrementCounter);
114+
mutationSubject.next(params);
115+
};
116+
117+
result.success = successSubject.asObservable();
118+
result.error = errorSubject.asObservable();
119+
120+
return result;
121+
}
122+
123+
export function mutation<P, R>(
124+
store: WritableStateSource<MutationState>,
125+
options: MutationOptions<P, R>,
126+
): (params: P) => Promise<R> {
127+
return async (params: P): Promise<R> => {
128+
patchState(store, incrementCounter);
129+
try {
130+
const result = await firstValueFrom(options.operation(params));
131+
options.onSuccess?.(params, result);
132+
return result;
133+
} catch (error) {
134+
console.error('Mutation error:', error);
135+
const mutationError =
136+
options.onError?.(params, error) ??
137+
(error instanceof Error ? error.message : 'Mutation failed');
138+
patchState(store, { mutationError });
139+
throw error;
140+
} finally {
141+
patchState(store, decrementCounter);
142+
}
143+
};
144+
}
145+
146+
export function withMutations() {
147+
return signalStoreFeature(
148+
withState<MutationState>({
149+
_mutationCount: 0,
150+
mutationError: null,
151+
}),
152+
withComputed((state) => ({
153+
isProcessing: computed(() => state._mutationCount() > 0),
154+
})),
155+
);
156+
}

0 commit comments

Comments
 (0)