Skip to content

Commit 7e32784

Browse files
authored
Expand config with executor decorators (#1213)
related: - #1186
1 parent d07f8ea commit 7e32784

File tree

7 files changed

+536
-2
lines changed

7 files changed

+536
-2
lines changed

README.md

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ Additionally, you can customize:
106106
- grouping by key via `parallelBy(...)` / `parallelToStreamBy(...)` methods
107107
- ordered streaming via the `ordered()` configurer option (streaming collectors only)
108108
- a custom downstream `Collector` (`ParallelCollectors.parallel` only)
109+
- executor decoration via `executorDecorator()` to wrap the resolved executor
110+
- task decoration via `taskDecorator()` to wrap each individual task
109111

110112
All configuration is done via the `CollectingConfigurer` (for `parallel`/`parallelBy`) or `StreamingConfigurer` (for `parallelToStream`/`parallelToStreamBy`) passed as a `Consumer`:
111113

@@ -170,6 +172,40 @@ The `parallelBy(...)` and `parallelToStreamBy(...)` methods allow you to classif
170172

171173
The `Group<K, V>` record provides `key()` and `values()` accessors, plus a `map()` method for transforming values while preserving the grouping key.
172174

175+
#### Decorators
176+
177+
Two decorator options let you add cross-cutting behavior without replacing the executor:
178+
179+
**`executorDecorator(UnaryOperator<Executor>)`** wraps the resolved executor (the virtual-thread default or a custom one) and returns a replacement. It is invoked once per collector, before any tasks are submitted. This is a natural fit for intercepting every `execute()` call, for example to plug in a monitoring layer.
180+
181+
The returned executor must not drop or discard tasks — doing so will cause the collector to wait indefinitely for results that will never arrive.
182+
183+
list.stream()
184+
.collect(parallel(i -> foo(i), c -> c
185+
.executorDecorator(exec -> task -> {
186+
metrics.incrementAndGet();
187+
exec.execute(task);
188+
}),
189+
toList()));
190+
191+
**`taskDecorator(UnaryOperator<Runnable>)`** wraps each individual task before it is handed to the executor. Unlike the executor decorator, it runs on the worker thread and is re-applied for every element. This makes it the right tool for propagating thread-local context (MDC, OpenTelemetry spans, `SecurityContext`) into worker threads:
192+
193+
var snapshot = MDC.getCopyOfContextMap();
194+
195+
list.stream()
196+
.collect(parallel(i -> foo(i), c -> c
197+
.taskDecorator(task -> () -> {
198+
MDC.setContextMap(snapshot);
199+
try {
200+
task.run();
201+
} finally {
202+
MDC.clear();
203+
}
204+
}),
205+
toList()));
206+
207+
Both decorators can be combined and each may be specified at most once per configurer.
208+
173209
### Leveraging CompletableFuture
174210

175211
Parallel Collectors expose results wrapped in `CompletableFuture` instances, which provides great flexibility and the possibility of working with them in a non-blocking fashion:
@@ -243,6 +279,34 @@ What's more, since JDK9, [you can even provide your own timeout easily](https://
243279
.batching(),
244280
toList()));
245281

282+
##### 8. Propagate MDC context into worker threads via `taskDecorator`
283+
284+
var snapshot = MDC.getCopyOfContextMap();
285+
286+
CompletableFuture<List<String>> result = list.stream()
287+
.collect(parallel(i -> foo(i), c -> c
288+
.taskDecorator(task -> () -> {
289+
MDC.setContextMap(snapshot);
290+
try {
291+
task.run();
292+
} finally {
293+
MDC.clear();
294+
}
295+
}),
296+
toList()));
297+
298+
##### 9. Instrument every task submission via `executorDecorator`
299+
300+
var submitted = new AtomicInteger();
301+
302+
CompletableFuture<List<String>> result = list.stream()
303+
.collect(parallel(i -> foo(i), c -> c
304+
.executorDecorator(exec -> task -> {
305+
submitted.incrementAndGet();
306+
exec.execute(task);
307+
}),
308+
toList()));
309+
246310
## Rationale
247311

248312
Stream API is a great tool for collection processing, especially if you need to parallelize the execution of CPU-intensive tasks, for example:

src/main/java/com/pivovarit/collectors/CollectingConfigurer.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
import java.util.Collections;
2020
import java.util.HashSet;
2121
import java.util.List;
22+
import java.util.Objects;
2223
import java.util.Set;
2324
import java.util.concurrent.Executor;
25+
import java.util.function.UnaryOperator;
2426

