diff --git a/changelog/fragments/1774301037-id6318-director-httpjson.yaml b/changelog/fragments/1774301037-id6318-director-httpjson.yaml new file mode 100644 index 000000000000..35c38bb0ad30 --- /dev/null +++ b/changelog/fragments/1774301037-id6318-director-httpjson.yaml @@ -0,0 +1,3 @@ +kind: enhancement +summary: Allow HTTP JSON input redirection to the CEL input. +component: filebeat diff --git a/docs/reference/filebeat/filebeat-input-httpjson.md b/docs/reference/filebeat/filebeat-input-httpjson.md index de792d79c4a4..06c4675dd3ce 100644 --- a/docs/reference/filebeat/filebeat-input-httpjson.md +++ b/docs/reference/filebeat/filebeat-input-httpjson.md @@ -12,7 +12,7 @@ applies_to: Use the `httpjson` input to read messages from an HTTP API with JSON payloads. -If you are starting development of a new custom HTTP API input, we recommend that you use the [Common Expression Language input](/reference/filebeat/filebeat-input-cel.md) which provides greater flexibility and an improved developer experience. +If you are starting development of a new custom HTTP API input, we recommend that you use the [Common Expression Language input](/reference/filebeat/filebeat-input-cel.md) which provides greater flexibility and an improved developer experience. Existing `httpjson` inputs can be migrated to CEL using the [`run_as_cel`](#run-as-cel) option. This input supports: @@ -1499,7 +1499,7 @@ Example: } ``` - This behaviour of targeted fixed pattern replacement in the url helps solve various use cases. + This behavior of targeted fixed pattern replacement in the url helps solve various use cases. **Some useful points to remember:** @@ -1729,6 +1729,123 @@ filebeat.inputs: ``` +### `run_as_cel` [run-as-cel] + +```{applies_to} +stack: ga 9.4.0 +``` + +When set to `true`, the input is transparently redirected to the [Common Expression Language input](/reference/filebeat/filebeat-input-cel.md). This allows an existing `httpjson` configuration to run under the CEL engine without changing the input `type`. A `cel.program` must be provided when this option is enabled. + +Shared configuration fields are automatically translated to their CEL equivalents: + +| httpjson field | CEL field | +| --- | --- | +| `interval` | `interval` | +| `id` | `id` | +| `request.url` | `resource.url` | +| `request.timeout` | `resource.timeout` | +| `request.ssl` | `resource.ssl` | +| `request.proxy_url` | `resource.proxy_url` | +| `request.proxy_headers` | `resource.proxy_headers` | +| `request.proxy_disable` | `resource.proxy_disable` | +| `request.idle_connection_timeout` | `resource.idle_connection_timeout` | +| `request.keep_alive` | `resource.keep_alive` | +| `request.retry` | `resource.retry` | +| `request.redirect` | `resource.redirect` | +| `request.tracer` | `resource.tracer` | +| `auth` | `auth` | + +Fields that are specific to httpjson (such as `request.transforms`, `response.transforms`, `response.split`, `response.pagination`, and `chain`) are not transferred and have no effect when `run_as_cel` is enabled. The CEL program is responsible for equivalent logic. + +If the httpjson input has existing cursor state, it is automatically carried over to the CEL input on the first run. After that first run the CEL input writes its own cursor, and all subsequent runs, whether the next interval or after a restart, read from the CEL cursor. The original httpjson cursor is not modified, so removing `run_as_cel` restores the original httpjson behavior with its last cursor intact. + +Default: `false`. + + +### `cel.program` [cel-program] + +```{applies_to} +stack: ga 9.4.0 +``` + +The CEL program to execute when [`run_as_cel`](#run-as-cel) is enabled. This is the same program format used by the [CEL input's `program` field](/reference/filebeat/filebeat-input-cel.md). Required when `run_as_cel` is `true`. + + +### `cel.state` [cel-state] + +```{applies_to} +stack: ga 9.4.0 +``` + +Initial state for the CEL program, equivalent to the [CEL input's `state` field](/reference/filebeat/filebeat-input-cel.md). May include an initial `cursor` object that is used as the bootstrap value on the first execution when no stored cursor exists. + + +### `cel.max_executions` [cel-max-executions] + +```{applies_to} +stack: ga 9.4.0 +``` + +The maximum number of CEL program executions per interval. Equivalent to the [CEL input's `max_executions` field](/reference/filebeat/filebeat-input-cel.md). Default: `1000`. + + +### `cel.regexp` [cel-regexp] + +```{applies_to} +stack: ga 9.4.0 +``` + +A map of named regular expression patterns available to the CEL program. Equivalent to the [CEL input's `regexp` field](/reference/filebeat/filebeat-input-cel.md). + + +### `cel.xsd` [cel-xsd] + +```{applies_to} +stack: ga 9.4.0 +``` + +A map of named XSD schemas for XML decoding in the CEL program. Equivalent to the [CEL input's `xsd` field](/reference/filebeat/filebeat-input-cel.md). + + +### `cel.redact` [cel-redact] + +```{applies_to} +stack: ga 9.4.0 +``` + +Redaction configuration for the CEL program. Equivalent to the [CEL input's `redact` field](/reference/filebeat/filebeat-input-cel.md). + +::::{admonition} Example: migrating an httpjson input to CEL +```yaml +filebeat.inputs: +- type: httpjson + id: my-api-input + interval: 60s + run_as_cel: true + request.url: https://api.example.com/events + auth.oauth2: + client.id: my-client-id + client.secret: my-client-secret + token_url: https://auth.example.com/oauth2/token + request.ssl.verification_mode: full + request.timeout: 30s + cel.program: | + state.url.with({ + "Header": {"Accept": ["application/json"]}, + }).as(req, request("GET", req).as(resp, + bytes(resp.Body).decode_json().as(body, { + "events": body.items.map(e, {"message": e.encode_json()}), + "cursor": {"since": body.items[body.items.size()-1].updated_at}, + }) + )) + cel.state: + cursor: + since: "2024-01-01T00:00:00Z" +``` +:::: + + ## Request life cycle [_request_life_cycle] ![Request lifecycle](images/input-httpjson-lifecycle.png "") diff --git a/x-pack/filebeat/input/httpjson/redirect.go b/x-pack/filebeat/input/httpjson/redirect.go new file mode 100644 index 000000000000..8c6b942b4f7d --- /dev/null +++ b/x-pack/filebeat/input/httpjson/redirect.go @@ -0,0 +1,273 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package httpjson + +import ( + "errors" + "fmt" + "net/url" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + conf "github.com/elastic/elastic-agent-libs/config" +) + +var _ v2.Redirector = InputManager{} + +// Redirect implements v2.Redirector. When run_as_cel is true and a +// cel.program is present, it builds a cel input config from the +// httpjson config's shared and cel-specific fields. +func (m InputManager) Redirect(cfg *conf.C) (string, *conf.C, error) { + has, err := cfg.Has("run_as_cel", -1) + if err != nil || !has { + return "", nil, err + } + runAsCel, err := cfg.Bool("run_as_cel", -1) + if err != nil || !runAsCel { + return "", nil, err + } + has, err = cfg.Has("cel.program", -1) + if err != nil { + return "", nil, err + } + if !has { + return "", nil, errors.New("run_as_cel requires cel.program") + } + newCfg, err := convertHttpjsonToCel(cfg) + if err != nil { + return "", nil, err + } + m.migrateCursor(cfg, newCfg) + return "cel", newCfg, nil +} + +// migrateCursor reads the httpjson cursor from the persistent store and +// injects it into the translated cel config's state.cursor so that the +// cel input continues from where httpjson left off. If the httpjson input +// was stateless or the store is unavailable, this is a no-op. +func (m InputManager) migrateCursor(src, dst *conf.C) { + id, _ := src.String("id", -1) // Missing id is fine; cursorKey handles empty. + rawURL, err := src.String("request.url", -1) + if err != nil { + return + } + // Parse and re-serialize to match source.Name() normalization. + u, err := url.Parse(rawURL) + if err != nil { + return + } + store, err := m.cursor.StateStore.StoreFor("httpjson") + if err != nil { + m.cursor.Logger.Warnw("cursor migration: cannot open store", "error", err) + return + } + defer store.Close() + + key := cursorKey("httpjson", id, u.String()) + var entry map[string]interface{} + if err := store.Get(key, &entry); err != nil { + return + } + cursor, ok := entry["cursor"] + if !ok || cursor == nil { + return + } + cursorCfg, err := conf.NewConfigFrom(cursor) + if err != nil { + m.cursor.Logger.Warnw("cursor migration: cannot create config from cursor", "error", err) + return + } + + // Ensure the state sub-config exists before injecting. + has, err := dst.Has("state", -1) + if err != nil { + return + } + if !has { + if err := dst.SetChild("state", -1, conf.NewConfig()); err != nil { + return + } + } + if err := dst.SetChild("state.cursor", -1, cursorCfg); err != nil { + m.cursor.Logger.Warnw("cursor migration: cannot inject cursor into config", "error", err) + } +} + +func cursorKey(typ, id, url string) string { + if id != "" { + return fmt.Sprintf("%s::%s::%s", typ, id, url) + } + return fmt.Sprintf("%s::%s", typ, url) +} + +// convertHttpjsonToCel builds a cel input config from an httpjson config +// by extracting shared fields and cel-namespaced fields. +func convertHttpjsonToCel(cfg *conf.C) (*conf.C, error) { + out := conf.NewConfig() + + if err := out.SetString("type", -1, "cel"); err != nil { + return nil, fmt.Errorf("cannot set type: %w", err) + } + + // Copy shared string fields that map directly or with a rename. + // Durations and URLs are stored as strings in the config. + scalars := []struct{ src, dst string }{ + {"interval", "interval"}, + {"request.url", "resource.url"}, + {"request.timeout", "resource.timeout"}, + {"request.proxy_url", "resource.proxy_url"}, + {"request.idle_connection_timeout", "resource.idle_connection_timeout"}, + } + for _, f := range scalars { + has, err := cfg.Has(f.src, -1) + if err != nil { + return nil, fmt.Errorf("checking %q: %w", f.src, err) + } + if !has { + continue + } + v, err := cfg.String(f.src, -1) + if err != nil { + return nil, fmt.Errorf("reading %q: %w", f.src, err) + } + if err := out.SetString(f.dst, -1, v); err != nil { + return nil, fmt.Errorf("setting %q: %w", f.dst, err) + } + } + + // Copy sub-configs that transfer as a block. + blocks := []struct{ src, dst string }{ + {"auth", "auth"}, + {"request.retry", "resource.retry"}, + {"request.redirect", "resource.redirect"}, + {"request.keep_alive", "resource.keep_alive"}, + {"request.tracer", "resource.tracer"}, + {"request.ssl", "resource.ssl"}, + {"request.proxy_headers", "resource.proxy_headers"}, + } + for _, b := range blocks { + has, err := cfg.Has(b.src, -1) + if err != nil { + return nil, fmt.Errorf("checking %q: %w", b.src, err) + } + if !has { + continue + } + sub, err := cfg.Child(b.src, -1) + if err != nil { + return nil, fmt.Errorf("reading %q: %w", b.src, err) + } + if err := out.SetChild(b.dst, -1, sub); err != nil { + return nil, fmt.Errorf("setting %q: %w", b.dst, err) + } + } + + // Copy shared boolean fields. + bools := []struct{ src, dst string }{ + {"request.proxy_disable", "resource.proxy_disable"}, + } + for _, f := range bools { + has, err := cfg.Has(f.src, -1) + if err != nil { + return nil, fmt.Errorf("checking %q: %w", f.src, err) + } + if !has { + continue + } + v, err := cfg.Bool(f.src, -1) + if err != nil { + return nil, fmt.Errorf("reading %q: %w", f.src, err) + } + if err := out.SetBool(f.dst, -1, v); err != nil { + return nil, fmt.Errorf("setting %q: %w", f.dst, err) + } + } + + // Extract cel-namespaced string fields into their top-level equivalents. + celStrings := []struct{ src, dst string }{ + {"cel.program", "program"}, + } + for _, f := range celStrings { + has, err := cfg.Has(f.src, -1) + if err != nil { + return nil, fmt.Errorf("checking %q: %w", f.src, err) + } + if !has { + continue + } + v, err := cfg.String(f.src, -1) + if err != nil { + return nil, fmt.Errorf("reading %q: %w", f.src, err) + } + if err := out.SetString(f.dst, -1, v); err != nil { + return nil, fmt.Errorf("setting %q: %w", f.dst, err) + } + } + + // Extract cel-namespaced integer fields. + celInts := []struct{ src, dst string }{ + {"cel.max_executions", "max_executions"}, + } + for _, f := range celInts { + has, err := cfg.Has(f.src, -1) + if err != nil { + return nil, fmt.Errorf("checking %q: %w", f.src, err) + } + if !has { + continue + } + v, err := cfg.Int(f.src, -1) + if err != nil { + return nil, fmt.Errorf("reading %q: %w", f.src, err) + } + if err := out.SetInt(f.dst, -1, v); err != nil { + return nil, fmt.Errorf("setting %q: %w", f.dst, err) + } + } + + // Extract cel sub-configs. + celBlocks := []struct{ src, dst string }{ + {"cel.state", "state"}, + {"cel.regexp", "regexp"}, + {"cel.xsd", "xsd"}, + {"cel.redact", "redact"}, + } + for _, b := range celBlocks { + has, err := cfg.Has(b.src, -1) + if err != nil { + return nil, fmt.Errorf("checking %q: %w", b.src, err) + } + if !has { + continue + } + sub, err := cfg.Child(b.src, -1) + if err != nil { + return nil, fmt.Errorf("reading %q: %w", b.src, err) + } + if err := out.SetChild(b.dst, -1, sub); err != nil { + return nil, fmt.Errorf("setting %q: %w", b.dst, err) + } + } + + // Copy passthrough fields. + passthrough := []string{"id"} + for _, key := range passthrough { + has, err := cfg.Has(key, -1) + if err != nil { + return nil, fmt.Errorf("checking %q: %w", key, err) + } + if !has { + continue + } + v, err := cfg.String(key, -1) + if err != nil { + return nil, fmt.Errorf("reading %q: %w", key, err) + } + if err := out.SetString(key, -1, v); err != nil { + return nil, fmt.Errorf("setting %q: %w", key, err) + } + } + + return out, nil +} diff --git a/x-pack/filebeat/input/httpjson/redirect_test.go b/x-pack/filebeat/input/httpjson/redirect_test.go new file mode 100644 index 000000000000..ac3caef93525 --- /dev/null +++ b/x-pack/filebeat/input/httpjson/redirect_test.go @@ -0,0 +1,742 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package httpjson + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/statestore" + "github.com/elastic/beats/v7/libbeat/statestore/storetest" + "github.com/elastic/beats/v7/x-pack/filebeat/input/cel" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" +) + +func TestRedirect_EndToEnd(t *testing.T) { + log := logp.NewNopLogger() + store := newTestStore() + + httpjsonPlugin := v2.Plugin{ + Name: "httpjson", + Stability: feature.Stable, + Manager: NewInputManager(log, store), + } + celPlugin := cel.Plugin(log, store) + + loader, err := v2.NewLoader(log, []v2.Plugin{httpjsonPlugin, celPlugin}, "type", "") + require.NoError(t, err) + + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "interval": "60s", + "run_as_cel": true, + "request.url": "https://api.example.com/events", + "cel.program": `{"events":[{"message":"Hello, World!"}]}`, + "cel.state": map[string]interface{}{}, + }) + + input, err := loader.Configure(cfg) + require.NoError(t, err) + require.NotNil(t, input) + require.Equal(t, "cel", input.Name()) +} + +func TestRedirect_NoRedirectWhenFlagAbsent(t *testing.T) { + log := logp.NewNopLogger() + store := newTestStore() + + httpjsonPlugin := v2.Plugin{ + Name: "httpjson", + Stability: feature.Stable, + Manager: NewInputManager(log, store), + } + + loader, err := v2.NewLoader(log, []v2.Plugin{httpjsonPlugin}, "type", "") + require.NoError(t, err) + + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "interval": "60s", + "request.url": "https://api.example.com/events", + }) + + input, err := loader.Configure(cfg) + require.NoError(t, err) + require.NotNil(t, input) +} + +func TestRedirect_ErrorWithoutProgram(t *testing.T) { + log := logp.NewNopLogger() + store := newTestStore() + + httpjsonPlugin := v2.Plugin{ + Name: "httpjson", + Stability: feature.Stable, + Manager: NewInputManager(log, store), + } + + loader, err := v2.NewLoader(log, []v2.Plugin{httpjsonPlugin}, "type", "") + require.NoError(t, err) + + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "interval": "60s", + "request.url": "https://api.example.com/events", + "run_as_cel": true, + }) + + _, err = loader.Configure(cfg) + require.Error(t, err) +} + +func TestConvertHttpjsonToCel(t *testing.T) { + t.Run("minimal", func(t *testing.T) { + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "interval": "60s", + "request.url": "https://api.example.com/events", + "cel.program": `bytes(resp.Body).decode_json()`, + }) + + out, err := convertHttpjsonToCel(cfg) + require.NoError(t, err) + + typ, err := out.String("type", -1) + require.NoError(t, err) + require.Equal(t, "cel", typ) + + url, err := out.String("resource.url", -1) + require.NoError(t, err) + require.Equal(t, "https://api.example.com/events", url) + + interval, err := out.String("interval", -1) + require.NoError(t, err) + require.Equal(t, "60s", interval) + + program, err := out.String("program", -1) + require.NoError(t, err) + require.Equal(t, `bytes(resp.Body).decode_json()`, program) + }) + + t.Run("passthrough_id", func(t *testing.T) { + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "id": "my-input", + "interval": "60s", + "request.url": "https://api.example.com/events", + "cel.program": `true`, + }) + + out, err := convertHttpjsonToCel(cfg) + require.NoError(t, err) + + id, err := out.String("id", -1) + require.NoError(t, err) + require.Equal(t, "my-input", id) + }) + + t.Run("auth_block", func(t *testing.T) { + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "interval": "60s", + "request.url": "https://api.example.com/events", + "cel.program": `true`, + "auth.basic.user": "testuser", + "auth.basic.password": "testpass", + }) + + out, err := convertHttpjsonToCel(cfg) + require.NoError(t, err) + + user, err := out.String("auth.basic.user", -1) + require.NoError(t, err) + require.Equal(t, "testuser", user) + + pass, err := out.String("auth.basic.password", -1) + require.NoError(t, err) + require.Equal(t, "testpass", pass) + }) + + t.Run("retry_block", func(t *testing.T) { + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "interval": "60s", + "request.url": "https://api.example.com/events", + "cel.program": `true`, + "request.retry.max_attempts": 3, + "request.retry.wait_min": "1s", + "request.retry.wait_max": "30s", + }) + + out, err := convertHttpjsonToCel(cfg) + require.NoError(t, err) + + has, err := out.Has("resource.retry", -1) + require.NoError(t, err) + require.True(t, has) + + sub, err := out.Child("resource.retry", -1) + require.NoError(t, err) + + v, err := sub.Int("max_attempts", -1) + require.NoError(t, err) + require.Equal(t, int64(3), v) + }) + + t.Run("redirect_block", func(t *testing.T) { + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "interval": "60s", + "request.url": "https://api.example.com/events", + "cel.program": `true`, + "request.redirect.forward_headers": true, + "request.redirect.max_redirects": 5, + }) + + out, err := convertHttpjsonToCel(cfg) + require.NoError(t, err) + + has, err := out.Has("resource.redirect", -1) + require.NoError(t, err) + require.True(t, has) + + sub, err := out.Child("resource.redirect", -1) + require.NoError(t, err) + + fwd, err := sub.Bool("forward_headers", -1) + require.NoError(t, err) + require.True(t, fwd) + }) + + t.Run("keep_alive_block", func(t *testing.T) { + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "interval": "60s", + "request.url": "https://api.example.com/events", + "cel.program": `true`, + "request.keep_alive.max_idle_connections": 10, + "request.keep_alive.max_idle_connections_per_host": 2, + "request.keep_alive.idle_connection_timeout": "30s", + }) + + out, err := convertHttpjsonToCel(cfg) + require.NoError(t, err) + + has, err := out.Has("resource.keep_alive", -1) + require.NoError(t, err) + require.True(t, has) + + sub, err := out.Child("resource.keep_alive", -1) + require.NoError(t, err) + + v, err := sub.Int("max_idle_connections", -1) + require.NoError(t, err) + require.Equal(t, int64(10), v) + }) + + t.Run("tracer_block", func(t *testing.T) { + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "interval": "60s", + "request.url": "https://api.example.com/events", + "cel.program": `true`, + "request.tracer.filename": "/tmp/trace.ndjson", + }) + + out, err := convertHttpjsonToCel(cfg) + require.NoError(t, err) + + has, err := out.Has("resource.tracer", -1) + require.NoError(t, err) + require.True(t, has) + + v, err := out.String("resource.tracer.filename", -1) + require.NoError(t, err) + require.Equal(t, "/tmp/trace.ndjson", v) + }) + + t.Run("transport_ssl", func(t *testing.T) { + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "interval": "60s", + "request.url": "https://api.example.com/events", + "cel.program": `true`, + "request.ssl.verification_mode": "none", + }) + + out, err := convertHttpjsonToCel(cfg) + require.NoError(t, err) + + has, err := out.Has("resource.ssl", -1) + require.NoError(t, err) + require.True(t, has) + + v, err := out.String("resource.ssl.verification_mode", -1) + require.NoError(t, err) + require.Equal(t, "none", v) + }) + + t.Run("transport_timeout", func(t *testing.T) { + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "interval": "60s", + "request.url": "https://api.example.com/events", + "cel.program": `true`, + "request.timeout": "45s", + }) + + out, err := convertHttpjsonToCel(cfg) + require.NoError(t, err) + + v, err := out.String("resource.timeout", -1) + require.NoError(t, err) + require.Equal(t, "45s", v) + }) + + t.Run("transport_proxy", func(t *testing.T) { + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "interval": "60s", + "request.url": "https://api.example.com/events", + "cel.program": `true`, + "request.proxy_url": "http://proxy.example.com:8080", + "request.proxy_disable": true, + }) + + out, err := convertHttpjsonToCel(cfg) + require.NoError(t, err) + + v, err := out.String("resource.proxy_url", -1) + require.NoError(t, err) + require.Equal(t, "http://proxy.example.com:8080", v) + + b, err := out.Bool("resource.proxy_disable", -1) + require.NoError(t, err) + require.True(t, b) + }) + + t.Run("cel_max_executions", func(t *testing.T) { + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "interval": "60s", + "request.url": "https://api.example.com/events", + "cel.program": `true`, + "cel.max_executions": 500, + }) + + out, err := convertHttpjsonToCel(cfg) + require.NoError(t, err) + + v, err := out.Int("max_executions", -1) + require.NoError(t, err) + require.Equal(t, int64(500), v) + }) + + t.Run("cel_state", func(t *testing.T) { + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "interval": "60s", + "request.url": "https://api.example.com/events", + "cel.program": `true`, + "cel.state": map[string]interface{}{"cursor": map[string]interface{}{"ts": "2024-01-01T00:00:00Z"}}, + }) + + out, err := convertHttpjsonToCel(cfg) + require.NoError(t, err) + + has, err := out.Has("state", -1) + require.NoError(t, err) + require.True(t, has) + + v, err := out.String("state.cursor.ts", -1) + require.NoError(t, err) + require.Equal(t, "2024-01-01T00:00:00Z", v) + }) + + t.Run("cel_regexp", func(t *testing.T) { + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "interval": "60s", + "request.url": "https://api.example.com/events", + "cel.program": `true`, + "cel.regexp": map[string]interface{}{"link_next": `<([^>]+)>;\s*rel="next"`}, + }) + + out, err := convertHttpjsonToCel(cfg) + require.NoError(t, err) + + v, err := out.String("regexp.link_next", -1) + require.NoError(t, err) + require.Equal(t, `<([^>]+)>;\s*rel="next"`, v) + }) + + t.Run("cel_xsd", func(t *testing.T) { + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "interval": "60s", + "request.url": "https://api.example.com/events", + "cel.program": `true`, + "cel.xsd": map[string]interface{}{"evt": ""}, + }) + + out, err := convertHttpjsonToCel(cfg) + require.NoError(t, err) + + v, err := out.String("xsd.evt", -1) + require.NoError(t, err) + require.Equal(t, "", v) + }) + + t.Run("cel_redact", func(t *testing.T) { + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "interval": "60s", + "request.url": "https://api.example.com/events", + "cel.program": `true`, + "cel.redact": map[string]interface{}{"fields": []string{"auth_token"}, "delete": true}, + }) + + out, err := convertHttpjsonToCel(cfg) + require.NoError(t, err) + + has, err := out.Has("redact", -1) + require.NoError(t, err) + require.True(t, has) + + b, err := out.Bool("redact.delete", -1) + require.NoError(t, err) + require.True(t, b) + }) + + t.Run("httpjson_only_fields_excluded", func(t *testing.T) { + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "interval": "60s", + "request.url": "https://api.example.com/events", + "request.method": "POST", + "cel.program": `true`, + }) + + out, err := convertHttpjsonToCel(cfg) + require.NoError(t, err) + + has, err := out.Has("request", -1) + require.NoError(t, err) + require.False(t, has) + + has, err = out.Has("cel", -1) + require.NoError(t, err) + require.False(t, has) + }) + + t.Run("realistic_full_config", func(t *testing.T) { + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "id": "okta-system-log", + "interval": "120s", + "request.url": "https://dev-123456.okta.com/api/v1/logs", + + "auth.oauth2.client.id": "0oa1234567890abcdef", + "auth.oauth2.client.secret": "client-secret-value", + "auth.oauth2.token_url": "https://dev-123456.okta.com/oauth2/v1/token", + "auth.oauth2.scopes": []string{"okta.logs.read"}, + + "request.retry.max_attempts": 5, + "request.retry.wait_min": "2s", + "request.retry.wait_max": "60s", + + "request.redirect.forward_headers": true, + "request.redirect.max_redirects": 3, + + "request.keep_alive.max_idle_connections": 5, + + "request.tracer.filename": "/tmp/okta-trace.ndjson", + + "request.ssl.verification_mode": "full", + "request.timeout": "30s", + "request.proxy_url": "http://corp-proxy:3128", + + "cel.program": ` +state.url.with({ + "Header": {"Accept": ["application/json"]}, +}).as(req, request("GET", req).as(resp, + bytes(resp.Body).decode_json().as(body, { + "events": body.map(e, {"message": e.encode_json()}), + "cursor": {"after": body[body.size()-1].published}, + }) +))`, + "cel.max_executions": 100, + "cel.state": map[string]interface{}{"cursor": map[string]interface{}{"after": ""}}, + "cel.regexp": map[string]interface{}{"link": `<([^>]+)>;\s*rel="next"`}, + "cel.redact": map[string]interface{}{"fields": []string{"auth.oauth2.client.secret"}}, + }) + + out, err := convertHttpjsonToCel(cfg) + require.NoError(t, err) + + typ, err := out.String("type", -1) + require.NoError(t, err) + require.Equal(t, "cel", typ) + + id, err := out.String("id", -1) + require.NoError(t, err) + require.Equal(t, "okta-system-log", id) + + interval, err := out.String("interval", -1) + require.NoError(t, err) + require.Equal(t, "120s", interval) + + url, err := out.String("resource.url", -1) + require.NoError(t, err) + require.Equal(t, "https://dev-123456.okta.com/api/v1/logs", url) + + // Auth transferred + clientID, err := out.String("auth.oauth2.client.id", -1) + require.NoError(t, err) + require.Equal(t, "0oa1234567890abcdef", clientID) + + // Retry transferred + retrySub, err := out.Child("resource.retry", -1) + require.NoError(t, err) + maxAttempts, err := retrySub.Int("max_attempts", -1) + require.NoError(t, err) + require.Equal(t, int64(5), maxAttempts) + + // Redirect transferred + has, err := out.Has("resource.redirect", -1) + require.NoError(t, err) + require.True(t, has) + + // Keep alive transferred + has, err = out.Has("resource.keep_alive", -1) + require.NoError(t, err) + require.True(t, has) + + // Tracer transferred + tracerFile, err := out.String("resource.tracer.filename", -1) + require.NoError(t, err) + require.Equal(t, "/tmp/okta-trace.ndjson", tracerFile) + + // Transport transferred + sslMode, err := out.String("resource.ssl.verification_mode", -1) + require.NoError(t, err) + require.Equal(t, "full", sslMode) + + timeout, err := out.String("resource.timeout", -1) + require.NoError(t, err) + require.Equal(t, "30s", timeout) + + proxyURL, err := out.String("resource.proxy_url", -1) + require.NoError(t, err) + require.Equal(t, "http://corp-proxy:3128", proxyURL) + + // CEL fields transferred + program, err := out.String("program", -1) + require.NoError(t, err) + require.Contains(t, program, "state.url.with") + + maxExec, err := out.Int("max_executions", -1) + require.NoError(t, err) + require.Equal(t, int64(100), maxExec) + + has, err = out.Has("state", -1) + require.NoError(t, err) + require.True(t, has) + + has, err = out.Has("regexp", -1) + require.NoError(t, err) + require.True(t, has) + + has, err = out.Has("redact", -1) + require.NoError(t, err) + require.True(t, has) + + // httpjson-only fields absent + has, err = out.Has("request", -1) + require.NoError(t, err) + require.False(t, has) + + has, err = out.Has("cel", -1) + require.NoError(t, err) + require.False(t, has) + }) +} + +func TestMigrateCursor(t *testing.T) { + t.Run("injects_stored_cursor", func(t *testing.T) { + store := newTestStore() + s, err := store.StoreFor("httpjson") + require.NoError(t, err) + err = s.Set("httpjson::my-input::https://api.example.com/events", map[string]interface{}{ + "ttl": 0, + "updated": time.Now(), + "cursor": map[string]interface{}{"timestamp": "2025-06-15T10:30:00Z"}, + }) + require.NoError(t, err) + s.Close() + + mgr := NewInputManager(logp.NewNopLogger(), store) + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "id": "my-input", + "interval": "60s", + "run_as_cel": true, + "request.url": "https://api.example.com/events", + "cel.program": `true`, + "cel.state": map[string]interface{}{"cursor": map[string]interface{}{"timestamp": ""}}, + }) + + _, newCfg, err := mgr.Redirect(cfg) + require.NoError(t, err) + + v, err := newCfg.String("state.cursor.timestamp", -1) + require.NoError(t, err) + require.Equal(t, "2025-06-15T10:30:00Z", v) + }) + + t.Run("no_entry", func(t *testing.T) { + store := newTestStore() + mgr := NewInputManager(logp.NewNopLogger(), store) + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "id": "my-input", + "interval": "60s", + "run_as_cel": true, + "request.url": "https://api.example.com/events", + "cel.program": `true`, + "cel.state": map[string]interface{}{"cursor": map[string]interface{}{"timestamp": "default"}}, + }) + + _, newCfg, err := mgr.Redirect(cfg) + require.NoError(t, err) + + v, err := newCfg.String("state.cursor.timestamp", -1) + require.NoError(t, err) + require.Equal(t, "default", v) + }) + + t.Run("no_id", func(t *testing.T) { + store := newTestStore() + s, err := store.StoreFor("httpjson") + require.NoError(t, err) + err = s.Set("httpjson::https://api.example.com/events", map[string]interface{}{ + "ttl": 0, + "updated": time.Now(), + "cursor": map[string]interface{}{"page": "42"}, + }) + require.NoError(t, err) + s.Close() + + mgr := NewInputManager(logp.NewNopLogger(), store) + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "interval": "60s", + "run_as_cel": true, + "request.url": "https://api.example.com/events", + "cel.program": `true`, + "cel.state": map[string]interface{}{}, + }) + + _, newCfg, err := mgr.Redirect(cfg) + require.NoError(t, err) + + v, err := newCfg.String("state.cursor.page", -1) + require.NoError(t, err) + require.Equal(t, "42", v) + }) + + t.Run("no_state_in_config", func(t *testing.T) { + store := newTestStore() + s, err := store.StoreFor("httpjson") + require.NoError(t, err) + err = s.Set("httpjson::no-state::https://api.example.com/events", map[string]interface{}{ + "ttl": 0, + "updated": time.Now(), + "cursor": map[string]interface{}{"offset": "100"}, + }) + require.NoError(t, err) + s.Close() + + mgr := NewInputManager(logp.NewNopLogger(), store) + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "id": "no-state", + "interval": "60s", + "run_as_cel": true, + "request.url": "https://api.example.com/events", + "cel.program": `true`, + }) + + _, newCfg, err := mgr.Redirect(cfg) + require.NoError(t, err) + + v, err := newCfg.String("state.cursor.offset", -1) + require.NoError(t, err) + require.Equal(t, "100", v) + }) + + t.Run("idempotent", func(t *testing.T) { + store := newTestStore() + s, err := store.StoreFor("httpjson") + require.NoError(t, err) + err = s.Set("httpjson::idem::https://api.example.com/events", map[string]interface{}{ + "ttl": 0, + "updated": time.Now(), + "cursor": map[string]interface{}{"seq": "99"}, + }) + require.NoError(t, err) + s.Close() + + mgr := NewInputManager(logp.NewNopLogger(), store) + cfg := conf.MustNewConfigFrom(map[string]interface{}{ + "type": "httpjson", + "id": "idem", + "interval": "60s", + "run_as_cel": true, + "request.url": "https://api.example.com/events", + "cel.program": `true`, + "cel.state": map[string]interface{}{}, + }) + + _, first, err := mgr.Redirect(cfg) + require.NoError(t, err) + + _, second, err := mgr.Redirect(cfg) + require.NoError(t, err) + + v1, err := first.String("state.cursor.seq", -1) + require.NoError(t, err) + v2, err := second.String("state.cursor.seq", -1) + require.NoError(t, err) + require.Equal(t, v1, v2) + }) +} + +func TestCursorKey(t *testing.T) { + require.Equal(t, "httpjson::my-id::https://example.com", cursorKey("httpjson", "my-id", "https://example.com")) + require.Equal(t, "cel::https://example.com", cursorKey("cel", "", "https://example.com")) + require.Equal(t, "httpjson::https://example.com/path", cursorKey("httpjson", "", "https://example.com/path")) +} + +var _ statestore.States = (*testStore)(nil) + +type testStore struct { + registry *statestore.Registry +} + +func newTestStore() *testStore { + return &testStore{ + registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend()), + } +} + +func (s *testStore) Close() { s.registry.Close() } +func (s *testStore) StoreFor(string) (*statestore.Store, error) { return s.registry.Get("filebeat") } +func (s *testStore) CleanupInterval() time.Duration { return 0 }