Skip to content

Commit ff2ca27

Browse files
committed
feat(impl): Support interception of output record before serialization
Fixes #147. Introduction of a new `OutputRecordInterceptor` interface to be implemented to be able to intercept outgoing messages before their payload is serialized. Its aim is to replace the `ProducerOnSendInterceptor` that at the difference with `OutputRecordInterceptor` is called after the serialization of the payload. Henceforth `ProducerOnSendInterceptor` is deprecated. Version is bumped to 4.1 to reflect the new feature. It is backwards-compatible. The TracingProducerInterceptor has been rewritten as a TracingOutputRecordInterceptor, but kept for backwards-compatibility. It is itself marked for removal as well. Documentation updated accordingly.
1 parent e6896ca commit ff2ca27

File tree

33 files changed

+784
-31
lines changed

33 files changed

+784
-31
lines changed

.github/project.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
release:
22
current-version: "4.0.3"
3-
next-version: "4.0.0-SNAPSHOT"
3+
next-version: "4.1.0-SNAPSHOT"

api/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<parent>
55
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
66
<artifactId>quarkus-kafka-streams-processor-parent</artifactId>
7-
<version>4.0.0-SNAPSHOT</version>
7+
<version>4.1.0-SNAPSHOT</version>
88
</parent>
99
<artifactId>quarkus-kafka-streams-processor-api</artifactId>
1010
<name>quarkus-kafka-streams-processor-api</name>
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package io.quarkiverse.kafkastreamsprocessor.api.decorator.outputrecord;
2+
3+
import org.apache.kafka.streams.processor.api.Record;
4+
5+
/**
6+
* Interceptor that if implemented is called whenever the processor calls any of
7+
* {@link org.apache.kafka.streams.processor.api.ProcessorContext}'s forward methods.
8+
* <p>
9+
* Order of execution is guaranteed based on the integer priority returned by {@link #priority()}.
10+
* </p>
11+
* <p>
12+
* It differs from {@link io.quarkiverse.kafkastreamsprocessor.api.decorator.producer.ProducerOnSendInterceptor} in that
13+
* it is executed before any serialization of the payload to byte[] happens.
14+
* </p>
15+
*/
16+
public interface OutputRecordInterceptor {
17+
/**
18+
* By default, if not overriden, the interceptor has the following priority.
19+
*/
20+
int DEFAULT_PRIORITY = 100;
21+
22+
/**
23+
* Override this method to finely tune the order of execution of any interceptor you implement.
24+
*
25+
* @return the custom priority you want to assign. A number between 0 and {@link Integer#MAX_VALUE}.
26+
*/
27+
default int priority() {
28+
return DEFAULT_PRIORITY;
29+
}
30+
31+
/**
32+
* Intercept the record before it is eventually given to
33+
* {@link org.apache.kafka.streams.processor.api.ProcessorContext#forward(Record, String)}.
34+
*
35+
* @param record the record has the processor requested it to be forwarded
36+
* @return the new record with any modifications this interceptor wants to apply before serialization.
37+
*/
38+
Record interceptOutputRecord(Record record);
39+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*-
2+
* #%L
3+
* Quarkus Kafka Streams Processor
4+
* %%
5+
* Copyright (C) 2024 Amadeus s.a.s.
6+
* %%
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
* #L%
19+
*/
20+
package io.quarkiverse.kafkastreamsprocessor.api.decorator.outputrecord;
21+
22+
/**
23+
* Priorities of the output record interceptors the framework provides.
24+
*/
25+
public final class OutputRecordInterceptorPriorities {
26+
/**
27+
* Priority of the interceptor that will inject the tracing headers for propagation.
28+
*/
29+
public static final int TRACING = 100;
30+
31+
private OutputRecordInterceptorPriorities() {
32+
33+
}
34+
}

api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/decorator/processor/AbstractProcessorDecorator.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,7 @@
3939
* Class introduced in 2.0, for compatibility with Quarkus 3.8 random failure to start when using custom processor
4040
* decorators.
4141
* </p>
42-
*
43-
* @deprecated It will be removed in 3.0, with the integration of Quarkus 3.15 where we will be able to go back to pure
44-
* CDI decorators.
4542
*/
46-
@Deprecated(forRemoval = true, since = "2.0")
4743
public abstract class AbstractProcessorDecorator implements Processor {
4844
/**
4945
* The decorated processor, holding either the next decorator layer or the final processor.

api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/decorator/processor/ProcessorDecoratorPriorities.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,25 +58,36 @@ public final class ProcessorDecoratorPriorities {
5858
* {@link ContextualProcessor#process(Record)} method.
5959
*/
6060
public static final int TRACING = 100;
61+
6162
/**
6263
* Priority of the decorator in charge or initializing a "request context" for the duration of the processing of the
6364
* ContextualProcessor#process(Record)} method. It is closed afterward.
6465
*/
6566
public static final int CDI_REQUEST_SCOPE = 200;
67+
68+
/**
69+
* Priority for the decorator that wraps the {@link org.apache.kafka.streams.processor.api.ProcessorContext} to
70+
* intercept calls to its <code>forward</code> methods.
71+
*/
72+
public static final int CONTEXT_FORWARD = 250;
73+
6674
/**
6775
* Priority of the decorator that will handle exception and potentially redirect the message in a dead letter queue
6876
* topic, if configured.
6977
*/
7078
public static final int DLQ = 300;
79+
7180
/**
7281
* Priority of the decorator in charge of measuring the processing time and the number of exceptions thrown.
7382
*/
7483
public static final int METRICS = 400;
84+
7585
/**
7686
* Priority of the decorator in charge of injecting all {@link DecoratedPunctuator} configured by the framework and
7787
* your custom potential additions.
7888
*/
7989
public static final int PUNCTUATOR_DECORATION = 500;
90+
8091
/**
8192
* Priority of the decorator in charge of implementing a form of fault tolerance by means of calling again the
8293
* {@link ContextualProcessor#process(Record)} method.

api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/decorator/producer/ProducerInterceptorPriorities.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@
2121

2222
/**
2323
* Priorities of the producer interceptors the framework provides.
24+
*
25+
* @deprecated Change any producer interceptor into a
26+
* {@link io.quarkiverse.kafkastreamsprocessor.api.decorator.outputrecord.OutputRecordInterceptor}.
2427
*/
28+
@Deprecated(forRemoval = true, since = "4.1")
2529
public final class ProducerInterceptorPriorities {
2630
/**
2731
* Priority of the interceptor that will inject the tracing headers for propagation.

api/src/main/java/io/quarkiverse/kafkastreamsprocessor/api/decorator/producer/ProducerOnSendInterceptor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,11 @@
2626

2727
/**
2828
* Interface to extend to by able to decorate the production of the response message to the outgoing topic.
29+
*
30+
* @deprecated Change any producer interceptor into a
31+
* {@link io.quarkiverse.kafkastreamsprocessor.api.decorator.outputrecord.OutputRecordInterceptor}.
2932
*/
33+
@Deprecated(forRemoval = true, since = "4.1")
3034
public interface ProducerOnSendInterceptor extends ProducerInterceptor<byte[], byte[]> {
3135
/**
3236
* By default, if not overriden, the interceptor has the following priority.

bom/application/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@
44
<parent>
55
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
66
<artifactId>quarkus-kafka-streams-processor-bom-parent</artifactId>
7-
<version>4.0.0-SNAPSHOT</version>
7+
<version>4.1.0-SNAPSHOT</version>
88
</parent>
99
<artifactId>quarkus-kafka-streams-processor-bom</artifactId>
1010
<name>quarkus-kafka-streams-processor-bom</name>
1111
<packaging>pom</packaging>
1212
<properties>
13-
<quarkus-kafka-streams-processor.version>4.0.0-SNAPSHOT</quarkus-kafka-streams-processor.version>
13+
<quarkus-kafka-streams-processor.version>4.1.0-SNAPSHOT</quarkus-kafka-streams-processor.version>
1414
</properties>
1515
<dependencyManagement>
1616
<dependencies>

bom/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<parent>
55
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
66
<artifactId>quarkus-kafka-streams-processor-parent</artifactId>
7-
<version>4.0.0-SNAPSHOT</version>
7+
<version>4.1.0-SNAPSHOT</version>
88
</parent>
99
<artifactId>quarkus-kafka-streams-processor-bom-parent</artifactId>
1010
<name>quarkus-kafka-streams-processor-bom-parent</name>

0 commit comments

Comments
 (0)