diff --git a/docs/changelog/136133.yaml b/docs/changelog/136133.yaml
new file mode 100644
index 0000000000000..564aa615da610
--- /dev/null
+++ b/docs/changelog/136133.yaml
@@ -0,0 +1,5 @@
+pr: 136133
+summary: Implement `network_direction` function
+area: ES|QL
+type: enhancement
+issues: []
diff --git a/docs/reference/query-languages/esql/_snippets/functions/description/network_direction.md b/docs/reference/query-languages/esql/_snippets/functions/description/network_direction.md
new file mode 100644
index 0000000000000..8eb58c0735b86
--- /dev/null
+++ b/docs/reference/query-languages/esql/_snippets/functions/description/network_direction.md
@@ -0,0 +1,6 @@
+% This is generated by ESQL's AbstractFunctionTestCase. Do not edit it. See ../README.md for how to regenerate it.
+
+**Description**
+
+Returns the direction type (inbound, outbound, internal, external) given a source IP address, destination IP address, and a list of internal networks.
+
diff --git a/docs/reference/query-languages/esql/_snippets/functions/examples/network_direction.md b/docs/reference/query-languages/esql/_snippets/functions/examples/network_direction.md
new file mode 100644
index 0000000000000..761db02f26f94
--- /dev/null
+++ b/docs/reference/query-languages/esql/_snippets/functions/examples/network_direction.md
@@ -0,0 +1,14 @@
+% This is generated by ESQL's AbstractFunctionTestCase. Do not edit it. See ../README.md for how to regenerate it.
+
+**Example**
+
+```esql
+ROW ip0 = "127.0.0.1"::ip, ip1 = "5.6.7.8"::ip
+| EVAL direction = NETWORK_DIRECTION(ip0, ip1, ["loopback", "private"])
+```
+
+| ip0:ip | ip1:ip | direction:keyword |
+| --- | --- | --- |
+| 127.0.0.1 | 5.6.7.8 | outbound |
+
+
diff --git a/docs/reference/query-languages/esql/_snippets/functions/layout/network_direction.md b/docs/reference/query-languages/esql/_snippets/functions/layout/network_direction.md
new file mode 100644
index 0000000000000..c7d72dfece8d4
--- /dev/null
+++ b/docs/reference/query-languages/esql/_snippets/functions/layout/network_direction.md
@@ -0,0 +1,23 @@
+% This is generated by ESQL's AbstractFunctionTestCase. Do not edit it. See ../README.md for how to regenerate it.
+
+## `NETWORK_DIRECTION` [esql-network_direction]
+
+**Syntax**
+
+:::{image} ../../../images/functions/network_direction.svg
+:alt: Embedded
+:class: text-center
+:::
+
+
+:::{include} ../parameters/network_direction.md
+:::
+
+:::{include} ../description/network_direction.md
+:::
+
+:::{include} ../types/network_direction.md
+:::
+
+:::{include} ../examples/network_direction.md
+:::
diff --git a/docs/reference/query-languages/esql/_snippets/functions/parameters/network_direction.md b/docs/reference/query-languages/esql/_snippets/functions/parameters/network_direction.md
new file mode 100644
index 0000000000000..ce78ff7b8b4a0
--- /dev/null
+++ b/docs/reference/query-languages/esql/_snippets/functions/parameters/network_direction.md
@@ -0,0 +1,13 @@
+% This is generated by ESQL's AbstractFunctionTestCase. Do not edit it. See ../README.md for how to regenerate it.
+
+**Parameters**
+
+`source_ip`
+: Source IP address of type `ip` (both IPv4 and IPv6 are supported).
+
+`destination_ip`
+: Destination IP address of type `ip` (both IPv4 and IPv6 are supported).
+
+`internal_networks`
+: List of internal networks. Supports IPv4 and IPv6 addresses, ranges in CIDR notation, and named ranges.
+
diff --git a/docs/reference/query-languages/esql/_snippets/functions/types/network_direction.md b/docs/reference/query-languages/esql/_snippets/functions/types/network_direction.md
new file mode 100644
index 0000000000000..ea29dc5d04589
--- /dev/null
+++ b/docs/reference/query-languages/esql/_snippets/functions/types/network_direction.md
@@ -0,0 +1,9 @@
+% This is generated by ESQL's AbstractFunctionTestCase. Do not edit it. See ../README.md for how to regenerate it.
+
+**Supported types**
+
+| source_ip | destination_ip | internal_networks | result |
+| --- | --- | --- | --- |
+| ip | ip | keyword | keyword |
+| ip | ip | text | keyword |
+
diff --git a/docs/reference/query-languages/esql/images/functions/network_direction.svg b/docs/reference/query-languages/esql/images/functions/network_direction.svg
new file mode 100644
index 0000000000000..9f65a55a81523
--- /dev/null
+++ b/docs/reference/query-languages/esql/images/functions/network_direction.svg
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/docs/reference/query-languages/esql/kibana/definition/functions/network_direction.json b/docs/reference/query-languages/esql/kibana/definition/functions/network_direction.json
new file mode 100644
index 0000000000000..f921b8e32822a
--- /dev/null
+++ b/docs/reference/query-languages/esql/kibana/definition/functions/network_direction.json
@@ -0,0 +1,61 @@
+{
+ "comment" : "This is generated by ESQL's AbstractFunctionTestCase. Do not edit it. See ../README.md for how to regenerate it.",
+ "type" : "scalar",
+ "name" : "network_direction",
+ "description" : "Returns the direction type (inbound, outbound, internal, external) given a source IP address, destination IP address, and a list of internal networks.",
+ "signatures" : [
+ {
+ "params" : [
+ {
+ "name" : "source_ip",
+ "type" : "ip",
+ "optional" : false,
+ "description" : "Source IP address of type `ip` (both IPv4 and IPv6 are supported)."
+ },
+ {
+ "name" : "destination_ip",
+ "type" : "ip",
+ "optional" : false,
+ "description" : "Destination IP address of type `ip` (both IPv4 and IPv6 are supported)."
+ },
+ {
+ "name" : "internal_networks",
+ "type" : "keyword",
+ "optional" : false,
+ "description" : "List of internal networks. Supports IPv4 and IPv6 addresses, ranges in CIDR notation, and named ranges."
+ }
+ ],
+ "variadic" : false,
+ "returnType" : "keyword"
+ },
+ {
+ "params" : [
+ {
+ "name" : "source_ip",
+ "type" : "ip",
+ "optional" : false,
+ "description" : "Source IP address of type `ip` (both IPv4 and IPv6 are supported)."
+ },
+ {
+ "name" : "destination_ip",
+ "type" : "ip",
+ "optional" : false,
+ "description" : "Destination IP address of type `ip` (both IPv4 and IPv6 are supported)."
+ },
+ {
+ "name" : "internal_networks",
+ "type" : "text",
+ "optional" : false,
+ "description" : "List of internal networks. Supports IPv4 and IPv6 addresses, ranges in CIDR notation, and named ranges."
+ }
+ ],
+ "variadic" : false,
+ "returnType" : "keyword"
+ }
+ ],
+ "examples" : [
+ "ROW ip0 = \"127.0.0.1\"::ip, ip1 = \"5.6.7.8\"::ip\n| EVAL direction = NETWORK_DIRECTION(ip0, ip1, [\"loopback\", \"private\"])"
+ ],
+ "preview" : true,
+ "snapshot_only" : false
+}
diff --git a/docs/reference/query-languages/esql/kibana/docs/functions/network_direction.md b/docs/reference/query-languages/esql/kibana/docs/functions/network_direction.md
new file mode 100644
index 0000000000000..a70a9879df682
--- /dev/null
+++ b/docs/reference/query-languages/esql/kibana/docs/functions/network_direction.md
@@ -0,0 +1,9 @@
+% This is generated by ESQL's AbstractFunctionTestCase. Do not edit it. See ../README.md for how to regenerate it.
+
+### NETWORK DIRECTION
+Returns the direction type (inbound, outbound, internal, external) given a source IP address, destination IP address, and a list of internal networks.
+
+```esql
+ROW ip0 = "127.0.0.1"::ip, ip1 = "5.6.7.8"::ip
+| EVAL direction = NETWORK_DIRECTION(ip0, ip1, ["loopback", "private"])
+```
diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/NetworkDirectionProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/NetworkDirectionProcessor.java
index 7257eaf71a3cb..19cb4cf642a24 100644
--- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/NetworkDirectionProcessor.java
+++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/NetworkDirectionProcessor.java
@@ -10,8 +10,7 @@
package org.elasticsearch.ingest.common;
import org.elasticsearch.cluster.metadata.ProjectId;
-import org.elasticsearch.common.network.CIDRUtils;
-import org.elasticsearch.common.network.InetAddresses;
+import org.elasticsearch.common.network.NetworkDirectionUtils;
import org.elasticsearch.ingest.AbstractProcessor;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestDocument;
@@ -19,9 +18,7 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.TemplateScript;
-import java.net.InetAddress;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -29,28 +26,9 @@
import static org.elasticsearch.ingest.ConfigurationUtils.readBooleanProperty;
public class NetworkDirectionProcessor extends AbstractProcessor {
- static final byte[] UNDEFINED_IP4 = new byte[] { 0, 0, 0, 0 };
- static final byte[] UNDEFINED_IP6 = new byte[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
- static final byte[] BROADCAST_IP4 = new byte[] { (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff };
public static final String TYPE = "network_direction";
- public static final String DIRECTION_INTERNAL = "internal";
- public static final String DIRECTION_EXTERNAL = "external";
- public static final String DIRECTION_INBOUND = "inbound";
- public static final String DIRECTION_OUTBOUND = "outbound";
-
- private static final String LOOPBACK_NAMED_NETWORK = "loopback";
- private static final String GLOBAL_UNICAST_NAMED_NETWORK = "global_unicast";
- private static final String UNICAST_NAMED_NETWORK = "unicast";
- private static final String LINK_LOCAL_UNICAST_NAMED_NETWORK = "link_local_unicast";
- private static final String INTERFACE_LOCAL_NAMED_NETWORK = "interface_local_multicast";
- private static final String LINK_LOCAL_MULTICAST_NAMED_NETWORK = "link_local_multicast";
- private static final String MULTICAST_NAMED_NETWORK = "multicast";
- private static final String UNSPECIFIED_NAMED_NETWORK = "unspecified";
- private static final String PRIVATE_NAMED_NETWORK = "private";
- private static final String PUBLIC_NAMED_NETWORK = "public";
-
private final String sourceIpField;
private final String destinationIpField;
private final String targetField;
@@ -140,96 +118,10 @@ private String getDirection(IngestDocument d) throws Exception {
return null;
}
- boolean sourceInternal = isInternal(networks, sourceIpAddrString);
- boolean destinationInternal = isInternal(networks, destIpAddrString);
-
- if (sourceInternal && destinationInternal) {
- return DIRECTION_INTERNAL;
- }
- if (sourceInternal) {
- return DIRECTION_OUTBOUND;
- }
- if (destinationInternal) {
- return DIRECTION_INBOUND;
- }
- return DIRECTION_EXTERNAL;
- }
-
- private static boolean isInternal(List networks, String ip) {
- for (String network : networks) {
- if (inNetwork(ip, network)) {
- return true;
- }
- }
- return false;
- }
-
- private static boolean inNetwork(String ip, String network) {
- InetAddress address = InetAddresses.forString(ip);
- return switch (network) {
- case LOOPBACK_NAMED_NETWORK -> isLoopback(address);
- case GLOBAL_UNICAST_NAMED_NETWORK, UNICAST_NAMED_NETWORK -> isUnicast(address);
- case LINK_LOCAL_UNICAST_NAMED_NETWORK -> isLinkLocalUnicast(address);
- case INTERFACE_LOCAL_NAMED_NETWORK -> isInterfaceLocalMulticast(address);
- case LINK_LOCAL_MULTICAST_NAMED_NETWORK -> isLinkLocalMulticast(address);
- case MULTICAST_NAMED_NETWORK -> isMulticast(address);
- case UNSPECIFIED_NAMED_NETWORK -> isUnspecified(address);
- case PRIVATE_NAMED_NETWORK -> isPrivate(ip);
- case PUBLIC_NAMED_NETWORK -> isPublic(ip);
- default -> CIDRUtils.isInRange(ip, network);
- };
- }
-
- private static boolean isLoopback(InetAddress ip) {
- return ip.isLoopbackAddress();
- }
-
- private static boolean isUnicast(InetAddress ip) {
- return Arrays.equals(ip.getAddress(), BROADCAST_IP4) == false
- && isUnspecified(ip) == false
- && isLoopback(ip) == false
- && isMulticast(ip) == false
- && isLinkLocalUnicast(ip) == false;
- }
-
- private static boolean isLinkLocalUnicast(InetAddress ip) {
- return ip.isLinkLocalAddress();
- }
-
- private static boolean isInterfaceLocalMulticast(InetAddress ip) {
- return ip.isMCNodeLocal();
- }
-
- private static boolean isLinkLocalMulticast(InetAddress ip) {
- return ip.isMCLinkLocal();
- }
-
- private static boolean isMulticast(InetAddress ip) {
- return ip.isMulticastAddress();
- }
-
- private static boolean isUnspecified(InetAddress ip) {
- var address = ip.getAddress();
- return Arrays.equals(UNDEFINED_IP4, address) || Arrays.equals(UNDEFINED_IP6, address);
- }
-
- private static boolean isPrivate(String ip) {
- return CIDRUtils.isInRange(ip, "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "fd00::/8");
- }
-
- private static boolean isPublic(String ip) {
- return isLocalOrPrivate(ip) == false;
- }
+ boolean sourceInternal = NetworkDirectionUtils.isInternal(networks, sourceIpAddrString);
+ boolean destinationInternal = NetworkDirectionUtils.isInternal(networks, destIpAddrString);
- private static boolean isLocalOrPrivate(String ip) {
- var address = InetAddresses.forString(ip);
- return isPrivate(ip)
- || isLoopback(address)
- || isUnspecified(address)
- || isLinkLocalUnicast(address)
- || isLinkLocalMulticast(address)
- || isInterfaceLocalMulticast(address)
- || Arrays.equals(address.getAddress(), BROADCAST_IP4);
+ return NetworkDirectionUtils.getDirection(sourceInternal, destinationInternal);
}
@Override
diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/NetworkDirectionProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/NetworkDirectionProcessorTests.java
index af96c287b49a2..a8c5ae0ea2867 100644
--- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/NetworkDirectionProcessorTests.java
+++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/NetworkDirectionProcessorTests.java
@@ -93,6 +93,7 @@ public void testInvalidNetwork() throws Exception {
assertThat(e.getMessage(), containsString("'invalid' is not an IP string literal."));
}
+ // These tests copy the data from the NetworkDirectionUtils tests
public void testCIDR() throws Exception {
testNetworkDirectionProcessor(buildEvent("10.0.1.1", "192.168.1.2"), new String[] { "10.0.0.0/8" }, "outbound");
testNetworkDirectionProcessor(buildEvent("192.168.1.2", "10.0.1.1"), new String[] { "10.0.0.0/8" }, "inbound");
diff --git a/server/src/main/java/org/elasticsearch/common/network/NetworkDirectionUtils.java b/server/src/main/java/org/elasticsearch/common/network/NetworkDirectionUtils.java
new file mode 100644
index 0000000000000..6ee23d3b7481d
--- /dev/null
+++ b/server/src/main/java/org/elasticsearch/common/network/NetworkDirectionUtils.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.common.network;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.List;
+
+public class NetworkDirectionUtils {
+
+ static final byte[] UNDEFINED_IP4 = new byte[] { 0, 0, 0, 0 };
+ static final byte[] UNDEFINED_IP6 = new byte[] { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
+ static final byte[] BROADCAST_IP4 = new byte[] { (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff };
+
+ private static final String LOOPBACK_NAMED_NETWORK = "loopback";
+ private static final String GLOBAL_UNICAST_NAMED_NETWORK = "global_unicast";
+ private static final String UNICAST_NAMED_NETWORK = "unicast";
+ private static final String LINK_LOCAL_UNICAST_NAMED_NETWORK = "link_local_unicast";
+ private static final String INTERFACE_LOCAL_NAMED_NETWORK = "interface_local_multicast";
+ private static final String LINK_LOCAL_MULTICAST_NAMED_NETWORK = "link_local_multicast";
+ private static final String MULTICAST_NAMED_NETWORK = "multicast";
+ private static final String UNSPECIFIED_NAMED_NETWORK = "unspecified";
+ private static final String PRIVATE_NAMED_NETWORK = "private";
+ private static final String PUBLIC_NAMED_NETWORK = "public";
+
+ public static final String DIRECTION_INTERNAL = "internal";
+ public static final String DIRECTION_EXTERNAL = "external";
+ public static final String DIRECTION_INBOUND = "inbound";
+ public static final String DIRECTION_OUTBOUND = "outbound";
+
+ public static boolean isInternal(List networks, String ip) {
+ for (String network : networks) {
+ if (inNetwork(InetAddresses.forString(ip), network)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static boolean inNetwork(InetAddress address, String network) {
+ return switch (network) {
+ case LOOPBACK_NAMED_NETWORK -> isLoopback(address);
+ case GLOBAL_UNICAST_NAMED_NETWORK, UNICAST_NAMED_NETWORK -> isUnicast(address);
+ case LINK_LOCAL_UNICAST_NAMED_NETWORK -> isLinkLocalUnicast(address);
+ case INTERFACE_LOCAL_NAMED_NETWORK -> isInterfaceLocalMulticast(address);
+ case LINK_LOCAL_MULTICAST_NAMED_NETWORK -> isLinkLocalMulticast(address);
+ case MULTICAST_NAMED_NETWORK -> isMulticast(address);
+ case UNSPECIFIED_NAMED_NETWORK -> isUnspecified(address);
+ case PRIVATE_NAMED_NETWORK -> isPrivate(NetworkAddress.format(address));
+ case PUBLIC_NAMED_NETWORK -> isPublic(NetworkAddress.format(address));
+ default -> CIDRUtils.isInRange(NetworkAddress.format(address), network);
+ };
+ }
+
+ public static String getDirection(boolean sourceInternal, boolean destinationInternal) {
+ if (sourceInternal && destinationInternal) {
+ return DIRECTION_INTERNAL;
+ }
+ if (sourceInternal) {
+ return DIRECTION_OUTBOUND;
+ }
+ if (destinationInternal) {
+ return DIRECTION_INBOUND;
+ }
+ return DIRECTION_EXTERNAL;
+ }
+
+ private static boolean isLoopback(InetAddress ip) {
+ return ip.isLoopbackAddress();
+ }
+
+ private static boolean isUnicast(InetAddress ip) {
+ return Arrays.equals(ip.getAddress(), BROADCAST_IP4) == false
+ && isUnspecified(ip) == false
+ && isLoopback(ip) == false
+ && isMulticast(ip) == false
+ && isLinkLocalUnicast(ip) == false;
+ }
+
+ private static boolean isLinkLocalUnicast(InetAddress ip) {
+ return ip.isLinkLocalAddress();
+ }
+
+ private static boolean isInterfaceLocalMulticast(InetAddress ip) {
+ return ip.isMCNodeLocal();
+ }
+
+ private static boolean isLinkLocalMulticast(InetAddress ip) {
+ return ip.isMCLinkLocal();
+ }
+
+ private static boolean isMulticast(InetAddress ip) {
+ return ip.isMulticastAddress();
+ }
+
+ private static boolean isUnspecified(InetAddress ip) {
+ var address = ip.getAddress();
+ return Arrays.equals(UNDEFINED_IP4, address) || Arrays.equals(UNDEFINED_IP6, address);
+ }
+
+ private static boolean isPrivate(String ip) {
+ return CIDRUtils.isInRange(ip, "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "fd00::/8");
+ }
+
+ private static boolean isPublic(String ip) {
+ return isLocalOrPrivate(ip) == false;
+ }
+
+ private static boolean isLocalOrPrivate(String ip) {
+ var address = InetAddresses.forString(ip);
+ return isPrivate(ip)
+ || isLoopback(address)
+ || isUnspecified(address)
+ || isLinkLocalUnicast(address)
+ || isLinkLocalMulticast(address)
+ || isInterfaceLocalMulticast(address)
+ || Arrays.equals(address.getAddress(), BROADCAST_IP4);
+ }
+}
diff --git a/server/src/test/java/org/elasticsearch/common/network/NetworkDirectionUtilsTests.java b/server/src/test/java/org/elasticsearch/common/network/NetworkDirectionUtilsTests.java
new file mode 100644
index 0000000000000..38bd908920897
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/common/network/NetworkDirectionUtilsTests.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.common.network;
+
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.List;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class NetworkDirectionUtilsTests extends ESTestCase {
+ public void testCIDR() {
+ testNetworkDirectionUtils("10.0.1.1", "192.168.1.2", List.of("10.0.0.0/8"), "outbound");
+ testNetworkDirectionUtils("192.168.1.2", "10.0.1.1", List.of("10.0.0.0/8"), "inbound");
+ }
+
+ public void testUnspecified() {
+ testNetworkDirectionUtils("0.0.0.0", "0.0.0.0", List.of("unspecified"), "internal");
+ testNetworkDirectionUtils("::", "::", List.of("unspecified"), "internal");
+ }
+
+ public void testNetworkPrivate() {
+ testNetworkDirectionUtils("192.168.1.1", "192.168.1.2", List.of("private"), "internal");
+ testNetworkDirectionUtils("10.0.1.1", "192.168.1.2", List.of("private"), "internal");
+ testNetworkDirectionUtils("192.168.1.1", "172.16.0.1", List.of("private"), "internal");
+ testNetworkDirectionUtils("192.168.1.1", "fd12:3456:789a:1::1", List.of("private"), "internal");
+ }
+
+ public void testNetworkPublic() {
+ testNetworkDirectionUtils("192.168.1.1", "192.168.1.2", List.of("public"), "external");
+ testNetworkDirectionUtils("10.0.1.1", "192.168.1.2", List.of("public"), "external");
+ testNetworkDirectionUtils("192.168.1.1", "172.16.0.1", List.of("public"), "external");
+ testNetworkDirectionUtils("192.168.1.1", "fd12:3456:789a:1::1", List.of("public"), "external");
+ }
+
+ private void testNetworkDirectionUtils(String source, String destination, List networks, String expectedDirection) {
+ boolean sourceInternal = NetworkDirectionUtils.isInternal(networks, source);
+ boolean destinationInternal = NetworkDirectionUtils.isInternal(networks, destination);
+ assertThat(expectedDirection, equalTo(NetworkDirectionUtils.getDirection(sourceInternal, destinationInternal)));
+ }
+}
diff --git a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/argument/BlockArgument.java b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/argument/BlockArgument.java
index 42f0a1eaf04b4..8adbd7a40c2d2 100644
--- a/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/argument/BlockArgument.java
+++ b/x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/argument/BlockArgument.java
@@ -26,7 +26,7 @@ public TypeName dataType(boolean blockStyle) {
@Override
public String paramName(boolean blockStyle) {
- return name + (blockStyle ? "Block" : "Vector");
+ return name + "Block";
}
@Override
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec
index afb5baefab7b8..6f83b54606e05 100644
--- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/ip.csv-spec
@@ -729,3 +729,97 @@ row ip4 = to_ip("1.2.3.4")
ip4:ip | a:ip | b:ip | c:ip
1.2.3.4 | null | null | null
;
+
+networkDirectionSimple
+required_capability: network_direction
+
+FROM hosts
+| WHERE MV_COUNT(ip0) == 1 AND MV_COUNT(ip1) == 1
+| EVAL direction = NETWORK_DIRECTION(ip0, ip1, ["loopback"])
+| KEEP ip0, ip1, direction
+| SORT direction, ip0, ip1
+;
+
+ip0:ip | ip1:ip | direction:keyword
+fe80::cae2:65ff:fece:feb9 | fe81::cae2:65ff:fece:feb9 | external
+fe80::cae2:65ff:fece:feb9 | 127.0.0.3 | inbound
+::1 | ::1 | internal
+127.0.0.1 | ::1 | internal
+127.0.0.1 | 127.0.0.1 | internal
+127.0.0.1 | 127.0.0.2 | internal
+127.0.0.1 | 128.0.0.1 | outbound
+;
+
+networkDirectionFromRow
+required_capability: network_direction
+
+ROW ip0 = "1.2.3.4"::ip, ip1 = "5.6.7.8"::ip, networks = ["loopback", "private"]
+| EVAL direction = NETWORK_DIRECTION(ip0, ip1, networks)
+| DROP networks
+;
+
+ip0:ip | ip1:ip | direction:keyword
+1.2.3.4 | 5.6.7.8 | external
+;
+
+networkDirectionFromRowUsingAlias
+required_capability: network_direction
+
+ROW ip0 = "1.2.3.4"::ip, ip1 = "5.6.7.8"::ip, networks = ["loopback", "private"]
+| EVAL direction = NETDIR(ip0, ip1, networks)
+| DROP networks
+;
+
+ip0:ip | ip1:ip | direction:keyword
+1.2.3.4 | 5.6.7.8 | external
+;
+
+networkDirectionFromRowWithInlineNetworks
+required_capability: network_direction
+
+// tag::networkDirectionFromRowWithInlineNetworks[]
+ROW ip0 = "127.0.0.1"::ip, ip1 = "5.6.7.8"::ip
+| EVAL direction = NETWORK_DIRECTION(ip0, ip1, ["loopback", "private"])
+// end::networkDirectionFromRowWithInlineNetworks[]
+;
+
+// tag::networkDirectionFromRowWithInlineNetworks-result[]
+ip0:ip | ip1:ip | direction:keyword
+127.0.0.1 | 5.6.7.8 | outbound
+// end::networkDirectionFromRowWithInlineNetworks-result[]
+;
+
+networkDirectionFromIpPrefix
+required_capability: network_direction
+
+ROW ip0 = "192.168.1.123"::ip, ip1 = "8.8.8.8"::ip
+| EVAL direction = NETWORK_DIRECTION(IP_PREFIX(ip0, 24, 0), ip1, ["private"])
+| KEEP ip0, ip1, direction
+;
+
+ip0:ip | ip1:ip | direction:keyword
+192.168.1.123 | 8.8.8.8 | outbound
+;
+
+networkDirectionBadIp
+required_capability: network_direction
+
+ROW ip0 = null::ip, ip1 = "8.8.8.8"::ip
+| EVAL direction = NETWORK_DIRECTION(TO_IP(ip0), ip1, ["loopback"])
+;
+
+ip0:ip | ip1:ip | direction:keyword
+null | 8.8.8.8 | null
+;
+
+networkDirectionBadNetworksArray
+required_capability: network_direction
+
+ROW ip0 = "127.0.0.1"::ip, ip1 = "8.8.8.8"::ip
+| EVAL direction = NETWORK_DIRECTION(ip0, ip1, ["invalid_network"]);
+warning:Line 2:20: evaluation of [NETWORK_DIRECTION(ip0, ip1, [\"invalid_network\"])] failed, treating result as null. Only first 20 failures recorded.
+warning:Line 2:20: java.lang.IllegalArgumentException: 'invalid_network' is not an IP string literal.
+
+ip0:ip |ip1:ip |direction:keyword
+127.0.0.1 |8.8.8.8 |null
+;
diff --git a/x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/ip/NetworkDirectionEvaluator.java b/x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/ip/NetworkDirectionEvaluator.java
new file mode 100644
index 0000000000000..0dea21cab25d2
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/generated/org/elasticsearch/xpack/esql/expression/function/scalar/ip/NetworkDirectionEvaluator.java
@@ -0,0 +1,184 @@
+// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+// or more contributor license agreements. Licensed under the Elastic License
+// 2.0; you may not use this file except in compliance with the Elastic License
+// 2.0.
+package org.elasticsearch.xpack.esql.expression.function.scalar.ip;
+
+import java.lang.IllegalArgumentException;
+import java.lang.Override;
+import java.lang.String;
+import java.util.function.Function;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.operator.DriverContext;
+import org.elasticsearch.compute.operator.EvalOperator;
+import org.elasticsearch.compute.operator.Warnings;
+import org.elasticsearch.core.Releasables;
+import org.elasticsearch.xpack.esql.core.tree.Source;
+
+/**
+ * {@link EvalOperator.ExpressionEvaluator} implementation for {@link NetworkDirection}.
+ * This class is generated. Edit {@code EvaluatorImplementer} instead.
+ */
+public final class NetworkDirectionEvaluator implements EvalOperator.ExpressionEvaluator {
+ private static final long BASE_RAM_BYTES_USED = RamUsageEstimator.shallowSizeOfInstance(NetworkDirectionEvaluator.class);
+
+ private final Source source;
+
+ private final BytesRef scratch;
+
+ private final BytesRef netScratch;
+
+ private final EvalOperator.ExpressionEvaluator sourceIp;
+
+ private final EvalOperator.ExpressionEvaluator destinationIp;
+
+ private final EvalOperator.ExpressionEvaluator networks;
+
+ private final DriverContext driverContext;
+
+ private Warnings warnings;
+
+ public NetworkDirectionEvaluator(Source source, BytesRef scratch, BytesRef netScratch,
+ EvalOperator.ExpressionEvaluator sourceIp, EvalOperator.ExpressionEvaluator destinationIp,
+ EvalOperator.ExpressionEvaluator networks, DriverContext driverContext) {
+ this.source = source;
+ this.scratch = scratch;
+ this.netScratch = netScratch;
+ this.sourceIp = sourceIp;
+ this.destinationIp = destinationIp;
+ this.networks = networks;
+ this.driverContext = driverContext;
+ }
+
+ @Override
+ public Block eval(Page page) {
+ try (BytesRefBlock sourceIpBlock = (BytesRefBlock) sourceIp.eval(page)) {
+ try (BytesRefBlock destinationIpBlock = (BytesRefBlock) destinationIp.eval(page)) {
+ try (BytesRefBlock networksBlock = (BytesRefBlock) networks.eval(page)) {
+ return eval(page.getPositionCount(), sourceIpBlock, destinationIpBlock, networksBlock);
+ }
+ }
+ }
+ }
+
+ @Override
+ public long baseRamBytesUsed() {
+ long baseRamBytesUsed = BASE_RAM_BYTES_USED;
+ baseRamBytesUsed += sourceIp.baseRamBytesUsed();
+ baseRamBytesUsed += destinationIp.baseRamBytesUsed();
+ baseRamBytesUsed += networks.baseRamBytesUsed();
+ return baseRamBytesUsed;
+ }
+
+ public BytesRefBlock eval(int positionCount, BytesRefBlock sourceIpBlock,
+ BytesRefBlock destinationIpBlock, BytesRefBlock networksBlock) {
+ try(BytesRefBlock.Builder result = driverContext.blockFactory().newBytesRefBlockBuilder(positionCount)) {
+ BytesRef sourceIpScratch = new BytesRef();
+ BytesRef destinationIpScratch = new BytesRef();
+ position: for (int p = 0; p < positionCount; p++) {
+ boolean allBlocksAreNulls = true;
+ switch (sourceIpBlock.getValueCount(p)) {
+ case 0:
+ result.appendNull();
+ continue position;
+ case 1:
+ break;
+ default:
+ warnings().registerException(new IllegalArgumentException("single-value function encountered multi-value"));
+ result.appendNull();
+ continue position;
+ }
+ switch (destinationIpBlock.getValueCount(p)) {
+ case 0:
+ result.appendNull();
+ continue position;
+ case 1:
+ break;
+ default:
+ warnings().registerException(new IllegalArgumentException("single-value function encountered multi-value"));
+ result.appendNull();
+ continue position;
+ }
+ if (!networksBlock.isNull(p)) {
+ allBlocksAreNulls = false;
+ }
+ if (allBlocksAreNulls) {
+ result.appendNull();
+ continue position;
+ }
+ BytesRef sourceIp = sourceIpBlock.getBytesRef(sourceIpBlock.getFirstValueIndex(p), sourceIpScratch);
+ BytesRef destinationIp = destinationIpBlock.getBytesRef(destinationIpBlock.getFirstValueIndex(p), destinationIpScratch);
+ try {
+ NetworkDirection.process(result, this.scratch, this.netScratch, sourceIp, destinationIp, p, networksBlock);
+ } catch (IllegalArgumentException e) {
+ warnings().registerException(e);
+ result.appendNull();
+ }
+ }
+ return result.build();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "NetworkDirectionEvaluator[" + "sourceIp=" + sourceIp + ", destinationIp=" + destinationIp + ", networks=" + networks + "]";
+ }
+
+ @Override
+ public void close() {
+ Releasables.closeExpectNoException(sourceIp, destinationIp, networks);
+ }
+
+ private Warnings warnings() {
+ if (warnings == null) {
+ this.warnings = Warnings.createWarnings(
+ driverContext.warningsMode(),
+ source.source().getLineNumber(),
+ source.source().getColumnNumber(),
+ source.text()
+ );
+ }
+ return warnings;
+ }
+
+ static class Factory implements EvalOperator.ExpressionEvaluator.Factory {
+ private final Source source;
+
+ private final Function scratch;
+
+ private final Function netScratch;
+
+ private final EvalOperator.ExpressionEvaluator.Factory sourceIp;
+
+ private final EvalOperator.ExpressionEvaluator.Factory destinationIp;
+
+ private final EvalOperator.ExpressionEvaluator.Factory networks;
+
+ public Factory(Source source, Function scratch,
+ Function netScratch,
+ EvalOperator.ExpressionEvaluator.Factory sourceIp,
+ EvalOperator.ExpressionEvaluator.Factory destinationIp,
+ EvalOperator.ExpressionEvaluator.Factory networks) {
+ this.source = source;
+ this.scratch = scratch;
+ this.netScratch = netScratch;
+ this.sourceIp = sourceIp;
+ this.destinationIp = destinationIp;
+ this.networks = networks;
+ }
+
+ @Override
+ public NetworkDirectionEvaluator get(DriverContext context) {
+ return new NetworkDirectionEvaluator(source, scratch.apply(context), netScratch.apply(context), sourceIp.get(context), destinationIp.get(context), networks.get(context), context);
+ }
+
+ @Override
+ public String toString() {
+ return "NetworkDirectionEvaluator[" + "sourceIp=" + sourceIp + ", destinationIp=" + destinationIp + ", networks=" + networks + "]";
+ }
+ }
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
index 8dae1d8865efe..84a5103200a14 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
@@ -1506,6 +1506,11 @@ public enum Cap {
*/
DOTS_IN_FUSE,
+ /**
+ * Network direction function.
+ */
+ NETWORK_DIRECTION(Build.current().isSnapshot()),
+
/**
* Support for the literal {@code m} suffix as an alias for {@code minute} in temporal amounts.
*/
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java
index 2f4d72338b4fc..ba30131b4220b 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java
@@ -112,6 +112,7 @@
import org.elasticsearch.xpack.esql.expression.function.scalar.date.Now;
import org.elasticsearch.xpack.esql.expression.function.scalar.ip.CIDRMatch;
import org.elasticsearch.xpack.esql.expression.function.scalar.ip.IpPrefix;
+import org.elasticsearch.xpack.esql.expression.function.scalar.ip.NetworkDirection;
import org.elasticsearch.xpack.esql.expression.function.scalar.math.Abs;
import org.elasticsearch.xpack.esql.expression.function.scalar.math.Acos;
import org.elasticsearch.xpack.esql.expression.function.scalar.math.Asin;
@@ -460,6 +461,7 @@ private static FunctionDefinition[][] functions() {
// IP
new FunctionDefinition[] { def(CIDRMatch.class, CIDRMatch::new, "cidr_match") },
new FunctionDefinition[] { def(IpPrefix.class, IpPrefix::new, "ip_prefix") },
+ new FunctionDefinition[] { def(NetworkDirection.class, NetworkDirection::new, "network_direction", "netdir") },
// conversion functions
new FunctionDefinition[] {
def(FromBase64.class, FromBase64::new, "from_base64"),
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/ScalarFunctionWritables.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/ScalarFunctionWritables.java
index 961d577692aa0..5087b376b9649 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/ScalarFunctionWritables.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/ScalarFunctionWritables.java
@@ -25,6 +25,7 @@
import org.elasticsearch.xpack.esql.expression.function.scalar.date.Now;
import org.elasticsearch.xpack.esql.expression.function.scalar.ip.CIDRMatch;
import org.elasticsearch.xpack.esql.expression.function.scalar.ip.IpPrefix;
+import org.elasticsearch.xpack.esql.expression.function.scalar.ip.NetworkDirection;
import org.elasticsearch.xpack.esql.expression.function.scalar.math.Atan2;
import org.elasticsearch.xpack.esql.expression.function.scalar.math.CopySign;
import org.elasticsearch.xpack.esql.expression.function.scalar.math.E;
@@ -98,6 +99,7 @@ public static List getNamedWriteables() {
entries.add(Locate.ENTRY);
entries.add(Log.ENTRY);
entries.add(Md5.ENTRY);
+ entries.add(NetworkDirection.ENTRY);
entries.add(Now.ENTRY);
entries.add(Or.ENTRY);
entries.add(Pi.ENTRY);
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/ip/NetworkDirection.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/ip/NetworkDirection.java
new file mode 100644
index 0000000000000..5f16967a398c5
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/ip/NetworkDirection.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.expression.function.scalar.ip;
+
+import org.apache.lucene.document.InetAddressPoint;
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.network.NetworkDirectionUtils;
+import org.elasticsearch.compute.ann.Evaluator;
+import org.elasticsearch.compute.ann.Fixed;
+import org.elasticsearch.compute.ann.Position;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.operator.EvalOperator;
+import org.elasticsearch.xpack.esql.core.expression.Expression;
+import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
+import org.elasticsearch.xpack.esql.core.tree.Source;
+import org.elasticsearch.xpack.esql.core.type.DataType;
+import org.elasticsearch.xpack.esql.expression.function.Example;
+import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
+import org.elasticsearch.xpack.esql.expression.function.Param;
+import org.elasticsearch.xpack.esql.expression.function.scalar.EsqlScalarFunction;
+import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.elasticsearch.compute.ann.Fixed.Scope.THREAD_LOCAL;
+import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.FIRST;
+import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.SECOND;
+import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.THIRD;
+import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isIPAndExact;
+import static org.elasticsearch.xpack.esql.expression.EsqlTypeResolutions.isStringAndExact;
+
+/**
+ * Returns the direction type (inbound, outbound, internal, external) given
+ * a source IP address, destination IP address, and a list of internal networks.
+ */
+public class NetworkDirection extends EsqlScalarFunction {
+ public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
+ Expression.class,
+ "NetworkDirection",
+ NetworkDirection::new
+ );
+
+ private final Expression sourceIpField;
+ private final Expression destinationIpField;
+ private final Expression internalNetworks;
+
+ @FunctionInfo(
+ returnType = "keyword",
+ preview = true,
+ description = "Returns the direction type (inbound, outbound, internal, external) given "
+ + "a source IP address, destination IP address, and a list of internal networks.",
+ examples = @Example(file = "ip", tag = "networkDirectionFromRowWithInlineNetworks")
+ )
+ public NetworkDirection(
+ Source source,
+ @Param(
+ name = "source_ip",
+ type = { "ip" },
+ description = "Source IP address of type `ip` (both IPv4 and IPv6 are supported)."
+ ) Expression sourceIpField,
+ @Param(
+ name = "destination_ip",
+ type = { "ip" },
+ description = "Destination IP address of type `ip` (both IPv4 and IPv6 are supported)."
+ ) Expression destinationIpField,
+ @Param(
+ name = "internal_networks",
+ type = { "keyword", "text" },
+ description = "List of internal networks. Supports IPv4 and IPv6 addresses, ranges in CIDR notation, and named ranges."
+ ) Expression internalNetworks
+ ) {
+ super(source, Arrays.asList(sourceIpField, destinationIpField, internalNetworks));
+ this.sourceIpField = sourceIpField;
+ this.destinationIpField = destinationIpField;
+ this.internalNetworks = internalNetworks;
+ }
+
+ private NetworkDirection(StreamInput in) throws IOException {
+ this(
+ Source.readFrom((PlanStreamInput) in),
+ in.readNamedWriteable(Expression.class),
+ in.readNamedWriteable(Expression.class),
+ in.readNamedWriteable(Expression.class)
+ );
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ source().writeTo(out);
+ out.writeNamedWriteable(sourceIpField);
+ out.writeNamedWriteable(destinationIpField);
+ out.writeNamedWriteable(internalNetworks);
+ }
+
+ @Override
+ public String getWriteableName() {
+ return ENTRY.name;
+ }
+
+ @Override
+ public Expression replaceChildren(List newChildren) {
+ return new NetworkDirection(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2));
+ }
+
+ @Override
+ protected NodeInfo extends Expression> info() {
+ return NodeInfo.create(this, NetworkDirection::new, sourceIpField, destinationIpField, internalNetworks);
+ }
+
+ @Override
+ public EvalOperator.ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvaluator) {
+ var sourceIpEvaluatorSupplier = toEvaluator.apply(sourceIpField);
+ var destinationIpEvaluatorSupplier = toEvaluator.apply(destinationIpField);
+ var internalNetworksEvaluatorSupplier = toEvaluator.apply(internalNetworks);
+ return new NetworkDirectionEvaluator.Factory(
+ source(),
+ context -> new BytesRef(16),
+ context -> new BytesRef(),
+ sourceIpEvaluatorSupplier,
+ destinationIpEvaluatorSupplier,
+ internalNetworksEvaluatorSupplier
+ );
+ }
+
+ @Evaluator(warnExceptions = IllegalArgumentException.class)
+ static void process(
+ BytesRefBlock.Builder builder,
+ @Fixed(includeInToString = false, scope = THREAD_LOCAL) BytesRef scratch,
+ @Fixed(includeInToString = false, scope = THREAD_LOCAL) BytesRef netScratch,
+ BytesRef sourceIp,
+ BytesRef destinationIp,
+ @Position int position,
+ BytesRefBlock networks
+ ) {
+ int valueCount = networks.getValueCount(position);
+ if (valueCount == 0) {
+ builder.appendNull();
+ return;
+ }
+ int first = networks.getFirstValueIndex(position);
+
+ System.arraycopy(sourceIp.bytes, sourceIp.offset, scratch.bytes, 0, sourceIp.length);
+ InetAddress sourceIpAddress = InetAddressPoint.decode(scratch.bytes);
+ System.arraycopy(destinationIp.bytes, destinationIp.offset, scratch.bytes, 0, destinationIp.length);
+ InetAddress destinationIpAddress = InetAddressPoint.decode(scratch.bytes);
+
+ boolean sourceInternal = false;
+ boolean destinationInternal = false;
+
+ for (int i = first; i < first + valueCount; i++) {
+ if (NetworkDirectionUtils.inNetwork(sourceIpAddress, networks.getBytesRef(i, netScratch).utf8ToString())) {
+ sourceInternal = true;
+ break;
+ }
+ }
+ for (int i = first; i < first + valueCount; i++) {
+ if (NetworkDirectionUtils.inNetwork(destinationIpAddress, networks.getBytesRef(i, netScratch).utf8ToString())) {
+ destinationInternal = true;
+ break;
+ }
+ }
+
+ builder.appendBytesRef(new BytesRef(NetworkDirectionUtils.getDirection(sourceInternal, destinationInternal)));
+ }
+
+ @Override
+ public DataType dataType() {
+ return DataType.KEYWORD;
+ }
+
+ @Override
+ protected TypeResolution resolveType() {
+ if (childrenResolved() == false) {
+ return new TypeResolution("Unresolved children");
+ }
+
+ return isIPAndExact(sourceIpField, sourceText(), FIRST).and(isIPAndExact(destinationIpField, sourceText(), SECOND))
+ .and(isStringAndExact(internalNetworks, sourceText(), THIRD));
+ }
+
+ public Expression sourceIpField() {
+ return sourceIpField;
+ }
+
+ public Expression destinationIpField() {
+ return destinationIpField;
+ }
+
+ public Expression internalNetworks() {
+ return internalNetworks;
+ }
+
+ @Override
+ public boolean foldable() {
+ return sourceIpField.foldable() && destinationIpField.foldable() && internalNetworks.foldable();
+ }
+}
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/ip/NetworkDirectionErrorTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/ip/NetworkDirectionErrorTests.java
new file mode 100644
index 0000000000000..0ba8fff3bdb73
--- /dev/null
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/ip/NetworkDirectionErrorTests.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.expression.function.scalar.ip;
+
+import org.elasticsearch.xpack.esql.core.expression.Expression;
+import org.elasticsearch.xpack.esql.core.tree.Source;
+import org.elasticsearch.xpack.esql.core.type.DataType;
+import org.elasticsearch.xpack.esql.expression.function.ErrorsForCasesWithoutExamplesTestCase;
+import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier;
+import org.hamcrest.Matcher;
+
+import java.util.List;
+import java.util.Set;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class NetworkDirectionErrorTests extends ErrorsForCasesWithoutExamplesTestCase {
+ @Override
+ protected List cases() {
+ return paramsToSuppliers(NetworkDirectionTests.parameters());
+ }
+
+ @Override
+ protected Expression build(Source source, List args) {
+ return new NetworkDirection(source, args.get(0), args.get(1), args.get(2));
+ }
+
+ @Override
+ protected Matcher expectedTypeErrorMatcher(List> validPerPosition, List signature) {
+ return equalTo(typeErrorMessage(true, validPerPosition, signature, (v, p) -> switch (p) {
+ case 0, 1 -> "ip";
+ case 2 -> "string";
+ default -> "";
+ }));
+ }
+}
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/ip/NetworkDirectionSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/ip/NetworkDirectionSerializationTests.java
new file mode 100644
index 0000000000000..0ef46e8f6d4ca
--- /dev/null
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/ip/NetworkDirectionSerializationTests.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.expression.function.scalar.ip;
+
+import org.elasticsearch.xpack.esql.core.expression.Expression;
+import org.elasticsearch.xpack.esql.core.tree.Source;
+import org.elasticsearch.xpack.esql.expression.AbstractExpressionSerializationTests;
+
+import java.io.IOException;
+
+public class NetworkDirectionSerializationTests extends AbstractExpressionSerializationTests {
+
+ @Override
+ protected NetworkDirection createTestInstance() {
+ Source source = randomSource();
+ Expression sourceIpField = randomChild();
+ Expression destinationIpField = randomChild();
+ Expression internalNetworks = randomChild();
+ return new NetworkDirection(source, sourceIpField, destinationIpField, internalNetworks);
+ }
+
+ @Override
+ protected NetworkDirection mutateInstance(NetworkDirection instance) throws IOException {
+ Source source = instance.source();
+ Expression sourceIpField = instance.sourceIpField();
+ Expression destinationIpField = instance.destinationIpField();
+ Expression internalNetworks = instance.internalNetworks();
+ switch (between(0, 2)) {
+ case 0 -> sourceIpField = randomValueOtherThan(sourceIpField, AbstractExpressionSerializationTests::randomChild);
+ case 1 -> destinationIpField = randomValueOtherThan(destinationIpField, AbstractExpressionSerializationTests::randomChild);
+ case 2 -> internalNetworks = randomValueOtherThan(internalNetworks, AbstractExpressionSerializationTests::randomChild);
+ }
+
+ return new NetworkDirection(source, sourceIpField, destinationIpField, internalNetworks);
+ }
+}
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/ip/NetworkDirectionTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/ip/NetworkDirectionTests.java
new file mode 100644
index 0000000000000..3c6405360a5df
--- /dev/null
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/ip/NetworkDirectionTests.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.expression.function.scalar.ip;
+
+import com.carrotsearch.randomizedtesting.annotations.Name;
+import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.network.NetworkDirectionUtils;
+import org.elasticsearch.xpack.esql.core.expression.Expression;
+import org.elasticsearch.xpack.esql.core.tree.Source;
+import org.elasticsearch.xpack.esql.core.type.DataType;
+import org.elasticsearch.xpack.esql.expression.function.AbstractScalarFunctionTestCase;
+import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier;
+import org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class NetworkDirectionTests extends AbstractScalarFunctionTestCase {
+ public NetworkDirectionTests(@Name("TestCase") Supplier testCaseSupplier) {
+ this.testCase = testCaseSupplier.get();
+ }
+
+ @ParametersFactory
+ public static Iterable