Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 32 additions & 5 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ endif::[]
== Phase

|===
|onRequest |onResponse
|onRequest |onResponse |onEntrypointConnect (Native Kafka)

|X
|
|X

|===

NOTE: For Native Kafka APIs, this policy executes during the `ENTRYPOINT_CONNECT` phase, which occurs at the TCP connection level before authentication.

== Description

You can use the `ip-filtering` policy to control access to your API by filtering IP addresses.
Expand All @@ -31,6 +34,18 @@ You can specify a host to be resolved and checked against the remote IP.

NOTE: When using domain name, the Gateway is performing DNS Lookup with the DNS server configured on the host by default. If you want to use a specific DNS server, you can configure it at the policy level. See <<_gateway>> for more information.

=== Native Kafka API Support

For Native Kafka APIs, this policy executes during the `ENTRYPOINT_CONNECT` phase at the TCP connection level. This enables early rejection of unauthorized connections before authentication and protocol handshake.

**Limitations for Native Kafka:**

* **DNS lookups (hostnames) are not supported** - Only IP addresses and CIDR ranges are evaluated for performance reasons.
* **`matchAllFromXForwardedFor` is not applicable** - Native Kafka uses raw TCP connections without HTTP headers.
* **Custom IP addresses with comma-separated values** - Only the first IP is used, as each TCP connection has a single source address.

If hostnames are configured in whitelist/blacklist for a Native Kafka API, they will be logged as warnings and ignored during the `ENTRYPOINT_CONNECT` phase.

== Compatibility with APIM

|===
Expand All @@ -50,28 +65,40 @@ At the policy level, you can configure the following options:

|matchAllFromXForwardedFor
|No
|If set to `true`, the `X-Forwarded-For` header is parsed to extract all IP addresses and check them against the whitelist or blacklist.
|If set to `true`, the `X-Forwarded-For` header is parsed to extract all IP addresses and check them against the whitelist or blacklist. *Not applicable to Native Kafka APIs.*
|boolean
|`false`

|whitelistIps
|No
|A list of allowed IPs with or without CIDR notation (host is allowed)
|A list of allowed IPs with or without CIDR notation. Hostnames are supported for HTTP/MESSAGE APIs but not Native Kafka APIs.
|string list
|`empty`

|blacklistIps
|No
|A list of denied IPs with or without CIDR notation (host is allowed)
|A list of denied IPs with or without CIDR notation. Hostnames are supported for HTTP/MESSAGE APIs but not Native Kafka APIs.
|string list
|`empty`

|lookupIpVersion
|No
|IP version to use to lookup host name. If you're not sure your DNS server can handle multi-question requests (both V4 and V6) specify a version.
|IP version to use to lookup host name. If you're not sure your DNS server can handle multi-question requests (both V4 and V6) specify a version. *Only applicable to HTTP/MESSAGE APIs (not Native Kafka).*
|enum [`IPV4`, `IPV6`, `ALL`]
|`ALL`

|useCustomIPAddress
|No
|Override the remote address with a custom value extracted from EL expressions. Useful for extracting IPs from headers or context attributes.
|boolean
|`false`

|customIPAddress
|No
|Custom IP address to use instead of the remote address. Supports EL expressions (e.g., `{#request.headers['X-Forwarded-For']}`). For Native Kafka, only the first IP from comma-separated values is used.
|string
|`empty`


|===

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

<properties>
<gravitee-bom.version>8.3.39</gravitee-bom.version>
<gravitee-gateway-api.version>3.13.0</gravitee-gateway-api.version>
<gravitee-gateway-api.version>4.3.0-APIM-12432-interrupt-exception-entrypoint-connect-SNAPSHOT</gravitee-gateway-api.version>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will replace after gravitee-io/gravitee-gateway-api#321 gets merged

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it's not a BC. Because I'm not sure if the policy still works for an apim 3.13.0 for example 🤔
Since this dependency is provided, won't there be an error in the NativePolicy import, in version of apim without it ?

I'm not really sure, but I have my doubts 🤷‍♂️

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is

<gravitee-node-api.version>6.8.3</gravitee-node-api.version>

<gravitee-policy-api.version>1.11.0</gravitee-policy-api.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@
import static java.util.stream.Collectors.toList;