2527
/**
2628
* Fluent configuration builder for collectors that <em>collect</em> all results (i.e. non-streaming).
@@ -88,6 +90,51 @@ public CollectingConfigurer executor(Executor executor) {
8890
return this;
8991
}
9092

93+
/**
94+
* Decorates the executor used for running tasks.
95+
* <p>
96+
* The decorator receives the resolved executor (either the default virtual-thread executor or
97+
* the one provided via {@link #executor(Executor)}) and returns a wrapped replacement.
98+
* This is useful for augmenting the executor with cross-cutting concerns such as context
99+
* propagation (MDC, OpenTelemetry spans, etc.) or monitoring, without replacing the executor entirely.
100+
*
101+
* <p><b>Note:</b> The executor returned by the decorator must not <em>drop</em> tasks on rejection.
102+
* Dropping tasks will cause the collector to wait for results that will never be produced,
103+
* which can lead to deadlocks.
104+
*
105+
* @param decorator a function that wraps the resolved executor
106+
*
107+
* @return this configurer instance for fluent chaining
108+
*/
109+
public CollectingConfigurer executorDecorator(UnaryOperator<Executor> decorator) {
110+
Objects.requireNonNull(decorator, "executor decorator can't be null");
111+
112+
addOnce(new ConfigProcessor.Option.ExecutorDecorator(decorator));
113+
return this;
114+
}
115+
116+
/**
117+
* Decorates each individual task before it is submitted to the executor.
118+
* <p>
119+
* The decorator receives the {@link Runnable} representing a single unit of work and returns a
120+
* wrapped replacement that runs in its place. This is useful for propagating thread-local context
121+
* (e.g. MDC entries, OpenTelemetry spans, {@code SecurityContext}) into worker threads, or for
122+
* per-task instrumentation, without replacing the executor entirely.
123+
*
124+
* <p>Unlike {@link #executorDecorator(UnaryOperator)}, which wraps the executor as a whole,
125+
* this decorator is applied to each task individually and runs on the worker thread.
126+
*
127+
* @param decorator a function that wraps each submitted task
128+
*
129+
* @return this configurer instance for fluent chaining
130+
*/
131+
public CollectingConfigurer taskDecorator(UnaryOperator<Runnable> decorator) {
132+
Objects.requireNonNull(decorator, "task decorator can't be null");
133+
134+
addOnce(new ConfigProcessor.Option.TaskDecorator(decorator));
135+
return this;
136+
}
137+
91138
List<ConfigProcessor.Option> getConfig() {
92139
return Collections.unmodifiableList(modifiers);
93140
}

src/main/java/com/pivovarit/collectors/ConfigProcessor.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.concurrent.Executor;
2121
import java.util.concurrent.ExecutorService;
2222
import java.util.concurrent.Executors;
23+
import java.util.function.UnaryOperator;
2324

2425
import static java.util.Objects.requireNonNull;
2526

@@ -29,7 +30,7 @@ final class ConfigProcessor {
2930
.name("parallel-collectors-", 0)
3031
.factory());
3132

32-
record Config(boolean ordered, boolean batching, int parallelism, Executor executor) {
33+
record Config(boolean ordered, boolean batching, int parallelism, Executor executor, UnaryOperator<Runnable> taskDecorator) {
3334
Config {
3435
Objects.requireNonNull(executor, "executor can't be null");
3536
}
@@ -42,21 +43,37 @@ static Config process(List<Option> options) {
4243
Boolean ordered = null;
4344
Integer parallelism = null;
4445
Executor executor = null;
46+
UnaryOperator<Executor> decorator = null;
47+
UnaryOperator<Runnable> taskDecorator = null;
4548

4649
for (var option : options) {
4750
switch (option) {
4851
case Option.Batched ignored -> batching = true;
4952
case Option.Parallelism(var p) -> parallelism = p;
5053
case Option.ThreadPool(var e) -> executor = e;
5154
case Option.Ordered ignored -> ordered = true;
55+
case Option.ExecutorDecorator(var d) -> decorator = d;
56+
case Option.TaskDecorator(var d) -> taskDecorator = d;
5257
}
5358
}
5459

60+
var resolvedExecutor = Objects.requireNonNullElse(executor, DEFAULT_EXECUTOR);
61+
if (decorator != null) {
62+
resolvedExecutor = decorator.apply(resolvedExecutor);
63+
Preconditions.requireValidExecutor(resolvedExecutor);
64+
}
65+
if (taskDecorator != null) {
66+
var td = taskDecorator;
67+
var delegate = resolvedExecutor;
68+
resolvedExecutor = r -> delegate.execute(td.apply(r));
69+
}
70+
5571
return new Config(
5672
Objects.requireNonNullElse(ordered, false),
5773
Objects.requireNonNullElse(batching, false),
5874
Objects.requireNonNullElse(parallelism, 0),
59-
Objects.requireNonNullElse(executor, DEFAULT_EXECUTOR));
75+
resolvedExecutor,
76+
taskDecorator);
6077
}
6178

6279
static String toHumanReadableString(Option option) {
@@ -65,6 +82,8 @@ static String toHumanReadableString(Option option) {
6582
case Option.Parallelism ignored -> "parallelism";
6683
case Option.ThreadPool ignored -> "executor";
6784
case Option.Ordered ignored -> "ordered";
85+
case Option.ExecutorDecorator ignored -> "executor decorator";
86+
case Option.TaskDecorator ignored -> "task decorator";
6887
};
6988
}
7089

@@ -89,5 +108,17 @@ record Parallelism(int parallelism) implements Option {
89108
Preconditions.requireValidParallelism(parallelism);
90109
}
91110
}
111+
112+
record ExecutorDecorator(UnaryOperator<Executor> decorator) implements Option {
113+
public ExecutorDecorator {
114+
Objects.requireNonNull(decorator, "decorator can't be null");
115+
}
116+
}
117+
118+
record TaskDecorator(UnaryOperator<Runnable> decorator) implements Option {
119+
public TaskDecorator {
120+
Objects.requireNonNull(decorator, "decorator can't be null");
121+
}
122+
}
92123
}
93124
}

