Skip to content

Commit 7991ad0

Browse files
committed
Add ML Datafeed resource
1 parent ea1e9e8 commit 7991ad0

33 files changed

+3465
-285
lines changed
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
2+
---
3+
# generated by https://github.com/hashicorp/terraform-plugin-docs
4+
page_title: "elasticstack_elasticsearch_ml_datafeed Resource - terraform-provider-elasticstack"
5+
subcategory: "Ml"
6+
description: |-
7+
Creates and manages Machine Learning datafeeds. Datafeeds retrieve data from Elasticsearch for analysis by an anomaly detection job. Each anomaly detection job can have only one associated datafeed. See the ML Datafeed API documentation https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-put-datafeed.html for more details.
8+
---
9+
10+
# elasticstack_elasticsearch_ml_datafeed (Resource)
11+
12+
Creates and manages Machine Learning datafeeds. Datafeeds retrieve data from Elasticsearch for analysis by an anomaly detection job. Each anomaly detection job can have only one associated datafeed. See the [ML Datafeed API documentation](https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-put-datafeed.html) for more details.
13+
14+
## Example Usage
15+
16+
```terraform
17+
# Basic ML Datafeed
18+
resource "elasticstack_elasticsearch_ml_datafeed" "basic" {
19+
datafeed_id = "my-basic-datafeed"
20+
job_id = elasticstack_elasticsearch_ml_anomaly_detector.example.job_id
21+
indices = ["log-data-*"]
22+
23+
query = jsonencode({
24+
match_all = {}
25+
})
26+
}
27+
28+
# Comprehensive ML Datafeed with all options
29+
resource "elasticstack_elasticsearch_ml_datafeed" "comprehensive" {
30+
datafeed_id = "my-comprehensive-datafeed"
31+
job_id = elasticstack_elasticsearch_ml_anomaly_detector.example.job_id
32+
indices = ["app-logs-*", "system-logs-*"]
33+
34+
query = jsonencode({
35+
bool = {
36+
must = [
37+
{
38+
range = {
39+
"@timestamp" = {
40+
gte = "now-1h"
41+
}
42+
}
43+
},
44+
{
45+
term = {
46+
"status" = "error"
47+
}
48+
}
49+
]
50+
}
51+
})
52+
53+
scroll_size = 1000
54+
frequency = "30s"
55+
query_delay = "60s"
56+
max_empty_searches = 10
57+
58+
chunking_config {
59+
mode = "manual"
60+
time_span = "30m"
61+
}
62+
63+
delayed_data_check_config {
64+
enabled = true
65+
check_window = "2h"
66+
}
67+
68+
indices_options {
69+
ignore_unavailable = true
70+
allow_no_indices = false
71+
expand_wildcards = ["open", "closed"]
72+
}
73+
74+
runtime_mappings = jsonencode({
75+
"hour_of_day" = {
76+
"type" = "long"
77+
"script" = {
78+
"source" = "emit(doc['@timestamp'].value.getHour())"
79+
}
80+
}
81+
})
82+
83+
script_fields = jsonencode({
84+
"my_script_field" = {
85+
"script" = {
86+
"source" = "_score * doc['my_field'].value"
87+
}
88+
}
89+
})
90+
}
91+
92+
# Required ML Job for the datafeed
93+
resource "elasticstack_elasticsearch_ml_anomaly_detector" "example" {
94+
job_id = "example-anomaly-job"
95+
description = "Example anomaly detection job"
96+
97+
analysis_config {
98+
bucket_span = "15m"
99+
detectors {
100+
function = "count"
101+
}
102+
}
103+
104+
data_description {
105+
time_field = "@timestamp"
106+
time_format = "epoch_ms"
107+
}
108+
}
109+
```
110+
111+
<!-- schema generated by tfplugindocs -->
112+
## Schema
113+
114+
### Required
115+
116+
- `datafeed_id` (String) A numerical character string that uniquely identifies the datafeed. This identifier can contain lowercase alphanumeric characters (a-z and 0-9), hyphens, and underscores. It must start and end with alphanumeric characters.
117+
- `indices` (List of String) An array of index names. Wildcards are supported. If any of the indices are in remote clusters, the machine learning nodes must have the `remote_cluster_client` role.
118+
- `job_id` (String) Identifier for the anomaly detection job. The job must exist before creating the datafeed.
119+
120+
### Optional
121+
122+
- `aggregations` (String) If set, the datafeed performs aggregation searches. Support for aggregations is limited and should be used only with low cardinality data. This should be a JSON object representing the aggregations to be performed.
123+
- `chunking_config` (Attributes) Datafeeds might search over long time periods, for several months or years. This search is split into time chunks in order to ensure the load on Elasticsearch is managed. Chunking configuration controls how the size of these time chunks are calculated; it is an advanced configuration option. (see [below for nested schema](#nestedatt--chunking_config))
124+
- `delayed_data_check_config` (Attributes) Specifies whether the datafeed checks for missing data and the size of the window. The datafeed can optionally search over indices that have already been read in an effort to determine whether any data has subsequently been added to the index. If missing data is found, it is a good indication that the `query_delay` is set too low and the data is being indexed after the datafeed has passed that moment in time. This check runs only on real-time datafeeds. (see [below for nested schema](#nestedatt--delayed_data_check_config))
125+
- `elasticsearch_connection` (Block List, Deprecated) Elasticsearch connection configuration block. (see [below for nested schema](#nestedblock--elasticsearch_connection))
126+
- `frequency` (String) The interval at which scheduled queries are made while the datafeed runs in real time. The default value is either the bucket span for short bucket spans, or, for longer bucket spans, a sensible fraction of the bucket span. When `frequency` is shorter than the bucket span, interim results for the last (partial) bucket are written then eventually overwritten by the full bucket results. If the datafeed uses aggregations, this value must be divisible by the interval of the date histogram aggregation.
127+
- `indices_options` (Attributes) Specifies index expansion options that are used during search. (see [below for nested schema](#nestedatt--indices_options))
128+
- `max_empty_searches` (Number) If a real-time datafeed has never seen any data (including during any initial training period), it automatically stops and closes the associated job after this many real-time searches return no documents. In other words, it stops after `frequency` times `max_empty_searches` of real-time operation. If not set, a datafeed with no end time that sees no data remains started until it is explicitly stopped.
129+
- `query` (String) The Elasticsearch query domain-specific language (DSL). This value corresponds to the query object in an Elasticsearch search POST body. All the options that are supported by Elasticsearch can be used, as this object is passed verbatim to Elasticsearch. By default uses `{"match_all": {"boost": 1}}`.
130+
- `query_delay` (String) The number of seconds behind real time that data is queried. For example, if data from 10:04 a.m. might not be searchable in Elasticsearch until 10:06 a.m., set this property to 120 seconds. The default value is randomly selected between `60s` and `120s`. This randomness improves the query performance when there are multiple jobs running on the same node.
131+
- `runtime_mappings` (String) Specifies runtime fields for the datafeed search. This should be a JSON object representing the runtime field mappings.
132+
- `script_fields` (String) Specifies scripts that evaluate custom expressions and returns script fields to the datafeed. The detector configuration objects in a job can contain functions that use these script fields. This should be a JSON object representing the script fields.
133+
- `scroll_size` (Number) The size parameter that is used in Elasticsearch searches when the datafeed does not use aggregations. The maximum value is the value of `index.max_result_window`, which is 10,000 by default.
134+
135+
### Read-Only
136+
137+
- `id` (String) Internal identifier of the resource
138+
139+
<a id="nestedatt--chunking_config"></a>
140+
### Nested Schema for `chunking_config`
141+
142+
Required:
143+
144+
- `mode` (String) The chunking mode. Can be `auto`, `manual`, or `off`. In `auto` mode, the chunk size is dynamically calculated. In `manual` mode, chunking is applied according to the specified `time_span`. In `off` mode, no chunking is applied.
145+
146+
Optional:
147+
148+
- `time_span` (String) The time span for each chunk. Only applicable and required when mode is `manual`. Must be a valid duration.
149+
150+
151+
<a id="nestedatt--delayed_data_check_config"></a>
152+
### Nested Schema for `delayed_data_check_config`
153+
154+
Optional:
155+
156+
- `check_window` (String) The window of time that is searched for late data. This window of time ends with the latest finalized bucket. It defaults to null, which causes an appropriate `check_window` to be calculated when the real-time datafeed runs.
157+
- `enabled` (Boolean) Specifies whether the datafeed periodically checks for delayed data.
158+
159+
160+
<a id="nestedblock--elasticsearch_connection"></a>
161+
### Nested Schema for `elasticsearch_connection`
162+
163+
Optional:
164+
165+
- `api_key` (String, Sensitive) API Key to use for authentication to Elasticsearch
166+
- `bearer_token` (String, Sensitive) Bearer Token to use for authentication to Elasticsearch
167+
- `ca_data` (String) PEM-encoded custom Certificate Authority certificate
168+
- `ca_file` (String) Path to a custom Certificate Authority certificate
169+
- `cert_data` (String) PEM encoded certificate for client auth
170+
- `cert_file` (String) Path to a file containing the PEM encoded certificate for client auth
171+
- `endpoints` (List of String, Sensitive) A list of endpoints where the terraform provider will point to, this must include the http(s) schema and port number.
172+
- `es_client_authentication` (String, Sensitive) ES Client Authentication field to be used with the JWT token
173+
- `headers` (Map of String, Sensitive) A list of headers to be sent with each request to Elasticsearch.
174+
- `insecure` (Boolean) Disable TLS certificate validation
175+
- `key_data` (String, Sensitive) PEM encoded private key for client auth
176+
- `key_file` (String) Path to a file containing the PEM encoded private key for client auth
177+
- `password` (String, Sensitive) Password to use for API authentication to Elasticsearch.
178+
- `username` (String) Username to use for API authentication to Elasticsearch.
179+
180+
181+
<a id="nestedatt--indices_options"></a>
182+
### Nested Schema for `indices_options`
183+
184+
Optional:
185+
186+
- `allow_no_indices` (Boolean) If true, wildcard indices expressions that resolve into no concrete indices are ignored. This includes the `_all` string or when no indices are specified.
187+
- `expand_wildcards` (List of String) Type of index that wildcard patterns can match. If the request can target data streams, this argument determines whether wildcard expressions match hidden data streams. Supports comma-separated values.
188+
- `ignore_throttled` (Boolean, Deprecated) If true, concrete, expanded, or aliased indices are ignored when frozen. This setting is deprecated.
189+
- `ignore_unavailable` (Boolean) If true, unavailable indices (missing or closed) are ignored.
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
# Basic ML Datafeed
2+
resource "elasticstack_elasticsearch_ml_datafeed" "basic" {
3+
datafeed_id = "my-basic-datafeed"
4+
job_id = elasticstack_elasticsearch_ml_anomaly_detector.example.job_id
5+
indices = ["log-data-*"]
6+
7+
query = jsonencode({
8+
match_all = {}
9+
})
10+
}
11+
12+
# Comprehensive ML Datafeed with all options
13+
resource "elasticstack_elasticsearch_ml_datafeed" "comprehensive" {
14+
datafeed_id = "my-comprehensive-datafeed"
15+
job_id = elasticstack_elasticsearch_ml_anomaly_detector.example.job_id
16+
indices = ["app-logs-*", "system-logs-*"]
17+
18+
query = jsonencode({
19+
bool = {
20+
must = [
21+
{
22+
range = {
23+
"@timestamp" = {
24+
gte = "now-1h"
25+
}
26+
}
27+
},
28+
{
29+
term = {
30+
"status" = "error"
31+
}
32+
}
33+
]
34+
}
35+
})
36+
37+
scroll_size = 1000
38+
frequency = "30s"
39+
query_delay = "60s"
40+
max_empty_searches = 10
41+
42+
chunking_config {
43+
mode = "manual"
44+
time_span = "30m"
45+
}
46+
47+
delayed_data_check_config {
48+
enabled = true
49+
check_window = "2h"
50+
}
51+
52+
indices_options {
53+
ignore_unavailable = true
54+
allow_no_indices = false
55+
expand_wildcards = ["open", "closed"]
56+
}
57+
58+
runtime_mappings = jsonencode({
59+
"hour_of_day" = {
60+
"type" = "long"
61+
"script" = {
62+
"source" = "emit(doc['@timestamp'].value.getHour())"
63+
}
64+
}
65+
})
66+
67+
script_fields = jsonencode({
68+
"my_script_field" = {
69+
"script" = {
70+
"source" = "_score * doc['my_field'].value"
71+
}
72+
}
73+
})
74+
}
75+
76+
# Required ML Job for the datafeed
77+
resource "elasticstack_elasticsearch_ml_anomaly_detector" "example" {
78+
job_id = "example-anomaly-job"
79+
description = "Example anomaly detection job"
80+
81+
analysis_config {
82+
bucket_span = "15m"
83+
detectors {
84+
function = "count"
85+
}
86+
}
87+
88+
data_description {
89+
time_field = "@timestamp"
90+
time_format = "epoch_ms"
91+
}
92+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package asyncutils
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/hashicorp/terraform-plugin-log/tflog"
9+
)
10+
11+
// StateChecker is a function that checks if a resource is in the desired state.
12+
// It should return true if the resource is in the desired state, false otherwise, and any error that occurred during the check.
13+
type StateChecker func(ctx context.Context) (isDesiredState bool, err error)
14+
15+
// WaitForStateTransition waits for a resource to reach the desired state by polling its current state.
16+
// It uses exponential backoff with a maximum interval to avoid overwhelming the API.
17+
func WaitForStateTransition(ctx context.Context, resourceType, resourceId string, stateChecker StateChecker) error {
18+
const pollInterval = 2 * time.Second
19+
ticker := time.NewTicker(pollInterval)
20+
defer ticker.Stop()
21+
22+
for {
23+
select {
24+
case <-ctx.Done():
25+
return ctx.Err()
26+
case <-ticker.C:
27+
isInDesiredState, err := stateChecker(ctx)
28+
if err != nil {
29+
return fmt.Errorf("failed to check state during wait: %w", err)
30+
}
31+
if isInDesiredState {
32+
return nil
33+
}
34+
35+
tflog.Debug(ctx, fmt.Sprintf("Waiting for %s %s to reach desired state...", resourceType, resourceId))
36+
}
37+
}
38+
}

0 commit comments

Comments
 (0)