diff --git a/.github/workflows/integration_test.yml b/.github/workflows/integration_test.yml index 8d67825199..f7c511b144 100644 --- a/.github/workflows/integration_test.yml +++ b/.github/workflows/integration_test.yml @@ -26,6 +26,7 @@ jobs: - ./internal/impl/cockroachdb # - ./internal/impl/couchbase - ./internal/impl/elasticsearch/v8 + - ./internal/impl/elasticsearch/v9 # - ./internal/impl/gcp - ./internal/impl/gcp/enterprise # - ./internal/impl/gcp/enterprise/changestreams diff --git a/docs/modules/components/pages/outputs/elasticsearch_v8.adoc b/docs/modules/components/pages/outputs/elasticsearch_v8.adoc index 35e4eeb645..1cd95aacda 100644 --- a/docs/modules/components/pages/outputs/elasticsearch_v8.adoc +++ b/docs/modules/components/pages/outputs/elasticsearch_v8.adoc @@ -113,7 +113,7 @@ output: processors: - mapping: | meta id = this.id - # Performs a partial update ont he document. + # Performs a partial update on the document. root.doc = this elasticsearch_v8: urls: [localhost:9200] diff --git a/docs/modules/components/pages/outputs/elasticsearch_v9.adoc b/docs/modules/components/pages/outputs/elasticsearch_v9.adoc new file mode 100644 index 0000000000..876c60921a --- /dev/null +++ b/docs/modules/components/pages/outputs/elasticsearch_v9.adoc @@ -0,0 +1,630 @@ += elasticsearch_v9 +:type: output +:status: stable +:categories: ["Services"] + + + +//// + THIS FILE IS AUTOGENERATED! + + To make changes, edit the corresponding source file under: + + https://github.com/redpanda-data/connect/tree/main/internal/impl/. + + And: + + https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl +//// + +// © 2024 Redpanda Data Inc. + + +component_type_dropdown::[] + + +Publishes messages into an Elasticsearch index. If the index does not exist then it is created with a dynamic mapping. + + +[tabs] +====== +Common:: ++ +-- + +```yml +# Common config fields, showing default values +output: + label: "" + elasticsearch_v9: + urls: [] # No default (required) + index: "" # No default (required) + action: "" # No default (required) + id: ${!counter()}-${!timestamp_unix()} # No default (required) + max_in_flight: 64 + batching: + count: 0 + byte_size: 0 + period: "" + check: "" +``` + +-- +Advanced:: ++ +-- + +```yml +# All config fields, showing default values +output: + label: "" + elasticsearch_v9: + urls: [] # No default (required) + index: "" # No default (required) + action: "" # No default (required) + id: ${!counter()}-${!timestamp_unix()} # No default (required) + pipeline: "" + routing: "" + retry_on_conflict: 0 + tls: + enabled: false + skip_cert_verify: false + enable_renegotiation: false + root_cas: "" + root_cas_file: "" + client_certs: [] + max_in_flight: 64 + basic_auth: + enabled: false + username: "" + password: "" + batching: + count: 0 + byte_size: 0 + period: "" + check: "" + processors: [] # No default (optional) +``` + +-- +====== + +Both the `id` and `index` fields can be dynamically set using function interpolations described xref:configuration:interpolation.adoc#bloblang-queries[here]. When sending batched messages these interpolations are performed per message part. + +== Performance + +This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field `max_in_flight`. + +This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more xref:configuration:batching.adoc[in this doc]. + +== Examples + +[tabs] +====== +Updating Documents:: ++ +-- + +When updating documents, the request body should contain a combination of a `doc`, `upsert`, and/or `script` fields at the top level, this should be done via mapping processors. `doc` updates using a partial document, `script` performs an update using a scripting language such as the built in Painless language, and `upsert` updates an existing document or inserts a new one if it doesn’t exist. For more information on the structures and behaviors of these fields, please see the https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html[Elasticsearch Update API^] + +```yaml +# Partial document update +output: + processors: + - mapping: | + meta id = this.id + # Performs a partial update on the document. + root.doc = this + elasticsearch_v9: + urls: [localhost:9200] + index: foo + id: ${! @id } + action: update + +# Scripted update +output: + processors: + - mapping: | + meta id = this.id + # Increments the field "counter" by 1. + root.script.source = "ctx._source.counter += 1" + elasticsearch_v9: + urls: [localhost:9200] + index: foo + id: ${! @id } + action: update + +# Upsert +output: + processors: + - mapping: | + meta id = this.id + # If the product with the ID exists, its price will be updated to 100. + # If the product does not exist, a new document with ID 1 and a price + # of 50 will be inserted. + root.doc.product_price = 50 + root.upsert.product_price = 100 + elasticsearch_v9: + urls: [localhost:9200] + index: foo + id: ${! @id } + action: update +``` + +-- +Indexing documents from Redpanda:: ++ +-- + +Here we read messages from a Redpanda cluster and write them to an Elasticsearch index using a field from the message as the ID for the Elasticsearch document. + +```yaml +input: + redpanda: + seed_brokers: [localhost:19092] + topics: ["things"] + consumer_group: "rpcn3" + processors: + - mapping: | + meta id = this.id + root = this +output: + elasticsearch_v9: + urls: ['http://localhost:9200'] + index: "things" + action: "index" + id: ${! meta("id") } +``` + +-- +Indexing documents from S3:: ++ +-- + +Here we read messages from a AWS S3 bucket and write them to an Elasticsearch index using the S3 key as the ID for the Elasticsearch document. + +```yaml +input: + aws_s3: + bucket: "my-cool-bucket" + prefix: "bug-facts/" + scanner: + to_the_end: {} +output: + elasticsearch_v9: + urls: ['http://localhost:9200'] + index: "cool-bug-facts" + action: "index" + id: ${! meta("s3_key") } +``` + +-- +Create Documents:: ++ +-- + +When using the `create` action, a new document will be created if the document ID does not already exist. If the document ID already exists, the operation will fail. + +```yaml +output: + elasticsearch_v9: + urls: [localhost:9200] + index: foo + id: ${! json("id") } + action: create +``` + +-- +Upserting Documents:: ++ +-- + +When using the `upsert` action, if the document ID already exists, it will be updated. If the document ID does not exist, a new document will be inserted. The request body should contain the document to be indexed. + +```yaml +output: + processors: + - mapping: | + meta id = this.id + root = this.doc + elasticsearch_v9: + urls: [localhost:9200] + index: foo + id: ${! @id } + action: upsert +``` + +-- +====== + +== Fields + +=== `urls` + +A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs. + + +*Type*: `array` + + +```yml +# Examples + +urls: + - http://localhost:9200 +``` + +=== `index` + +The index to place messages. +This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. + + +*Type*: `string` + + +=== `action` + +The action to take on the document. This field must resolve to one of the following action types: `index`, `update`, `delete`, `create` or `upsert`. See the `Updating Documents` example for more on how the `update` action works and the `Create Documents` and `Upserting Documents` examples for how to use the `create` and `upsert` actions respectively. +This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. + + +*Type*: `string` + + +=== `id` + +The ID for indexed messages. Interpolation should be used in order to create a unique ID for each message. +This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. + + +*Type*: `string` + + +```yml +# Examples + +id: ${!counter()}-${!timestamp_unix()} +``` + +=== `pipeline` + +An optional pipeline id to preprocess incoming documents. +This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. + + +*Type*: `string` + +*Default*: `""` + +=== `routing` + +The routing key to use for the document. +This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions]. + + +*Type*: `string` + +*Default*: `""` + +=== `retry_on_conflict` + +Specify how many times should an update operation be retried when a conflict occurs + + +*Type*: `int` + +*Default*: `0` + +=== `tls` + +Custom TLS settings can be used to override system defaults. + + +*Type*: `object` + + +=== `tls.enabled` + +Whether custom TLS settings are enabled. + + +*Type*: `bool` + +*Default*: `false` + +=== `tls.skip_cert_verify` + +Whether to skip server side certificate verification. + + +*Type*: `bool` + +*Default*: `false` + +=== `tls.enable_renegotiation` + +Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you're seeing the error message `local error: tls: no renegotiation`. + + +*Type*: `bool` + +*Default*: `false` +Requires version 3.45.0 or newer + +=== `tls.root_cas` + +An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate. +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + +*Default*: `""` + +```yml +# Examples + +root_cas: |- + -----BEGIN CERTIFICATE----- + ... + -----END CERTIFICATE----- +``` + +=== `tls.root_cas_file` + +An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate. + + +*Type*: `string` + +*Default*: `""` + +```yml +# Examples + +root_cas_file: ./root_cas.pem +``` + +=== `tls.client_certs` + +A list of client certificates to use. For each certificate either the fields `cert` and `key`, or `cert_file` and `key_file` should be specified, but not both. + + +*Type*: `array` + +*Default*: `[]` + +```yml +# Examples + +client_certs: + - cert: foo + key: bar + +client_certs: + - cert_file: ./example.pem + key_file: ./example.key +``` + +=== `tls.client_certs[].cert` + +A plain text certificate to use. + + +*Type*: `string` + +*Default*: `""` + +=== `tls.client_certs[].key` + +A plain text certificate key to use. +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + +*Default*: `""` + +=== `tls.client_certs[].cert_file` + +The path of a certificate to use. + + +*Type*: `string` + +*Default*: `""` + +=== `tls.client_certs[].key_file` + +The path of a certificate key to use. + + +*Type*: `string` + +*Default*: `""` + +=== `tls.client_certs[].password` + +A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete `pbeWithMD5AndDES-CBC` algorithm is not supported for the PKCS#8 format. + +Because the obsolete pbeWithMD5AndDES-CBC algorithm does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext. +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + +*Default*: `""` + +```yml +# Examples + +password: foo + +password: ${KEY_PASSWORD} +``` + +=== `max_in_flight` + +The maximum number of messages to have in flight at a given time. Increase this to improve throughput. + + +*Type*: `int` + +*Default*: `64` + +=== `basic_auth` + +Allows you to specify basic authentication. + + +*Type*: `object` + + +=== `basic_auth.enabled` + +Whether to use basic authentication in requests. + + +*Type*: `bool` + +*Default*: `false` + +=== `basic_auth.username` + +A username to authenticate as. + + +*Type*: `string` + +*Default*: `""` + +=== `basic_auth.password` + +A password to authenticate with. +[CAUTION] +==== +This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info]. +==== + + + +*Type*: `string` + +*Default*: `""` + +=== `batching` + +Allows you to configure a xref:configuration:batching.adoc[batching policy]. + + +*Type*: `object` + + +```yml +# Examples + +batching: + byte_size: 5000 + count: 0 + period: 1s + +batching: + count: 10 + period: 1s + +batching: + check: this.contains("END BATCH") + count: 0 + period: 1m +``` + +=== `batching.count` + +A number of messages at which the batch should be flushed. If `0` disables count based batching. + + +*Type*: `int` + +*Default*: `0` + +=== `batching.byte_size` + +An amount of bytes at which the batch should be flushed. If `0` disables size based batching. + + +*Type*: `int` + +*Default*: `0` + +=== `batching.period` + +A period in which an incomplete batch should be flushed regardless of its size. + + +*Type*: `string` + +*Default*: `""` + +```yml +# Examples + +period: 1s + +period: 1m + +period: 500ms +``` + +=== `batching.check` + +A xref:guides:bloblang/about.adoc[Bloblang query] that should return a boolean value indicating whether a message should end a batch. + + +*Type*: `string` + +*Default*: `""` + +```yml +# Examples + +check: this.type == "end_of_transaction" +``` + +=== `batching.processors` + +A list of xref:components:processors/about.adoc[processors] to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op. + + +*Type*: `array` + + +```yml +# Examples + +processors: + - archive: + format: concatenate + +processors: + - archive: + format: lines + +processors: + - archive: + format: json_array +``` + + diff --git a/go.mod b/go.mod index d1dc9f8db3..7a41b2cb58 100644 --- a/go.mod +++ b/go.mod @@ -87,6 +87,7 @@ require ( github.com/eclipse/paho.mqtt.golang v1.5.1 github.com/elastic/elastic-transport-go/v8 v8.7.0 github.com/elastic/go-elasticsearch/v8 v8.19.0 + github.com/elastic/go-elasticsearch/v9 v9.2.0 github.com/generikvault/gvalstrings v0.0.0-20180926130504-471f38f0112a github.com/getsentry/sentry-go v0.35.3 github.com/go-faker/faker/v4 v4.7.0 diff --git a/go.sum b/go.sum index 1099094291..6e49419279 100644 --- a/go.sum +++ b/go.sum @@ -1193,6 +1193,8 @@ github.com/elastic/elastic-transport-go/v8 v8.7.0 h1:OgTneVuXP2uip4BA658Xi6Hfw+P github.com/elastic/elastic-transport-go/v8 v8.7.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= github.com/elastic/go-elasticsearch/v8 v8.19.0 h1:VmfBLNRORY7RZL+9hTxBD97ehl9H8Nxf2QigDh6HuMU= github.com/elastic/go-elasticsearch/v8 v8.19.0/go.mod h1:F3j9e+BubmKvzvLjNui/1++nJuJxbkhHefbaT0kFKGY= +github.com/elastic/go-elasticsearch/v9 v9.2.0 h1:COeL/g20+ixnUbffe4Wfbu88emrHjAq/LhVfmrjqRQs= +github.com/elastic/go-elasticsearch/v9 v9.2.0/go.mod h1:2PB5YQPpY5tWbF65MRqzEXA31PZOdXCkloQSOZtU14I= github.com/elazarl/goproxy v1.7.2 h1:Y2o6urb7Eule09PjlhQRGNsqRfPmYI3KKQLFpCAV3+o= github.com/elazarl/goproxy v1.7.2/go.mod h1:82vkLNir0ALaW14Rc399OTTjyNREgmdL2cVoIbS6XaE= github.com/emicklei/proto v1.14.2 h1:wJPxPy2Xifja9cEMrcA/g08art5+7CGJNFNk35iXC1I= diff --git a/internal/impl/elasticsearch/v8/integration_test.go b/internal/impl/elasticsearch/v8/integration_test.go index c6c435d7f7..8c965b122e 100644 --- a/internal/impl/elasticsearch/v8/integration_test.go +++ b/internal/impl/elasticsearch/v8/integration_test.go @@ -39,6 +39,7 @@ func TestIntegrationElasticsearch(t *testing.T) { resource, err := pool.Run("docker.elastic.co/elasticsearch/elasticsearch", "8.17.1", []string{ "discovery.type=single-node", + "cluster.routing.allocation.disk.threshold_enabled=false", "xpack.security.enabled=false", }) require.NoError(t, err) diff --git a/internal/impl/elasticsearch/v8/output.go b/internal/impl/elasticsearch/v8/output.go index 60b2ec0eda..c302d9fd97 100644 --- a/internal/impl/elasticsearch/v8/output.go +++ b/internal/impl/elasticsearch/v8/output.go @@ -13,6 +13,15 @@ // limitations under the License. package elasticsearch +// NOTE: This implementation is intentionally duplicated in ../v9/output.go. +// The Elasticsearch TypedAPI is designed to be stable across major versions, +// differing only in import paths. This allows for: +// - Clear version boundaries for users +// - Independent deprecation of older versions +// - Dead code elimination benefits in v9+ +// +// When modifying this file, check if ../v9/output.go needs the same changes. + import ( "context" "encoding/json" @@ -178,7 +187,7 @@ output: processors: - mapping: | meta id = this.id - # Performs a partial update ont he document. + # Performs a partial update on the document. root.doc = this elasticsearch_v8: urls: [localhost:9200] diff --git a/internal/impl/elasticsearch/v9/integration_test.go b/internal/impl/elasticsearch/v9/integration_test.go new file mode 100644 index 0000000000..9f42dba8f1 --- /dev/null +++ b/internal/impl/elasticsearch/v9/integration_test.go @@ -0,0 +1,186 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package elasticsearch + +import ( + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/elastic/go-elasticsearch/v9" + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/require" + + _ "github.com/redpanda-data/benthos/v4/public/components/pure" + "github.com/redpanda-data/benthos/v4/public/service" + "github.com/redpanda-data/benthos/v4/public/service/integration" +) + +func TestIntegrationElasticsearch(t *testing.T) { + integration.CheckSkip(t) + t.Parallel() + + ctx := t.Context() + pool, err := dockertest.NewPool("") + require.NoError(t, err) + pool.MaxWait = time.Second * 60 + + resource, err := pool.Run("docker.elastic.co/elasticsearch/elasticsearch", "9.1.7", []string{ + "discovery.type=single-node", + "cluster.routing.allocation.disk.threshold_enabled=false", + "xpack.security.enabled=false", + }) + require.NoError(t, err) + t.Cleanup(func() { + if err = pool.Purge(resource); err != nil { + t.Logf("Failed to clean up docker resource: %v", err) + } + }) + + url := fmt.Sprintf("http://127.0.0.1:%v", resource.GetPort("9200/tcp")) + + client, err := elasticsearch.NewTypedClient(elasticsearch.Config{ + Addresses: []string{url}, + }) + require.NoError(t, err) + + require.Eventually(t, func() bool { + ok, err := client.Ping().Do(ctx) + return err == nil && ok + }, time.Second*30, time.Millisecond*500) + + streamBuilder := service.NewStreamBuilder() + require.NoError(t, streamBuilder.AddOutputYAML(fmt.Sprintf(` +elasticsearch_v9: + urls: ['%s'] + index: "things" + action: ${! meta("action") } + id: ${! meta("id") } +`, url))) + + inFunc, err := streamBuilder.AddProducerFunc() + require.NoError(t, err) + + stream, err := streamBuilder.Build() + require.NoError(t, err) + + go func() { + require.NoError(t, stream.Run(ctx)) + }() + defer func() { + err := stream.StopWithin(time.Second * 3) + require.NoError(t, err) + }() + + t.Run("index", func(t *testing.T) { + msgBytes := []byte(`{"message":"blobfish are cool","likes":1}`) + msg := service.NewMessage(msgBytes) + msg.MetaSet("action", "index") + msg.MetaSet("id", "1") + err = inFunc(ctx, msg) + require.NoError(t, err) + + resp, err := client.Get("things", "1").Do(ctx) + require.NoError(t, err) + + require.Equal(t, string(msgBytes), string(resp.Source_)) + }) + + t.Run("update", func(t *testing.T) { + msgBytes, err := json.Marshal(map[string]any{ + "script": map[string]any{ + "source": "ctx._source.likes += 1", + "lang": "painless", + }, + }) + require.NoError(t, err) + + msg := service.NewMessage(msgBytes) + msg.MetaSet("id", "1") + msg.MetaSet("action", "update") + err = inFunc(ctx, msg) + require.NoError(t, err) + + resp, err := client.Get("things", "1").Do(ctx) + require.NoError(t, err) + + require.Equal(t, `{"message":"blobfish are cool","likes":2}`, string(resp.Source_)) + }) + + t.Run("delete", func(t *testing.T) { + msg := service.NewMessage([]byte("{}")) + msg.MetaSet("id", "1") + msg.MetaSet("action", "delete") + err = inFunc(ctx, msg) + require.NoError(t, err) + + resp, err := client.Get("things", "1").Do(ctx) + require.NoError(t, err) + require.False(t, resp.Found) + }) + + t.Run("create", func(t *testing.T) { + // Create a new document + createMsgBytes := []byte(`{"message":"mantis shrimp are epic","likes":10}`) + createMsg := service.NewMessage(createMsgBytes) + createMsg.MetaSet("action", "create") + createMsg.MetaSet("id", "2") + err = inFunc(ctx, createMsg) + require.NoError(t, err) + + resp, err := client.Get("things", "2").Do(ctx) + require.NoError(t, err) + require.True(t, resp.Found) + require.Equal(t, string(createMsgBytes), string(resp.Source_)) + + // Attempt to create the same document again (should fail) + err = inFunc(ctx, createMsg) + require.Error(t, err) // Expecting an error here + + // Verify the document was not overwritten + resp, err = client.Get("things", "2").Do(ctx) + require.NoError(t, err) + require.True(t, resp.Found) + require.Equal(t, string(createMsgBytes), string(resp.Source_)) + }) + + t.Run("upsert", func(t *testing.T) { + // Upsert a new document + upsertNewMsgBytes := []byte(`{"message":"dragonflies are ancient","likes":5}`) + upsertNewMsg := service.NewMessage(upsertNewMsgBytes) + upsertNewMsg.MetaSet("action", "upsert") + upsertNewMsg.MetaSet("id", "3") + err = inFunc(ctx, upsertNewMsg) + require.NoError(t, err) + + resp, err := client.Get("things", "3").Do(ctx) + require.NoError(t, err) + require.True(t, resp.Found) + require.Equal(t, string(upsertNewMsgBytes), string(resp.Source_)) + + // Upsert an existing document (update) + upsertUpdateMsgBytes := []byte(`{"message":"dragonflies are truly ancient","likes":6}`) + upsertUpdateMsg := service.NewMessage(upsertUpdateMsgBytes) + upsertUpdateMsg.MetaSet("action", "upsert") + upsertUpdateMsg.MetaSet("id", "3") + err = inFunc(ctx, upsertUpdateMsg) + require.NoError(t, err) + + resp, err = client.Get("things", "3").Do(ctx) + require.NoError(t, err) + require.True(t, resp.Found) + require.Equal(t, string(upsertUpdateMsgBytes), string(resp.Source_)) + }) +} diff --git a/internal/impl/elasticsearch/v9/output.go b/internal/impl/elasticsearch/v9/output.go new file mode 100644 index 0000000000..3c072de4f5 --- /dev/null +++ b/internal/impl/elasticsearch/v9/output.go @@ -0,0 +1,490 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package elasticsearch + +// NOTE: This implementation is intentionally duplicated from ../v8/output.go. +// The Elasticsearch TypedAPI is designed to be stable across major versions, +// differing only in import paths. This allows for: +// - Clear version boundaries for users +// - Independent deprecation of older versions +// - Dead code elimination benefits in v9+ +// +// When modifying this file, check if ../v8/output.go needs the same changes. + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "os" + "strings" + "time" + + "github.com/elastic/elastic-transport-go/v8/elastictransport" + "github.com/elastic/go-elasticsearch/v9" + "github.com/elastic/go-elasticsearch/v9/typedapi/core/bulk" + "github.com/elastic/go-elasticsearch/v9/typedapi/types" + + "github.com/redpanda-data/benthos/v4/public/service" +) + +const ( + esFieldURLs = "urls" + esFieldID = "id" + esFieldAction = "action" + esFieldIndex = "index" + esFieldPipeline = "pipeline" + esFieldRouting = "routing" + esFieldRetryOnConflict = "retry_on_conflict" + esFieldTLS = "tls" + esFieldAuth = "basic_auth" + esFieldAuthEnabled = "enabled" + esFieldAuthUsername = "username" + esFieldAuthPassword = "password" + esFieldBatching = "batching" +) + +type esConfig struct { + clientOpts elasticsearch.Config + + action *service.InterpolatedString + id *service.InterpolatedString + index *service.InterpolatedString + pipeline *service.InterpolatedString + routing *service.InterpolatedString + retryOnConflict int +} + +func esConfigFromParsed(pConf *service.ParsedConfig) (*esConfig, error) { + conf := &esConfig{} + + if os.Getenv("REDPANDA_CONNECT_ELASTICSEARCH_DEBUG") != "" { + conf.clientOpts.Logger = &elastictransport.CurlLogger{ + Output: os.Stdout, + EnableRequestBody: true, + EnableResponseBody: true, + } + } + + urlStrs, err := pConf.FieldStringList(esFieldURLs) + if err != nil { + return nil, err + } + for _, u := range urlStrs { + for urlStr := range strings.SplitSeq(u, ",") { + if urlStr != "" { + conf.clientOpts.Addresses = append(conf.clientOpts.Addresses, urlStr) + } + } + } + + authConf := pConf.Namespace(esFieldAuth) + if enabled, _ := authConf.FieldBool(esFieldAuthEnabled); enabled { + if conf.clientOpts.Username, err = authConf.FieldString(esFieldAuthUsername); err != nil { + return nil, err + } + if conf.clientOpts.Password, err = authConf.FieldString(esFieldAuthPassword); err != nil { + return nil, err + } + } + + tlsConf, tlsEnabled, err := pConf.FieldTLSToggled(esFieldTLS) + if err != nil { + return nil, err + } + if tlsEnabled { + conf.clientOpts.Transport = &http.Transport{ + TLSClientConfig: tlsConf, + } + } + + if conf.action, err = pConf.FieldInterpolatedString(esFieldAction); err != nil { + return nil, err + } + if conf.id, err = pConf.FieldInterpolatedString(esFieldID); err != nil { + return nil, err + } + if conf.index, err = pConf.FieldInterpolatedString(esFieldIndex); err != nil { + return nil, err + } + if conf.pipeline, err = pConf.FieldInterpolatedString(esFieldPipeline); err != nil { + return nil, err + } + if conf.routing, err = pConf.FieldInterpolatedString(esFieldRouting); err != nil { + return nil, err + } + if conf.retryOnConflict, err = pConf.FieldInt(esFieldRetryOnConflict); err != nil { + return nil, err + } + + return conf, nil +} + +func elasticsearchConfigSpec() *service.ConfigSpec { + return service.NewConfigSpec(). + Stable(). + Categories("Services"). + Summary(`Publishes messages into an Elasticsearch index. If the index does not exist then it is created with a dynamic mapping.`). + Description(` +Both the `+"`id` and `index`"+` fields can be dynamically set using function interpolations described xref:configuration:interpolation.adoc#bloblang-queries[here]. When sending batched messages these interpolations are performed per message part.`+service.OutputPerformanceDocs(true, true)). + Fields( + service.NewStringListField(esFieldURLs). + Description("A list of URLs to connect to. If an item of the list contains commas it will be expanded into multiple URLs."). + Example([]string{"http://localhost:9200"}), + service.NewInterpolatedStringField(esFieldIndex). + Description("The index to place messages."), + service.NewInterpolatedStringField(esFieldAction). + Description("The action to take on the document. This field must resolve to one of the following action types: `index`, `update`, `delete`, `create` or `upsert`. See the `Updating Documents` example for more on how the `update` action works and the `Create Documents` and `Upserting Documents` examples for how to use the `create` and `upsert` actions respectively."), + service.NewInterpolatedStringField(esFieldID). + Description("The ID for indexed messages. Interpolation should be used in order to create a unique ID for each message."). + Example(`${!counter()}-${!timestamp_unix()}`), + service.NewInterpolatedStringField(esFieldPipeline). + Description("An optional pipeline id to preprocess incoming documents."). + Advanced(). + Default(""), + service.NewInterpolatedStringField(esFieldRouting). + Description("The routing key to use for the document."). + Advanced(). + Default(""), + service.NewIntField(esFieldRetryOnConflict). + Description("Specify how many times should an update operation be retried when a conflict occurs"). + Advanced(). + Default(0), + service.NewTLSToggledField(esFieldTLS), + service.NewOutputMaxInFlightField(), + ). + Fields( + service.NewObjectField(esFieldAuth, + service.NewBoolField(esFieldAuthEnabled). + Description("Whether to use basic authentication in requests."). + Default(false), + service.NewStringField(esFieldAuthUsername). + Description("A username to authenticate as."). + Default(""), + service.NewStringField(esFieldAuthPassword). + Description("A password to authenticate with."). + Default("").Secret(), + ).Description("Allows you to specify basic authentication."). + Advanced(). + Optional(), + service.NewBatchPolicyField(esFieldBatching), + ). + Example("Updating Documents", "When updating documents, the request body should contain a combination of a `doc`, `upsert`, and/or `script` fields at the top level, this should be done via mapping processors. `doc` updates using a partial document, `script` performs an update using a scripting language such as the built in Painless language, and `upsert` updates an existing document or inserts a new one if it doesn’t exist. For more information on the structures and behaviors of these fields, please see the https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html[Elasticsearch Update API^]", ` +# Partial document update +output: + processors: + - mapping: | + meta id = this.id + # Performs a partial update on the document. + root.doc = this + elasticsearch_v9: + urls: [localhost:9200] + index: foo + id: ${! @id } + action: update + +# Scripted update +output: + processors: + - mapping: | + meta id = this.id + # Increments the field "counter" by 1. + root.script.source = "ctx._source.counter += 1" + elasticsearch_v9: + urls: [localhost:9200] + index: foo + id: ${! @id } + action: update + +# Upsert +output: + processors: + - mapping: | + meta id = this.id + # If the product with the ID exists, its price will be updated to 100. + # If the product does not exist, a new document with ID 1 and a price + # of 50 will be inserted. + root.doc.product_price = 50 + root.upsert.product_price = 100 + elasticsearch_v9: + urls: [localhost:9200] + index: foo + id: ${! @id } + action: update +`). + Example("Indexing documents from Redpanda", "Here we read messages from a Redpanda cluster and write them to an Elasticsearch index using a field from the message as the ID for the Elasticsearch document.", ` +input: + redpanda: + seed_brokers: [localhost:19092] + topics: ["things"] + consumer_group: "rpcn3" + processors: + - mapping: | + meta id = this.id + root = this +output: + elasticsearch_v9: + urls: ['http://localhost:9200'] + index: "things" + action: "index" + id: ${! meta("id") } +`). + Example("Indexing documents from S3", "Here we read messages from a AWS S3 bucket and write them to an Elasticsearch index using the S3 key as the ID for the Elasticsearch document.", ` +input: + aws_s3: + bucket: "my-cool-bucket" + prefix: "bug-facts/" + scanner: + to_the_end: {} +output: + elasticsearch_v9: + urls: ['http://localhost:9200'] + index: "cool-bug-facts" + action: "index" + id: ${! meta("s3_key") } +`). + Example("Create Documents", "When using the `create` action, a new document will be created if the document ID does not already exist. If the document ID already exists, the operation will fail.", ` +output: + elasticsearch_v9: + urls: [localhost:9200] + index: foo + id: ${! json("id") } + action: create +`). + Example("Upserting Documents", "When using the `upsert` action, if the document ID already exists, it will be updated. If the document ID does not exist, a new document will be inserted. The request body should contain the document to be indexed.", ` +output: + processors: + - mapping: | + meta id = this.id + root = this.doc + elasticsearch_v9: + urls: [localhost:9200] + index: foo + id: ${! @id } + action: upsert +`) +} + +func init() { + service.MustRegisterBatchOutput("elasticsearch_v9", elasticsearchConfigSpec(), + func(conf *service.ParsedConfig, mgr *service.Resources) (out service.BatchOutput, batchPolicy service.BatchPolicy, maxInFlight int, err error) { + if maxInFlight, err = conf.FieldMaxInFlight(); err != nil { + return + } + if batchPolicy, err = conf.FieldBatchPolicy(esFieldBatching); err != nil { + return + } + out, err = outputFromParsed(conf, mgr) + return + }) +} + +func outputFromParsed(pConf *service.ParsedConfig, mgr *service.Resources) (*esOutput, error) { + conf, err := esConfigFromParsed(pConf) + if err != nil { + return nil, err + } + return &esOutput{ + log: mgr.Logger(), + conf: conf, + }, nil +} + +type esOutput struct { + log *service.Logger + conf *esConfig + + client *elasticsearch.TypedClient +} + +func (e *esOutput) Connect(context.Context) error { + if e.client != nil { + return nil + } + + client, err := elasticsearch.NewTypedClient(e.conf.clientOpts) + if err != nil { + return err + } + + e.client = client + return nil +} + +func (e *esOutput) WriteBatch(ctx context.Context, batch service.MessageBatch) error { + bulkWriter := e.client.Bulk() + batchInterpolator := e.newBatchInterpolator(batch) + + for i := range batch { + if err := e.addOpToBatch(bulkWriter, batch, batchInterpolator, i); err != nil { + return fmt.Errorf("adding operation to batch: %w", err) + } + } + + result, err := bulkWriter.Do(ctx) + if err != nil { + return fmt.Errorf("sending bulk request: %w", err) + } + + if result.Errors { + var batchErr *service.BatchError + for i, item := range result.Items { + for _, responseItem := range item { + if responseItem.Error != nil { + err := errors.New(*responseItem.Error.Reason) + if batchErr == nil { + batchErr = service.NewBatchError(batch, err) + } + batchErr.Failed(i, err) + } + } + } + return batchErr + } + + // result.Took is an int64 counting milliseconds + tookDuration := time.Duration(result.Took) * time.Millisecond + + e.log.Debugf( + "Successfully dispatched [%s] documents in %s (%s docs/sec)", + len(result.Items), + tookDuration, + float64(len(result.Items))/tookDuration.Seconds(), + ) + + return nil +} + +func (e *esOutput) newBatchInterpolator(batch service.MessageBatch) *batchInterpolator { + return &batchInterpolator{ + action: batch.InterpolationExecutor(e.conf.action), + index: batch.InterpolationExecutor(e.conf.index), + routing: batch.InterpolationExecutor(e.conf.routing), + id: batch.InterpolationExecutor(e.conf.id), + pipeline: batch.InterpolationExecutor(e.conf.pipeline), + } +} + +type batchInterpolator struct { + action *service.MessageBatchInterpolationExecutor + index *service.MessageBatchInterpolationExecutor + routing *service.MessageBatchInterpolationExecutor + id *service.MessageBatchInterpolationExecutor + pipeline *service.MessageBatchInterpolationExecutor +} + +func (e *esOutput) addOpToBatch(bulkWriter *bulk.Bulk, batch service.MessageBatch, batchInterpolator *batchInterpolator, i int) error { + msg := batch[i] + msgBytes, err := msg.AsBytes() + if err != nil { + return fmt.Errorf("reading raw message data: %w", err) + } + + action, err := batchInterpolator.action.TryString(i) + if err != nil { + return fmt.Errorf("interpolating action: %w", err) + } + index, err := batchInterpolator.index.TryString(i) + if err != nil { + return fmt.Errorf("interpolating index: %w", err) + } + routing, err := batchInterpolator.routing.TryString(i) + if err != nil { + return fmt.Errorf("interpolating routing: %w", err) + } + id, err := batchInterpolator.id.TryString(i) + if err != nil { + return fmt.Errorf("interpolating id: %w", err) + } + pipeline, err := batchInterpolator.pipeline.TryString(i) + if err != nil { + return fmt.Errorf("interpolating pipeline: %w", err) + } + + switch action { + case "index", "upsert": + op := types.IndexOperation{ + Index_: &index, + Id_: optionalStr(id), + Pipeline: optionalStr(pipeline), + Routing: optionalStr(routing), + } + if err := bulkWriter.IndexOp(op, msgBytes); err != nil { + return err + } + case "create": + op := types.CreateOperation{ + Index_: &index, + Id_: optionalStr(id), + Pipeline: optionalStr(pipeline), + Routing: optionalStr(routing), + } + if err := bulkWriter.CreateOp(op, msgBytes); err != nil { + return err + } + case "update": + op := types.UpdateOperation{ + Id_: &id, + Index_: &index, + Routing: optionalStr(routing), + } + if e.conf.retryOnConflict != 0 { + op.RetryOnConflict = &e.conf.retryOnConflict + } + // We use our own struct here so that users can't specify, intentionally or + // not, other fields that may alter behavior we depend on internally. + var update updateAction + if err := json.Unmarshal(msgBytes, &update); err != nil { + return fmt.Errorf("unmarshalling update action: %w", err) + } + err := bulkWriter.UpdateOp(op, nil, &types.UpdateAction{ + Doc: update.Doc, + Script: update.Script, + Upsert: update.Upsert, + }) + if err != nil { + return err + } + case "delete": + op := types.DeleteOperation{ + Id_: &id, + Index_: &index, + Routing: optionalStr(routing), + } + if err := bulkWriter.DeleteOp(op); err != nil { + return err + } + } + return nil +} + +type updateAction struct { + Doc json.RawMessage `json:"doc"` + Script *types.Script `json:"script"` + Upsert json.RawMessage `json:"upsert"` +} + +func optionalStr(s string) *string { + if s == "" { + return nil + } + return &s +} + +func (*esOutput) Close(context.Context) error { + // The client does not need to be closed, as it interacts with Elasticsearch + // over short lived HTTP connections. + return nil +} diff --git a/internal/plugins/info.csv b/internal/plugins/info.csv index 54ab77f408..dcf89a3fb1 100644 --- a/internal/plugins/info.csv +++ b/internal/plugins/info.csv @@ -74,6 +74,7 @@ drop_on ,output ,drop_on ,0.0.0 ,certif dynamic ,input ,dynamic ,0.0.0 ,community ,n ,n ,n dynamic ,output ,dynamic ,0.0.0 ,community ,n ,n ,n elasticsearch_v8 ,output ,elasticsearch_v8 ,4.47.0 ,certified ,n ,y ,y +elasticsearch_v9 ,output ,elasticsearch_v9 ,0.0.0 ,community ,n ,n ,n fallback ,output ,fallback ,3.58.0 ,certified ,n ,y ,y ffi ,processor ,Foreign Function Interface,4.69.0 ,certified ,n ,n ,n file ,cache ,File ,0.0.0 ,certified ,n ,n ,n diff --git a/public/components/cloud/package.go b/public/components/cloud/package.go index eeadbe613b..64fef1c47f 100644 --- a/public/components/cloud/package.go +++ b/public/components/cloud/package.go @@ -24,6 +24,7 @@ import ( _ "github.com/redpanda-data/connect/v4/public/components/cyborgdb" _ "github.com/redpanda-data/connect/v4/public/components/dgraph" _ "github.com/redpanda-data/connect/v4/public/components/elasticsearch/v8" + _ "github.com/redpanda-data/connect/v4/public/components/elasticsearch/v9" _ "github.com/redpanda-data/connect/v4/public/components/gateway" _ "github.com/redpanda-data/connect/v4/public/components/gcp" _ "github.com/redpanda-data/connect/v4/public/components/gcp/enterprise" diff --git a/public/components/community/package.go b/public/components/community/package.go index 6d31d32c91..d63f1fb875 100644 --- a/public/components/community/package.go +++ b/public/components/community/package.go @@ -37,6 +37,7 @@ import ( _ "github.com/redpanda-data/connect/v4/public/components/dgraph" _ "github.com/redpanda-data/connect/v4/public/components/discord" _ "github.com/redpanda-data/connect/v4/public/components/elasticsearch/v8" + _ "github.com/redpanda-data/connect/v4/public/components/elasticsearch/v9" _ "github.com/redpanda-data/connect/v4/public/components/ffi" _ "github.com/redpanda-data/connect/v4/public/components/gcp" _ "github.com/redpanda-data/connect/v4/public/components/git" diff --git a/public/components/elasticsearch/v9/package.go b/public/components/elasticsearch/v9/package.go new file mode 100644 index 0000000000..0aa819f5d0 --- /dev/null +++ b/public/components/elasticsearch/v9/package.go @@ -0,0 +1,20 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package elasticsearch + +import ( + // Bring in the internal plugin definitions. + _ "github.com/redpanda-data/connect/v4/internal/impl/elasticsearch/v9" +)