Skip to content

Commit 472f078

Browse files
committed
Promises.try* family of operations doesn't enlist source as dependent promise any longer - PromiseOperations performs this task
1 parent 35d6348 commit 472f078

File tree

2 files changed

+124
-128
lines changed

2 files changed

+124
-128
lines changed

src/main/java/net/tascalate/concurrent/PromiseOperations.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,22 +76,22 @@ public static <T, F extends Promise<T>> Function<F, F> peek(Consumer<? super F>
7676

7777
public static <T, R extends AutoCloseable> Function<Promise<R>, Promise<T>>
7878
tryApply(Function<? super R, ? extends T> fn) {
79-
return resourcePromise -> Promises.tryApply(resourcePromise, fn);
79+
return p -> unwrap(Promises.tryApply(p.dependent(PromiseOrigin.ALL), fn));
8080
}
8181

8282
public static <T, R extends AsyncCloseable> Function<Promise<R>, Promise<T>>
8383
tryApplyEx(Function<? super R, ? extends T> fn) {
84-
return resourcePromise -> Promises.tryApplyEx(resourcePromise, fn);
84+
return p -> unwrap(Promises.tryApplyEx(p.dependent(PromiseOrigin.ALL), fn));
8585
}
8686

8787
public static <T, R extends AutoCloseable> Function<Promise<R>, Promise<T>>
8888
tryCompose(Function<? super R, ? extends CompletionStage<T>> fn) {
89-
return resourcePromise -> Promises.tryCompose(resourcePromise, fn);
89+
return p -> unwrap(Promises.tryCompose(p.dependent(PromiseOrigin.ALL), fn));
9090
}
9191

9292
public static <T, R extends AsyncCloseable> Function<Promise<R>, Promise<T>>
9393
tryComposeEx(Function<? super R, ? extends CompletionStage<T>> fn) {
94-
return resourcePromise -> Promises.tryComposeEx(resourcePromise, fn);
94+
return p -> unwrap(Promises.tryComposeEx(p.dependent(PromiseOrigin.ALL), fn));
9595
}
9696

9797
public static <T, A, R> Function<Promise<Iterable<T>>, Promise<R>>
@@ -139,4 +139,8 @@ public static <T, F extends Promise<T>> Function<F, F> peek(Consumer<? super F>
139139
Promises.partitioned(values, batchSize, spawner, downstream, downstreamExecutor), true)
140140
.unwrap();
141141
}
142+
143+
private static <T> Promise<T> unwrap(Promise<T> p) {
144+
return p.unwrap();
145+
}
142146
}

src/main/java/net/tascalate/concurrent/Promises.java

Lines changed: 116 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -127,153 +127,145 @@ public static <T> Promise<T> loop(T initialValue,
127127
return asyncLoop;
128128
}
129129

