diff --git a/libs/ngrx-toolkit/src/lib/mutation.spec.ts b/libs/ngrx-toolkit/src/lib/mutation.spec.ts new file mode 100644 index 00000000..f6259e70 --- /dev/null +++ b/libs/ngrx-toolkit/src/lib/mutation.spec.ts @@ -0,0 +1,96 @@ +import { fakeAsync, TestBed, tick } from '@angular/core/testing'; +import { patchState, signalStore, withMethods, withState } from '@ngrx/signals'; +import { delay, Observable, of } from 'rxjs'; +import { mutation, rxMutation, withMutations } from './mutation'; + +function calcDouble(value: number): Observable { + return of(value * 2).pipe(delay(1000)); +} + +describe('mutation', () => { + it('rxMutation should update the state', fakeAsync(() => { + TestBed.runInInjectionContext(() => { + const Store = signalStore( + withState({ counter: 3 }), + withMutations(), + withMethods((store) => ({ + increment: rxMutation(store, { + operation: (value: number) => calcDouble(value), + onSuccess: (_params, result) => { + patchState(store, (state) => ({ + counter: state.counter + result, + })); + }, + }), + })), + ); + const store = new Store(); + + store.increment(2); + tick(1000); + expect(store.counter()).toEqual(7); + }); + })); + + it('rxMutation deals with race conditions', fakeAsync(() => { + TestBed.runInInjectionContext(() => { + const Store = signalStore( + withState({ counter: 3 }), + withMutations(), + withMethods((store) => ({ + increment: rxMutation(store, { + operation: (value: number) => calcDouble(value), + onSuccess: (_params, result) => { + patchState(store, (state) => ({ + counter: state.counter + result, + })); + }, + operator: 'switch', + }), + })), + ); + + const store = new Store(); + + const successSpy = jest.fn(); + const errorSpy = jest.fn(); + + store.increment.success.subscribe(successSpy); + store.increment.error.subscribe(errorSpy); + + store.increment(1); + tick(500); + store.increment(2); + tick(1000); + + expect(store.counter()).toEqual(7); + expect(successSpy).toHaveBeenCalledTimes(1); + expect(errorSpy).toHaveBeenCalledTimes(0); + expect(successSpy).toHaveBeenCalledWith({ params: 2, result: 4 }); + }); + })); + + it('mutation should update the state', fakeAsync(() => { + TestBed.runInInjectionContext(() => { + const Store = signalStore( + withState({ counter: 3 }), + withMutations(), + withMethods((store) => ({ + increment: mutation(store, { + operation: (value: number) => calcDouble(value), + onSuccess: (_params, result) => { + patchState(store, (state) => ({ + counter: state.counter + result, + })); + }, + }), + })), + ); + const store = new Store(); + + store.increment(2); + tick(1000); + expect(store.counter()).toEqual(7); + }); + })); +}); diff --git a/libs/ngrx-toolkit/src/lib/mutation.ts b/libs/ngrx-toolkit/src/lib/mutation.ts new file mode 100644 index 00000000..531729b0 --- /dev/null +++ b/libs/ngrx-toolkit/src/lib/mutation.ts @@ -0,0 +1,156 @@ +import { computed, DestroyRef, inject, Injector } from '@angular/core'; +import { takeUntilDestroyed } from '@angular/core/rxjs-interop'; +import { + patchState, + signalStoreFeature, + withComputed, + withState, + WritableStateSource, +} from '@ngrx/signals'; +import { + catchError, + concatMap, + exhaustMap, + firstValueFrom, + mergeMap, + Observable, + of, + OperatorFunction, + Subject, + switchMap, + tap, +} from 'rxjs'; + +// +// CREDITS: This implementation is highly influenced by Marko Stanimirović' prototype: +// https://github.com/markostanimirovic/rx-resource-proto +// +// Big thanks to Marko for sharing his knowledge and for his great work! +// + +export interface MutationState { + _mutationCount: number; + mutationError: string | null; +} + +const incrementCounter = (state: MutationState) => ({ + ...state, + _mutationCount: state._mutationCount + 1, +}); +const decrementCounter = (state: MutationState) => ({ + ...state, + _mutationCount: state._mutationCount - 1, +}); + +export const clearMutationError = (state: MutationState) => ({ + ...state, + mutationError: null, +}); + +export type FlatteningOperator = 'merge' | 'concat' | 'switch' | 'exhaust'; + +export interface MutationOptions { + operation: (params: P) => Observable; + onSuccess?: (params: P, result: R) => void; + onError?: (params: P, error: unknown) => string | void; + operator?: FlatteningOperator; + injector?: Injector; +} + +function flatten( + operation: (params: P) => Observable, + operator: FlatteningOperator, +): OperatorFunction { + switch (operator) { + case 'concat': + return concatMap(operation); + case 'switch': + return switchMap(operation); + case 'exhaust': + return exhaustMap(operation); + case 'merge': + default: + return mergeMap(operation); + } +} + +export function rxMutation( + store: WritableStateSource, + options: MutationOptions, +) { + const destroyRef = options.injector?.get(DestroyRef) || inject(DestroyRef); + const mutationSubject = new Subject

(); + + const successSubject = new Subject<{ params: P; result: R }>(); + const errorSubject = new Subject<{ params: P; error: unknown }>(); + + const operator = options.operator || 'merge'; + const flatteningOp = flatten((params: P) => { + return options.operation(params).pipe( + tap((result) => { + options.onSuccess?.(params, result); + patchState(store, decrementCounter); + successSubject.next({ params, result }); + }), + catchError((error) => { + console.error('Mutation error:', error); + const mutationError = + options.onError?.(params, error) ?? + error.message ?? + 'Mutation failed'; + patchState(store, mutationError, decrementCounter); + errorSubject.next({ params, error }); + return of(null); + }), + ); + }, operator); + + mutationSubject + .pipe(flatteningOp, takeUntilDestroyed(destroyRef)) + .subscribe(); + + const result = (params: P) => { + patchState(store, incrementCounter); + mutationSubject.next(params); + }; + + result.success = successSubject.asObservable(); + result.error = errorSubject.asObservable(); + + return result; +} + +export function mutation( + store: WritableStateSource, + options: MutationOptions, +): (params: P) => Promise { + return async (params: P): Promise => { + patchState(store, incrementCounter); + try { + const result = await firstValueFrom(options.operation(params)); + options.onSuccess?.(params, result); + return result; + } catch (error) { + console.error('Mutation error:', error); + const mutationError = + options.onError?.(params, error) ?? + (error instanceof Error ? error.message : 'Mutation failed'); + patchState(store, { mutationError }); + throw error; + } finally { + patchState(store, decrementCounter); + } + }; +} + +export function withMutations() { + return signalStoreFeature( + withState({ + _mutationCount: 0, + mutationError: null, + }), + withComputed((state) => ({ + isProcessing: computed(() => state._mutationCount() > 0), + })), + ); +}