Skip to content

Commit 66299e8

Browse files
authored
Merge branch 'main' into sslTimeouts
2 parents 0fe381e + ae93811 commit 66299e8

File tree

16 files changed

+49
-325
lines changed

16 files changed

+49
-325
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ Format `<github issue/pr number>: <short description>`.
77

88
## SNAPSHOT
99

10+
* [#1657](https://github.com/kroxylicious/kroxylicious/pull/1657) Remove forwardPartialRequests feature of record validation filter
1011
* [#1635](https://github.com/kroxylicious/kroxylicious/pull/1635) Handle ApiVersions unsupported version downgrade
1112
* [#1648](https://github.com/kroxylicious/kroxylicious/pull/1648) Add test-only feature mechanism to Proxy configuration
1213
* [#1379](https://github.com/kroxylicious/kroxylicious/issues/1379) Remove Deprecated EnvelopeEncryption
@@ -22,7 +23,8 @@ Format `<github issue/pr number>: <short description>`.
2223

2324
### Changes, deprecations and removals
2425

25-
* The deprecated EnvelopeEncryption filter is now removed. Use RecordEncryption instead.
26+
* The deprecated EnvelopeEncryption filter is now removed. Use RecordEncryption instead.
27+
* The deprecated forwardPartialRequests option has been removed from the Record Validation Filter.
2628

2729
## 0.8.0
2830

docs/modules/record-validation/proc-record-validation.adoc

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,18 +37,11 @@ filters:
3737
<rule definition> # <2>
3838
valueRule:
3939
<rule definition> # <3>
40-
forwardPartialRequests: false # <5>
4140
----
4241
<1> List of topic names to which the validation rules will be applied.
4342
<2> Validation rules that are applied to the record's key.
4443
<3> Validation rules that are applied to the record's value.
4544
<4> (Optional) Default rule that is applied to any topics for which there is no explict rule defined.
46-
<5> If `false`, any record failing validation will cause the entire produce request to be rejected.
47-
If `true`, only partitions within the request where all records validated successfully will be forwarded to the
48-
broker. The response the client will receive will be a mixed response with response returned from the broker
49-
for the partitions whose records passed validation and error responses for the partitions that failed validation.
50-
If the client is using transactions, this setting is *ignored*. Any failing record validation will
51-
cause the entire produce request to be rejected.
5245

5346
Replace the token `<rule definition>` in the YAML configuration with either a Schema Validation rule or a JSON Syntax Validation rule depending on your requirements.
5447

kroxylicious-filters/kroxylicious-record-validation/src/main/java/io/kroxylicious/proxy/filter/validation/ProduceRequestValidatorBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ private ProduceRequestValidatorBuilder() {
3838
*/
3939
static ProduceRequestValidator build(ValidationConfig config) {
4040
RoutingProduceRequestValidator.RoutingProduceRequestValidatorBuilder builder = RoutingProduceRequestValidator.builder();
41-
config.getRules().forEach(rule -> builder.appendValidatorForTopicPattern(rule.getTopicNames(), toValidatorWithNullHandling(rule)));
42-
RecordValidationRule defaultRule = config.getDefaultRule();
41+
config.rules().forEach(rule -> builder.appendValidatorForTopicPattern(rule.getTopicNames(), toValidatorWithNullHandling(rule)));
42+
RecordValidationRule defaultRule = config.defaultRule();
4343
TopicValidator defaultValidator = defaultRule == null ? TopicValidators.allValid() : toValidatorWithNullHandling(defaultRule);
4444
builder.setDefaultValidator(defaultValidator);
4545
return builder.build();

kroxylicious-filters/kroxylicious-record-validation/src/main/java/io/kroxylicious/proxy/filter/validation/RecordValidation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public ValidationConfig initialize(FilterFactoryContext context, ValidationConfi
2424
@Override
2525
public RecordValidationFilter createFilter(FilterFactoryContext context, ValidationConfig configuration) {
2626
ProduceRequestValidator validator = ProduceRequestValidatorBuilder.build(configuration);
27-
return new RecordValidationFilter(configuration.isForwardPartialRequests(), validator);
27+
return new RecordValidationFilter(validator);
2828
}
2929

3030
}

kroxylicious-filters/kroxylicious-record-validation/src/main/java/io/kroxylicious/proxy/filter/validation/RecordValidationFilter.java

Lines changed: 10 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -31,37 +31,25 @@
3131
import io.kroxylicious.proxy.filter.validation.validators.topic.TopicValidationResult;
3232

3333
/**
34-
* A Filter that is intended to validate some criteria about each topic-partition, preventing
35-
* invalid data being sent on to the broker. The response will contain an {@link Errors#INVALID_RECORD}
36-
* error code and an error message for each invalid topic partition.
37-
* <p>
38-
* Optionally supports forwarding partial data. If the request is non-transactional, the filter
39-
* can be configured to forward any valid topic-partitions to the broker, filtering out invalid ones.
40-
* Then when the ProduceResponse is intercepted then it can augment in failure messages for the invalid
41-
* topic-partitions.
42-
* </p>
43-
* <p>
44-
* Note: if all the topic partitions are invalid (or the request is transactional), a response is sent
45-
* back to the client without forwarding anything upstream, with all topic-partitions failed.
46-
* </p>
34+
* The filter intercepts the produce requests and subject the records contained within them to validation. If the
35+
* validation fails, the whole produce request is rejected and the producing application receives an error
36+
* response {@link Errors#INVALID_RECORD}. The broker does not receive rejected produce requests.
4737
*/
4838
public class RecordValidationFilter implements ProduceRequestFilter, ProduceResponseFilter {
4939

5040
private static final Logger LOGGER = LoggerFactory.getLogger(RecordValidationFilter.class);
51-
private final boolean forwardPartialRequests;
5241
private final ProduceRequestValidator validator;
5342
private final Map<Integer, ProduceRequestValidationResult> correlatedResults = new HashMap<>();
5443

5544
/**
5645
* Construct a new ProduceValidationFilter
57-
* @param forwardPartialRequests whether to forward valid topic-partitions if some other topic-partition is invalid (transactional requests are never forwarded if any topic-partition invalid)
46+
*
5847
* @param validator validator to test ProduceRequests with
5948
*/
60-
public RecordValidationFilter(boolean forwardPartialRequests, ProduceRequestValidator validator) {
49+
public RecordValidationFilter(ProduceRequestValidator validator) {
6150
if (validator == null) {
6251
throw new IllegalArgumentException("validator is null");
6352
}
64-
this.forwardPartialRequests = forwardPartialRequests;
6553
this.validator = validator;
6654
}
6755

@@ -70,37 +58,19 @@ public CompletionStage<RequestFilterResult> onProduceRequest(short apiVersion, R
7058
CompletionStage<ProduceRequestValidationResult> validationStage = validator.validateRequest(request);
7159
return validationStage.thenCompose(result -> {
7260
if (result.isAnyTopicPartitionInvalid()) {
73-
return handleInvalidTopicPartitions(header, request, context, result);
61+
return handleInvalidTopicPartitions(request, context, result);
7462
}
7563
else {
7664
return context.forwardRequest(header, request);
7765
}
7866
});
7967
}
8068

81-
private CompletionStage<RequestFilterResult> handleInvalidTopicPartitions(RequestHeaderData header, ProduceRequestData request, FilterContext context,
69+
private CompletionStage<RequestFilterResult> handleInvalidTopicPartitions(ProduceRequestData request, FilterContext context,
8270
ProduceRequestValidationResult result) {
83-
if (result.isAllTopicPartitionsInvalid()) {
84-
LOGGER.debug("all topic-partitions for request contained invalid data: {}", result);
85-
ProduceResponseData response = invalidateEntireRequest(request, result);
86-
return context.requestFilterResultBuilder().shortCircuitResponse(response).completed();
87-
}
88-
// do not forward partial produce data if request is transactional because the whole produce must eventually succeed or fail together
89-
else if (request.transactionalId() == null && forwardPartialRequests) {
90-
LOGGER.debug("some topic-partitions contained invalid data: {}, forwarding valid topic-partitions", result);
91-
request.topicData().removeIf(topicProduceData -> result.isAllPartitionsInvalid(topicProduceData.name()));
92-
for (ProduceRequestData.TopicProduceData topicDatum : request.topicData()) {
93-
topicDatum.partitionData().removeIf(partitionProduceData -> !result.isPartitionValid(topicDatum.name(), partitionProduceData.index()));
94-
}
95-
correlatedResults.put(header.correlationId(), result);
96-
return context.forwardRequest(header, request);
97-
}
98-
else {
99-
LOGGER.debug("some topic-partitions for transactional request with id: {}, contained invalid data: {}, invalidation entire request",
100-
request.transactionalId(), result);
101-
ProduceResponseData response = invalidateEntireRequest(request, result);
102-
return context.requestFilterResultBuilder().shortCircuitResponse(response).completed();
103-
}
71+
LOGGER.debug("At least one topic-partitions with the request contained invalid records: {}. Produce request will be rejected.", result);
72+
ProduceResponseData response = invalidateEntireRequest(request, result);
73+
return context.requestFilterResultBuilder().shortCircuitResponse(response).completed();
10474
}
10575

10676
private static ProduceResponseData invalidateEntireRequest(ProduceRequestData request, ProduceRequestValidationResult produceRequestValidationResult) {

kroxylicious-filters/kroxylicious-record-validation/src/main/java/io/kroxylicious/proxy/filter/validation/config/ValidationConfig.java

Lines changed: 7 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -7,90 +7,23 @@
77
package io.kroxylicious.proxy.filter.validation.config;
88

99
import java.util.List;
10-
import java.util.Objects;
1110

12-
import com.fasterxml.jackson.annotation.JsonCreator;
1311
import com.fasterxml.jackson.annotation.JsonProperty;
1412

1513
/**
1614
* Configuration for Produce Request validation. Contains a description of the rules for validating
17-
* the data for all topic-partitions with a ProduceRequest and how to handle partial failures (where
18-
* some topic-partitions are valid and others are invalid within a single ProduceRequest)
15+
* the data for all topic-partitions within a ProduceRequest.
16+
*
17+
* @param rules describes a list of rules, associating topics with some validation to be applied to produce data for that topic
18+
* @param defaultRule the default validation rule to be applied when no rule is matched for a topic within a ProduceRequest
1919
*/
20-
public class ValidationConfig {
21-
22-
/**
23-
* If this is enabled then the proxy will (for non-transactional requests):
24-
* 1. filter out invalid topic-partitions from the ProduceRequest
25-
* 2. forward the filtered request up the chain
26-
* 3. intercept the produce response and augment in failure details for the invalid topic-partitions
27-
*/
28-
private final boolean forwardPartialRequests;
29-
private final List<TopicMatchingRecordValidationRule> rules;
30-
private final RecordValidationRule defaultRule;
31-
32-
/**
33-
* Construct a new ValidationConfig
34-
* @param forwardPartialRequests describes whether partial ProduceRequest data should be forwarded to the broker (for non-transactional requests)
35-
* @param rules describes a list of rules, associating topics with some validation to be applied to produce data for that topic
36-
* @param defaultRule the default validation rule to be applied when no rule is matched for a topic within a ProduceRequest
37-
*/
38-
@JsonCreator
39-
public ValidationConfig(@JsonProperty(value = "forwardPartialRequests", defaultValue = "false") Boolean forwardPartialRequests,
40-
@JsonProperty("rules") List<TopicMatchingRecordValidationRule> rules,
41-
@JsonProperty("defaultRule") RecordValidationRule defaultRule) {
42-
this.forwardPartialRequests = forwardPartialRequests != null && forwardPartialRequests;
43-
this.rules = rules;
44-
this.defaultRule = defaultRule;
45-
}
46-
47-
/**
48-
* Get the rules
49-
* @return rules
50-
*/
51-
public List<TopicMatchingRecordValidationRule> getRules() {
52-
return rules;
53-
}
54-
55-
/**
56-
* is forwarding partial requests enabled?
57-
* @return true if forwarding partial requests enabled
58-
*/
59-
public boolean isForwardPartialRequests() {
60-
return forwardPartialRequests;
61-
}
62-
63-
/**
64-
* get default rule
65-
* @return default rule (not null)
66-
*/
67-
public RecordValidationRule getDefaultRule() {
68-
return defaultRule;
69-
}
70-
71-
@Override
72-
public boolean equals(Object o) {
73-
if (this == o) {
74-
return true;
75-
}
76-
if (o == null || getClass() != o.getClass()) {
77-
return false;
78-
}
79-
ValidationConfig that = (ValidationConfig) o;
80-
return forwardPartialRequests == that.forwardPartialRequests && Objects.equals(rules, that.rules) && Objects.equals(defaultRule,
81-
that.defaultRule);
82-
}
83-
84-
@Override
85-
public int hashCode() {
86-
return Objects.hash(forwardPartialRequests, rules, defaultRule);
87-
}
20+
public record ValidationConfig(@JsonProperty("rules") List<TopicMatchingRecordValidationRule> rules,
21+
@JsonProperty("defaultRule") RecordValidationRule defaultRule) {
8822

8923
@Override
9024
public String toString() {
9125
return "ValidationConfig{" +
92-
"forwardPartialRequests=" + forwardPartialRequests +
93-
", rules=" + rules +
26+
"rules=" + rules +
9427
", defaultRule=" + defaultRule +
9528
'}';
9629
}

kroxylicious-filters/kroxylicious-record-validation/src/main/java/io/kroxylicious/proxy/filter/validation/validators/request/ProduceRequestValidationResult.java

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import java.util.Map;
1010
import java.util.stream.Stream;
1111

12-
import io.kroxylicious.proxy.filter.validation.validators.topic.PartitionValidationResult;
1312
import io.kroxylicious.proxy.filter.validation.validators.topic.TopicValidationResult;
1413

1514
/**
@@ -27,49 +26,6 @@ public boolean isAnyTopicPartitionInvalid() {
2726
return topicValidationResults.values().stream().anyMatch(TopicValidationResult::isAnyPartitionInvalid);
2827
}
2928

30-
/**
31-
* Are all topic partitions invalid
32-
* @return true if all topic-partitions were invalid
33-
*/
34-
public boolean isAllTopicPartitionsInvalid() {
35-
return topicValidationResults.values().stream().allMatch(TopicValidationResult::isAllPartitionsInvalid);
36-
}
37-
38-
/**
39-
* Are all topic partitions invalid for a topic
40-
* @param topicName topic name
41-
* @return true if all partitions for topicName are invalid
42-
*/
43-
public boolean isAllPartitionsInvalid(String topicName) {
44-
TopicValidationResult topicValidationResult = topicValidationResults.get(topicName);
45-
if (topicValidationResult == null) {
46-
return false;
47-
}
48-
else {
49-
return topicValidationResult.isAllPartitionsInvalid();
50-
}
51-
}
52-
53-
/**
54-
* Is a topic-partition valid
55-
* @param topicName name of the topic
56-
* @param partitionIndex index of the partition
57-
* @return true if the topic-partition is valid
58-
* @throws IllegalStateException if the topicName or partitionIndex has no recorded result, we should have some outcome for every topic-partition
59-
*/
60-
public boolean isPartitionValid(String topicName, int partitionIndex) {
61-
TopicValidationResult topicValidationResult = topicValidationResults.get(topicName);
62-
if (topicValidationResult == null) {
63-
throw new IllegalStateException("topicValidationResults should contain a result for all topics in the request, failed topic: " + topicName);
64-
}
65-
PartitionValidationResult partitionValidationResult = topicValidationResult.getPartitionResult(partitionIndex);
66-
if (partitionValidationResult == null) {
67-
throw new IllegalStateException(
68-
"partitionValidationResult should contain a result for all topics in the request, failed topic: " + topicName + ", partition" + partitionIndex);
69-
}
70-
return partitionValidationResult.allRecordsValid();
71-
}
72-
7329
/**
7430
* Get all topics with invalid partitions
7531
* @return stream of topic results containing invalid partitions

kroxylicious-filters/kroxylicious-record-validation/src/main/java/io/kroxylicious/proxy/filter/validation/validators/topic/AllValidTopicValidationResult.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,6 @@ public boolean isAnyPartitionInvalid() {
1616
return false;
1717
}
1818

19-
@Override
20-
public boolean isAllPartitionsInvalid() {
21-
return false;
22-
}
23-
2419
@Override
2520
public Stream<PartitionValidationResult> invalidPartitions() {
2621
return Stream.empty();

kroxylicious-filters/kroxylicious-record-validation/src/main/java/io/kroxylicious/proxy/filter/validation/validators/topic/PerPartitionTopicValidationResult.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,6 @@ public boolean isAnyPartitionInvalid() {
1616
return partitionValidationResults.values().stream().anyMatch(partitionValidationResult -> !partitionValidationResult.allRecordsValid());
1717
}
1818

19-
@Override
20-
public boolean isAllPartitionsInvalid() {
21-
if (partitionValidationResults.isEmpty()) {
22-
return false;
23-
}
24-
return partitionValidationResults.values().stream().noneMatch(PartitionValidationResult::allRecordsValid);
25-
}
26-
2719
@Override
2820
public Stream<PartitionValidationResult> invalidPartitions() {
2921
return partitionValidationResults.values().stream().filter(partitionValidationResult -> !partitionValidationResult.allRecordsValid());

kroxylicious-filters/kroxylicious-record-validation/src/main/java/io/kroxylicious/proxy/filter/validation/validators/topic/TopicValidationResult.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,6 @@ public interface TopicValidationResult {
1919
*/
2020
boolean isAnyPartitionInvalid();
2121

22-
/**
23-
* are all partitions invalid
24-
* @return true if all partitions invalid
25-
*/
26-
boolean isAllPartitionsInvalid();
27-
2822
/**
2923
* get invalid partitions
3024
* @return stream of invalid partitions

0 commit comments

Comments
 (0)