Skip to content

Commit ea78e75

Browse files
committed
Streams HeaderEnricher Doc Polishing
Clarify multi-threaded use and add example.
1 parent fef1a11 commit ea78e75

File tree

1 file changed

+26
-0
lines changed

1 file changed

+26
-0
lines changed

spring-kafka-docs/src/main/asciidoc/streams.adoc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,32 @@ To use the enricher within a stream:
259259

260260
The transformer does not change the `key` or `value`; it simply adds headers.
261261

262+
IMPORTANT: If your stream is multi-threaded, you need a new instance for each record.
263+
264+
====
265+
[source, java]
266+
----
267+
.transform(() -> new HeaderEnricher<..., ...>(expressionMap))
268+
----
269+
====
270+
271+
Here is a simple example, adding one literal header and one variable:
272+
273+
====
274+
[source, java]
275+
----
276+
Map<String, Expression> headers = new HashMap<>();
277+
headers.put("header1", new LiteralExpression("value1"));
278+
SpelExpressionParser parser = new SpelExpressionParser();
279+
headers.put("header2", parser.parseExpression("context.timestamp() + ' @' + context.offset()"));
280+
HeaderEnricher<String, String> enricher = new HeaderEnricher<>(headers);
281+
KStream<String, String> stream = builder.stream(INPUT);
282+
stream
283+
.transform(() -> enricher)
284+
.to(OUTPUT);
285+
----
286+
====
287+
262288
[[streams-messaging]]
263289
==== `MessagingTransformer`
264290

0 commit comments

Comments
 (0)