import io.gravitee.common.http.HttpStatusCode;
import io.gravitee.el.TemplateEngine;
import io.gravitee.gateway.api.ExecutionContext;
import io.gravitee.gateway.api.Request;
import io.gravitee.gateway.api.http.HttpHeaderNames;
import io.gravitee.gateway.reactive.api.exception.InterruptConnectionException;
import io.gravitee.gateway.reactive.api.policy.kafka.KafkaPolicy;
import io.gravitee.policy.api.PolicyChain;
import io.gravitee.policy.api.PolicyResult;
import io.gravitee.policy.api.annotations.OnRequest;
import io.netty.handler.ipfilter.IpFilterRuleType;
import io.netty.handler.ipfilter.IpSubnetFilterRule;
import io.reactivex.rxjava3.core.Completable;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
Expand All @@ -47,7 +51,7 @@
* @author Azize ELAMRANI (azize.elamrani at graviteesource.com)
* @author GraviteeSource Team
*/
public class IPFilteringPolicy {
public class IPFilteringPolicy implements KafkaPolicy {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public class IPFilteringPolicy implements KafkaPolicy {
public class IPFilteringPolicy implements NativePolicy {


private static final Logger LOGGER = LoggerFactory.getLogger(IPFilteringPolicy.class);

Expand All @@ -62,13 +66,18 @@ public IPFilteringPolicy(IPFilteringPolicyConfiguration configuration) {
this.configuration = configuration;
}

@Override
public String id() {
return "ip-filtering";
}

@OnRequest
public void onRequest(ExecutionContext executionContext, PolicyChain policyChain) {
final List<String> ips = extractIps(executionContext);
final List<Future> futures = new ArrayList<>();

var blackList = computeList(executionContext, configuration.getBlacklistIps());
var whiteList = computeList(executionContext, configuration.getWhitelistIps());
var blackList = computeList(executionContext.getTemplateEngine(), configuration.getBlacklistIps());
var whiteList = computeList(executionContext.getTemplateEngine(), configuration.getWhitelistIps());

if (!blackList.isEmpty()) {
final List<String> filteredIps = new ArrayList<>();
Expand Down Expand Up @@ -113,6 +122,84 @@ public void onRequest(ExecutionContext executionContext, PolicyChain policyChain
}
}

/**
* Executes IP filtering during the ENTRYPOINT_CONNECT phase for Native Kafka APIs.
* This happens BEFORE authentication, allowing early rejection of unauthorized IPs.
*
* Note: DNS lookups for hostnames are NOT supported in this phase for performance reasons.
* Only static IPs and CIDR ranges are evaluated. Use IPs instead of hostnames for ENTRYPOINT_CONNECT phase.
*
* @param ctx the entrypoint connect context
* @return a Completable that completes successfully if the IP is allowed, or throws InterruptConnectionException if denied
*/
@Override
public Completable onEntrypointConnect(io.gravitee.gateway.reactive.api.context.EntrypointConnectContext ctx) {
return Completable.defer(() -> {
try {
final String remoteAddress = extractIpFromNativeContext(ctx);

final Set<String> blacklistIps = computeList(ctx.getTemplateEngine(), configuration.getBlacklistIps());
final Set<String> whitelistIps = computeList(ctx.getTemplateEngine(), configuration.getWhitelistIps());

final List<String> blacklistFilteredIps = new ArrayList<>();
final List<String> blacklistFilteredHosts = new ArrayList<>();
processFilteredLists(blacklistIps, blacklistFilteredIps, blacklistFilteredHosts);

final List<String> whitelistFilteredIps = new ArrayList<>();
final List<String> whitelistFilteredHosts = new ArrayList<>();
processFilteredLists(whitelistIps, whitelistFilteredIps, whitelistFilteredHosts);

// Log warning if hostnames are configured (not supported in ENTRYPOINT_CONNECT)
if (!blacklistFilteredHosts.isEmpty() || !whitelistFilteredHosts.isEmpty()) {
LOGGER.warn(
"[IP Filtering] Hostnames in whitelist/blacklist are not supported in ENTRYPOINT_CONNECT phase. " +
"Only IP addresses and CIDR ranges will be evaluated. Configured hostnames will be ignored: blacklist={}, whitelist={}",
blacklistFilteredHosts,
whitelistFilteredHosts
);
}

if (isFiltered(remoteAddress, blacklistFilteredIps)) {
String reason = "IP " + remoteAddress + " is blacklisted and not allowed to connect";
ctx.interrupt(reason);
return Completable.error(new InterruptConnectionException(reason));
}

if (!whitelistFilteredIps.isEmpty()) {
boolean matched = isFiltered(remoteAddress, whitelistFilteredIps);
if (!matched) {
String reason = "IP " + remoteAddress + " is not whitelisted";
ctx.interrupt(reason);
return Completable.error(new InterruptConnectionException(reason));
}
}

return Completable.complete();
} catch (InterruptConnectionException e) {
return Completable.error(e);
} catch (Exception e) {
LOGGER.error("[IP Filtering] Error during ENTRYPOINT_CONNECT phase", e);
return Completable.error(e);
}
});
}

private String extractIpFromNativeContext(io.gravitee.gateway.reactive.api.context.EntrypointConnectContext ctx) {
if (configuration.isUseCustomIPAddress()) {
String customIPAddress = ctx.getTemplateEngine().getValue(configuration.getCustomIPAddress(), String.class);
if (customIPAddress == null || customIPAddress.trim().isEmpty()) {
return ctx.remoteAddress();
}
return Arrays
.stream(customIPAddress.split(","))
.map(String::trim)
.filter(ip -> !ip.isEmpty())
.findFirst() // Take first IP if comma-separated (Native Kafka = 1 connection = 1 source IP)
.orElse(ctx.remoteAddress());
}
return ctx.remoteAddress();
}

/**
* @param filteredHosts A list of hosts that should be blocked
* @param futures
Expand Down Expand Up @@ -276,13 +363,13 @@ private static boolean isIPv4(String ip) throws UnknownHostException {
}

@SuppressWarnings({ "removal" })
private static Set<String> computeList(ExecutionContext ctx, List<String> givenList) {
private static Set<String> computeList(TemplateEngine templateEngine, List<String> givenList) {
if (givenList == null) {
return Set.of();
}
return givenList
.stream()
.map(given -> ctx.getTemplateEngine().getValue(given, String.class))
.map(given -> templateEngine.getValue(given, String.class))
.map(k -> k != null && !k.isEmpty() ? k.split(",") : new String[] {})
.flatMap(Arrays::stream)
.collect(Collectors.toSet());
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/plugin.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ http_proxy=REQUEST
http_message=REQUEST
mcp_proxy=REQUEST
llm_proxy=REQUEST
native_kafka=ENTRYPOINT_CONNECT
13 changes: 8 additions & 5 deletions src/main/resources/schemas/schema-form.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
"properties": {
"matchAllFromXForwardedFor": {
"title": "Use X-Forwarded-For header",
"description": "Extract and check all IPs from X-Forwarded-For header. Not applicable to Native Kafka APIs (no HTTP headers).",
"type": "boolean",
"default": false,
"x-schema-form": {
"hidden": [
{
Expand All @@ -25,11 +27,12 @@
},
"useCustomIPAddress": {
"title": "Use custom IP address (support EL)",
"type": "boolean"
"type": "boolean",
"default": false
},
"customIPAddress": {
"title": "Custom IP Address (support comma separated list)",
"description": "Support EL (ex: {#request.headers['X-Forwarded-For']})",
"description": "Support EL (ex: {#request.headers['X-Forwarded-For']}). For Native Kafka, only the first IP from comma-separated values is used.",
"type": "string",
"x-schema-form": {
"expression-language": true,
Expand All @@ -51,7 +54,7 @@
},
"whitelistIps": {
"title": "IPs Whitelist (CIDR and hosts allowed)",
"description": "List of IPs to allow in the request. Each entry may be a comma-separated list.",
"description": "List of IPs to allow in the request. Each entry may be a comma-separated list. Hostnames are for HTTP/MESSAGE only, and not Native Kafka.",
"type": "array",
"items": {
"title": "IP / CIDR / Host",
Expand All @@ -61,7 +64,7 @@
},
"blacklistIps": {
"title": "IPs Blacklist (CIDR and hosts allowed)",
"description": "List of IPs to disallow in the request. Each entry may be a comma-separated list.",
"description": "List of IPs to disallow in the request. Each entry may be a comma-separated list. Hostnames are for HTTP/MESSAGE only, and not Native Kafka.",
"type": "array",
"items": {
"title": "IP / CIDR / Host",
Expand All @@ -71,7 +74,7 @@
},
"lookupIpVersion": {
"title": "Lookup IP version to use (default is ALL)",
"description": "If you're not sure your DNS server can handle multi-question requests (both V4 and V6) specify a version",
"description": "If you're not sure your DNS server can handle multi-question requests (both V4 and V6) specify a version. Only applicable to HTTP/MESSAGE APIs (not Native Kafka).",
"type": "string",
"enum": ["IPV4", "IPV6", "ALL"],
"default": "ALL"
Expand Down
Loading