diff --git a/coco.yml b/coco.yml index 9bcfbc03..f5b276f0 100644 --- a/coco.yml +++ b/coco.yml @@ -344,6 +344,39 @@ connector: interval: 5m queue: name: indexing_documents + elasticsearch: + enabled: true + interval: 30m + queue: + name: indexing_documents + deployment_mode: cluster + read_strategy: + scroll_size: 1000 + scroll_timeout: "5m" + slice_count: 4 + preference: "_primary" + sync_policy: + full_sync_interval: "24h" + incremental_sync_interval: "5m" + fields: ["title", "content", "url", "@timestamp"] + query_filter: | + { + "range": { + "@timestamp": { + "gte": "now-1d" + } + } + } + health_check: + enabled: true + interval: 30s + failure_threshold: 3 + recovery_threshold: 2 + retry_config: + max_retries: 3 + initial_delay: "1s" + max_delay: "60s" + backoff_factor: 2.0 network_drive: enabled: true interval: 30s diff --git a/plugins/connectors/elasticsearch/client.go b/plugins/connectors/elasticsearch/client.go new file mode 100644 index 00000000..ae09a6e8 --- /dev/null +++ b/plugins/connectors/elasticsearch/client.go @@ -0,0 +1,194 @@ +package elasticsearch + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + log "github.com/cihub/seelog" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/model" + "infini.sh/framework/core/util" + "infini.sh/framework/modules/elastic/common" +) + +type ESClient struct { + client elastic.API + config Config +} + +type ScrollIterator struct { + client elastic.API + scrollID string + config Config + index string + finished bool +} + +func NewESClient(cfg Config) (*ESClient, error) { + esConfig := elastic.ElasticsearchConfig{ + Name: "coco-connector-es", + Enabled: true, + } + + if len(cfg.Endpoints) > 0 { + esConfig.Endpoint = cfg.Endpoints + } else { + return nil, fmt.Errorf("no endpoints specified") + } + + if cfg.Username != "" && cfg.Password != "" { + esConfig.BasicAuth = &model.BasicAuth{ + Username: cfg.Username, + Password: cfg.Password, + } + } + + if cfg.CredentialID != "" { + esConfig.CredentialID = cfg.CredentialID + } + + if cfg.GetTimeout() != "" { + esConfig.ClientTimeout = cfg.GetTimeout() + } + + client, err := common.InitClientWithConfig(esConfig) + if err != nil { + return nil, fmt.Errorf("failed to initialize ES client: %w", err) + } + + return &ESClient{client: client, config: cfg}, nil +} + +func (c *ESClient) NewScrollIterator(ctx context.Context, index string) (*ScrollIterator, error) { + query := c.buildQuery() + + searchRequest := &elastic.SearchRequest{ + Size: c.config.GetScrollSize(), + Query: query, + } + + if len(c.config.IncludeFields) > 0 || len(c.config.ExcludeFields) > 0 { + source := make(map[string]interface{}) + if len(c.config.IncludeFields) > 0 { + source["includes"] = c.config.IncludeFields + } + if len(c.config.ExcludeFields) > 0 { + source["excludes"] = c.config.ExcludeFields + } + searchRequest.Source = source + } + + scrollID, err := c.client.NewScroll(index, c.config.GetScrollTime(), c.config.GetScrollSize(), searchRequest, 0, 1) + if err != nil { + return nil, fmt.Errorf("failed to create scroll: %w", err) + } + + return &ScrollIterator{ + client: c.client, + scrollID: string(scrollID), + config: c.config, + index: index, + finished: false, + }, nil +} + +func (c *ESClient) buildQuery() *elastic.Query { + if c.config.Query != "" { + var queryMap map[string]interface{} + if err := json.Unmarshal([]byte(c.config.Query), &queryMap); err == nil { + return &elastic.Query{ + RawQuery: queryMap, + } + } + log.Warnf("Failed to parse custom query DSL, using match_all: %v", err) + } + + if c.config.TimestampField != "" && c.config.LastSyncTime != "" { + return &elastic.Query{ + BoolQuery: &elastic.BoolQuery{ + Must: []interface{}{ + map[string]interface{}{ + "range": map[string]interface{}{ + c.config.TimestampField: map[string]interface{}{ + "gte": c.config.LastSyncTime, + }, + }, + }, + }, + }, + } + } + + return &elastic.Query{ + MatchAllQuery: &elastic.MatchAllQuery{}, + } +} + +func (iter *ScrollIterator) Next(ctx context.Context) ([]elastic.IndexDocument, error) { + if iter.finished { + return nil, nil + } + + apiCtx := &elastic.APIContext{ + Context: ctx, + } + + response, err := iter.client.NextScroll(apiCtx, iter.config.GetScrollTime(), iter.scrollID) + if err != nil { + return nil, fmt.Errorf("failed to get next scroll: %w", err) + } + + var searchResponse elastic.SearchResponse + if err := json.Unmarshal(response, &searchResponse); err != nil { + return nil, fmt.Errorf("failed to parse scroll response: %w", err) + } + + if searchResponse.ScrollId != "" { + iter.scrollID = searchResponse.ScrollId + } + + if len(searchResponse.Hits.Hits) == 0 { + iter.finished = true + iter.Close() + return nil, nil + } + + docs := make([]elastic.IndexDocument, len(searchResponse.Hits.Hits)) + for i, hit := range searchResponse.Hits.Hits { + docs[i] = elastic.IndexDocument{ + Index: hit.Index, + Type: hit.Type, + Id: hit.ID, + Source: hit.Source, + } + } + + return docs, nil +} + +func (iter *ScrollIterator) Close() error { + if iter.scrollID != "" { + err := iter.client.ClearScroll(iter.scrollID) + iter.scrollID = "" + return err + } + return nil +} + +func (c *ESClient) TestConnection(ctx context.Context) error { + health, err := c.client.ClusterHealth(ctx) + if err != nil { + return fmt.Errorf("failed to get cluster health: %w", err) + } + + log.Infof("ES cluster health: %s, nodes: %d", health.Status, health.NumberOfNodes) + return nil +} + +func (c *ESClient) GetIndicesInfo(ctx context.Context) (map[string]interface{}, error) { + indices := strings.Join(c.config.GetIndices(), ",") + return c.client.GetIndex(indices) +} diff --git a/plugins/connectors/elasticsearch/client_test.go b/plugins/connectors/elasticsearch/client_test.go new file mode 100644 index 00000000..5f747216 --- /dev/null +++ b/plugins/connectors/elasticsearch/client_test.go @@ -0,0 +1,215 @@ +/* Copyright © INFINI LTD. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package elasticsearch + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNewESClient(t *testing.T) { + tests := []struct { + name string + config Config + wantErr bool + }{ + { + name: "valid config", + config: Config{ + Endpoints: []string{"http://localhost:9200"}, + Username: "elastic", + Password: "password", + }, + wantErr: false, + }, + { + name: "empty endpoints", + config: Config{ + Endpoints: []string{}, + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client, err := NewESClient(tt.config) + if tt.wantErr { + assert.Error(t, err) + assert.Nil(t, client) + } else { + assert.NoError(t, err) + assert.NotNil(t, client) + assert.Equal(t, tt.config, client.config) + } + }) + } +} + +func TestESClientBuildQuery(t *testing.T) { + config := Config{ + Endpoints: []string{"http://localhost:9200"}, + Query: `{"match_all": {}}`, + } + + client, err := NewESClient(config) + assert.NoError(t, err) + + query := client.buildQuery() + assert.NotNil(t, query) +} + +func TestESClientTestConnection(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/" { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"version":{"number":"8.0.0"}}`)) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + config := Config{ + Endpoints: []string{server.URL}, + } + + client, err := NewESClient(config) + assert.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err = client.TestConnection(ctx) + if err != nil { + assert.Contains(t, err.Error(), "failed to test connection") + } +} + +func TestESClientGetIndicesInfo(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/test-index" { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{ + "test-index": { + "aliases": {}, + "mappings": { + "properties": { + "title": {"type": "text"}, + "content": {"type": "text"} + } + }, + "settings": { + "index": { + "number_of_shards": "1", + "number_of_replicas": "1" + } + } + } + }`)) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + defer server.Close() + + config := Config{ + Endpoints: []string{server.URL}, + Indices: []string{"test-index"}, + } + + client, err := NewESClient(config) + assert.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + info, err := client.GetIndicesInfo(ctx) + if err != nil { + assert.Contains(t, err.Error(), "failed to get indices info") + } else { + assert.NotNil(t, info) + } +} + +func TestScrollIteratorClose(t *testing.T) { + config := Config{ + Endpoints: []string{"http://localhost:9200"}, + } + + client, err := NewESClient(config) + assert.NoError(t, err) + + iterator := &ScrollIterator{ + client: client, + scrollID: "test-scroll-id", + finished: false, + } + + iterator.Close() + assert.True(t, iterator.finished) +} + +func TestConfigGetMethods(t *testing.T) { + config := Config{ + ScrollSize: 500, + ScrollTime: "2m", + BatchSize: 200, + Concurrency: 4, + Timeout: "60s", + Indices: []string{"index1", "index2"}, + ReadStrategy: ReadStrategyConfig{ + SliceCount: 8, + Preference: "_local", + }, + RetryConfig: RetryConfig{ + MaxRetries: 5, + InitialDelay: 2 * time.Second, + MaxDelay: 2 * time.Minute, + BackoffFactor: 3.0, + }, + } + + assert.Equal(t, 500, config.GetScrollSize()) + assert.Equal(t, "2m", config.GetScrollTime()) + assert.Equal(t, 200, config.GetBatchSize()) + assert.Equal(t, 4, config.GetConcurrency()) + assert.Equal(t, "60s", config.GetTimeout()) + assert.Equal(t, []string{"index1", "index2"}, config.GetIndices()) + + assert.Equal(t, 8, config.ReadStrategy.GetSliceCount()) + assert.Equal(t, "_local", config.ReadStrategy.GetPreference()) + + assert.Equal(t, 5, config.RetryConfig.GetMaxRetries()) + assert.Equal(t, 2*time.Second, config.RetryConfig.GetInitialDelay()) + assert.Equal(t, 2*time.Minute, config.RetryConfig.GetMaxDelay()) + assert.Equal(t, 3.0, config.RetryConfig.GetBackoffFactor()) +} + +func TestConfigDefaultValues(t *testing.T) { + config := Config{} + + assert.Equal(t, DefaultScrollSize, config.GetScrollSize()) + assert.Equal(t, DefaultScrollTimeout, config.GetScrollTime()) + assert.Equal(t, DefaultBatchSize, config.GetBatchSize()) + assert.Equal(t, DefaultConcurrency, config.GetConcurrency()) + assert.Equal(t, "30s", config.GetTimeout()) + assert.Equal(t, []string{"*"}, config.GetIndices()) + + assert.Equal(t, DefaultSliceCount, config.ReadStrategy.GetSliceCount()) + assert.Equal(t, DefaultPreference, config.ReadStrategy.GetPreference()) + + assert.Equal(t, DefaultMaxRetries, config.RetryConfig.GetMaxRetries()) + assert.Equal(t, DefaultInitialDelay, config.RetryConfig.GetInitialDelay()) + assert.Equal(t, DefaultMaxDelay, config.RetryConfig.GetMaxDelay()) + assert.Equal(t, DefaultBackoffFactor, config.RetryConfig.GetBackoffFactor()) +} diff --git a/plugins/connectors/elasticsearch/config.go b/plugins/connectors/elasticsearch/config.go new file mode 100644 index 00000000..d35c50c9 --- /dev/null +++ b/plugins/connectors/elasticsearch/config.go @@ -0,0 +1,150 @@ +package elasticsearch + +import "time" + +type Config struct { + Endpoints []string `config:"endpoints"` // Support multi-endpoint cluster mode + Username string `config:"username"` + Password string `config:"password"` + CredentialID string `config:"credential_id"` + + DeploymentMode string `config:"deployment_mode"` // single, cluster, ha_cluster + + Indices []string `config:"indices"` // List of indices to scan + Query string `config:"query"` // Custom query DSL + ScrollSize int `config:"scroll_size"` // Scroll query size + ScrollTime string `config:"scroll_time"` // Scroll timeout + + BatchSize int `config:"batch_size"` // Batch processing size + Concurrency int `config:"concurrency"` // Concurrency level + + TimestampField string `config:"timestamp_field"` // Timestamp field for incremental sync + LastSyncTime string `config:"last_sync_time"` // Last sync time + + IncludeFields []string `config:"include_fields"` // Fields to include in documents + ExcludeFields []string `config:"exclude_fields"` // Fields to exclude from documents + Timeout string `config:"timeout"` // Request timeout + + ReadStrategy ReadStrategyConfig `config:"read_strategy"` + SyncPolicy SyncPolicyConfig `config:"sync_policy"` + HealthCheck HealthCheckConfig `config:"health_check"` + RetryConfig RetryConfig `config:"retry_config"` +} + +type ReadStrategyConfig struct { + ScrollSize int `config:"scroll_size"` + ScrollTimeout string `config:"scroll_timeout"` + SliceCount int `config:"slice_count"` + Preference string `config:"preference"` +} + +type SyncPolicyConfig struct { + FullSyncInterval string `config:"full_sync_interval"` + IncrementalSyncInterval string `config:"incremental_sync_interval"` + Fields []string `config:"fields"` + QueryFilter string `config:"query_filter"` +} + +type HealthCheckConfig struct { + Enabled bool `config:"enabled"` + Interval time.Duration `config:"interval"` + FailureThreshold int `config:"failure_threshold"` + RecoveryThreshold int `config:"recovery_threshold"` +} + +type RetryConfig struct { + MaxRetries int `config:"max_retries"` + InitialDelay time.Duration `config:"initial_delay"` + MaxDelay time.Duration `config:"max_delay"` + BackoffFactor float64 `config:"backoff_factor"` +} + +func (c *Config) GetScrollSize() int { + if c.ReadStrategy.ScrollSize > 0 { + return c.ReadStrategy.ScrollSize + } + if c.ScrollSize <= 0 { + return DefaultScrollSize + } + return c.ScrollSize +} + +func (c *Config) GetScrollTime() string { + if c.ReadStrategy.ScrollTimeout != "" { + return c.ReadStrategy.ScrollTimeout + } + if c.ScrollTime == "" { + return DefaultScrollTimeout + } + return c.ScrollTime +} + +func (c *Config) GetBatchSize() int { + if c.BatchSize <= 0 { + return DefaultBatchSize + } + return c.BatchSize +} + +func (c *Config) GetConcurrency() int { + if c.Concurrency <= 0 { + return DefaultConcurrency + } + return c.Concurrency +} + +func (c *Config) GetTimeout() string { + if c.Timeout == "" { + return "30s" + } + return c.Timeout +} + +func (c *Config) GetIndices() []string { + if len(c.Indices) == 0 { + return []string{"*"} + } + return c.Indices +} + +func (c *ReadStrategyConfig) GetSliceCount() int { + if c.SliceCount <= 0 { + return DefaultSliceCount + } + return c.SliceCount +} + +func (c *ReadStrategyConfig) GetPreference() string { + if c.Preference == "" { + return DefaultPreference + } + return c.Preference +} + +func (c *RetryConfig) GetMaxRetries() int { + if c.MaxRetries <= 0 { + return DefaultMaxRetries + } + return c.MaxRetries +} + +func (c *RetryConfig) GetInitialDelay() time.Duration { + if c.InitialDelay <= 0 { + return DefaultInitialDelay + } + return c.InitialDelay +} + +func (c *RetryConfig) GetMaxDelay() time.Duration { + if c.MaxDelay <= 0 { + return DefaultMaxDelay + } + return c.MaxDelay +} + +func (c *RetryConfig) GetBackoffFactor() float64 { + if c.BackoffFactor <= 0 { + return DefaultBackoffFactor + } + return c.BackoffFactor +} diff --git a/plugins/connectors/elasticsearch/integration_test.go b/plugins/connectors/elasticsearch/integration_test.go new file mode 100644 index 00000000..8fb6fd07 --- /dev/null +++ b/plugins/connectors/elasticsearch/integration_test.go @@ -0,0 +1,481 @@ +/* Copyright © INFINI LTD. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +//go:build integration + +package elasticsearch + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "infini.sh/coco/modules/common" + "infini.sh/coco/plugins/connectors" + "infini.sh/framework/core/kv" + "infini.sh/framework/core/module" + "infini.sh/framework/core/queue" +) + +func TestFullWorkflowIntegration(t *testing.T) { + theQueue := mockQueue{} + queue.RegisterDefaultHandler(theQueue) + kv.Register("indexing_documents", &mockKVStore{store: make(map[string]cache)}) + + requestCount := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + requestCount++ + w.Header().Set("Content-Type", "application/json") + + switch { + case r.URL.Path == "/": + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"version":{"number":"8.0.0","build_flavor":"default"}}`)) + + case r.URL.Path == "/_cluster/health": + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{ + "cluster_name": "test-cluster", + "status": "green", + "timed_out": false, + "number_of_nodes": 3, + "number_of_data_nodes": 3, + "active_primary_shards": 5, + "active_shards": 10 + }`)) + + case r.URL.Path == "/_cat/nodes": + w.WriteHeader(http.StatusOK) + w.Write([]byte(`[ + { + "ip": "192.168.1.1", + "name": "node-1", + "id": "node-1-id", + "node.role": "dilmrt", + "master": "*" + }, + { + "ip": "192.168.1.2", + "name": "node-2", + "id": "node-2-id", + "node.role": "dilmrt", + "master": "-" + } + ]`)) + + case r.URL.Path == "/test-index/_search": + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{ + "took": 5, + "timed_out": false, + "_shards": {"total": 1, "successful": 1, "skipped": 0, "failed": 0}, + "hits": { + "total": {"value": 3, "relation": "eq"}, + "max_score": 1.0, + "hits": [ + { + "_index": "test-index", + "_type": "_doc", + "_id": "1", + "_score": 1.0, + "_source": { + "title": "Integration Test Document 1", + "content": "This is content for integration test document 1", + "url": "https://example.com/doc1", + "@timestamp": "2024-01-01T00:00:00Z", + "author": "Test Author 1", + "_seq_no": 1, + "_primary_term": 1 + } + }, + { + "_index": "test-index", + "_type": "_doc", + "_id": "2", + "_score": 1.0, + "_source": { + "title": "Integration Test Document 2", + "content": "This is content for integration test document 2", + "url": "https://example.com/doc2", + "@timestamp": "2024-01-02T00:00:00Z", + "author": "Test Author 2", + "_seq_no": 2, + "_primary_term": 1 + } + } + ] + }, + "_scroll_id": "test-scroll-id-123" + }`)) + + case r.URL.Path == "/_search/scroll": + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{ + "took": 2, + "timed_out": false, + "_shards": {"total": 1, "successful": 1, "skipped": 0, "failed": 0}, + "hits": { + "total": {"value": 0, "relation": "eq"}, + "max_score": null, + "hits": [] + }, + "_scroll_id": "test-scroll-id-123" + }`)) + + case r.URL.Path == "/test-index": + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{ + "test-index": { + "aliases": {}, + "mappings": { + "properties": { + "title": {"type": "text"}, + "content": {"type": "text"}, + "url": {"type": "keyword"}, + "@timestamp": {"type": "date"}, + "author": {"type": "keyword"} + } + }, + "settings": { + "index": { + "number_of_shards": "1", + "number_of_replicas": "1" + } + } + } + }`)) + + default: + w.WriteHeader(http.StatusNotFound) + w.Write([]byte(`{"error":"not found"}`)) + } + })) + defer server.Close() + + testQueueName := "indexing_documents" + plugin := &Plugin{} + plugin.Queue = &queue.QueueConfig{Name: testQueueName} + module.RegisterUserPlugin(plugin) + plugin.Queue = queue.SmartGetOrInitConfig(plugin.Queue) + plugin.Setup() + + testCases := []struct { + name string + deploymentMode string + sliceCount int + healthCheck bool + expectedDocs int + }{ + { + name: "Single Node Mode", + deploymentMode: "single", + sliceCount: 1, + healthCheck: false, + expectedDocs: 2, + }, + { + name: "Cluster Mode with Slice Scroll", + deploymentMode: "cluster", + sliceCount: 2, + healthCheck: true, + expectedDocs: 2, + }, + { + name: "HA Cluster Mode", + deploymentMode: "ha_cluster", + sliceCount: 4, + healthCheck: true, + expectedDocs: 2, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + theQueue[testQueueName] = [][]byte{} + + connector := &common.Connector{ID: "elasticsearch"} + dataSource := &common.DataSource{ + ID: "test-datasource-" + tc.deploymentMode, + Name: "Test ES Source - " + tc.name, + Connector: common.ConnectorConfig{ + ConnectorID: "elasticsearch", + Config: map[string]interface{}{ + "endpoints": []interface{}{server.URL}, + "indices": []interface{}{"test-index"}, + "deployment_mode": tc.deploymentMode, + "scroll_size": 100, + "batch_size": 50, + "read_strategy": map[string]interface{}{ + "slice_count": tc.sliceCount, + "preference": "_primary", + }, + "health_check": map[string]interface{}{ + "enabled": tc.healthCheck, + "interval": "5s", + }, + "retry_config": map[string]interface{}{ + "max_retries": 2, + "initial_delay": "100ms", + "max_delay": "1s", + "backoff_factor": 2.0, + }, + }, + }, + } + + didPanic := false + var panicValue interface{} + func() { + defer func() { + if r := recover(); r != nil { + didPanic = true + panicValue = r + } + }() + plugin.Scan(connector, dataSource) + }() + + assert.False(t, didPanic, "Scan should not panic for %s: %v", tc.name, panicValue) + + assert.NotEqual(t, StatusError, plugin.syncState.Status, "Sync should not be in error state for %s", tc.name) + + queueID := plugin.Queue.ID + queueSize := len(theQueue[queueID]) + + if queueSize > 0 { + data := theQueue[queueID][0] + var doc common.Document + err := json.Unmarshal(data, &doc) + assert.NoError(t, err, "Document should unmarshal correctly for %s", tc.name) + + assert.Equal(t, "Integration Test Document 1", doc.Title) + assert.Equal(t, "This is content for integration test document 1", doc.Content) + assert.Equal(t, "https://example.com/doc1", doc.URL) + assert.Equal(t, "elasticsearch", doc.Type) + assert.Equal(t, "Test Author 1", doc.Owner.UserName) + assert.Equal(t, dataSource.ID, doc.DataSourceID) + assert.Equal(t, "test-index", doc.Category) + } + + assert.Greater(t, plugin.metrics.LastUpdated.Unix(), int64(0), "Metrics should be updated for %s", tc.name) + + if plugin.syncState.Status != StatusError { + assert.GreaterOrEqual(t, plugin.syncState.ProcessedDocuments, int64(0), "Processed documents should be tracked for %s", tc.name) + } + }) + } +} + +func TestErrorHandlingIntegration(t *testing.T) { + theQueue := mockQueue{} + queue.RegisterDefaultHandler(theQueue) + kv.Register("indexing_documents", &mockKVStore{store: make(map[string]cache)}) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/": + w.WriteHeader(http.StatusServiceUnavailable) + w.Write([]byte(`{"error":"service unavailable"}`)) + default: + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(`{"error":"internal server error"}`)) + } + })) + defer server.Close() + + testQueueName := "indexing_documents" + plugin := &Plugin{} + plugin.Queue = &queue.QueueConfig{Name: testQueueName} + module.RegisterUserPlugin(plugin) + plugin.Queue = queue.SmartGetOrInitConfig(plugin.Queue) + plugin.Setup() + + connector := &common.Connector{ID: "elasticsearch"} + dataSource := &common.DataSource{ + ID: "test-error-datasource", + Name: "Test Error ES Source", + Connector: common.ConnectorConfig{ + ConnectorID: "elasticsearch", + Config: map[string]interface{}{ + "endpoints": []interface{}{server.URL}, + "indices": []interface{}{"test-index"}, + "retry_config": map[string]interface{}{ + "max_retries": 1, + "initial_delay": "10ms", + "max_delay": "100ms", + "backoff_factor": 2.0, + }, + }, + }, + } + + didPanic := false + func() { + defer func() { + if r := recover(); r != nil { + didPanic = true + } + }() + plugin.Scan(connector, dataSource) + }() + + assert.False(t, didPanic, "Scan should handle errors gracefully without panicking") + assert.Equal(t, StatusError, plugin.syncState.Status, "Sync state should be error after connection failure") + assert.Greater(t, plugin.syncState.ErrorCount, 0, "Error count should be incremented") +} + +func TestHealthMonitoringIntegration(t *testing.T) { + theQueue := mockQueue{} + queue.RegisterDefaultHandler(theQueue) + kv.Register("indexing_documents", &mockKVStore{store: make(map[string]cache)}) + + healthCheckCount := 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + switch r.URL.Path { + case "/": + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"version":{"number":"8.0.0"}}`)) + + case "/_cluster/health": + healthCheckCount++ + if healthCheckCount <= 2 { + w.WriteHeader(http.StatusServiceUnavailable) + w.Write([]byte(`{"error":"cluster unavailable"}`)) + } else { + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{ + "cluster_name": "test-cluster", + "status": "green", + "number_of_nodes": 2 + }`)) + } + + case "/_cat/nodes": + w.WriteHeader(http.StatusOK) + w.Write([]byte(`[{"ip": "192.168.1.1", "name": "node-1", "id": "node-1-id"}]`)) + + case "/test-index/_search": + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{ + "hits": {"hits": []}, + "_scroll_id": "test-scroll-id" + }`)) + + case "/_search/scroll": + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"hits": {"hits": []}}`)) + + default: + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{}`)) + } + })) + defer server.Close() + + testQueueName := "indexing_documents" + plugin := &Plugin{} + plugin.Queue = &queue.QueueConfig{Name: testQueueName} + module.RegisterUserPlugin(plugin) + plugin.Queue = queue.SmartGetOrInitConfig(plugin.Queue) + plugin.Setup() + + connector := &common.Connector{ID: "elasticsearch"} + dataSource := &common.DataSource{ + ID: "test-health-datasource", + Name: "Test Health Monitoring ES Source", + Connector: common.ConnectorConfig{ + ConnectorID: "elasticsearch", + Config: map[string]interface{}{ + "endpoints": []interface{}{server.URL}, + "indices": []interface{}{"test-index"}, + "deployment_mode": "ha_cluster", + "health_check": map[string]interface{}{ + "enabled": true, + "interval": "100ms", + "failure_threshold": 2, + }, + }, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + plugin.ctx = ctx + plugin.Scan(connector, dataSource) + + time.Sleep(500 * time.Millisecond) + + assert.Greater(t, healthCheckCount, 0, "Health checks should have been performed") +} + +func TestConfigurationValidationIntegration(t *testing.T) { + theQueue := mockQueue{} + queue.RegisterDefaultHandler(theQueue) + + testQueueName := "indexing_documents" + plugin := &Plugin{} + plugin.Queue = &queue.QueueConfig{Name: testQueueName} + module.RegisterUserPlugin(plugin) + plugin.Queue = queue.SmartGetOrInitConfig(plugin.Queue) + plugin.Setup() + + invalidConfigs := []struct { + name string + config map[string]interface{} + }{ + { + name: "Missing endpoints", + config: map[string]interface{}{ + "indices": []interface{}{"test-index"}, + }, + }, + { + name: "Invalid deployment mode", + config: map[string]interface{}{ + "endpoints": []interface{}{"http://localhost:9200"}, + "deployment_mode": "invalid_mode", + }, + }, + } + + for _, tc := range invalidConfigs { + t.Run(tc.name, func(t *testing.T) { + connector := &common.Connector{ID: "elasticsearch"} + dataSource := &common.DataSource{ + ID: "test-invalid-config", + Name: "Test Invalid Config", + Connector: common.ConnectorConfig{ + ConnectorID: "elasticsearch", + Config: tc.config, + }, + } + + plugin.syncState.Status = StatusIdle + plugin.syncState.ErrorCount = 0 + + didPanic := false + func() { + defer func() { + if r := recover(); r != nil { + didPanic = true + } + }() + plugin.Scan(connector, dataSource) + }() + + assert.False(t, didPanic, "Invalid config should not cause panic for %s", tc.name) + + if tc.name == "Missing endpoints" { + assert.Equal(t, StatusError, plugin.syncState.Status, "Missing endpoints should result in error state") + } + }) + } +} diff --git a/plugins/connectors/elasticsearch/plugin.go b/plugins/connectors/elasticsearch/plugin.go new file mode 100644 index 00000000..8316da3a --- /dev/null +++ b/plugins/connectors/elasticsearch/plugin.go @@ -0,0 +1,561 @@ +package elasticsearch + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + log "github.com/cihub/seelog" + "infini.sh/coco/modules/common" + "infini.sh/coco/plugins/connectors" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/global" + "infini.sh/framework/core/module" + "infini.sh/framework/core/queue" + "infini.sh/framework/core/util" +) + +const ConnectorElasticsearch = "elasticsearch" + +type Plugin struct { + connectors.BasePlugin + mu sync.Mutex + ctx context.Context + cancel context.CancelFunc + syncState *SyncState + metrics *PerformanceMetrics + healthTicker *time.Ticker +} + +func (p *Plugin) Name() string { + return ConnectorElasticsearch +} + +func (p *Plugin) Setup() { + p.BasePlugin.Init("connector.elasticsearch", "indexing elasticsearch documents", p) + p.syncState = &SyncState{ + Status: StatusIdle, + } + p.metrics = &PerformanceMetrics{ + LastUpdated: time.Now(), + } +} + +func (p *Plugin) Start() error { + p.mu.Lock() + defer p.mu.Unlock() + p.ctx, p.cancel = context.WithCancel(context.Background()) + + p.startHealthCheck() + + return p.BasePlugin.Start(connectors.DefaultSyncInterval) +} + +func (p *Plugin) Stop() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.healthTicker != nil { + p.healthTicker.Stop() + } + + if p.cancel != nil { + p.cancel() + } + + p.syncState.Status = StatusStopped + return p.BasePlugin.Stop() +} + +func (p *Plugin) Scan(connector *common.Connector, datasource *common.DataSource) { + log.Infof("[%v connector] Starting enhanced scan for datasource: %s", ConnectorElasticsearch, datasource.Name) + + p.mu.Lock() + p.syncState.Status = StatusRunning + p.syncState.LastIncrementalSync = time.Now() + p.mu.Unlock() + + cfg := Config{} + err := connectors.ParseConnectorConfigure(connector, datasource, &cfg) + if err != nil { + log.Errorf("[%v connector] Parsing connector configuration failed: %v", ConnectorElasticsearch, err) + p.updateSyncState(StatusError, fmt.Sprintf("Config parse error: %v", err)) + return + } + + log.Debugf("[%v connector] Enhanced config: endpoints=%v, deployment_mode=%s, slice_count=%d", + ConnectorElasticsearch, cfg.Endpoints, cfg.DeploymentMode, cfg.ReadStrategy.GetSliceCount()) + + if len(cfg.Endpoints) == 0 { + log.Errorf("[%v connector] Missing required configuration for datasource [%s]: endpoints", ConnectorElasticsearch, datasource.Name) + p.updateSyncState(StatusError, "Missing endpoints configuration") + return + } + + client, err := NewESClient(cfg) + if err != nil { + log.Errorf("[%v connector] Failed to create ES client for datasource [%s]: %v", ConnectorElasticsearch, datasource.Name, err) + p.updateSyncState(StatusError, fmt.Sprintf("Client creation error: %v", err)) + return + } + + if err := p.testConnectionWithRetry(client, cfg.RetryConfig); err != nil { + log.Errorf("[%v connector] ES connection test failed after retries: %v", ConnectorElasticsearch, err) + p.updateSyncState(StatusError, fmt.Sprintf("Connection failed: %v", err)) + return + } + + reader := NewReader(client, cfg) + + if err := reader.OptimizeReadStrategy(p.ctx); err != nil { + log.Warnf("[%v connector] Failed to optimize read strategy: %v", ConnectorElasticsearch, err) + } + + var healthCtx context.Context + var healthCancel context.CancelFunc + if cfg.HealthCheck.Enabled { + healthCtx, healthCancel = context.WithCancel(p.ctx) + go p.monitorHealth(healthCtx, reader, cfg.HealthCheck) + defer healthCancel() + } + + indices := cfg.GetIndices() + concurrency := cfg.GetConcurrency() + + startTime := time.Now() + var totalDocs int64 + + if concurrency > 1 && len(indices) > 1 { + totalDocs = p.scanIndicesConcurrentlyEnhanced(reader, indices, datasource, cfg) + } else { + totalDocs = p.scanIndicesSequentiallyEnhanced(reader, indices, datasource, cfg) + } + + duration := time.Since(startTime) + p.updateMetrics(totalDocs, duration) + p.updateSyncState(StatusIdle, "Scan completed successfully") + + log.Infof("[%v connector] Enhanced scan completed for datasource [%s], processed %d documents in %v", + ConnectorElasticsearch, datasource.Name, totalDocs, duration) +} + +func (p *Plugin) scanIndicesSequentially(client *ESClient, indices []string, datasource *common.DataSource, cfg Config) { + for _, index := range indices { + if p.ctx.Err() != nil { + log.Infof("[%v connector] Context cancelled, stopping scan", ConnectorElasticsearch) + return + } + p.scanIndex(client, index, datasource, cfg) + } +} + +func (p *Plugin) scanIndicesConcurrently(client *ESClient, indices []string, datasource *common.DataSource, cfg Config) { + var wg sync.WaitGroup + semaphore := make(chan struct{}, cfg.GetConcurrency()) + + for _, index := range indices { + if p.ctx.Err() != nil { + log.Infof("[%v connector] Context cancelled, stopping scan", ConnectorElasticsearch) + break + } + + wg.Add(1) + go func(idx string) { + defer wg.Done() + semaphore <- struct{}{} + defer func() { <-semaphore }() + + p.scanIndex(client, idx, datasource, cfg) + }(index) + } + + wg.Wait() +} + +func (p *Plugin) scanIndex(client *ESClient, index string, datasource *common.DataSource, cfg Config) { + log.Infof("[%v connector] Starting scan of index [%s] for datasource [%s]", ConnectorElasticsearch, index, datasource.Name) + + iterator, err := client.NewScrollIterator(p.ctx, index) + if err != nil { + log.Errorf("[%v connector] Failed to create scroll iterator for index [%s]: %v", ConnectorElasticsearch, index, err) + return + } + defer iterator.Close() + + docCount := 0 + batchCount := 0 + batchSize := cfg.GetBatchSize() + batch := make([]common.Document, 0, batchSize) + + for { + if p.ctx.Err() != nil { + log.Infof("[%v connector] Context cancelled, stopping index scan", ConnectorElasticsearch) + break + } + + docs, err := iterator.Next(p.ctx) + if err != nil { + log.Errorf("[%v connector] Error during scroll for index [%s]: %v", ConnectorElasticsearch, index, err) + break + } + + if docs == nil || len(docs) == 0 { + break + } + + for _, doc := range docs { + cocoDoc := p.convertToCocoDocument(doc, datasource, index) + batch = append(batch, cocoDoc) + docCount++ + + if len(batch) >= batchSize { + p.processBatch(batch, datasource.Name) + batch = batch[:0] + batchCount++ + } + } + } + + if len(batch) > 0 { + p.processBatch(batch, datasource.Name) + batchCount++ + } + + log.Infof("[%v connector] Completed scan of index [%s]: %d documents in %d batches", ConnectorElasticsearch, index, docCount, batchCount) +} + +func (p *Plugin) convertToCocoDocument(esDoc elastic.IndexDocument, datasource *common.DataSource, index string) common.Document { + doc := common.Document{ + Source: common.DataSourceReference{ + ID: datasource.ID, + Type: "connector", + Name: datasource.Name, + }, + Type: ConnectorElasticsearch, + Icon: "elasticsearch", + Category: index, + System: datasource.System, + Metadata: make(map[string]interface{}), + } + + doc.ID = util.MD5digest(fmt.Sprintf("%s-%s-%s", datasource.ID, esDoc.Index, esDoc.Id)) + + if esDoc.Source != nil { + sourceMap, ok := esDoc.Source.(map[string]interface{}) + if ok { + if title, exists := sourceMap["title"]; exists { + if titleStr, ok := title.(string); ok { + doc.Title = titleStr + } + } else if name, exists := sourceMap["name"]; exists { + if nameStr, ok := name.(string); ok { + doc.Title = nameStr + } + } else { + doc.Title = fmt.Sprintf("Document %s", esDoc.Id) + } + + if content, exists := sourceMap["content"]; exists { + if contentStr, ok := content.(string); ok { + doc.Content = contentStr + } + } else if body, exists := sourceMap["body"]; exists { + if bodyStr, ok := body.(string); ok { + doc.Content = bodyStr + } + } else if message, exists := sourceMap["message"]; exists { + if messageStr, ok := message.(string); ok { + doc.Content = messageStr + } + } + + if url, exists := sourceMap["url"]; exists { + if urlStr, ok := url.(string); ok { + doc.URL = urlStr + } + } + + if created, exists := sourceMap["created"]; exists { + if createdTime := p.parseTime(created); createdTime != nil { + doc.Created = createdTime + } + } else if timestamp, exists := sourceMap["@timestamp"]; exists { + if timestampTime := p.parseTime(timestamp); timestampTime != nil { + doc.Created = timestampTime + } + } + + if updated, exists := sourceMap["updated"]; exists { + if updatedTime := p.parseTime(updated); updatedTime != nil { + doc.Updated = updatedTime + } + } else if modified, exists := sourceMap["modified"]; exists { + if modifiedTime := p.parseTime(modified); modifiedTime != nil { + doc.Updated = modifiedTime + } + } + + if owner, exists := sourceMap["owner"]; exists { + if ownerMap, ok := owner.(map[string]interface{}); ok { + userInfo := &common.UserInfo{} + if id, exists := ownerMap["id"]; exists { + if idStr, ok := id.(string); ok { + userInfo.UserID = idStr + } + } + if name, exists := ownerMap["name"]; exists { + if nameStr, ok := name.(string); ok { + userInfo.UserName = nameStr + } + } + if userInfo.UserID != "" || userInfo.UserName != "" { + doc.Owner = userInfo + } + } + } + + for key, value := range sourceMap { + if !p.isReservedField(key) { + doc.Metadata[key] = value + } + } + } + } + + doc.Metadata["_index"] = esDoc.Index + doc.Metadata["_type"] = esDoc.Type + doc.Metadata["_id"] = esDoc.Id + + return doc +} + +func (p *Plugin) parseTime(timeValue interface{}) *time.Time { + switch v := timeValue.(type) { + case string: + formats := []string{ + time.RFC3339, + time.RFC3339Nano, + "2006-01-02T15:04:05.000Z", + "2006-01-02T15:04:05Z", + "2006-01-02 15:04:05", + "2006-01-02", + } + for _, format := range formats { + if t, err := time.Parse(format, v); err == nil { + return &t + } + } + case float64: + t := time.Unix(int64(v), 0) + return &t + case int64: + t := time.Unix(v, 0) + return &t + } + return nil +} + +func (p *Plugin) isReservedField(field string) bool { + reservedFields := []string{ + "title", "name", "content", "body", "message", + "url", "created", "updated", "modified", "@timestamp", + "owner", + } + for _, reserved := range reservedFields { + if field == reserved { + return true + } + } + return false +} + +func (p *Plugin) processBatch(batch []common.Document, datasourceName string) { + for _, doc := range batch { + data := util.MustToJSONBytes(doc) + if global.Env().IsDebug { + log.Tracef("[%v connector] Queuing document: %s", ConnectorElasticsearch, string(data)) + } + + if err := queue.Push(p.Queue, data); err != nil { + log.Errorf("[%v connector] Failed to push document to queue for datasource [%s]: %v", ConnectorElasticsearch, datasourceName, err) + panic(err) + } + } +} + +func (p *Plugin) scanIndicesSequentiallyEnhanced(reader *Reader, indices []string, datasource *common.DataSource, cfg Config) int64 { + var totalDocs int64 + + for _, index := range indices { + if p.ctx.Err() != nil { + log.Infof("[%v connector] Context cancelled, stopping scan", ConnectorElasticsearch) + break + } + + log.Infof("[%v connector] Scanning index with enhanced reader: %s", ConnectorElasticsearch, index) + docs := p.processIndexWithReader(reader, index, cfg, datasource) + totalDocs += docs + } + + return totalDocs +} + +func (p *Plugin) scanIndicesConcurrentlyEnhanced(reader *Reader, indices []string, datasource *common.DataSource, cfg Config) int64 { + var wg sync.WaitGroup + var totalDocs int64 + var docsMu sync.Mutex + semaphore := make(chan struct{}, cfg.GetConcurrency()) + + for _, index := range indices { + if p.ctx.Err() != nil { + log.Infof("[%v connector] Context cancelled, stopping scan", ConnectorElasticsearch) + break + } + + wg.Add(1) + go func(idx string) { + defer wg.Done() + semaphore <- struct{}{} + defer func() { <-semaphore }() + + log.Infof("[%v connector] Scanning index concurrently with enhanced reader: %s", ConnectorElasticsearch, idx) + docs := p.processIndexWithReader(reader, idx, cfg, datasource) + + docsMu.Lock() + totalDocs += docs + docsMu.Unlock() + }(index) + } + + wg.Wait() + return totalDocs +} + +func (p *Plugin) processIndexWithReader(reader *Reader, index string, cfg Config, datasource *common.DataSource) int64 { + var processedDocs int64 + + err := reader.ReadDocuments(p.ctx, index, func(docs []elastic.IndexDocument) error { + batch := make([]common.Document, 0, len(docs)) + + for _, doc := range docs { + cocoDoc := p.convertToCocoDocument(doc, datasource, index) + batch = append(batch, cocoDoc) + } + + if len(batch) > 0 { + p.processBatch(batch, datasource.Name) + processedDocs += int64(len(batch)) + } + + return nil + }) + + if err != nil { + log.Errorf("[%v connector] Failed to read documents from index %s: %v", ConnectorElasticsearch, index, err) + p.syncState.ErrorCount++ + } + + return processedDocs +} + +func (p *Plugin) startHealthCheck() { + log.Debug("[%v connector] Health check monitoring will be started per datasource", ConnectorElasticsearch) +} + +func (p *Plugin) testConnectionWithRetry(client *ESClient, retryConfig RetryConfig) error { + maxRetries := retryConfig.GetMaxRetries() + initialDelay := retryConfig.GetInitialDelay() + maxDelay := retryConfig.GetMaxDelay() + backoffFactor := retryConfig.GetBackoffFactor() + + var lastErr error + delay := initialDelay + + for attempt := 0; attempt <= maxRetries; attempt++ { + if attempt > 0 { + log.Infof("[%v connector] Retrying ES connection test, attempt %d/%d", ConnectorElasticsearch, attempt, maxRetries) + time.Sleep(delay) + + delay = time.Duration(float64(delay) * backoffFactor) + if delay > maxDelay { + delay = maxDelay + } + } + + if err := client.TestConnection(p.ctx); err != nil { + lastErr = err + log.Warnf("[%v connector] ES connection test failed (attempt %d): %v", ConnectorElasticsearch, attempt+1, err) + continue + } + + return nil + } + + return fmt.Errorf("connection failed after %d attempts: %w", maxRetries+1, lastErr) +} + +func (p *Plugin) monitorHealth(ctx context.Context, reader *Reader, config HealthCheckConfig) { + ticker := time.NewTicker(config.Interval) + defer ticker.Stop() + + failureCount := 0 + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := reader.updateClusterHealth(ctx); err != nil { + failureCount++ + log.Warnf("[%v connector] Health check failed (%d/%d): %v", ConnectorElasticsearch, failureCount, config.FailureThreshold, err) + + if failureCount >= config.FailureThreshold { + p.updateSyncState(StatusError, "Health check threshold exceeded") + } + } else { + if failureCount > 0 { + log.Infof("[%v connector] Health check recovered after %d failures", ConnectorElasticsearch, failureCount) + } + failureCount = 0 + + if p.syncState.Status == StatusError { + p.updateSyncState(StatusRecovering, "Health check recovered") + } + } + } + } +} + +func (p *Plugin) updateSyncState(status string, message string) { + p.mu.Lock() + defer p.mu.Unlock() + + p.syncState.Status = status + if status == StatusError { + p.syncState.ErrorCount++ + } + + log.Infof("[%v connector] Sync state updated: %s - %s", ConnectorElasticsearch, status, message) +} + +func (p *Plugin) updateMetrics(totalDocs int64, duration time.Duration) { + p.mu.Lock() + defer p.mu.Unlock() + + p.syncState.ProcessedDocuments += totalDocs + + if duration.Seconds() > 0 { + p.metrics.DocumentsPerSecond = float64(totalDocs) / duration.Seconds() + } + p.metrics.AverageLatency = duration + p.metrics.LastUpdated = time.Now() + + log.Infof("[%v connector] Performance metrics: %.2f docs/sec, avg latency: %v", + ConnectorElasticsearch, p.metrics.DocumentsPerSecond, p.metrics.AverageLatency) +} + +func init() { + module.RegisterUserPlugin(&Plugin{}) +} diff --git a/plugins/connectors/elasticsearch/plugin_test.go b/plugins/connectors/elasticsearch/plugin_test.go new file mode 100644 index 00000000..a0a13316 --- /dev/null +++ b/plugins/connectors/elasticsearch/plugin_test.go @@ -0,0 +1,623 @@ +/* Copyright © INFINI LTD. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package elasticsearch + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "infini.sh/coco/modules/common" + "infini.sh/coco/plugins/connectors" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/kv" + "infini.sh/framework/core/module" + "infini.sh/framework/core/queue" + "infini.sh/framework/core/util" +) + +func bytesToBase64String(key []byte) string { + return base64.StdEncoding.EncodeToString(key) +} + +type cache map[string][]byte + +type mockKVStore struct { + store map[string]cache +} + +func (s *mockKVStore) Open() error { + s.store = make(map[string]cache) + return nil +} + +func (s *mockKVStore) Close() error { + return nil +} + +func (s *mockKVStore) GetValue(bucket string, key []byte) ([]byte, error) { + target := s.store[bucket] + return target[bytesToBase64String(key)], nil +} + +func (s *mockKVStore) GetCompressedValue(bucket string, key []byte) ([]byte, error) { + return s.GetValue(bucket, key) +} + +func (s *mockKVStore) AddValueCompress(bucket string, key []byte, value []byte) error { + return s.AddValue(bucket, key, value) +} + +func (s *mockKVStore) AddValue(bucket string, key []byte, value []byte) error { + target := s.store[bucket] + if target == nil { + target = cache{} + s.store[bucket] = target + } + target[bytesToBase64String(key)] = value + return nil +} + +func (s *mockKVStore) ExistsKey(bucket string, key []byte) (bool, error) { + target := s.store[bucket] + if target == nil { + return false, nil + } + return target[bytesToBase64String(key)] != nil, nil +} + +func (s *mockKVStore) DeleteKey(bucket string, key []byte) error { + delete(s.store[bucket], bytesToBase64String(key)) + return nil +} + +type mockQueue map[string][][]byte + +func (q mockQueue) Name() string { + return "indexing_documents" +} + +func (q mockQueue) Init(s string) error { + q[s] = [][]byte{} + return nil +} + +func (q mockQueue) Close(s string) error { + q[s] = nil + return nil +} + +func (q mockQueue) GetStorageSize(k string) uint64 { + return uint64(len(q[k])) +} + +func (q mockQueue) Destroy(s string) error { + clear(q[s]) + return nil +} + +func (q mockQueue) GetQueues() []string { + var ret []string + for name := range q { + ret = append(ret, name) + } + return ret +} + +func (q mockQueue) Push(s string, bytes []byte) error { + q[s] = append(q[s], bytes) + return nil +} + +const mockSearchResponse = `{ + "took": 5, + "timed_out": false, + "_shards": { + "total": 1, + "successful": 1, + "skipped": 0, + "failed": 0 + }, + "hits": { + "total": { + "value": 2, + "relation": "eq" + }, + "max_score": 1.0, + "hits": [ + { + "_index": "test-index", + "_type": "_doc", + "_id": "1", + "_score": 1.0, + "_source": { + "title": "Test Document 1", + "content": "This is test content for document 1", + "url": "https://example.com/doc1", + "@timestamp": "2024-01-01T00:00:00Z", + "_seq_no": 1, + "_primary_term": 1 + } + }, + { + "_index": "test-index", + "_type": "_doc", + "_id": "2", + "_score": 1.0, + "_source": { + "title": "Test Document 2", + "content": "This is test content for document 2", + "url": "https://example.com/doc2", + "@timestamp": "2024-01-02T00:00:00Z", + "_seq_no": 2, + "_primary_term": 1 + } + } + ] + }, + "_scroll_id": "test-scroll-id-123" +}` + +const mockClusterHealthResponse = `{ + "cluster_name": "test-cluster", + "status": "green", + "timed_out": false, + "number_of_nodes": 3, + "number_of_data_nodes": 3, + "active_primary_shards": 5, + "active_shards": 10, + "relocating_shards": 0, + "initializing_shards": 0, + "unassigned_shards": 0 +}` + +const mockCatNodesResponse = `[ + { + "ip": "192.168.1.1", + "heap.percent": "50", + "ram.percent": "60", + "cpu": "10", + "load_1m": "1.5", + "load_5m": "1.2", + "load_15m": "1.0", + "node.role": "dilmrt", + "master": "*", + "name": "node-1", + "id": "node-1-id" + }, + { + "ip": "192.168.1.2", + "heap.percent": "45", + "ram.percent": "55", + "cpu": "8", + "load_1m": "1.2", + "load_5m": "1.0", + "load_15m": "0.8", + "node.role": "dilmrt", + "master": "-", + "name": "node-2", + "id": "node-2-id" + } +]` + +const mockIndicesResponse = `{ + "test-index": { + "aliases": {}, + "mappings": { + "properties": { + "title": {"type": "text"}, + "content": {"type": "text"}, + "url": {"type": "keyword"}, + "@timestamp": {"type": "date"} + } + }, + "settings": { + "index": { + "number_of_shards": "1", + "number_of_replicas": "1" + } + } + } +}` + +func createMockESServer() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + switch { + case r.URL.Path == "/_cluster/health": + w.WriteHeader(http.StatusOK) + w.Write([]byte(mockClusterHealthResponse)) + case r.URL.Path == "/_cat/nodes": + w.WriteHeader(http.StatusOK) + w.Write([]byte(mockCatNodesResponse)) + case strings.Contains(r.URL.Path, "/_search"): + w.WriteHeader(http.StatusOK) + w.Write([]byte(mockSearchResponse)) + case strings.Contains(r.URL.Path, "/_search/scroll"): + emptyResponse := `{"hits":{"hits":[]},"_scroll_id":"test-scroll-id-123"}` + w.WriteHeader(http.StatusOK) + w.Write([]byte(emptyResponse)) + case r.URL.Path == "/test-index": + w.WriteHeader(http.StatusOK) + w.Write([]byte(mockIndicesResponse)) + case r.URL.Path == "/": + versionResponse := `{"version":{"number":"8.0.0"}}` + w.WriteHeader(http.StatusOK) + w.Write([]byte(versionResponse)) + default: + w.WriteHeader(http.StatusNotFound) + w.Write([]byte(`{"error":"not found"}`)) + } + })) +} + +func TestConfigParsing(t *testing.T) { + tests := []struct { + name string + config map[string]interface{} + expected Config + }{ + { + name: "basic config", + config: map[string]interface{}{ + "endpoints": []interface{}{"http://localhost:9200"}, + "username": "elastic", + "password": "password", + "indices": []interface{}{"test-index"}, + }, + expected: Config{ + Endpoints: []string{"http://localhost:9200"}, + Username: "elastic", + Password: "password", + Indices: []string{"test-index"}, + }, + }, + { + name: "enhanced config with deployment mode", + config: map[string]interface{}{ + "endpoints": []interface{}{"http://localhost:9200"}, + "deployment_mode": "cluster", + "read_strategy": map[string]interface{}{ + "slice_count": 4, + "preference": "_primary", + }, + "health_check": map[string]interface{}{ + "enabled": true, + "interval": "30s", + }, + }, + expected: Config{ + Endpoints: []string{"http://localhost:9200"}, + DeploymentMode: "cluster", + ReadStrategy: ReadStrategyConfig{ + SliceCount: 4, + Preference: "_primary", + }, + HealthCheck: HealthCheckConfig{ + Enabled: true, + Interval: 30 * time.Second, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + connector := &common.Connector{ID: "elasticsearch"} + datasource := &common.DataSource{ + ID: "test-ds", + Name: "Test DataSource", + Connector: common.ConnectorConfig{ + ConnectorID: "elasticsearch", + Config: tt.config, + }, + } + + var cfg Config + err := connectors.ParseConnectorConfigure(connector, datasource, &cfg) + assert.NoError(t, err) + + assert.Equal(t, tt.expected.Endpoints, cfg.Endpoints) + assert.Equal(t, tt.expected.Username, cfg.Username) + assert.Equal(t, tt.expected.Password, cfg.Password) + assert.Equal(t, tt.expected.DeploymentMode, cfg.DeploymentMode) + + if tt.expected.ReadStrategy.SliceCount > 0 { + assert.Equal(t, tt.expected.ReadStrategy.SliceCount, cfg.ReadStrategy.SliceCount) + } + }) + } +} + +func TestConfigDefaults(t *testing.T) { + cfg := Config{} + + assert.Equal(t, DefaultScrollSize, cfg.GetScrollSize()) + assert.Equal(t, DefaultScrollTimeout, cfg.GetScrollTime()) + assert.Equal(t, DefaultBatchSize, cfg.GetBatchSize()) + assert.Equal(t, DefaultConcurrency, cfg.GetConcurrency()) + assert.Equal(t, []string{"*"}, cfg.GetIndices()) + + assert.Equal(t, DefaultSliceCount, cfg.ReadStrategy.GetSliceCount()) + assert.Equal(t, DefaultPreference, cfg.ReadStrategy.GetPreference()) + + assert.Equal(t, DefaultMaxRetries, cfg.RetryConfig.GetMaxRetries()) + assert.Equal(t, DefaultInitialDelay, cfg.RetryConfig.GetInitialDelay()) + assert.Equal(t, DefaultMaxDelay, cfg.RetryConfig.GetMaxDelay()) + assert.Equal(t, DefaultBackoffFactor, cfg.RetryConfig.GetBackoffFactor()) +} + +func TestDeploymentModeTypes(t *testing.T) { + assert.Equal(t, "single", string(DeploymentModeSingle)) + assert.Equal(t, "cluster", string(DeploymentModeCluster)) + assert.Equal(t, "ha_cluster", string(DeploymentModeHACluster)) +} + +func TestStatusConstants(t *testing.T) { + assert.Equal(t, "idle", StatusIdle) + assert.Equal(t, "running", StatusRunning) + assert.Equal(t, "error", StatusError) + assert.Equal(t, "recovering", StatusRecovering) + assert.Equal(t, "stopped", StatusStopped) +} + +func TestPluginScanSuccess(t *testing.T) { + theQueue := mockQueue{} + queue.RegisterDefaultHandler(theQueue) + + kv.Register("indexing_documents", &mockKVStore{store: make(map[string]cache)}) + + server := createMockESServer() + defer server.Close() + + testQueueName := "indexing_documents" + plugin := &Plugin{} + plugin.Queue = &queue.QueueConfig{Name: testQueueName} + module.RegisterUserPlugin(plugin) + plugin.Queue = queue.SmartGetOrInitConfig(plugin.Queue) + plugin.Setup() + + connector := &common.Connector{ID: "elasticsearch"} + dataSource := &common.DataSource{ + ID: "test-datasource-id", + Name: "Test ES Source", + Connector: common.ConnectorConfig{ + ConnectorID: "elasticsearch", + Config: map[string]interface{}{ + "endpoints": []interface{}{server.URL}, + "indices": []interface{}{"test-index"}, + "deployment_mode": "single", + "scroll_size": 100, + "batch_size": 50, + }, + }, + } + + // Defensive testing + didPanic := false + var panicValue interface{} + func() { + defer func() { + if r := recover(); r != nil { + didPanic = true + panicValue = r + } + }() + plugin.Scan(connector, dataSource) + }() + + assert.False(t, didPanic, fmt.Sprintf("Scan panicked with: %v", panicValue)) + + queueID := plugin.Queue.ID + queueSize := len(theQueue[queueID]) + assert.Greater(t, queueSize, 0, "Expected documents to be pushed to the queue") + + if queueSize > 0 { + data := theQueue[queueID][0] + var doc common.Document + err := json.Unmarshal(data, &doc) + assert.NoError(t, err) + + assert.Equal(t, "Test Document 1", doc.Title) + assert.Equal(t, "This is test content for document 1", doc.Content) + assert.Equal(t, "https://example.com/doc1", doc.URL) + assert.Equal(t, "elasticsearch", doc.Type) + } +} + +func TestPluginScanWithClusterMode(t *testing.T) { + theQueue := mockQueue{} + queue.RegisterDefaultHandler(theQueue) + + kv.Register("indexing_documents", &mockKVStore{store: make(map[string]cache)}) + + server := createMockESServer() + defer server.Close() + + testQueueName := "indexing_documents" + plugin := &Plugin{} + plugin.Queue = &queue.QueueConfig{Name: testQueueName} + module.RegisterUserPlugin(plugin) + plugin.Queue = queue.SmartGetOrInitConfig(plugin.Queue) + plugin.Setup() + + connector := &common.Connector{ID: "elasticsearch"} + dataSource := &common.DataSource{ + ID: "test-datasource-id", + Name: "Test ES Cluster Source", + Connector: common.ConnectorConfig{ + ConnectorID: "elasticsearch", + Config: map[string]interface{}{ + "endpoints": []interface{}{server.URL}, + "indices": []interface{}{"test-index"}, + "deployment_mode": "cluster", + "read_strategy": map[string]interface{}{ + "slice_count": 2, + "preference": "_primary", + }, + "health_check": map[string]interface{}{ + "enabled": true, + "interval": "10s", + }, + }, + }, + } + + // Defensive testing + didPanic := false + var panicValue interface{} + func() { + defer func() { + if r := recover(); r != nil { + didPanic = true + panicValue = r + } + }() + plugin.Scan(connector, dataSource) + }() + + assert.False(t, didPanic, fmt.Sprintf("Scan with cluster mode panicked with: %v", panicValue)) + + queueID := plugin.Queue.ID + queueSize := len(theQueue[queueID]) + assert.Greater(t, queueSize, 0, "Expected documents to be pushed to the queue in cluster mode") +} + +func TestPluginScanMissingEndpoints(t *testing.T) { + theQueue := mockQueue{} + queue.RegisterDefaultHandler(theQueue) + + testQueueName := "indexing_documents" + plugin := &Plugin{} + plugin.Queue = &queue.QueueConfig{Name: testQueueName} + module.RegisterUserPlugin(plugin) + plugin.Queue = queue.SmartGetOrInitConfig(plugin.Queue) + plugin.Setup() + + connector := &common.Connector{ID: "elasticsearch"} + dataSource := &common.DataSource{ + ID: "test-datasource-id", + Name: "Test ES Source", + Connector: common.ConnectorConfig{ + ConnectorID: "elasticsearch", + Config: map[string]interface{}{ + "indices": []interface{}{"test-index"}, + }, + }, + } + + didPanic := false + func() { + defer func() { + if r := recover(); r != nil { + didPanic = true + } + }() + plugin.Scan(connector, dataSource) + }() + + assert.False(t, didPanic, "Scan should handle missing endpoints gracefully") + + assert.Equal(t, StatusError, plugin.syncState.Status) +} + +func TestRetryMechanism(t *testing.T) { + plugin := &Plugin{} + plugin.Setup() + plugin.ctx = context.Background() + + mockClient := &ESClient{} + + retryConfig := RetryConfig{ + MaxRetries: 2, + InitialDelay: 10 * time.Millisecond, + MaxDelay: 100 * time.Millisecond, + BackoffFactor: 2.0, + } + + start := time.Now() + err := plugin.testConnectionWithRetry(mockClient, retryConfig) + duration := time.Since(start) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "connection failed after") + + assert.Greater(t, duration, 10*time.Millisecond) +} + +func TestSyncStateManagement(t *testing.T) { + plugin := &Plugin{} + plugin.Setup() + + assert.Equal(t, StatusIdle, plugin.syncState.Status) + assert.Equal(t, 0, plugin.syncState.ErrorCount) + + plugin.updateSyncState(StatusRunning, "Starting scan") + assert.Equal(t, StatusRunning, plugin.syncState.Status) + + plugin.updateSyncState(StatusError, "Connection failed") + assert.Equal(t, StatusError, plugin.syncState.Status) + assert.Equal(t, 1, plugin.syncState.ErrorCount) + + plugin.updateSyncState(StatusError, "Another error") + assert.Equal(t, 2, plugin.syncState.ErrorCount) +} + +func TestMetricsUpdate(t *testing.T) { + plugin := &Plugin{} + plugin.Setup() + + totalDocs := int64(1000) + duration := 10 * time.Second + + plugin.updateMetrics(totalDocs, duration) + + assert.Equal(t, totalDocs, plugin.syncState.ProcessedDocuments) + assert.Equal(t, float64(100), plugin.metrics.DocumentsPerSecond) // 1000 docs / 10 seconds + assert.Equal(t, duration, plugin.metrics.AverageLatency) + assert.True(t, time.Since(plugin.metrics.LastUpdated) < time.Second) +} + +func TestDocumentConversion(t *testing.T) { + plugin := &Plugin{} + plugin.Setup() + + esDoc := elastic.IndexDocument{ + Index: "test-index", + Type: "_doc", + Id: "test-id", + Source: map[string]interface{}{ + "title": "Test Title", + "content": "Test Content", + "url": "https://example.com", + "@timestamp": "2024-01-01T00:00:00Z", + "author": "Test Author", + }, + } + + datasource := &common.DataSource{ + ID: "test-ds", + Name: "Test DataSource", + } + + doc := plugin.convertToCocoDocument(esDoc, datasource, "test-index") + + assert.Equal(t, "Test Title", doc.Title) + assert.Equal(t, "Test Content", doc.Content) + assert.Equal(t, "https://example.com", doc.URL) + assert.Equal(t, "elasticsearch", doc.Type) + assert.Equal(t, "Test Author", doc.Owner.UserName) + assert.Equal(t, "test-ds", doc.DataSourceID) + assert.Equal(t, "test-index", doc.Category) +} diff --git a/plugins/connectors/elasticsearch/reader.go b/plugins/connectors/elasticsearch/reader.go new file mode 100644 index 00000000..3cf4e1fa --- /dev/null +++ b/plugins/connectors/elasticsearch/reader.go @@ -0,0 +1,289 @@ +package elasticsearch + +import ( + "context" + "fmt" + "sync" + "time" + + log "github.com/cihub/seelog" + "infini.sh/framework/core/elastic" + "infini.sh/framework/core/util" +) + +type Reader struct { + client *ESClient + config Config + deployMode DeploymentMode + clusterStatus *ClusterStatus + mu sync.RWMutex +} + +func NewReader(client *ESClient, config Config) *Reader { + deployMode := DeploymentMode(config.DeploymentMode) + if deployMode == "" { + deployMode = DeploymentModeSingle + } + + return &Reader{ + client: client, + config: config, + deployMode: deployMode, + clusterStatus: &ClusterStatus{ + Status: "unknown", + }, + } +} + +func (r *Reader) ReadDocuments(ctx context.Context, index string, callback func([]elastic.IndexDocument) error) error { + switch r.deployMode { + case DeploymentModeSingle: + return r.readSingleNode(ctx, index, callback) + case DeploymentModeCluster: + return r.readCluster(ctx, index, callback) + case DeploymentModeHACluster: + return r.readHACluster(ctx, index, callback) + default: + return fmt.Errorf("unsupported deployment mode: %s", r.deployMode) + } +} + +func (r *Reader) readSingleNode(ctx context.Context, index string, callback func([]elastic.IndexDocument) error) error { + log.Infof("Reading from single node ES: %s", index) + + iterator, err := r.client.NewScrollIterator(ctx, index) + if err != nil { + return fmt.Errorf("failed to create scroll iterator: %w", err) + } + defer iterator.Close() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + docs, err := iterator.Next(ctx) + if err != nil { + return fmt.Errorf("failed to get next batch: %w", err) + } + + if len(docs) == 0 { + break + } + + if err := callback(docs); err != nil { + return fmt.Errorf("callback failed: %w", err) + } + } + + return nil +} + +func (r *Reader) readCluster(ctx context.Context, index string, callback func([]elastic.IndexDocument) error) error { + log.Infof("Reading from ES cluster with slice scroll: %s", index) + + sliceCount := r.config.ReadStrategy.GetSliceCount() + if sliceCount <= 1 { + return r.readSingleNode(ctx, index, callback) + } + + var wg sync.WaitGroup + errChan := make(chan error, sliceCount) + + for sliceID := 0; sliceID < sliceCount; sliceID++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + if err := r.readSlice(ctx, index, id, sliceCount, callback); err != nil { + errChan <- fmt.Errorf("slice %d failed: %w", id, err) + } + }(sliceID) + } + + go func() { + wg.Wait() + close(errChan) + }() + + for err := range errChan { + if err != nil { + return err + } + } + + return nil +} + +func (r *Reader) readSlice(ctx context.Context, index string, sliceID, maxSlices int, callback func([]elastic.IndexDocument) error) error { + query := r.client.buildQuery() + + if query.RawQuery == nil { + query.RawQuery = make(map[string]interface{}) + } + + query.Set("slice", util.MapStr{ + "id": sliceID, + "max": maxSlices, + }) + + searchRequest := &elastic.SearchRequest{ + Size: r.config.GetScrollSize(), + Query: query, + } + + scrollID, err := r.client.client.NewScroll( + index, + r.config.GetScrollTime(), + r.config.GetScrollSize(), + searchRequest, + sliceID, + maxSlices, + ) + if err != nil { + return fmt.Errorf("failed to create slice scroll: %w", err) + } + + defer func() { + if err := r.client.client.ClearScroll(string(scrollID)); err != nil { + log.Warnf("Failed to clear scroll: %v", err) + } + }() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + apiCtx := &elastic.APIContext{Context: ctx} + response, err := r.client.client.NextScroll(apiCtx, r.config.GetScrollTime(), string(scrollID)) + if err != nil { + return fmt.Errorf("failed to get next scroll: %w", err) + } + + var searchResponse elastic.SearchResponse + if err := util.FromJSONBytes(response, &searchResponse); err != nil { + return fmt.Errorf("failed to parse scroll response: %w", err) + } + + if len(searchResponse.Hits.Hits) == 0 { + break + } + + docs := make([]elastic.IndexDocument, len(searchResponse.Hits.Hits)) + for i, hit := range searchResponse.Hits.Hits { + docs[i] = elastic.IndexDocument{ + Index: hit.Index, + Type: hit.Type, + Id: hit.ID, + Source: hit.Source, + } + } + + if err := callback(docs); err != nil { + return fmt.Errorf("callback failed: %w", err) + } + + if searchResponse.ScrollId != "" { + scrollID = []byte(searchResponse.ScrollId) + } + } + + return nil +} + +func (r *Reader) readHACluster(ctx context.Context, index string, callback func([]elastic.IndexDocument) error) error { + log.Infof("Reading from HA ES cluster with failover: %s", index) + + if err := r.updateClusterHealth(ctx); err != nil { + log.Warnf("Failed to check cluster health: %v", err) + } + + return r.readCluster(ctx, index, callback) +} + +func (r *Reader) updateClusterHealth(ctx context.Context) error { + r.mu.Lock() + defer r.mu.Unlock() + + health, err := r.client.client.ClusterHealth(ctx) + if err != nil { + return fmt.Errorf("failed to get cluster health: %w", err) + } + + r.clusterStatus.ClusterName = health.Name + r.clusterStatus.Status = health.Status + r.clusterStatus.LastHealthCheck = time.Now() + + nodes, err := r.client.client.CatNodes("") + if err != nil { + log.Warnf("Failed to get node information: %v", err) + return nil + } + + nodeHealths := make([]NodeHealth, len(nodes)) + activeNodes := 0 + + for i, node := range nodes { + nodeHealths[i] = NodeHealth{ + NodeID: node.NodeID, + NodeName: node.Name, + Host: node.IP, + Status: "active", + LastCheck: time.Now(), + Available: true, + } + activeNodes++ + } + + r.clusterStatus.Nodes = nodeHealths + r.clusterStatus.ActiveNodes = activeNodes + r.clusterStatus.TotalNodes = len(nodes) + + log.Infof("Cluster health updated: %s, active nodes: %d/%d", + r.clusterStatus.Status, r.clusterStatus.ActiveNodes, r.clusterStatus.TotalNodes) + + return nil +} + +func (r *Reader) GetClusterStatus() ClusterStatus { + r.mu.RLock() + defer r.mu.RUnlock() + return *r.clusterStatus +} + +func (r *Reader) SupportsSliceScroll() bool { + version := r.client.client.GetVersion() + return version.Major >= 5 +} + +func (r *Reader) OptimizeReadStrategy(ctx context.Context) error { + if r.deployMode == DeploymentModeSingle { + return nil + } + + if err := r.updateClusterHealth(ctx); err != nil { + return err + } + + if r.clusterStatus.ActiveNodes > 0 { + optimalSlices := r.clusterStatus.ActiveNodes * 2 + if optimalSlices > 8 { + optimalSlices = 8 + } + if optimalSlices < 2 { + optimalSlices = 2 + } + + if r.config.ReadStrategy.SliceCount != optimalSlices { + log.Infof("Optimizing slice count from %d to %d based on %d active nodes", + r.config.ReadStrategy.SliceCount, optimalSlices, r.clusterStatus.ActiveNodes) + r.config.ReadStrategy.SliceCount = optimalSlices + } + } + + return nil +} diff --git a/plugins/connectors/elasticsearch/reader_test.go b/plugins/connectors/elasticsearch/reader_test.go new file mode 100644 index 00000000..4a927a57 --- /dev/null +++ b/plugins/connectors/elasticsearch/reader_test.go @@ -0,0 +1,302 @@ +/* Copyright © INFINI LTD. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + +package elasticsearch + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "infini.sh/framework/core/elastic" +) + +func TestNewReader(t *testing.T) { + config := Config{ + Endpoints: []string{"http://localhost:9200"}, + DeploymentMode: "cluster", + } + + client, err := NewESClient(config) + assert.NoError(t, err) + + reader := NewReader(client, config) + assert.NotNil(t, reader) + assert.Equal(t, DeploymentModeCluster, reader.deployMode) + assert.NotNil(t, reader.clusterStatus) + assert.Equal(t, "unknown", reader.clusterStatus.Status) +} + +func TestReaderDeploymentModeDefault(t *testing.T) { + config := Config{ + Endpoints: []string{"http://localhost:9200"}, + } + + client, err := NewESClient(config) + assert.NoError(t, err) + + reader := NewReader(client, config) + assert.Equal(t, DeploymentModeSingle, reader.deployMode) +} + +func TestReaderGetClusterStatus(t *testing.T) { + config := Config{ + Endpoints: []string{"http://localhost:9200"}, + DeploymentMode: "ha_cluster", + } + + client, err := NewESClient(config) + assert.NoError(t, err) + + reader := NewReader(client, config) + + status := reader.GetClusterStatus() + assert.Equal(t, "unknown", status.Status) + assert.Equal(t, 0, status.ActiveNodes) + assert.Equal(t, 0, status.TotalNodes) +} + +func TestReaderSupportsSliceScroll(t *testing.T) { + config := Config{ + Endpoints: []string{"http://localhost:9200"}, + } + + client, err := NewESClient(config) + assert.NoError(t, err) + + reader := NewReader(client, config) + + supports := reader.SupportsSliceScroll() + assert.IsType(t, bool(false), supports) +} + +func TestReaderOptimizeReadStrategy(t *testing.T) { + config := Config{ + Endpoints: []string{"http://localhost:9200"}, + DeploymentMode: "single", + ReadStrategy: ReadStrategyConfig{ + SliceCount: 2, + }, + } + + client, err := NewESClient(config) + assert.NoError(t, err) + + reader := NewReader(client, config) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + originalSliceCount := reader.config.ReadStrategy.SliceCount + err = reader.OptimizeReadStrategy(ctx) + + assert.NoError(t, err) + assert.Equal(t, originalSliceCount, reader.config.ReadStrategy.SliceCount) +} + +func TestReaderOptimizeReadStrategyCluster(t *testing.T) { + config := Config{ + Endpoints: []string{"http://localhost:9200"}, + DeploymentMode: "cluster", + ReadStrategy: ReadStrategyConfig{ + SliceCount: 2, + }, + } + + client, err := NewESClient(config) + assert.NoError(t, err) + + reader := NewReader(client, config) + + reader.clusterStatus.ActiveNodes = 3 + reader.clusterStatus.TotalNodes = 3 + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err = reader.OptimizeReadStrategy(ctx) + + if err != nil { + assert.Contains(t, err.Error(), "failed to get cluster health") + } +} + +func TestReaderReadDocumentsSingleNode(t *testing.T) { + config := Config{ + Endpoints: []string{"http://localhost:9200"}, + DeploymentMode: "single", + } + + client, err := NewESClient(config) + assert.NoError(t, err) + + reader := NewReader(client, config) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + callbackCalled := false + callback := func(docs []elastic.IndexDocument) error { + callbackCalled = true + return nil + } + + err = reader.ReadDocuments(ctx, "test-index", callback) + + if err != nil { + assert.Contains(t, err.Error(), "failed to create scroll iterator") + } + + assert.False(t, callbackCalled) +} + +func TestReaderReadDocumentsCluster(t *testing.T) { + config := Config{ + Endpoints: []string{"http://localhost:9200"}, + DeploymentMode: "cluster", + ReadStrategy: ReadStrategyConfig{ + SliceCount: 2, + }, + } + + client, err := NewESClient(config) + assert.NoError(t, err) + + reader := NewReader(client, config) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + callbackCalled := false + callback := func(docs []elastic.IndexDocument) error { + callbackCalled = true + return nil + } + + err = reader.ReadDocuments(ctx, "test-index", callback) + + if err != nil { + assert.NotNil(t, err) + } +} + +func TestReaderReadDocumentsHACluster(t *testing.T) { + config := Config{ + Endpoints: []string{"http://localhost:9200"}, + DeploymentMode: "ha_cluster", + ReadStrategy: ReadStrategyConfig{ + SliceCount: 4, + }, + } + + client, err := NewESClient(config) + assert.NoError(t, err) + + reader := NewReader(client, config) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + callbackCalled := false + callback := func(docs []elastic.IndexDocument) error { + callbackCalled = true + return nil + } + + err = reader.ReadDocuments(ctx, "test-index", callback) + + if err != nil { + assert.NotNil(t, err) + } +} + +func TestReaderUnsupportedDeploymentMode(t *testing.T) { + config := Config{ + Endpoints: []string{"http://localhost:9200"}, + DeploymentMode: "invalid_mode", + } + + client, err := NewESClient(config) + assert.NoError(t, err) + + reader := NewReader(client, config) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + callback := func(docs []elastic.IndexDocument) error { + return nil + } + + err = reader.ReadDocuments(ctx, "test-index", callback) + assert.Error(t, err) + assert.Contains(t, err.Error(), "unsupported deployment mode") +} + +func TestClusterStatusStructure(t *testing.T) { + status := ClusterStatus{ + ClusterName: "test-cluster", + Status: "green", + ActiveNodes: 3, + TotalNodes: 3, + LastHealthCheck: time.Now(), + Nodes: []NodeHealth{ + { + NodeID: "node-1", + NodeName: "test-node-1", + Host: "192.168.1.1", + Status: "active", + Available: true, + LastCheck: time.Now(), + }, + }, + } + + assert.Equal(t, "test-cluster", status.ClusterName) + assert.Equal(t, "green", status.Status) + assert.Equal(t, 3, status.ActiveNodes) + assert.Equal(t, 3, status.TotalNodes) + assert.Len(t, status.Nodes, 1) + assert.Equal(t, "node-1", status.Nodes[0].NodeID) + assert.True(t, status.Nodes[0].Available) +} + +func TestSyncStateStructure(t *testing.T) { + now := time.Now() + state := SyncState{ + LastFullSync: now, + LastIncrementalSync: now, + LastSeqNo: 12345, + LastPrimaryTerm: 1, + ProcessedDocuments: 1000, + ErrorCount: 2, + Status: StatusRunning, + } + + assert.Equal(t, now.Unix(), state.LastFullSync.Unix()) + assert.Equal(t, now.Unix(), state.LastIncrementalSync.Unix()) + assert.Equal(t, int64(12345), state.LastSeqNo) + assert.Equal(t, int64(1), state.LastPrimaryTerm) + assert.Equal(t, int64(1000), state.ProcessedDocuments) + assert.Equal(t, 2, state.ErrorCount) + assert.Equal(t, StatusRunning, state.Status) +} + +func TestPerformanceMetricsStructure(t *testing.T) { + now := time.Now() + metrics := PerformanceMetrics{ + DocumentsPerSecond: 150.5, + AverageLatency: 500 * time.Millisecond, + ErrorRate: 0.02, + ThroughputMB: 25.7, + LastUpdated: now, + } + + assert.Equal(t, 150.5, metrics.DocumentsPerSecond) + assert.Equal(t, 500*time.Millisecond, metrics.AverageLatency) + assert.Equal(t, 0.02, metrics.ErrorRate) + assert.Equal(t, 25.7, metrics.ThroughputMB) + assert.Equal(t, now.Unix(), metrics.LastUpdated.Unix()) +} diff --git a/plugins/connectors/elasticsearch/types.go b/plugins/connectors/elasticsearch/types.go new file mode 100644 index 00000000..ace1920b --- /dev/null +++ b/plugins/connectors/elasticsearch/types.go @@ -0,0 +1,101 @@ +package elasticsearch + +import ( + "time" +) + +type DeploymentMode string + +const ( + DeploymentModeSingle DeploymentMode = "single" + DeploymentModeCluster DeploymentMode = "cluster" + DeploymentModeHACluster DeploymentMode = "ha_cluster" +) + +type SyncPolicy struct { + FullSyncInterval string `config:"full_sync_interval"` + IncrementalSyncInterval string `config:"incremental_sync_interval"` + Fields []string `config:"fields"` + QueryFilter string `config:"query_filter"` +} + +type ReadStrategy struct { + ScrollSize int `config:"scroll_size"` + ScrollTimeout string `config:"scroll_timeout"` + SliceCount int `config:"slice_count"` + Preference string `config:"preference"` +} + +type HealthCheckConfig struct { + Enabled bool `config:"enabled"` + Interval time.Duration `config:"interval"` + FailureThreshold int `config:"failure_threshold"` + RecoveryThreshold int `config:"recovery_threshold"` +} + +type SyncState struct { + LastFullSync time.Time `json:"last_full_sync"` + LastIncrementalSync time.Time `json:"last_incremental_sync"` + LastSeqNo int64 `json:"last_seq_no"` + LastPrimaryTerm int64 `json:"last_primary_term"` + ProcessedDocuments int64 `json:"processed_documents"` + ErrorCount int `json:"error_count"` + Status string `json:"status"` +} + +type NodeHealth struct { + NodeID string `json:"node_id"` + NodeName string `json:"node_name"` + Host string `json:"host"` + Status string `json:"status"` + LastCheck time.Time `json:"last_check"` + Available bool `json:"available"` +} + +type ClusterStatus struct { + ClusterName string `json:"cluster_name"` + Status string `json:"status"` + Nodes []NodeHealth `json:"nodes"` + ActiveNodes int `json:"active_nodes"` + TotalNodes int `json:"total_nodes"` + LastHealthCheck time.Time `json:"last_health_check"` +} + +type RetryConfig struct { + MaxRetries int `config:"max_retries"` + InitialDelay time.Duration `config:"initial_delay"` + MaxDelay time.Duration `config:"max_delay"` + BackoffFactor float64 `config:"backoff_factor"` +} + +type PerformanceMetrics struct { + DocumentsPerSecond float64 `json:"documents_per_second"` + AverageLatency time.Duration `json:"average_latency"` + ErrorRate float64 `json:"error_rate"` + ThroughputMB float64 `json:"throughput_mb"` + LastUpdated time.Time `json:"last_updated"` +} + +const ( + DefaultScrollSize = 1000 + DefaultScrollTimeout = "5m" + DefaultSliceCount = 4 + DefaultPreference = "_primary" + DefaultBatchSize = 100 + DefaultConcurrency = 2 + DefaultMaxRetries = 3 + DefaultInitialDelay = time.Second + DefaultMaxDelay = time.Minute + DefaultBackoffFactor = 2.0 + DefaultHealthCheckInterval = 30 * time.Second + DefaultFailureThreshold = 3 + DefaultRecoveryThreshold = 2 +) + +const ( + StatusIdle = "idle" + StatusRunning = "running" + StatusError = "error" + StatusRecovering = "recovering" + StatusStopped = "stopped" +)