|
23 | 23 | package com.microsoft.azure.toolkit.lib.common.operation; |
24 | 24 |
|
25 | 25 | import com.microsoft.azure.toolkit.lib.common.handler.AzureExceptionHandler; |
| 26 | +import com.microsoft.azuretools.azurecommons.helpers.Nullable; |
26 | 27 | import rx.Completable; |
27 | 28 | import rx.Observable; |
28 | 29 | import rx.Single; |
| 30 | +import rx.functions.Action1; |
29 | 31 |
|
30 | 32 | import java.lang.Thread.UncaughtExceptionHandler; |
31 | 33 | import java.util.*; |
32 | 34 |
|
33 | 35 | public class AzureOperationsContext { |
34 | | - static final ThreadLocal<Deque<AzureOperationRef>> operations = ThreadLocal.withInitial(ArrayDeque::new); |
35 | | - static final UncaughtExceptionHandler rxExceptionHandler = (t, e) -> AzureExceptionHandler.onRxException(e); |
36 | | - static final UncaughtExceptionHandler exceptionHandler = (t, e) -> AzureExceptionHandler.onUncaughtException(e); |
| 36 | + private static final ThreadLocal<Deque<AzureOperationRef>> operations = ThreadLocal.withInitial(ArrayDeque::new); |
| 37 | + private static final UncaughtExceptionHandler rxExceptionHandler = (t, e) -> AzureExceptionHandler.onRxException(e); |
| 38 | + private static final UncaughtExceptionHandler exceptionHandler = (t, e) -> AzureExceptionHandler.onUncaughtException(e); |
37 | 39 |
|
38 | 40 | public static List<AzureOperationRef> getOperations() { |
39 | | - final ArrayList<AzureOperationRef> ops = new ArrayList<>(operations.get()); |
| 41 | + final ArrayList<AzureOperationRef> ops = new ArrayList<>(AzureOperationsContext.operations.get()); |
40 | 42 | Collections.reverse(ops); |
41 | 43 | return Collections.unmodifiableList(ops); |
42 | 44 | } |
43 | 45 |
|
| 46 | + static void init(Deque<AzureOperationRef> closure) { |
| 47 | + AzureOperationsContext.operations.set(closure); |
| 48 | + } |
| 49 | + |
44 | 50 | static void push(final AzureOperationRef operation) { |
45 | | - operations.get().push(operation); |
| 51 | + AzureOperationsContext.operations.get().push(operation); |
| 52 | + } |
| 53 | + |
| 54 | + static AzureOperationRef pop() { |
| 55 | + return AzureOperationsContext.operations.get().pop(); |
46 | 56 | } |
47 | 57 |
|
48 | | - static void pop() { |
49 | | - operations.get().pop(); |
| 58 | + static void clear() { |
| 59 | + AzureOperationsContext.operations.get().clear(); |
| 60 | + } |
| 61 | + |
| 62 | + static void dispose() { |
| 63 | + AzureOperationsContext.operations.remove(); |
50 | 64 | } |
51 | 65 |
|
52 | 66 | public static Runnable deriveClosure(final Runnable runnable) { |
53 | 67 | final Deque<AzureOperationRef> closure = new ArrayDeque<>(AzureOperationsContext.getOperations()); |
54 | | - return () -> { |
55 | | - operations.set(closure); |
56 | | - try { |
57 | | - runnable.run(); |
58 | | - } catch (final Throwable throwable) { |
59 | | - AzureExceptionHandler.onUncaughtException(throwable); |
60 | | - } |
61 | | - }; |
| 68 | + final long parentThread = Thread.currentThread().getId(); |
| 69 | + return () -> act((none) -> runnable.run(), closure, parentThread, null); |
62 | 70 | } |
63 | 71 |
|
64 | 72 | public static Single.OnSubscribe<?> deriveClosure(final Single.OnSubscribe<?> action) { |
65 | 73 | final Deque<AzureOperationRef> closure = new ArrayDeque<>(AzureOperationsContext.getOperations()); |
66 | | - return (o) -> { |
67 | | - Thread.currentThread().setUncaughtExceptionHandler(rxExceptionHandler); |
68 | | - operations.set(closure); |
69 | | - try { |
70 | | - action.call(o); |
71 | | - } catch (final Throwable throwable) { |
72 | | - AzureExceptionHandler.onRxException(throwable); |
73 | | - } |
74 | | - }; |
| 74 | + final long parentThread = Thread.currentThread().getId(); |
| 75 | + return (o) -> act(action, closure, parentThread, o); |
75 | 76 | } |
76 | 77 |
|
77 | 78 | public static Observable.OnSubscribe<?> deriveClosure(final Observable.OnSubscribe<?> action) { |
78 | 79 | final Deque<AzureOperationRef> closure = new ArrayDeque<>(AzureOperationsContext.getOperations()); |
79 | | - return (o) -> { |
80 | | - Thread.currentThread().setUncaughtExceptionHandler(rxExceptionHandler); |
81 | | - operations.set(closure); |
82 | | - try { |
83 | | - action.call(o); |
84 | | - } catch (final Throwable throwable) { |
85 | | - AzureExceptionHandler.onRxException(throwable); |
86 | | - } |
87 | | - }; |
| 80 | + final long parentThread = Thread.currentThread().getId(); |
| 81 | + return (o) -> act(action, closure, parentThread, o); |
88 | 82 | } |
89 | 83 |
|
90 | 84 | public static Completable.OnSubscribe deriveClosure(final Completable.OnSubscribe action) { |
91 | 85 | final Deque<AzureOperationRef> closure = new ArrayDeque<>(AzureOperationsContext.getOperations()); |
92 | | - return (o) -> { |
93 | | - Thread.currentThread().setUncaughtExceptionHandler(rxExceptionHandler); |
94 | | - operations.set(closure); |
95 | | - try { |
96 | | - action.call(o); |
97 | | - } catch (final Throwable throwable) { |
98 | | - AzureExceptionHandler.onRxException(throwable); |
99 | | - } |
100 | | - }; |
| 86 | + final long parentThread = Thread.currentThread().getId(); |
| 87 | + return (o) -> act(action, closure, parentThread, o); |
101 | 88 | } |
102 | 89 |
|
103 | | - @FunctionalInterface |
104 | | - public interface OperationProceedable { |
105 | | - Object proceed() throws Throwable; |
| 90 | + private static void act(final Action1 action, final Deque<AzureOperationRef> closure, final long parentThread, @Nullable final Object subscriber) { |
| 91 | + final long currentThread = Thread.currentThread().getId(); |
| 92 | + if (!Objects.equals(currentThread, parentThread)) { |
| 93 | + Thread.currentThread().setUncaughtExceptionHandler(rxExceptionHandler); |
| 94 | + AzureOperationsContext.init(closure); |
| 95 | + } |
| 96 | + try { |
| 97 | + action.call(subscriber); |
| 98 | + } catch (final Throwable throwable) { |
| 99 | + AzureExceptionHandler.onRxException(throwable); |
| 100 | + } finally { |
| 101 | + if (!Objects.equals(currentThread, parentThread)) { |
| 102 | + AzureOperationsContext.dispose(); |
| 103 | + } |
| 104 | + } |
106 | 105 | } |
107 | 106 | } |
0 commit comments