diff --git a/docs/reference/query-languages/esql/_snippets/functions/description/first.md b/docs/reference/query-languages/esql/_snippets/functions/description/first.md
new file mode 100644
index 0000000000000..c0e07fd848d6f
--- /dev/null
+++ b/docs/reference/query-languages/esql/_snippets/functions/description/first.md
@@ -0,0 +1,6 @@
+% This is generated by ESQL's AbstractFunctionTestCase. Do not edit it. See ../README.md for how to regenerate it.
+
+**Description**
+
+The earliest value of a field.
+
diff --git a/docs/reference/query-languages/esql/_snippets/functions/description/last.md b/docs/reference/query-languages/esql/_snippets/functions/description/last.md
new file mode 100644
index 0000000000000..23449350e80d9
--- /dev/null
+++ b/docs/reference/query-languages/esql/_snippets/functions/description/last.md
@@ -0,0 +1,6 @@
+% This is generated by ESQL's AbstractFunctionTestCase. Do not edit it. See ../README.md for how to regenerate it.
+
+**Description**
+
+The latest value of a field.
+
diff --git a/docs/reference/query-languages/esql/_snippets/functions/examples/first.md b/docs/reference/query-languages/esql/_snippets/functions/examples/first.md
new file mode 100644
index 0000000000000..cb993f3ba8678
--- /dev/null
+++ b/docs/reference/query-languages/esql/_snippets/functions/examples/first.md
@@ -0,0 +1,17 @@
+% This is generated by ESQL's AbstractFunctionTestCase. Do not edit it. See ../README.md for how to regenerate it.
+
+**Example**
+
+```esql
+FROM k8s
+| STATS first_bytes_in = FIRST(network.bytes_in, @timestamp) BY pod
+| SORT pod ASC
+```
+
+| first_bytes_in:long | pod:keyword |
+| --- | --- |
+| 278 | one |
+| 473 | three |
+| 699 | two |
+
+
diff --git a/docs/reference/query-languages/esql/_snippets/functions/examples/last.md b/docs/reference/query-languages/esql/_snippets/functions/examples/last.md
new file mode 100644
index 0000000000000..f98fa98687d4e
--- /dev/null
+++ b/docs/reference/query-languages/esql/_snippets/functions/examples/last.md
@@ -0,0 +1,17 @@
+% This is generated by ESQL's AbstractFunctionTestCase. Do not edit it. See ../README.md for how to regenerate it.
+
+**Example**
+
+```esql
+FROM k8s
+| STATS last_bytes_in = LAST(network.bytes_in, @timestamp) BY pod
+| SORT pod ASC
+```
+
+| last_bytes_in:long | pod:keyword |
+| --- | --- |
+| 206 | one |
+| 972 | three |
+| 812 | two |
+
+
diff --git a/docs/reference/query-languages/esql/_snippets/functions/layout/first.md b/docs/reference/query-languages/esql/_snippets/functions/layout/first.md
new file mode 100644
index 0000000000000..c8324c342fc83
--- /dev/null
+++ b/docs/reference/query-languages/esql/_snippets/functions/layout/first.md
@@ -0,0 +1,26 @@
+% This is generated by ESQL's AbstractFunctionTestCase. Do not edit it. See ../README.md for how to regenerate it.
+
+## `FIRST` [esql-first]
+```{applies_to}
+stack: unavailable
+```
+
+**Syntax**
+
+:::{image} ../../../images/functions/first.svg
+:alt: Embedded
+:class: text-center
+:::
+
+
+:::{include} ../parameters/first.md
+:::
+
+:::{include} ../description/first.md
+:::
+
+:::{include} ../types/first.md
+:::
+
+:::{include} ../examples/first.md
+:::
diff --git a/docs/reference/query-languages/esql/_snippets/functions/layout/last.md b/docs/reference/query-languages/esql/_snippets/functions/layout/last.md
new file mode 100644
index 0000000000000..319fd0cea4c4a
--- /dev/null
+++ b/docs/reference/query-languages/esql/_snippets/functions/layout/last.md
@@ -0,0 +1,26 @@
+% This is generated by ESQL's AbstractFunctionTestCase. Do not edit it. See ../README.md for how to regenerate it.
+
+## `LAST` [esql-last]
+```{applies_to}
+stack: unavailable
+```
+
+**Syntax**
+
+:::{image} ../../../images/functions/last.svg
+:alt: Embedded
+:class: text-center
+:::
+
+
+:::{include} ../parameters/last.md
+:::
+
+:::{include} ../description/last.md
+:::
+
+:::{include} ../types/last.md
+:::
+
+:::{include} ../examples/last.md
+:::
diff --git a/docs/reference/query-languages/esql/_snippets/functions/parameters/first.md b/docs/reference/query-languages/esql/_snippets/functions/parameters/first.md
new file mode 100644
index 0000000000000..78c98d3923a2e
--- /dev/null
+++ b/docs/reference/query-languages/esql/_snippets/functions/parameters/first.md
@@ -0,0 +1,10 @@
+% This is generated by ESQL's AbstractFunctionTestCase. Do not edit it. See ../README.md for how to regenerate it.
+
+**Parameters**
+
+`value`
+: Values to return
+
+`sort`
+: Sort key
+
diff --git a/docs/reference/query-languages/esql/_snippets/functions/parameters/last.md b/docs/reference/query-languages/esql/_snippets/functions/parameters/last.md
new file mode 100644
index 0000000000000..78c98d3923a2e
--- /dev/null
+++ b/docs/reference/query-languages/esql/_snippets/functions/parameters/last.md
@@ -0,0 +1,10 @@
+% This is generated by ESQL's AbstractFunctionTestCase. Do not edit it. See ../README.md for how to regenerate it.
+
+**Parameters**
+
+`value`
+: Values to return
+
+`sort`
+: Sort key
+
diff --git a/docs/reference/query-languages/esql/_snippets/functions/types/first.md b/docs/reference/query-languages/esql/_snippets/functions/types/first.md
new file mode 100644
index 0000000000000..4571d108fc722
--- /dev/null
+++ b/docs/reference/query-languages/esql/_snippets/functions/types/first.md
@@ -0,0 +1,13 @@
+% This is generated by ESQL's AbstractFunctionTestCase. Do not edit it. See ../README.md for how to regenerate it.
+
+**Supported types**
+
+| value | sort | result |
+| --- | --- | --- |
+| double | date | double |
+| double | date_nanos | double |
+| integer | date | integer |
+| integer | date_nanos | integer |
+| long | date | long |
+| long | date_nanos | long |
+
diff --git a/docs/reference/query-languages/esql/_snippets/functions/types/last.md b/docs/reference/query-languages/esql/_snippets/functions/types/last.md
new file mode 100644
index 0000000000000..4571d108fc722
--- /dev/null
+++ b/docs/reference/query-languages/esql/_snippets/functions/types/last.md
@@ -0,0 +1,13 @@
+% This is generated by ESQL's AbstractFunctionTestCase. Do not edit it. See ../README.md for how to regenerate it.
+
+**Supported types**
+
+| value | sort | result |
+| --- | --- | --- |
+| double | date | double |
+| double | date_nanos | double |
+| integer | date | integer |
+| integer | date_nanos | integer |
+| long | date | long |
+| long | date_nanos | long |
+
diff --git a/docs/reference/query-languages/esql/images/functions/first.svg b/docs/reference/query-languages/esql/images/functions/first.svg
new file mode 100644
index 0000000000000..39054c3f02196
--- /dev/null
+++ b/docs/reference/query-languages/esql/images/functions/first.svg
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/docs/reference/query-languages/esql/images/functions/last.svg b/docs/reference/query-languages/esql/images/functions/last.svg
new file mode 100644
index 0000000000000..40a575dc92de3
--- /dev/null
+++ b/docs/reference/query-languages/esql/images/functions/last.svg
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/docs/reference/query-languages/esql/kibana/definition/functions/first.json b/docs/reference/query-languages/esql/kibana/definition/functions/first.json
new file mode 100644
index 0000000000000..06e68c53c3bde
--- /dev/null
+++ b/docs/reference/query-languages/esql/kibana/definition/functions/first.json
@@ -0,0 +1,121 @@
+{
+ "comment" : "This is generated by ESQL's AbstractFunctionTestCase. Do not edit it. See ../README.md for how to regenerate it.",
+ "type" : "agg",
+ "name" : "first",
+ "description" : "The earliest value of a field.",
+ "signatures" : [
+ {
+ "params" : [
+ {
+ "name" : "value",
+ "type" : "double",
+ "optional" : false,
+ "description" : "Values to return"
+ },
+ {
+ "name" : "sort",
+ "type" : "date",
+ "optional" : false,
+ "description" : "Sort key"
+ }
+ ],
+ "variadic" : false,
+ "returnType" : "double"
+ },
+ {
+ "params" : [
+ {
+ "name" : "value",
+ "type" : "double",
+ "optional" : false,
+ "description" : "Values to return"
+ },
+ {
+ "name" : "sort",
+ "type" : "date_nanos",
+ "optional" : false,
+ "description" : "Sort key"
+ }
+ ],
+ "variadic" : false,
+ "returnType" : "double"
+ },
+ {
+ "params" : [
+ {
+ "name" : "value",
+ "type" : "integer",
+ "optional" : false,
+ "description" : "Values to return"
+ },
+ {
+ "name" : "sort",
+ "type" : "date",
+ "optional" : false,
+ "description" : "Sort key"
+ }
+ ],
+ "variadic" : false,
+ "returnType" : "integer"
+ },
+ {
+ "params" : [
+ {
+ "name" : "value",
+ "type" : "integer",
+ "optional" : false,
+ "description" : "Values to return"
+ },
+ {
+ "name" : "sort",
+ "type" : "date_nanos",
+ "optional" : false,
+ "description" : "Sort key"
+ }
+ ],
+ "variadic" : false,
+ "returnType" : "integer"
+ },
+ {
+ "params" : [
+ {
+ "name" : "value",
+ "type" : "long",
+ "optional" : false,
+ "description" : "Values to return"
+ },
+ {
+ "name" : "sort",
+ "type" : "date",
+ "optional" : false,
+ "description" : "Sort key"
+ }
+ ],
+ "variadic" : false,
+ "returnType" : "long"
+ },
+ {
+ "params" : [
+ {
+ "name" : "value",
+ "type" : "long",
+ "optional" : false,
+ "description" : "Values to return"
+ },
+ {
+ "name" : "sort",
+ "type" : "date_nanos",
+ "optional" : false,
+ "description" : "Sort key"
+ }
+ ],
+ "variadic" : false,
+ "returnType" : "long"
+ }
+ ],
+ "examples" : [
+ "FROM k8s\n| STATS first_bytes_in = FIRST(network.bytes_in, @timestamp) BY pod\n| SORT pod ASC"
+ ],
+ "preview" : false,
+ "snapshot_only" : true
+}
diff --git a/docs/reference/query-languages/esql/kibana/definition/functions/last.json b/docs/reference/query-languages/esql/kibana/definition/functions/last.json
new file mode 100644
index 0000000000000..b1bf80789f9a5
--- /dev/null
+++ b/docs/reference/query-languages/esql/kibana/definition/functions/last.json
@@ -0,0 +1,121 @@
+{
+ "comment" : "This is generated by ESQL's AbstractFunctionTestCase. Do not edit it. See ../README.md for how to regenerate it.",
+ "type" : "agg",
+ "name" : "last",
+ "description" : "The latest value of a field.",
+ "signatures" : [
+ {
+ "params" : [
+ {
+ "name" : "value",
+ "type" : "double",
+ "optional" : false,
+ "description" : "Values to return"
+ },
+ {
+ "name" : "sort",
+ "type" : "date",
+ "optional" : false,
+ "description" : "Sort key"
+ }
+ ],
+ "variadic" : false,
+ "returnType" : "double"
+ },
+ {
+ "params" : [
+ {
+ "name" : "value",
+ "type" : "double",
+ "optional" : false,
+ "description" : "Values to return"
+ },
+ {
+ "name" : "sort",
+ "type" : "date_nanos",
+ "optional" : false,
+ "description" : "Sort key"
+ }
+ ],
+ "variadic" : false,
+ "returnType" : "double"
+ },
+ {
+ "params" : [
+ {
+ "name" : "value",
+ "type" : "integer",
+ "optional" : false,
+ "description" : "Values to return"
+ },
+ {
+ "name" : "sort",
+ "type" : "date",
+ "optional" : false,
+ "description" : "Sort key"
+ }
+ ],
+ "variadic" : false,
+ "returnType" : "integer"
+ },
+ {
+ "params" : [
+ {
+ "name" : "value",
+ "type" : "integer",
+ "optional" : false,
+ "description" : "Values to return"
+ },
+ {
+ "name" : "sort",
+ "type" : "date_nanos",
+ "optional" : false,
+ "description" : "Sort key"
+ }
+ ],
+ "variadic" : false,
+ "returnType" : "integer"
+ },
+ {
+ "params" : [
+ {
+ "name" : "value",
+ "type" : "long",
+ "optional" : false,
+ "description" : "Values to return"
+ },
+ {
+ "name" : "sort",
+ "type" : "date",
+ "optional" : false,
+ "description" : "Sort key"
+ }
+ ],
+ "variadic" : false,
+ "returnType" : "long"
+ },
+ {
+ "params" : [
+ {
+ "name" : "value",
+ "type" : "long",
+ "optional" : false,
+ "description" : "Values to return"
+ },
+ {
+ "name" : "sort",
+ "type" : "date_nanos",
+ "optional" : false,
+ "description" : "Sort key"
+ }
+ ],
+ "variadic" : false,
+ "returnType" : "long"
+ }
+ ],
+ "examples" : [
+ "FROM k8s\n| STATS last_bytes_in = LAST(network.bytes_in, @timestamp) BY pod\n| SORT pod ASC"
+ ],
+ "preview" : false,
+ "snapshot_only" : true
+}
diff --git a/docs/reference/query-languages/esql/kibana/docs/functions/first.md b/docs/reference/query-languages/esql/kibana/docs/functions/first.md
new file mode 100644
index 0000000000000..65e4dcb11e3d6
--- /dev/null
+++ b/docs/reference/query-languages/esql/kibana/docs/functions/first.md
@@ -0,0 +1,10 @@
+% This is generated by ESQL's AbstractFunctionTestCase. Do not edit it. See ../README.md for how to regenerate it.
+
+### FIRST
+The earliest value of a field.
+
+```esql
+FROM k8s
+| STATS first_bytes_in = FIRST(network.bytes_in, @timestamp) BY pod
+| SORT pod ASC
+```
diff --git a/docs/reference/query-languages/esql/kibana/docs/functions/last.md b/docs/reference/query-languages/esql/kibana/docs/functions/last.md
new file mode 100644
index 0000000000000..db0e6fd8c26b5
--- /dev/null
+++ b/docs/reference/query-languages/esql/kibana/docs/functions/last.md
@@ -0,0 +1,10 @@
+% This is generated by ESQL's AbstractFunctionTestCase. Do not edit it. See ../README.md for how to regenerate it.
+
+### LAST
+The latest value of a field.
+
+```esql
+FROM k8s
+| STATS last_bytes_in = LAST(network.bytes_in, @timestamp) BY pod
+| SORT pod ASC
+```
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats_first.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats_first.csv-spec
new file mode 100644
index 0000000000000..4ebdd9ea98e2f
--- /dev/null
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats_first.csv-spec
@@ -0,0 +1,91 @@
+long_by_date
+required_capability: agg_first_last
+// tag::first[]
+FROM k8s
+| STATS first_bytes_in = FIRST(network.bytes_in, @timestamp) BY pod
+| SORT pod ASC
+// end::first[]
+;
+
+// tag::first-result[]
+first_bytes_in:long | pod:keyword
+ 278 | one
+ 473 | three
+ 699 | two
+// end::first-result[]
+;
+
+double_by_date
+required_capability: agg_first_last
+FROM k8s
+| STATS first_network_cost = FIRST(network.cost, @timestamp) BY pod
+| SORT pod ASC
+;
+
+first_network_cost:double | pod:keyword
+ 5.375 | one
+ 1.250 | three
+ 9.375 | two
+;
+
+by_date_nanos
+required_capability: agg_first_last
+FROM date_nanos
+| STATS first_num = FIRST(num, nanos)
+;
+
+ first_num:long
+ 0
+;
+
+
+row
+required_capability: agg_first_last
+ROW a = 100, @timestamp = "2025-01-01"::DATETIME
+| STATS FIRST(a, @timestamp)
+;
+
+FIRST(a, @timestamp):integer
+100
+;
+
+double_by_null
+required_capability: agg_first_last
+FROM k8s
+| EVAL @timestamp = @timestamp + null
+| STATS first_network_cost = FIRST(network.cost, @timestamp) BY pod
+| SORT pod ASC
+;
+
+first_network_cost:double | pod:keyword
+ null | one
+ null | three
+ null | two
+;
+
+null_by_timestamp
+required_capability: agg_first_last
+FROM k8s
+| EVAL network.cost = network.cost + null
+| STATS first_network_cost = FIRST(network.cost, @timestamp) BY pod
+| SORT pod ASC
+;
+
+first_network_cost:double | pod:keyword
+ null | one
+ null | three
+ null | two
+;
+
+skips_null
+required_capability: agg_first_last
+ROW v = ["null", "3", "2"]
+| MV_EXPAND v
+| EVAL v = CASE(v == "null", null, v::INTEGER)
+| EVAL @timestamp = CONCAT("2025-08-0", COALESCE(v::KEYWORD, "1"))::DATE
+| STATS FIRST(v, @timestamp)
+;
+
+FIRST(v, @timestamp):integer
+ 2
+;
diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats_last.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats_last.csv-spec
new file mode 100644
index 0000000000000..2f82bb7f83ce0
--- /dev/null
+++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/stats_last.csv-spec
@@ -0,0 +1,91 @@
+long_by_date
+required_capability: agg_first_last
+// tag::last[]
+FROM k8s
+| STATS last_bytes_in = LAST(network.bytes_in, @timestamp) BY pod
+| SORT pod ASC
+// end::last[]
+;
+
+// tag::last-result[]
+last_bytes_in:long | pod:keyword
+ 206 | one
+ 972 | three
+ 812 | two
+// end::last-result[]
+;
+
+double_by_date
+required_capability: agg_first_last
+FROM k8s
+| STATS last_network_cost = LAST(network.cost, @timestamp) BY pod
+| SORT pod ASC
+;
+
+last_network_cost:double | pod:keyword
+ 6.250 | one
+ 10.875 | three
+ 10.750 | two
+;
+
+by_date_nanos
+required_capability: agg_first_last
+FROM date_nanos
+| STATS last_num = LAST(num, nanos)
+;
+
+ last_num:long
+1698069301543123456
+;
+
+
+row
+required_capability: agg_first_last
+ROW a = 100, @timestamp = "2025-01-01"::DATETIME
+| STATS LAST(a, @timestamp)
+;
+
+LAST(a, @timestamp):integer
+100
+;
+
+double_by_null
+required_capability: agg_first_last
+FROM k8s
+| EVAL @timestamp = @timestamp + null
+| STATS last_network_cost = LAST(network.cost, @timestamp) BY pod
+| SORT pod ASC
+;
+
+last_network_cost:double | pod:keyword
+ null | one
+ null | three
+ null | two
+;
+
+null_by_timestamp
+required_capability: agg_first_last
+FROM k8s
+| EVAL network.cost = network.cost + null
+| STATS last_network_cost = LAST(network.cost, @timestamp) BY pod
+| SORT pod ASC
+;
+
+last_network_cost:double | pod:keyword
+ null | one
+ null | three
+ null | two
+;
+
+skips_null
+required_capability: agg_first_last
+ROW v = ["null", "1", "2"]
+| MV_EXPAND v
+| EVAL v = CASE(v == "null", null, v::INTEGER)
+| EVAL @timestamp = CONCAT("2025-08-0", COALESCE(v::KEYWORD, "3"))::DATE
+| STATS LAST(v, @timestamp)
+;
+
+LAST(v, @timestamp):integer
+ 2
+;
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
index 3b62a43b6d9d8..a09a9177203c4 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
@@ -1337,6 +1337,11 @@ public enum Cap {
*/
CATEGORIZE_OPTIONS,
+ /**
+ * FIRST and LAST aggregate functions.
+ */
+ AGG_FIRST_LAST(Build.current().isSnapshot()),
+
/**
* Support correct counting of skipped shards.
*/
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java
index 959602cc4e20d..0eca67f625121 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java
@@ -25,7 +25,9 @@
import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinct;
import org.elasticsearch.xpack.esql.expression.function.aggregate.CountDistinctOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.CountOverTime;
+import org.elasticsearch.xpack.esql.expression.function.aggregate.First;
import org.elasticsearch.xpack.esql.expression.function.aggregate.FirstOverTime;
+import org.elasticsearch.xpack.esql.expression.function.aggregate.Last;
import org.elasticsearch.xpack.esql.expression.function.aggregate.LastOverTime;
import org.elasticsearch.xpack.esql.expression.function.aggregate.Max;
import org.elasticsearch.xpack.esql.expression.function.aggregate.MaxOverTime;
@@ -475,6 +477,8 @@ private static FunctionDefinition[][] snapshotFunctions() {
// The delay() function is for debug/snapshot environments only and should never be enabled in a non-snapshot build.
// This is an experimental function and can be removed without notice.
def(Delay.class, Delay::new, "delay"),
+ def(First.class, bi(First::new), "first"),
+ def(Last.class, bi(Last::new), "last"),
def(Rate.class, uni(Rate::new), "rate"),
def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"),
def(MinOverTime.class, uni(MinOverTime::new), "min_over_time"),
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java
index 7dfacafbc2c53..82ddc388b2f16 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateWritables.java
@@ -18,6 +18,7 @@ public static List getNamedWriteables() {
Avg.ENTRY,
Count.ENTRY,
CountDistinct.ENTRY,
+ First.ENTRY,
Max.ENTRY,
Median.ENTRY,
MedianAbsoluteDeviation.ENTRY,
@@ -34,6 +35,7 @@ public static List getNamedWriteables() {
MinOverTime.ENTRY,
MaxOverTime.ENTRY,
AvgOverTime.ENTRY,
+ Last.ENTRY,
LastOverTime.ENTRY,
FirstOverTime.ENTRY,
SumOverTime.ENTRY,
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/First.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/First.java
new file mode 100644
index 0000000000000..95ca792634ed9
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/First.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.expression.function.aggregate;
+
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier;
+import org.elasticsearch.compute.aggregation.FirstDoubleByTimestampAggregatorFunctionSupplier;
+import org.elasticsearch.compute.aggregation.FirstFloatByTimestampAggregatorFunctionSupplier;
+import org.elasticsearch.compute.aggregation.FirstIntByTimestampAggregatorFunctionSupplier;
+import org.elasticsearch.compute.aggregation.FirstLongByTimestampAggregatorFunctionSupplier;
+import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
+import org.elasticsearch.xpack.esql.core.expression.Expression;
+import org.elasticsearch.xpack.esql.core.expression.Literal;
+import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
+import org.elasticsearch.xpack.esql.core.tree.Source;
+import org.elasticsearch.xpack.esql.core.type.DataType;
+import org.elasticsearch.xpack.esql.expression.function.Example;
+import org.elasticsearch.xpack.esql.expression.function.FunctionAppliesTo;
+import org.elasticsearch.xpack.esql.expression.function.FunctionAppliesToLifecycle;
+import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
+import org.elasticsearch.xpack.esql.expression.function.FunctionType;
+import org.elasticsearch.xpack.esql.expression.function.Param;
+import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
+import org.elasticsearch.xpack.esql.planner.ToAggregator;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
+import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.SECOND;
+import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
+
+public class First extends AggregateFunction implements ToAggregator {
+ public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "First", First::readFrom);
+
+ private final Expression sort;
+
+ // TODO: support all types of values
+ @FunctionInfo(
+ type = FunctionType.AGGREGATE,
+ returnType = { "long", "integer", "double" },
+ description = "The earliest value of a field.",
+ appliesTo = @FunctionAppliesTo(lifeCycle = FunctionAppliesToLifecycle.UNAVAILABLE),
+ examples = @Example(file = "stats_first", tag = "first")
+ )
+ public First(
+ Source source,
+ @Param(name = "value", type = { "long", "integer", "double" }, description = "Values to return") Expression field,
+ @Param(name = "sort", type = { "date", "date_nanos" }, description = "Sort key") Expression sort
+ ) {
+ this(source, field, Literal.TRUE, sort);
+ }
+
+ private First(Source source, Expression field, Expression filter, Expression sort) {
+ super(source, field, filter, List.of(sort));
+ this.sort = sort;
+ }
+
+ private static First readFrom(StreamInput in) throws IOException {
+ Source source = Source.readFrom((PlanStreamInput) in);
+ Expression field = in.readNamedWriteable(Expression.class);
+ Expression filter = in.readNamedWriteable(Expression.class);
+ List params = in.readNamedWriteableCollectionAsList(Expression.class);
+ Expression sort = params.getFirst();
+ return new First(source, field, filter, sort);
+ }
+
+ @Override
+ public String getWriteableName() {
+ return ENTRY.name;
+ }
+
+ @Override
+ protected NodeInfo info() {
+ return NodeInfo.create(this, First::new, field(), sort);
+ }
+
+ @Override
+ public First replaceChildren(List newChildren) {
+ return new First(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2));
+ }
+
+ @Override
+ public First withFilter(Expression filter) {
+ return new First(source(), field(), filter, sort);
+ }
+
+ @Override
+ public DataType dataType() {
+ return field().dataType();
+ }
+
+ @Override
+ protected TypeResolution resolveType() {
+ return isType(field(), dt -> dt.isNumeric() && dt != DataType.UNSIGNED_LONG, sourceText(), DEFAULT, "numeric except unsigned_long")
+ .and(
+ isType(
+ sort,
+ dt -> dt == DataType.LONG || dt == DataType.DATETIME || dt == DataType.DATE_NANOS,
+ sourceText(),
+ SECOND,
+ "long or date_nanos or datetime"
+ )
+ );
+ }
+
+ @Override
+ public AggregatorFunctionSupplier supplier() {
+ final DataType type = field().dataType();
+ return switch (type) {
+ case LONG -> new FirstLongByTimestampAggregatorFunctionSupplier();
+ case INTEGER -> new FirstIntByTimestampAggregatorFunctionSupplier();
+ case DOUBLE -> new FirstDoubleByTimestampAggregatorFunctionSupplier();
+ case FLOAT -> new FirstFloatByTimestampAggregatorFunctionSupplier();
+ default -> throw EsqlIllegalArgumentException.illegalDataType(type);
+ };
+ }
+
+ @Override
+ public String toString() {
+ return "first(" + field() + ", " + sort + ")";
+ }
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Last.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Last.java
new file mode 100644
index 0000000000000..d775ed089714e
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Last.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.expression.function.aggregate;
+
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.compute.aggregation.AggregatorFunctionSupplier;
+import org.elasticsearch.compute.aggregation.LastDoubleByTimestampAggregatorFunctionSupplier;
+import org.elasticsearch.compute.aggregation.LastFloatByTimestampAggregatorFunctionSupplier;
+import org.elasticsearch.compute.aggregation.LastIntByTimestampAggregatorFunctionSupplier;
+import org.elasticsearch.compute.aggregation.LastLongByTimestampAggregatorFunctionSupplier;
+import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
+import org.elasticsearch.xpack.esql.core.expression.Expression;
+import org.elasticsearch.xpack.esql.core.expression.Literal;
+import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
+import org.elasticsearch.xpack.esql.core.tree.Source;
+import org.elasticsearch.xpack.esql.core.type.DataType;
+import org.elasticsearch.xpack.esql.expression.function.Example;
+import org.elasticsearch.xpack.esql.expression.function.FunctionAppliesTo;
+import org.elasticsearch.xpack.esql.expression.function.FunctionAppliesToLifecycle;
+import org.elasticsearch.xpack.esql.expression.function.FunctionInfo;
+import org.elasticsearch.xpack.esql.expression.function.FunctionType;
+import org.elasticsearch.xpack.esql.expression.function.Param;
+import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput;
+import org.elasticsearch.xpack.esql.planner.ToAggregator;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.DEFAULT;
+import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.ParamOrdinal.SECOND;
+import static org.elasticsearch.xpack.esql.core.expression.TypeResolutions.isType;
+
+public class Last extends AggregateFunction implements ToAggregator {
+ public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Last", Last::readFrom);
+
+ private final Expression sort;
+
+ // TODO: support all types
+ @FunctionInfo(
+ type = FunctionType.AGGREGATE,
+ returnType = { "long", "integer", "double" },
+ description = "The latest value of a field.",
+ appliesTo = @FunctionAppliesTo(lifeCycle = FunctionAppliesToLifecycle.UNAVAILABLE),
+ examples = @Example(file = "stats_last", tag = "last")
+ )
+ public Last(
+ Source source,
+ @Param(name = "value", type = { "long", "integer", "double" }, description = "Values to return") Expression field,
+ @Param(name = "sort", type = { "date", "date_nanos" }, description = "Sort key") Expression sort
+ ) {
+ this(source, field, Literal.TRUE, sort);
+ }
+
+ private Last(Source source, Expression field, Expression filter, Expression sort) {
+ super(source, field, filter, List.of(sort));
+ this.sort = sort;
+ }
+
+ private static Last readFrom(StreamInput in) throws IOException {
+ Source source = Source.readFrom((PlanStreamInput) in);
+ Expression field = in.readNamedWriteable(Expression.class);
+ Expression filter = in.readNamedWriteable(Expression.class);
+ List params = in.readNamedWriteableCollectionAsList(Expression.class);
+ Expression sort = params.getFirst();
+ return new Last(source, field, filter, sort);
+ }
+
+ @Override
+ public String getWriteableName() {
+ return ENTRY.name;
+ }
+
+ @Override
+ protected NodeInfo info() {
+ return NodeInfo.create(this, Last::new, field(), sort);
+ }
+
+ @Override
+ public Last replaceChildren(List newChildren) {
+ return new Last(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2));
+ }
+
+ @Override
+ public Last withFilter(Expression filter) {
+ return new Last(source(), field(), filter, sort);
+ }
+
+ @Override
+ public DataType dataType() {
+ return field().dataType();
+ }
+
+ @Override
+ protected TypeResolution resolveType() {
+ return isType(field(), dt -> dt.isNumeric() && dt != DataType.UNSIGNED_LONG, sourceText(), DEFAULT, "numeric except unsigned_long")
+ .and(
+ isType(
+ sort,
+ dt -> dt == DataType.LONG || dt == DataType.DATETIME || dt == DataType.DATE_NANOS,
+ sourceText(),
+ SECOND,
+ "long or date_nanos or datetime"
+ )
+ );
+ }
+
+ @Override
+ public AggregatorFunctionSupplier supplier() {
+ final DataType type = field().dataType();
+ return switch (type) {
+ case LONG -> new LastLongByTimestampAggregatorFunctionSupplier();
+ case INTEGER -> new LastIntByTimestampAggregatorFunctionSupplier();
+ case DOUBLE -> new LastDoubleByTimestampAggregatorFunctionSupplier();
+ case FLOAT -> new LastFloatByTimestampAggregatorFunctionSupplier();
+ default -> throw EsqlIllegalArgumentException.illegalDataType(type);
+ };
+ }
+
+ @Override
+ public String toString() {
+ return "last(" + field() + ", " + sort + ")";
+ }
+}
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/MultiRowTestCaseSupplier.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/MultiRowTestCaseSupplier.java
index ed32fcd480fd5..3ffe2619526bb 100644
--- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/MultiRowTestCaseSupplier.java
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/MultiRowTestCaseSupplier.java
@@ -26,6 +26,7 @@
import static org.elasticsearch.test.ESTestCase.randomBoolean;
import static org.elasticsearch.test.ESTestCase.randomList;
+import static org.elasticsearch.xpack.esql.core.util.NumericUtils.UNSIGNED_LONG_MAX;
import static org.elasticsearch.xpack.esql.core.util.SpatialCoordinateTypes.GEO;
import static org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier.TypedDataSupplier;
@@ -36,6 +37,22 @@ public final class MultiRowTestCaseSupplier {
private MultiRowTestCaseSupplier() {}
+ /**
+ * A {@link List} of the cases for the specified type without any limits.
+ */
+ public static List unlimitedSuppliers(DataType type, int minRows, int maxRows) {
+ return switch (type) {
+ case DATETIME -> dateCases(minRows, maxRows);
+ case DATE_NANOS -> dateNanosCases(minRows, maxRows);
+ case INTEGER -> intCases(minRows, maxRows, Integer.MIN_VALUE, Integer.MAX_VALUE, true);
+ case LONG -> longCases(minRows, maxRows, Long.MIN_VALUE, Long.MAX_VALUE, true);
+ case UNSIGNED_LONG -> ulongCases(minRows, maxRows, BigInteger.ZERO, UNSIGNED_LONG_MAX, true);
+ case DOUBLE -> doubleCases(minRows, maxRows, -Double.MAX_VALUE, Double.MAX_VALUE, true);
+ // If a type is missing here it's safe to them as you need them
+ default -> throw new IllegalArgumentException("unsupported type [" + type + "]");
+ };
+ }
+
public static List nullCases(int minRows, int maxRows) {
List cases = new ArrayList<>();
diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstTests.java
new file mode 100644
index 0000000000000..1577ca2872b75
--- /dev/null
+++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstTests.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.expression.function.aggregate;
+
+import com.carrotsearch.randomizedtesting.annotations.Name;
+import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
+
+import org.elasticsearch.common.collect.Iterators;
+import org.elasticsearch.xpack.esql.core.expression.Expression;
+import org.elasticsearch.xpack.esql.core.tree.Source;
+import org.elasticsearch.xpack.esql.core.type.DataType;
+import org.elasticsearch.xpack.esql.expression.function.AbstractAggregationTestCase;
+import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier;
+import org.hamcrest.Matchers;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import static org.elasticsearch.xpack.esql.expression.function.MultiRowTestCaseSupplier.unlimitedSuppliers;
+import static org.hamcrest.Matchers.anyOf;
+
+public class FirstTests extends AbstractAggregationTestCase {
+ public FirstTests(@Name("TestCase") Supplier testCaseSupplier) {
+ this.testCase = testCaseSupplier.get();
+ }
+
+ @ParametersFactory
+ public static Iterable