@@ -5,14 +5,7 @@ import { parseObservableMarble } from '../marbles/parseObservableMarble';
55import { SubscriptionMarbleToken } from '../marbles/SubscriptionMarbleToken' ;
66import { TestMessage } from '../message/TestMessage' ;
77import { TestMessageValue } from '../message/TestMessage' ;
8- import {
9- AsyncAction ,
10- ColdObservable ,
11- COMPLETE_NOTIFICATION ,
12- errorNotification ,
13- HotObservable ,
14- nextNotification ,
15- } from '../utils/coreInternalImport' ;
8+ import { AsyncAction , ColdObservable , HotObservable } from '../utils/coreInternalImport' ;
169import { calculateSubscriptionFrame } from './calculateSubscriptionFrame' ;
1710
1811/**
@@ -97,11 +90,17 @@ const getCreateHotObservable = (state: SandboxState) => {
9790 * As we don't inherit virtualtimescheduler anymore, only these functions should be
9891 * used to properly flush out actions. Calling `scheduler.flush()` will not do any work.
9992 */
100- function getSchedulerFlushFunctions ( state : SandboxState , flushWithAsyncTick : true ) : {
93+ function getSchedulerFlushFunctions (
94+ state : SandboxState ,
95+ flushWithAsyncTick : true
96+ ) : {
10197 flushUntil : ( toFrame ?: number ) => Promise < void > ;
10298 advanceTo : ( toFrame ?: number ) => Promise < void > ;
10399} ;
104- function getSchedulerFlushFunctions ( state : SandboxState , flushWithAsyncTick : false ) : {
100+ function getSchedulerFlushFunctions (
101+ state : SandboxState ,
102+ flushWithAsyncTick : false
103+ ) : {
105104 flushUntil : ( toFrame ?: number ) => void ;
106105 advanceTo : ( toFrame ?: number ) => void ;
107106} ;
@@ -131,9 +130,11 @@ function getSchedulerFlushFunctions(state: SandboxState, flushWithAsyncTick: boo
131130 * For synchronous loop, it'll use plain `while` loop. In case of flushing with tick, each action
132131 * will be scheduled into promise instead.
133132 */
134- function loopActions ( loopState : SandboxState ,
135- condition : ( loopState : SandboxState ) => boolean ,
136- fn : ( loopState : SandboxState ) => Error | undefined ) : Promise < Error | undefined > | Error | undefined {
133+ function loopActions (
134+ loopState : SandboxState ,
135+ condition : ( loopState : SandboxState ) => boolean ,
136+ fn : ( loopState : SandboxState ) => Error | undefined
137+ ) : Promise < Error | undefined > | Error | undefined {
137138 if ( ! flushWithAsyncTick ) {
138139 let fnResult ;
139140 while ( condition ( loopState ) ) {
@@ -160,15 +161,19 @@ function getSchedulerFlushFunctions(state: SandboxState, flushWithAsyncTick: boo
160161
161162 // flush actions via custom loop fn, as same as
162163 // https://github.com/kwonoj/rx-sandbox/blob/c2922e5c5e2503739c64af626f2861b1e1f38159/src/scheduler/TestScheduler.ts#L166-L173
163- const loopResult = loopActions ( state , ( flushState ) => {
164- const action = flushState . scheduler . actions [ 0 ] ;
165- return ! ! action && action . delay <= toFrame ;
166- } , ( flushState ) => {
167- const action = flushState . scheduler . actions . shift ( ) ! ;
168- flushState . scheduler . frame = action . delay ;
169-
170- return action . execute ( action . state , action . delay ) ;
171- } ) ;
164+ const loopResult = loopActions (
165+ state ,
166+ ( flushState ) => {
167+ const action = flushState . scheduler . actions [ 0 ] ;
168+ return ! ! action && action . delay <= toFrame ;
169+ } ,
170+ ( flushState ) => {
171+ const action = flushState . scheduler . actions . shift ( ) ! ;
172+ flushState . scheduler . frame = action . delay ;
173+
174+ return action . execute ( action . state , action . delay ) ;
175+ }
176+ ) ;
172177
173178 const tearDown = ( error ?: Error ) => {
174179 state . flushing = false ;
@@ -212,15 +217,20 @@ function getSchedulerFlushFunctions(state: SandboxState, flushWithAsyncTick: boo
212217 }
213218
214219 const flushResult = flushUntil ( toFrame ) ;
215- const tearDown = ( ) => { state . scheduler . frame = toFrame ; } ;
220+ const tearDown = ( ) => {
221+ state . scheduler . frame = toFrame ;
222+ } ;
216223 return isPromise ( flushResult ) ? flushResult . then ( ( ) => tearDown ( ) ) : tearDown ( ) ;
217224 } ;
218225
219226 return { flushUntil, advanceTo } ;
220227}
221228
222229type getMessages = < T = string > ( observable : Observable < T > , unsubscriptionMarbles ?: string | null ) => void ;
223- type getMessagesWithTick = < T = string > ( observable : Observable < T > , unsubscriptionMarbles ?: string | null ) => Promise < void > ;
230+ type getMessagesWithTick = < T = string > (
231+ observable : Observable < T > ,
232+ unsubscriptionMarbles ?: string | null
233+ ) => Promise < void > ;
224234
225235/**
226236 * create getMessages function. Depends on flush, this'll either work asynchronously or synchronously.
@@ -235,11 +245,11 @@ function createGetMessages(state: SandboxState, flush: Function): Function {
235245 const pushMetaData = ( notification : ObservableNotification < T > ) =>
236246 innerObservableMetadata . push ( new TestMessageValue < T > ( state . scheduler . frame - outerFrame , notification ) ) ;
237247
238- observable . subscribe (
239- ( value ) => pushMetaData ( nextNotification ( value ) ) ,
240- ( err ) => pushMetaData ( errorNotification ( err ) ) ,
241- ( ) => pushMetaData ( COMPLETE_NOTIFICATION )
242- ) ;
248+ observable . subscribe ( {
249+ next : ( value ) => pushMetaData ( { kind : 'N' , value } ) ,
250+ error : ( error ) => pushMetaData ( { kind : 'E' , error } ) ,
251+ complete : ( ) => pushMetaData ( { kind : 'C' } ) ,
252+ } ) ;
243253
244254 return innerObservableMetadata ;
245255 } ;
@@ -257,16 +267,15 @@ function createGetMessages(state: SandboxState, flush: Function): Function {
257267
258268 let subscription : Subscription | null = null ;
259269 state . scheduler . schedule ( ( ) => {
260- subscription = observable . subscribe (
261- ( value : T ) =>
262- pushMetadata (
263- nextNotification (
264- value instanceof Observable ? materializeInnerObservable < T > ( value , state . scheduler . frame ) : value
265- )
266- ) ,
267- ( err : any ) => pushMetadata ( errorNotification ( err ) ) ,
268- ( ) => pushMetadata ( COMPLETE_NOTIFICATION )
269- ) ;
270+ subscription = observable . subscribe ( {
271+ next : ( value : T ) =>
272+ pushMetadata ( {
273+ kind : 'N' ,
274+ value : value instanceof Observable ? materializeInnerObservable < T > ( value , state . scheduler . frame ) : value ,
275+ } ) ,
276+ error : ( error : any ) => pushMetadata ( { kind : 'E' , error } ) ,
277+ complete : ( ) => pushMetadata ( { kind : 'C' } ) ,
278+ } ) ;
270279 } , subscribedFrame ) ;
271280
272281 if ( unsubscribedFrame !== Number . POSITIVE_INFINITY ) {
@@ -294,7 +303,7 @@ const initializeSandboxState = (autoFlush: boolean, frameTimeFactor: number, max
294303 maxFrame,
295304 frameTimeFactor,
296305 scheduler : new VirtualTimeScheduler ( VirtualAction , Number . POSITIVE_INFINITY ) ,
297- autoFlush
306+ autoFlush,
298307 } ;
299308} ;
300309
@@ -337,8 +346,8 @@ interface SchedulerInstance extends BaseSchedulerInstance {
337346
338347interface AsyncSchedulerInstance extends BaseSchedulerInstance {
339348 /**
340- * Flush out currently scheduled observables, only until reaches frame specfied.
341- */
349+ * Flush out currently scheduled observables, only until reaches frame specfied.
350+ */
342351 advanceTo : ReturnTypeWithArgs < typeof getSchedulerFlushFunctions , [ SandboxState , true ] > [ 'advanceTo' ] ;
343352 /**
344353 * Flush out currently scheduled observables, fill values returned by `getMarbles`.
@@ -355,9 +364,24 @@ interface AsyncSchedulerInstance extends BaseSchedulerInstance {
355364/**
356365 * Creates a new instance of virtualScheduler, along with utility functions for sandbox assertions.
357366 */
358- function createTestScheduler ( autoFlush : boolean , frameTimeFactor : number , maxFrameValue : number , flushWithAsyncTick : true ) : AsyncSchedulerInstance ;
359- function createTestScheduler ( autoFlush : boolean , frameTimeFactor : number , maxFrameValue : number , flushWithAsyncTick : false ) : SchedulerInstance ;
360- function createTestScheduler ( autoFlush : boolean , frameTimeFactor : number , maxFrameValue : number , flushWithAsyncTick : boolean ) : any {
367+ function createTestScheduler (
368+ autoFlush : boolean ,
369+ frameTimeFactor : number ,
370+ maxFrameValue : number ,
371+ flushWithAsyncTick : true
372+ ) : AsyncSchedulerInstance ;
373+ function createTestScheduler (
374+ autoFlush : boolean ,
375+ frameTimeFactor : number ,
376+ maxFrameValue : number ,
377+ flushWithAsyncTick : false
378+ ) : SchedulerInstance ;
379+ function createTestScheduler (
380+ autoFlush : boolean ,
381+ frameTimeFactor : number ,
382+ maxFrameValue : number ,
383+ flushWithAsyncTick : boolean
384+ ) : any {
361385 const sandboxState = initializeSandboxState ( autoFlush , frameTimeFactor , maxFrameValue ) ;
362386
363387 const { flushUntil, advanceTo } = getSchedulerFlushFunctions ( sandboxState , flushWithAsyncTick as any ) ;
0 commit comments