@@ -913,12 +913,8 @@ public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource>
913913 /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
914914 /// <param name="source">The source.</param>
915915 /// <returns>An Observable of T and a release mechanism.</returns>
916- public static IObservable < ( T Value , IDisposable Sync ) > SynchronizeAsync < T > ( this IObservable < T > source ) =>
917- Observable . Create < ( T Value , IDisposable Sync ) > ( observer =>
918- {
919- var gate = new object ( ) ;
920- return source . Synchronize ( gate ) . Subscribe ( item => new Continuation ( ) . Lock ( item , observer ) . Wait ( ) ) ;
921- } ) ;
916+ public static IObservable < ( T Value , IDisposable Sync ) > SynchronizeSynchronous < T > ( this IObservable < T > source ) =>
917+ Observable . Create < ( T Value , IDisposable Sync ) > ( observer => source . Subscribe ( item => new Continuation ( ) . Lock ( item , observer ) . Wait ( ) ) ) ;
922918
923919 /// <summary>
924920 /// Subscribes to the specified source synchronously.
@@ -930,7 +926,7 @@ public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource>
930926 /// <param name="onCompleted">The on completed.</param>
931927 /// <returns><see cref="IDisposable"/> object used to unsubscribe from the observable sequence.</returns>
932928 public static IDisposable SubscribeSynchronous < T > ( this IObservable < T > source , Func < T , Task > onNext , Action < Exception > onError , Action onCompleted ) =>
933- source . SynchronizeAsync ( ) . Subscribe (
929+ source . SynchronizeSynchronous ( ) . Subscribe (
934930 async observer =>
935931 {
936932 await onNext ( observer . Value ) ;
@@ -948,7 +944,7 @@ public static IDisposable SubscribeSynchronous<T>(this IObservable<T> source, Fu
948944 /// <param name="onError">Action to invoke upon exceptional termination of the observable sequence.</param>
949945 /// <returns><see cref="IDisposable"/> object used to unsubscribe from the observable sequence.</returns>
950946 public static IDisposable SubscribeSynchronous < T > ( this IObservable < T > source , Func < T , Task > onNext , Action < Exception > onError ) =>
951- source . SynchronizeAsync ( ) . Subscribe (
947+ source . SynchronizeSynchronous ( ) . Subscribe (
952948 async observer =>
953949 {
954950 await onNext ( observer . Value ) ;
@@ -966,7 +962,7 @@ public static IDisposable SubscribeSynchronous<T>(this IObservable<T> source, Fu
966962 /// <returns><see cref="IDisposable"/> object used to unsubscribe from the observable sequence.</returns>
967963 /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onCompleted"/> is <c>null</c>.</exception>
968964 public static IDisposable SubscribeSynchronous < T > ( this IObservable < T > source , Func < T , Task > onNext , Action onCompleted ) =>
969- source . SynchronizeAsync ( ) . Subscribe (
965+ source . SynchronizeSynchronous ( ) . Subscribe (
970966 async observer =>
971967 {
972968 await onNext ( observer . Value ) ;
@@ -982,13 +978,74 @@ public static IDisposable SubscribeSynchronous<T>(this IObservable<T> source, Fu
982978 /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
983979 /// <returns><see cref="IDisposable"/> object used to unsubscribe from the observable sequence.</returns>
984980 public static IDisposable SubscribeSynchronous < T > ( this IObservable < T > source , Func < T , Task > onNext ) =>
985- source . SynchronizeAsync ( ) . Subscribe (
981+ source . SynchronizeSynchronous ( ) . Subscribe (
986982 async observer =>
987983 {
988984 await onNext ( observer . Value ) ;
989985 observer . Sync . Dispose ( ) ;
990986 } ) ;
991987
988+ /// <summary>
989+ /// Synchronizes the asynchronous operations in downstream operations.
990+ /// Use SubscribeSynchronus instead for a simpler version.
991+ /// Call Sync.Dispose() to release the lock in the downstream methods.
992+ /// </summary>
993+ /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
994+ /// <param name="source">The source.</param>
995+ /// <returns>An Observable of T and a release mechanism.</returns>
996+ public static IObservable < ( T Value , IDisposable Sync ) > SynchronizeAsync < T > ( this IObservable < T > source ) =>
997+ Observable . Create < ( T Value , IDisposable Sync ) > ( observer => source . Select ( item => Observable . FromAsync ( ( ) => new Continuation ( ) . Lock ( item , observer ) ) ) . Concat ( ) . Subscribe ( ) ) ;
998+
999+ /// <summary>
1000+ /// Subscribes allowing asynchronous operations to be executed without blocking the source.
1001+ /// </summary>
1002+ /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
1003+ /// <param name="source">Observable sequence to subscribe to.</param>
1004+ /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
1005+ /// <returns><see cref="IDisposable"/> object used to unsubscribe from the observable sequence.</returns>
1006+ public static IDisposable SubscribeAsync < T > ( this IObservable < T > source , Func < T , Task > onNext ) =>
1007+ source . Select ( o => Observable . FromAsync ( ( ) => onNext ( o ) ) ) . Concat ( ) . Subscribe ( ) ;
1008+
1009+ /// <summary>
1010+ /// Subscribes allowing asynchronous operations to be executed without blocking the source.
1011+ /// </summary>
1012+ /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
1013+ /// <param name="source">Observable sequence to subscribe to.</param>
1014+ /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
1015+ /// <param name="onCompleted">The on completed.</param>
1016+ /// <returns>
1017+ /// <see cref="IDisposable" /> object used to unsubscribe from the observable sequence.
1018+ /// </returns>
1019+ public static IDisposable SubscribeAsync < T > ( this IObservable < T > source , Func < T , Task > onNext , Action onCompleted ) =>
1020+ source . Select ( o => Observable . FromAsync ( ( ) => onNext ( o ) ) ) . Concat ( ) . Subscribe ( _ => { } , onCompleted ) ;
1021+
1022+ /// <summary>
1023+ /// Subscribes allowing asynchronous operations to be executed without blocking the source.
1024+ /// </summary>
1025+ /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
1026+ /// <param name="source">Observable sequence to subscribe to.</param>
1027+ /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
1028+ /// <param name="onError">The on error.</param>
1029+ /// <returns>
1030+ /// <see cref="IDisposable" /> object used to unsubscribe from the observable sequence.
1031+ /// </returns>
1032+ public static IDisposable SubscribeAsync < T > ( this IObservable < T > source , Func < T , Task > onNext , Action < Exception > onError ) =>
1033+ source . Select ( o => Observable . FromAsync ( ( ) => onNext ( o ) ) ) . Concat ( ) . Subscribe ( _ => { } , onError ) ;
1034+
1035+ /// <summary>
1036+ /// Subscribes allowing asynchronous operations to be executed without blocking the source.
1037+ /// </summary>
1038+ /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
1039+ /// <param name="source">Observable sequence to subscribe to.</param>
1040+ /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
1041+ /// <param name="onError">The on error.</param>
1042+ /// <param name="onCompleted">The on completed.</param>
1043+ /// <returns>
1044+ /// <see cref="IDisposable" /> object used to unsubscribe from the observable sequence.
1045+ /// </returns>
1046+ public static IDisposable SubscribeAsync < T > ( this IObservable < T > source , Func < T , Task > onNext , Action < Exception > onError , Action onCompleted ) =>
1047+ source . Select ( o => Observable . FromAsync ( ( ) => onNext ( o ) ) ) . Concat ( ) . Subscribe ( _ => { } , onError , onCompleted ) ;
1048+
9921049 private static void FastForEach < T > ( IObserver < T > observer , IEnumerable < T > source )
9931050 {
9941051 if ( source is List < T > fullList )
0 commit comments