130-
public static <T, R extends AutoCloseable> Promise<T> tryApply(CompletionStage<R> resourcePromise,
130+
public static <T, R extends AutoCloseable> Promise<T> tryApply(CompletionStage<R> stage,
131131
Function<? super R, ? extends T> fn) {
132-
return tryApply(from(resourcePromise), fn);
132+
return tryApply(from(stage), fn);
133133
}
134134

135-
public static <T, R extends AutoCloseable> Promise<T> tryApply(Promise<R> resourcePromise,
135+
public static <T, R extends AutoCloseable> Promise<T> tryApply(Promise<R> p,
136136
Function<? super R, ? extends T> fn) {
137-
return
138-
resourcePromise.dependent()
139-
.thenApply(r -> {
140-
try (R resource = r) {
141-
return (T)fn.apply(resource);
142-
} catch (RuntimeException | Error rte) {
143-
throw rte;
144-
} catch (Throwable ex) {
145-
throw new CompletionException(ex);
146-
}
147-
}, true)
148-
.unwrap();
137+
return p.thenApply(r -> {
138+
try (R resource = r) {
139+
return (T)fn.apply(resource);
140+
} catch (RuntimeException | Error rte) {
141+
throw rte;
142+
} catch (Throwable ex) {
143+
throw new CompletionException(ex);
144+
}
145+
});
149146
}
150147

151-
public static <T, R extends AsyncCloseable> Promise<T> tryApplyEx(CompletionStage<R> resourcePromise,
148+
public static <T, R extends AsyncCloseable> Promise<T> tryApplyEx(CompletionStage<R> stage,
152149
Function<? super R, ? extends T> fn) {
153-
return tryApplyEx(from(resourcePromise), fn);
150+
return tryApplyEx(from(stage), fn);
154151
}
155152

156-
public static <T, R extends AsyncCloseable> Promise<T> tryApplyEx(Promise<R> resourcePromise,
153+
public static <T, R extends AsyncCloseable> Promise<T> tryApplyEx(Promise<R> p,
157154
Function<? super R, ? extends T> fn) {
158-
return
159-
resourcePromise.dependent()
160-
.thenCompose(resource -> {
161-
T result;
162-
try {
163-
result = fn.apply(resource);
164-
} catch (Throwable actionException) {
165-
try {
166-
// Use dependent here?
167-
return resource.close().thenCompose(__ -> failure(actionException));
168-
} catch (Throwable onClose) {
169-
actionException.addSuppressed(onClose);
170-
return failure(onClose);
171-
}
172-
}
155+
return p.thenCompose(resource -> {
156+
T result;
157+
try {
158+
result = fn.apply(resource);
159+
} catch (Throwable actionException) {
160+
try {
161+
// Use dependent here?
162+
return resource.close().thenCompose(__ -> failure(actionException));
163+
} catch (Throwable onClose) {
164+
actionException.addSuppressed(onClose);
165+
return failure(onClose);
166+
}
167+
}
173168

174-
try {
175-
// Use dependent here?
176-
return resource.close().thenApply(__ -> result);
177-
} catch (Throwable onClose) {
178-
return failure(onClose);
179-
}
180-
181-
}, true)
182-
.unwrap();
169+
try {
170+
// Use dependent here?
171+
return resource.close().thenApply(__ -> result);
172+
} catch (Throwable onClose) {
173+
return failure(onClose);
174+
}
175+
});
183176
}
184177

185-
public static <T, R extends AutoCloseable> Promise<T> tryCompose(CompletionStage<R> resourcePromise,
178+
public static <T, R extends AutoCloseable> Promise<T> tryCompose(CompletionStage<R> stage,
186179
Function<? super R, ? extends CompletionStage<T>> fn) {
187-
return tryCompose(from(resourcePromise), fn);
180+
return tryCompose(from(stage), fn);
188181
}
189182

190-
public static <T, R extends AutoCloseable> Promise<T> tryCompose(Promise<R> resourcePromise,
183+
public static <T, R extends AutoCloseable> Promise<T> tryCompose(Promise<R> p,
191184
Function<? super R, ? extends CompletionStage<T>> fn) {
192-
return
193-
resourcePromise.dependent()
194-
.thenCompose(resource -> {
195-
CompletionStage<T> action;
196-
try {
197-
action = fn.apply(resource);
198-
} catch (Throwable composeException) {
199-
try {
200-
resource.close();
201-
} catch (Exception onClose) {
202-
composeException.addSuppressed(onClose);
203-
}
204-
return failure(composeException);
205-
}
185+
return p.thenCompose(resource -> {
186+
CompletionStage<T> action;
187+
try {
188+
action = fn.apply(resource);
189+
} catch (Throwable composeException) {
190+
try {
191+
resource.close();
192+
} catch (Exception onClose) {
193+
composeException.addSuppressed(onClose);
194+
}
195+
return failure(composeException);
196+
}
206197

207-
CompletableFutureWrapper<T> result = new CompletableFutureWrapper<>();
208-
action.whenComplete((actionResult, actionException) -> {
209-
try {
210-
resource.close();
211-
} catch (Throwable onClose) {
212-
if (null != actionException) {
213-
actionException.addSuppressed(onClose);
214-
result.failure(actionException);
215-
} else {
216-
result.failure(onClose);
217-
}
218-
// DONE WITH ERROR ON CLOSE
219-
return;
220-
}
221-
// CLOSE OK
222-
result.complete(actionResult, actionException);
223-
});
224-
return result.onCancel(() -> cancelPromise(action, true));
225-
}, true)
226-
.unwrap();
198+
CompletableFutureWrapper<T> result = new CompletableFutureWrapper<>();
199+
action.whenComplete((actionResult, actionException) -> {
200+
try {
201+
resource.close();
202+
} catch (Throwable onClose) {
203+
if (null != actionException) {
204+
actionException.addSuppressed(onClose);
205+
result.failure(actionException);
206+
} else {
207+
result.failure(onClose);
208+
}
209+
// DONE WITH ERROR ON CLOSE
210+
return;
211+
}
212+
// CLOSE OK
213+
result.complete(actionResult, actionException);
214+
});
215+
return result.onCancel(() -> cancelPromise(action, true));
216+
});
227217
}
228218

229-
public static <T, R extends AsyncCloseable> Promise<T> tryComposeEx(Promise<R> resourcePromise,
219+
public static <T, R extends AsyncCloseable> Promise<T> tryComposeEx(CompletionStage<R> stage,
230220
Function<? super R, ? extends CompletionStage<T>> fn) {
231-
return
232-
resourcePromise.dependent()
233-
.thenCompose(resource -> {
234-
CompletionStage<T> action;
235-
try {
236-
action = fn.apply(resource);
237-
} catch (Throwable composeException) {
238-
try {
239-
// Use dependent here?
240-
return resource.close().thenCompose(__ -> failure(composeException));
241-
} catch (Throwable onClose) {
242-
composeException.addSuppressed(onClose);
243-
return failure(onClose);
244-
}
245-
}
221+
return tryComposeEx(from(stage), fn);
222+
}
223+
224+
public static <T, R extends AsyncCloseable> Promise<T> tryComposeEx(Promise<R> p,
225+
Function<? super R, ? extends CompletionStage<T>> fn) {
226+
return p.thenCompose(resource -> {
227+
CompletionStage<T> action;
228+
try {
229+
action = fn.apply(resource);
230+
} catch (Throwable composeException) {
231+
try {
232+
// Use dependent here?
233+
return resource.close().thenCompose(__ -> failure(composeException));
234+
} catch (Throwable onClose) {
235+
composeException.addSuppressed(onClose);
236+
return failure(onClose);
237+
}
238+
}
246239

247-
CompletableFutureWrapper<T> result = new CompletableFutureWrapper<>();
248-
action.whenComplete((actionResult, actionException) -> {
249-
CompletionStage<?> afterClose;
250-
try {
251-
afterClose = resource.close();
252-
} catch (Throwable onClose) {
253-
if (null != actionException) {
254-
actionException.addSuppressed(onClose);
255-
result.failure(actionException);
256-
} else {
257-
result.failure(onClose);
258-
}
259-
// DONE WITH ERROR ON ASYNC CLOSE
260-
return;
240+
CompletableFutureWrapper<T> result = new CompletableFutureWrapper<>();
241+
action.whenComplete((actionResult, actionException) -> {
242+
CompletionStage<?> afterClose;
243+
try {
244+
afterClose = resource.close();
245+
} catch (Throwable onClose) {
246+
if (null != actionException) {
247+
actionException.addSuppressed(onClose);
248+
result.failure(actionException);
249+
} else {
250+
result.failure(onClose);
251+
}
252+
// DONE WITH ERROR ON ASYNC CLOSE
253+
return;
254+
}
255+
// ASYNC CLOSE INVOKE OK
256+
afterClose.whenComplete((__, onClose) -> {
257+
if (null != actionException) {
258+
if (null != onClose) {
259+
actionException.addSuppressed(onClose);
261260
}
262-
// ASYNC CLOSE INVOKE OK
263-
afterClose.whenComplete((__, onClose) -> {
264-
if (null != actionException) {
265-
if (null != onClose) {
266-
actionException.addSuppressed(onClose);
267-
}
268-
result.failure(actionException);
269-
} else {
270-
result.complete(actionResult, onClose);
271-
}
272-
});
273-
});
274-
return result.onCancel(() -> cancelPromise(action, true));
275-
}, true)
276-
.unwrap();
261+
result.failure(actionException);
262+
} else {
263+
result.complete(actionResult, onClose);
264+
}
265+
});
266+
});
267+
return result.onCancel(() -> cancelPromise(action, true));
268+
});
277269
}
278270

279271
public static <T, A, R> Promise<R> partitioned(Iterable<? extends T> values,

0 commit comments

Comments
 (0)