Skip to content

Commit 7fbf9fa

Browse files
Copilotwandergeektobio
authored
Add reroute processor data source for Elasticsearch ingest pipelines (#1204)
* Initial plan * Add reroute processor data source implementation Co-authored-by: wandergeek <[email protected]> * Add comprehensive unit tests for reroute processor Co-authored-by: wandergeek <[email protected]> * Fix reroute processor fields and add documentation Co-authored-by: wandergeek <[email protected]> * make fmt * make docs-generate --------- Co-authored-by: copilot-swe-agent[bot] <[email protected]> Co-authored-by: wandergeek <[email protected]> Co-authored-by: Toby Brain <[email protected]>
1 parent 061e52c commit 7fbf9fa

File tree

10 files changed

+407
-0
lines changed

10 files changed

+407
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
## [Unreleased]
22

33
- Add support for `timeslice_metric_indicator` in `elasticstack_kibana_slo` ([#1195](https://github.com/elastic/terraform-provider-elasticstack/pull/1195))
4+
- Add `elasticstack_elasticsearch_ingest_processor_reroute` data source ([#678](https://github.com/elastic/terraform-provider-elasticstack/issues/678))
45

56
## [0.11.16] - 2025-07-09
67

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
---
2+
subcategory: "Ingest"
3+
layout: ""
4+
page_title: "Elasticstack: elasticstack_elasticsearch_ingest_processor_reroute Data Source"
5+
description: |-
6+
Helper data source to create a processor which reroutes a document to a different data stream, index, or index alias.
7+
---
8+
9+
# Data Source: elasticstack_elasticsearch_ingest_processor_reroute
10+
11+
Reroutes a document to a different data stream, index, or index alias. This processor is useful for routing documents based on data stream routing rules.
12+
13+
See: https://www.elastic.co/guide/en/elasticsearch/reference/current/reroute-processor.html
14+
15+
16+
## Example Usage
17+
18+
```terraform
19+
provider "elasticstack" {
20+
elasticsearch {}
21+
}
22+
23+
data "elasticstack_elasticsearch_ingest_processor_reroute" "reroute" {
24+
destination = "logs-generic-default"
25+
dataset = "generic"
26+
namespace = "default"
27+
}
28+
29+
resource "elasticstack_elasticsearch_ingest_pipeline" "my_ingest_pipeline" {
30+
name = "reroute-ingest"
31+
32+
processors = [
33+
data.elasticstack_elasticsearch_ingest_processor_reroute.reroute.json
34+
]
35+
}
36+
```
37+
38+
<!-- schema generated by tfplugindocs -->
39+
## Schema
40+
41+
### Optional
42+
43+
- `dataset` (String) The destination dataset to route the document to.
44+
- `description` (String) Description of the processor.
45+
- `destination` (String) The destination data stream, index, or index alias to route the document to.
46+
- `if` (String) Conditionally execute the processor
47+
- `ignore_failure` (Boolean) Ignore failures for the processor.
48+
- `namespace` (String) The destination namespace to route the document to.
49+
- `on_failure` (List of String) Handle failures for the processor.
50+
- `tag` (String) Identifier for the processor.
51+
52+
### Read-Only
53+
54+
- `id` (String) Internal identifier of the resource.
55+
- `json` (String) JSON representation of this data source.
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
provider "elasticstack" {
2+
elasticsearch {}
3+
}
4+
5+
data "elasticstack_elasticsearch_ingest_processor_reroute" "reroute" {
6+
destination = "logs-generic-default"
7+
dataset = "generic"
8+
namespace = "default"
9+
}
10+
11+
resource "elasticstack_elasticsearch_ingest_pipeline" "my_ingest_pipeline" {
12+
name = "reroute-ingest"
13+
14+
processors = [
15+
data.elasticstack_elasticsearch_ingest_processor_reroute.reroute.json
16+
]
17+
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package ingest
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"strings"
7+
8+
"github.com/elastic/terraform-provider-elasticstack/internal/models"
9+
"github.com/elastic/terraform-provider-elasticstack/internal/utils"
10+
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
11+
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
12+
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
13+
)
14+
15+
func DataSourceProcessorReroute() *schema.Resource {
16+
processorSchema := map[string]*schema.Schema{
17+
"id": {
18+
Description: "Internal identifier of the resource.",
19+
Type: schema.TypeString,
20+
Computed: true,
21+
},
22+
"destination": {
23+
Description: "The destination data stream, index, or index alias to route the document to.",
24+
Type: schema.TypeString,
25+
Optional: true,
26+
},
27+
"dataset": {
28+
Description: "The destination dataset to route the document to.",
29+
Type: schema.TypeString,
30+
Optional: true,
31+
},
32+
"namespace": {
33+
Description: "The destination namespace to route the document to.",
34+
Type: schema.TypeString,
35+
Optional: true,
36+
},
37+
"description": {
38+
Description: "Description of the processor. ",
39+
Type: schema.TypeString,
40+
Optional: true,
41+
},
42+
"if": {
43+
Description: "Conditionally execute the processor",
44+
Type: schema.TypeString,
45+
Optional: true,
46+
},
47+
"ignore_failure": {
48+
Description: "Ignore failures for the processor. ",
49+
Type: schema.TypeBool,
50+
Optional: true,
51+
Default: false,
52+
},
53+
"on_failure": {
54+
Description: "Handle failures for the processor.",
55+
Type: schema.TypeList,
56+
Optional: true,
57+
MinItems: 1,
58+
Elem: &schema.Schema{
59+
Type: schema.TypeString,
60+
ValidateFunc: validation.StringIsJSON,
61+
DiffSuppressFunc: utils.DiffJsonSuppress,
62+
},
63+
},
64+
"tag": {
65+
Description: "Identifier for the processor.",
66+
Type: schema.TypeString,
67+
Optional: true,
68+
},
69+
"json": {
70+
Description: "JSON representation of this data source.",
71+
Type: schema.TypeString,
72+
Computed: true,
73+
},
74+
}
75+
76+
return &schema.Resource{
77+
Description: "Reroutes a document to a different data stream, index, or index alias. See: https://www.elastic.co/guide/en/elasticsearch/reference/current/reroute-processor.html",
78+
79+
ReadContext: dataSourceProcessorRerouteRead,
80+
81+
Schema: processorSchema,
82+
}
83+
}
84+
85+
func dataSourceProcessorRerouteRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
86+
var diags diag.Diagnostics
87+
88+
processor := &models.ProcessorReroute{}
89+
90+
processor.IgnoreFailure = d.Get("ignore_failure").(bool)
91+
92+
if v, ok := d.GetOk("destination"); ok {
93+
processor.Destination = v.(string)
94+
}
95+
if v, ok := d.GetOk("dataset"); ok {
96+
processor.Dataset = v.(string)
97+
}
98+
if v, ok := d.GetOk("namespace"); ok {
99+
processor.Namespace = v.(string)
100+
}
101+
102+
if v, ok := d.GetOk("description"); ok {
103+
processor.Description = v.(string)
104+
}
105+
if v, ok := d.GetOk("if"); ok {
106+
processor.If = v.(string)
107+
}
108+
if v, ok := d.GetOk("tag"); ok {
109+
processor.Tag = v.(string)
110+
}
111+
if v, ok := d.GetOk("on_failure"); ok {
112+
onFailure := make([]map[string]interface{}, len(v.([]interface{})))
113+
for i, f := range v.([]interface{}) {
114+
item := make(map[string]interface{})
115+
if err := json.NewDecoder(strings.NewReader(f.(string))).Decode(&item); err != nil {
116+
return diag.FromErr(err)
117+
}
118+
onFailure[i] = item
119+
}
120+
processor.OnFailure = onFailure
121+
}
122+
123+
processorJson, err := json.MarshalIndent(map[string]*models.ProcessorReroute{"reroute": processor}, "", " ")
124+
if err != nil {
125+
return diag.FromErr(err)
126+
}
127+
if err := d.Set("json", string(processorJson)); err != nil {
128+
return diag.FromErr(err)
129+
}
130+
131+
hash, err := utils.StringToHash(string(processorJson))
132+
if err != nil {
133+
return diag.FromErr(err)
134+
}
135+
136+
d.SetId(*hash)
137+
138+
return diags
139+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package ingest_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/elastic/terraform-provider-elasticstack/internal/acctest"
7+
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
8+
)
9+
10+
func TestAccDataSourceIngestProcessorReroute(t *testing.T) {
11+
resource.Test(t, resource.TestCase{
12+
PreCheck: func() { acctest.PreCheck(t) },
13+
ProtoV6ProviderFactories: acctest.Providers,
14+
Steps: []resource.TestStep{
15+
{
16+
Config: testAccDataSourceIngestProcessorReroute,
17+
Check: resource.ComposeTestCheckFunc(
18+
CheckResourceJson("data.elasticstack_elasticsearch_ingest_processor_reroute.test", "json", expectedJsonReroute),
19+
),
20+
},
21+
},
22+
})
23+
}
24+
25+
const expectedJsonReroute = `{
26+
"reroute": {
27+
"ignore_failure": false,
28+
"destination": "logs-generic-default"
29+
}
30+
}`
31+
32+
const testAccDataSourceIngestProcessorReroute = `
33+
provider "elasticstack" {
34+
elasticsearch {}
35+
}
36+
37+
data "elasticstack_elasticsearch_ingest_processor_reroute" "test" {
38+
destination = "logs-generic-default"
39+
}
40+
`
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package ingest
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func TestDataSourceProcessorReroute_Unit(t *testing.T) {
12+
// Test that we can create and call the data source
13+
resource := DataSourceProcessorReroute()
14+
15+
assert.NotNil(t, resource)
16+
assert.Contains(t, resource.Description, "reroute")
17+
assert.Contains(t, resource.Schema, "destination")
18+
assert.Contains(t, resource.Schema, "dataset")
19+
assert.Contains(t, resource.Schema, "namespace")
20+
assert.Contains(t, resource.Schema, "json")
21+
22+
// Test data source read function
23+
d := schema.TestResourceDataRaw(t, resource.Schema, map[string]interface{}{
24+
"destination": "target-index",
25+
"dataset": "logs",
26+
"namespace": "production",
27+
"description": "Test reroute processor",
28+
})
29+
30+
ctx := context.Background()
31+
diags := resource.ReadContext(ctx, d, nil)
32+
33+
assert.False(t, diags.HasError(), "Data source read should not have errors")
34+
assert.NotEmpty(t, d.Get("json"))
35+
assert.NotEmpty(t, d.Id())
36+
37+
jsonOutput := d.Get("json").(string)
38+
assert.Contains(t, jsonOutput, "reroute")
39+
assert.Contains(t, jsonOutput, "target-index")
40+
assert.Contains(t, jsonOutput, "logs")
41+
assert.Contains(t, jsonOutput, "production")
42+
}
43+
44+
func TestDataSourceProcessorReroute_MinimalConfig(t *testing.T) {
45+
resource := DataSourceProcessorReroute()
46+
47+
// Test with just a destination
48+
d := schema.TestResourceDataRaw(t, resource.Schema, map[string]interface{}{
49+
"destination": "minimal-index",
50+
})
51+
52+
ctx := context.Background()
53+
diags := resource.ReadContext(ctx, d, nil)
54+
55+
assert.False(t, diags.HasError(), "Data source read should not have errors")
56+
assert.NotEmpty(t, d.Get("json"))
57+
58+
jsonOutput := d.Get("json").(string)
59+
assert.Contains(t, jsonOutput, "minimal-index")
60+
assert.Contains(t, jsonOutput, "ignore_failure")
61+
}
62+
63+
func TestDataSourceProcessorReroute_AllFields(t *testing.T) {
64+
resource := DataSourceProcessorReroute()
65+
66+
// Test with all optional fields
67+
d := schema.TestResourceDataRaw(t, resource.Schema, map[string]interface{}{
68+
"destination": "all-fields-index",
69+
"dataset": "metrics",
70+
"namespace": "development",
71+
"description": "Full processor test",
72+
"if": "ctx.field != null",
73+
"ignore_failure": true,
74+
"tag": "reroute-tag",
75+
"on_failure": []interface{}{`{"set": {"field": "error", "value": "reroute_failed"}}`},
76+
})
77+
78+
ctx := context.Background()
79+
diags := resource.ReadContext(ctx, d, nil)
80+
81+
assert.False(t, diags.HasError(), "Data source read should not have errors")
82+
assert.NotEmpty(t, d.Get("json"))
83+
84+
jsonOutput := d.Get("json").(string)
85+
assert.Contains(t, jsonOutput, "all-fields-index")
86+
assert.Contains(t, jsonOutput, "metrics")
87+
assert.Contains(t, jsonOutput, "development")
88+
assert.Contains(t, jsonOutput, "Full processor test")
89+
assert.Contains(t, jsonOutput, "ctx.field != null")
90+
assert.Contains(t, jsonOutput, "reroute-tag")
91+
assert.Contains(t, jsonOutput, "on_failure")
92+
}

internal/models/ingest.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,14 @@ type ProcessorRename struct {
260260
ProcessortFields
261261
}
262262

263+
type ProcessorReroute struct {
264+
CommonProcessor
265+
266+
Destination string `json:"destination,omitempty"`
267+
Dataset string `json:"dataset,omitempty"`
268+
Namespace string `json:"namespace,omitempty"`
269+
}
270+
263271
type ProcessorScript struct {
264272
CommonProcessor
265273

0 commit comments

Comments
 (0)