File tree Expand file tree Collapse file tree 1 file changed +3
-6
lines changed
Utils/azuretools-core/src/com/microsoft/azure/toolkit/lib/common/task Expand file tree Collapse file tree 1 file changed +3
-6
lines changed Original file line number Diff line number Diff line change 2525import lombok .extern .java .Log ;
2626import rx .Emitter ;
2727import rx .Observable ;
28- import rx .observables .ConnectableObservable ;
2928
3029import java .util .function .BiConsumer ;
3130import java .util .function .Supplier ;
@@ -253,8 +252,8 @@ public final <T> Observable<T> runInModalAsObservable(AzureTask<T> task) {
253252 return this .runInObservable (this ::doRunInModal , task );
254253 }
255254
256- private <T > ConnectableObservable <T > runInObservable (final BiConsumer <? super Runnable , ? super AzureTask <T >> consumer , final AzureTask <T > task ) {
257- final ConnectableObservable < T > observable = Observable .create ((Emitter <T > emitter ) -> {
255+ private <T > Observable <T > runInObservable (final BiConsumer <? super Runnable , ? super AzureTask <T >> consumer , final AzureTask <T > task ) {
256+ return Observable .create ((Emitter <T > emitter ) -> {
258257 final AzureTaskContext .Node context = AzureTaskContext .current ().derive ();
259258 task .setContext (context );
260259 context .setTask (task );
@@ -268,9 +267,7 @@ private <T> ConnectableObservable<T> runInObservable(final BiConsumer<? super Ru
268267 }
269268 }, context );
270269 consumer .accept (t , task );
271- }, Emitter .BackpressureMode .BUFFER ).publish ();
272- observable .connect ();
273- return observable ;
270+ }, Emitter .BackpressureMode .BUFFER );
274271 }
275272
276273 protected abstract void doRead (Runnable runnable , AzureTask <?> task );
You can’t perform that action at this time.
0 commit comments