src/main/java/com/pivovarit/collectors/StreamingConfigurer.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
import java.util.Collections;
2020
import java.util.HashSet;
2121
import java.util.List;
22+
import java.util.Objects;
2223
import java.util.Set;
2324
import java.util.concurrent.Executor;
25+
import java.util.function.UnaryOperator;
2426

2527
/**
2628
* Fluent configuration builder for collectors that <em>stream</em> results.
@@ -103,6 +105,51 @@ public StreamingConfigurer executor(Executor executor) {
103105
return this;
104106
}
105107

108+
/**
109+
* Decorates the executor used for running tasks.
110+
* <p>
111+
* The decorator receives the resolved executor (either the default virtual-thread executor or
112+
* the one provided via {@link #executor(Executor)}) and returns a wrapped replacement.
113+
* This is useful for augmenting the executor with cross-cutting concerns such as context
114+
* propagation (MDC, OpenTelemetry spans, etc.) or monitoring, without replacing the executor entirely.
115+
*
116+
* <p><b>Note:</b> The executor returned by the decorator must not <em>drop</em> tasks on rejection.
117+
* Dropping tasks will cause the stream to wait for results that will never be produced,
118+
* which can lead to deadlocks.
119+
*
120+
* @param decorator a function that wraps the resolved executor
121+
*
122+
* @return this configurer instance for fluent chaining
123+
*/
124+
public StreamingConfigurer executorDecorator(UnaryOperator<Executor> decorator) {
125+
Objects.requireNonNull(decorator, "executor decorator can't be null");
126+
127+
addOnce(new ConfigProcessor.Option.ExecutorDecorator(decorator));
128+
return this;
129+
}
130+
131+
/**
132+
* Decorates each individual task before it is submitted to the executor.
133+
* <p>
134+
* The decorator receives the {@link Runnable} representing a single unit of work and returns a
135+
* wrapped replacement that runs in its place. This is useful for propagating thread-local context
136+
* (e.g. MDC entries, OpenTelemetry spans, {@code SecurityContext}) into worker threads, or for
137+
* per-task instrumentation, without replacing the executor entirely.
138+
*
139+
* <p>Unlike {@link #executorDecorator(UnaryOperator)}, which wraps the executor as a whole,
140+
* this decorator is applied to each task individually and runs on the worker thread.
141+
*
142+
* @param decorator a function that wraps each submitted task
143+
*
144+
* @return this configurer instance for fluent chaining
145+
*/
146+
public StreamingConfigurer taskDecorator(UnaryOperator<Runnable> decorator) {
147+
Objects.requireNonNull(decorator, "task decorator can't be null");
148+
149+
addOnce(new ConfigProcessor.Option.TaskDecorator(decorator));
150+
return this;
151+
}
152+
106153
List<ConfigProcessor.Option> getConfig() {
107154
return Collections.unmodifiableList(modifiers);
108155
}

src/test/java/com/pivovarit/collectors/OptionTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,34 @@ void shouldThrowOnDuplicateOrdered() {
8686
.hasMessageContaining("'ordered' can only be configured once");
8787
}
8888

89+
@Test
90+
void shouldThrowOnNullExecutorDecorator() {
91+
assertThatThrownBy(() -> new CollectingConfigurer().executorDecorator(null))
92+
.isInstanceOf(NullPointerException.class);
93+
}
94+
95+
@Test
96+
void shouldThrowOnNullExecutorDecoratorStreaming() {
97+
assertThatThrownBy(() -> new StreamingConfigurer().executorDecorator(null))
98+
.isInstanceOf(NullPointerException.class);
99+
}
100+
101+
@Test
102+
void shouldThrowOnDuplicateExecutorDecorator() {
103+
var configurer = new CollectingConfigurer();
104+
configurer.executorDecorator(e -> e);
105+
assertThatThrownBy(() -> configurer.executorDecorator(e -> e))
106+
.isInstanceOf(IllegalArgumentException.class)
107+
.hasMessageContaining("'executor decorator' can only be configured once");
108+
}
109+
110+
@Test
111+
void shouldThrowOnDuplicateExecutorDecoratorStreaming() {
112+
var configurer = new StreamingConfigurer();
113+
configurer.executorDecorator(e -> e);
114+
assertThatThrownBy(() -> configurer.executorDecorator(e -> e))
115+
.isInstanceOf(IllegalArgumentException.class)
116+
.hasMessageContaining("'executor decorator' can only be configured once");
117+
}
118+
89119
}

0 commit comments

Comments
 (0)