diff --git a/maintnotifications/e2e/DATABASE_MANAGEMENT.md b/maintnotifications/e2e/DATABASE_MANAGEMENT.md new file mode 100644 index 000000000..02dffe762 --- /dev/null +++ b/maintnotifications/e2e/DATABASE_MANAGEMENT.md @@ -0,0 +1,363 @@ +# Database Management with Fault Injector + +This document describes how to use the fault injector's database management endpoints to create and delete Redis databases during E2E testing. + +## Overview + +The fault injector now supports two new endpoints for database management: + +1. **CREATE_DATABASE** - Create a new Redis database with custom configuration +2. **DELETE_DATABASE** - Delete an existing Redis database + +These endpoints are useful for E2E tests that need to dynamically create and destroy databases as part of their test scenarios. + +## Action Types + +### CREATE_DATABASE + +Creates a new Redis database with the specified configuration. + +**Parameters:** +- `cluster_index` (int): The index of the cluster where the database should be created +- `database_config` (object): The database configuration (see structure below) + +**Raises:** +- `CreateDatabaseException`: When database creation fails + +### DELETE_DATABASE + +Deletes an existing Redis database. + +**Parameters:** +- `cluster_index` (int): The index of the cluster containing the database +- `bdb_id` (int): The database ID to delete + +**Raises:** +- `DeleteDatabaseException`: When database deletion fails + +## Database Configuration Structure + +The `database_config` object supports the following fields: + +```go +type DatabaseConfig struct { + Name string `json:"name"` + Port int `json:"port"` + MemorySize int64 `json:"memory_size"` + Replication bool `json:"replication"` + EvictionPolicy string `json:"eviction_policy"` + Sharding bool `json:"sharding"` + AutoUpgrade bool `json:"auto_upgrade"` + ShardsCount int `json:"shards_count"` + ModuleList []DatabaseModule `json:"module_list,omitempty"` + OSSCluster bool `json:"oss_cluster"` + OSSClusterAPIPreferredIPType string `json:"oss_cluster_api_preferred_ip_type,omitempty"` + ProxyPolicy string `json:"proxy_policy,omitempty"` + ShardsPlacement string `json:"shards_placement,omitempty"` + ShardKeyRegex []ShardKeyRegexPattern `json:"shard_key_regex,omitempty"` +} + +type DatabaseModule struct { + ModuleArgs string `json:"module_args"` + ModuleName string `json:"module_name"` +} + +type ShardKeyRegexPattern struct { + Regex string `json:"regex"` +} +``` + +### Example Configuration + +#### Simple Database + +```json +{ + "name": "simple-db", + "port": 12000, + "memory_size": 268435456, + "replication": false, + "eviction_policy": "noeviction", + "sharding": false, + "auto_upgrade": true, + "shards_count": 1, + "oss_cluster": false +} +``` + +#### Clustered Database with Modules + +```json +{ + "name": "ioredis-cluster", + "port": 11112, + "memory_size": 1273741824, + "replication": true, + "eviction_policy": "noeviction", + "sharding": true, + "auto_upgrade": true, + "shards_count": 3, + "module_list": [ + { + "module_args": "", + "module_name": "ReJSON" + }, + { + "module_args": "", + "module_name": "search" + }, + { + "module_args": "", + "module_name": "timeseries" + }, + { + "module_args": "", + "module_name": "bf" + } + ], + "oss_cluster": true, + "oss_cluster_api_preferred_ip_type": "external", + "proxy_policy": "all-master-shards", + "shards_placement": "sparse", + "shard_key_regex": [ + { + "regex": ".*\\{(?.*)\\}.*" + }, + { + "regex": "(?.*)" + } + ] +} +``` + +## Usage Examples + +### Example 1: Create a Simple Database + +```go +ctx := context.Background() +faultInjector := NewFaultInjectorClient("http://127.0.0.1:20324") + +dbConfig := DatabaseConfig{ + Name: "test-db", + Port: 12000, + MemorySize: 268435456, // 256MB + Replication: false, + EvictionPolicy: "noeviction", + Sharding: false, + AutoUpgrade: true, + ShardsCount: 1, + OSSCluster: false, +} + +resp, err := faultInjector.CreateDatabase(ctx, 0, dbConfig) +if err != nil { + log.Fatalf("Failed to create database: %v", err) +} + +// Wait for creation to complete +status, err := faultInjector.WaitForAction(ctx, resp.ActionID, + WithMaxWaitTime(5*time.Minute)) +if err != nil { + log.Fatalf("Failed to wait for action: %v", err) +} + +if status.Status == StatusSuccess { + log.Println("Database created successfully!") +} +``` + +### Example 2: Create a Database with Modules + +```go +dbConfig := DatabaseConfig{ + Name: "modules-db", + Port: 12001, + MemorySize: 536870912, // 512MB + Replication: true, + EvictionPolicy: "noeviction", + Sharding: true, + AutoUpgrade: true, + ShardsCount: 3, + ModuleList: []DatabaseModule{ + {ModuleArgs: "", ModuleName: "ReJSON"}, + {ModuleArgs: "", ModuleName: "search"}, + }, + OSSCluster: true, + OSSClusterAPIPreferredIPType: "external", + ProxyPolicy: "all-master-shards", + ShardsPlacement: "sparse", +} + +resp, err := faultInjector.CreateDatabase(ctx, 0, dbConfig) +// ... handle response +``` + +### Example 3: Create Database Using a Map + +```go +dbConfigMap := map[string]interface{}{ + "name": "map-db", + "port": 12002, + "memory_size": 268435456, + "replication": false, + "eviction_policy": "volatile-lru", + "sharding": false, + "auto_upgrade": true, + "shards_count": 1, + "oss_cluster": false, +} + +resp, err := faultInjector.CreateDatabaseFromMap(ctx, 0, dbConfigMap) +// ... handle response +``` + +### Example 4: Delete a Database + +```go +clusterIndex := 0 +bdbID := 1 + +resp, err := faultInjector.DeleteDatabase(ctx, clusterIndex, bdbID) +if err != nil { + log.Fatalf("Failed to delete database: %v", err) +} + +status, err := faultInjector.WaitForAction(ctx, resp.ActionID, + WithMaxWaitTime(2*time.Minute)) +if err != nil { + log.Fatalf("Failed to wait for action: %v", err) +} + +if status.Status == StatusSuccess { + log.Println("Database deleted successfully!") +} +``` + +### Example 5: Complete Lifecycle (Create and Delete) + +```go +// Create database +dbConfig := DatabaseConfig{ + Name: "temp-db", + Port: 13000, + MemorySize: 268435456, + Replication: false, + EvictionPolicy: "noeviction", + Sharding: false, + AutoUpgrade: true, + ShardsCount: 1, + OSSCluster: false, +} + +createResp, err := faultInjector.CreateDatabase(ctx, 0, dbConfig) +if err != nil { + log.Fatalf("Failed to create database: %v", err) +} + +createStatus, err := faultInjector.WaitForAction(ctx, createResp.ActionID, + WithMaxWaitTime(5*time.Minute)) +if err != nil || createStatus.Status != StatusSuccess { + log.Fatalf("Database creation failed") +} + +// Extract bdb_id from output +var bdbID int +if id, ok := createStatus.Output["bdb_id"].(float64); ok { + bdbID = int(id) +} + +// Use the database for testing... +time.Sleep(10 * time.Second) + +// Delete the database +deleteResp, err := faultInjector.DeleteDatabase(ctx, 0, bdbID) +if err != nil { + log.Fatalf("Failed to delete database: %v", err) +} + +deleteStatus, err := faultInjector.WaitForAction(ctx, deleteResp.ActionID, + WithMaxWaitTime(2*time.Minute)) +if err != nil || deleteStatus.Status != StatusSuccess { + log.Fatalf("Database deletion failed") +} + +log.Println("Database lifecycle completed successfully!") +``` + +## Available Methods + +The `FaultInjectorClient` provides the following methods for database management: + +### CreateDatabase + +```go +func (c *FaultInjectorClient) CreateDatabase( + ctx context.Context, + clusterIndex int, + databaseConfig DatabaseConfig, +) (*ActionResponse, error) +``` + +Creates a new database using a structured `DatabaseConfig` object. + +### CreateDatabaseFromMap + +```go +func (c *FaultInjectorClient) CreateDatabaseFromMap( + ctx context.Context, + clusterIndex int, + databaseConfig map[string]interface{}, +) (*ActionResponse, error) +``` + +Creates a new database using a flexible map configuration. Useful when you need to pass custom or dynamic configurations. + +### DeleteDatabase + +```go +func (c *FaultInjectorClient) DeleteDatabase( + ctx context.Context, + clusterIndex int, + bdbID int, +) (*ActionResponse, error) +``` + +Deletes an existing database by its ID. + +## Testing + +To run the database management E2E tests: + +```bash +# Run all database management tests +go test -tags=e2e -v ./maintnotifications/e2e/ -run TestDatabase + +# Run specific test +go test -tags=e2e -v ./maintnotifications/e2e/ -run TestDatabaseLifecycle +``` + +## Notes + +- Database creation can take several minutes depending on the configuration +- Always use `WaitForAction` to ensure the operation completes before proceeding +- The `bdb_id` returned in the creation output should be used for deletion +- Deleting a non-existent database will result in a failed action status +- Memory sizes are specified in bytes (e.g., 268435456 = 256MB) +- Port numbers should be unique and not conflict with existing databases + +## Common Eviction Policies + +- `noeviction` - Return errors when memory limit is reached +- `allkeys-lru` - Evict any key using LRU algorithm +- `volatile-lru` - Evict keys with TTL using LRU algorithm +- `allkeys-random` - Evict random keys +- `volatile-random` - Evict random keys with TTL +- `volatile-ttl` - Evict keys with TTL, shortest TTL first + +## Common Proxy Policies + +- `all-master-shards` - Route to all master shards +- `all-nodes` - Route to all nodes +- `single-shard` - Route to a single shard + diff --git a/maintnotifications/e2e/README_SCENARIOS.md b/maintnotifications/e2e/README_SCENARIOS.md index 5b778d32b..a9b18de2e 100644 --- a/maintnotifications/e2e/README_SCENARIOS.md +++ b/maintnotifications/e2e/README_SCENARIOS.md @@ -44,7 +44,22 @@ there are three environment variables that need to be set before running the tes - Notification delivery consistency - Handoff behavior per endpoint type -### 3. Timeout Configurations Scenario (`scenario_timeout_configs_test.go`) +### 3. Database Management Scenario (`scenario_database_management_test.go`) +**Dynamic database creation and deletion** +- **Purpose**: Test database lifecycle management via fault injector +- **Features Tested**: CREATE_DATABASE, DELETE_DATABASE endpoints +- **Configuration**: Various database configurations (simple, with modules, clustered) +- **Duration**: ~10 minutes +- **Key Validations**: + - Database creation with different configurations + - Database creation with Redis modules (ReJSON, search, timeseries, bf) + - Database deletion + - Complete lifecycle (create → use → delete) + - Configuration validation + +See [DATABASE_MANAGEMENT.md](DATABASE_MANAGEMENT.md) for detailed documentation on database management endpoints. + +### 4. Timeout Configurations Scenario (`scenario_timeout_configs_test.go`) **Various timeout strategies** - **Purpose**: Test different timeout configurations and their impact - **Features Tested**: Conservative, Aggressive, HighLatency timeouts @@ -58,7 +73,7 @@ there are three environment variables that need to be set before running the tes - Recovery times appropriate for each strategy - Error rates correlate with timeout aggressiveness -### 4. TLS Configurations Scenario (`scenario_tls_configs_test.go`) +### 5. TLS Configurations Scenario (`scenario_tls_configs_test.go`) **Security and encryption testing framework** - **Purpose**: Test push notifications with different TLS configurations - **Features Tested**: NoTLS, TLSInsecure, TLSSecure, TLSMinimal, TLSStrict @@ -71,7 +86,7 @@ there are three environment variables that need to be set before running the tes - Security compliance - **Note**: TLS configuration is handled at the Redis connection config level, not client options level -### 5. Stress Test Scenario (`scenario_stress_test.go`) +### 6. Stress Test Scenario (`scenario_stress_test.go`) **Extreme load and concurrent operations** - **Purpose**: Test system limits and behavior under extreme stress - **Features Tested**: Maximum concurrent operations, multiple clients diff --git a/maintnotifications/e2e/command_runner_test.go b/maintnotifications/e2e/command_runner_test.go index 7974016a9..b80a434bb 100644 --- a/maintnotifications/e2e/command_runner_test.go +++ b/maintnotifications/e2e/command_runner_test.go @@ -3,6 +3,7 @@ package e2e import ( "context" "fmt" + "strings" "sync" "sync/atomic" "time" @@ -88,6 +89,15 @@ func (cr *CommandRunner) FireCommandsUntilStop(ctx context.Context) { cr.operationCount.Add(1) if err != nil { + if err == redis.ErrClosed || strings.Contains(err.Error(), "client is closed") { + select { + case <-cr.stopCh: + return + default: + } + return + } + fmt.Printf("Error: %v\n", err) cr.errorCount.Add(1) diff --git a/maintnotifications/e2e/config_parser_test.go b/maintnotifications/e2e/config_parser_test.go index e8e795a44..6f1bc3252 100644 --- a/maintnotifications/e2e/config_parser_test.go +++ b/maintnotifications/e2e/config_parser_test.go @@ -1,9 +1,11 @@ package e2e import ( + "context" "crypto/tls" "encoding/json" "fmt" + "math/rand" "net/url" "os" "strconv" @@ -28,9 +30,9 @@ type DatabaseEndpoint struct { UID string `json:"uid"` } -// DatabaseConfig represents the configuration for a single database -type DatabaseConfig struct { - BdbID int `json:"bdb_id,omitempty"` +// EnvDatabaseConfig represents the configuration for a single database +type EnvDatabaseConfig struct { + BdbID interface{} `json:"bdb_id,omitempty"` Username string `json:"username,omitempty"` Password string `json:"password,omitempty"` TLS bool `json:"tls"` @@ -39,8 +41,8 @@ type DatabaseConfig struct { Endpoints []string `json:"endpoints"` } -// DatabasesConfig represents the complete configuration file structure -type DatabasesConfig map[string]DatabaseConfig +// EnvDatabasesConfig represents the complete configuration file structure +type EnvDatabasesConfig map[string]EnvDatabaseConfig // EnvConfig represents environment configuration for test scenarios type EnvConfig struct { @@ -80,13 +82,13 @@ func GetEnvConfig() (*EnvConfig, error) { } // GetDatabaseConfigFromEnv reads database configuration from a file -func GetDatabaseConfigFromEnv(filePath string) (DatabasesConfig, error) { +func GetDatabaseConfigFromEnv(filePath string) (EnvDatabasesConfig, error) { fileContent, err := os.ReadFile(filePath) if err != nil { return nil, fmt.Errorf("failed to read database config from %s: %w", filePath, err) } - var config DatabasesConfig + var config EnvDatabasesConfig if err := json.Unmarshal(fileContent, &config); err != nil { return nil, fmt.Errorf("failed to parse database config from %s: %w", filePath, err) } @@ -95,8 +97,8 @@ func GetDatabaseConfigFromEnv(filePath string) (DatabasesConfig, error) { } // GetDatabaseConfig gets Redis connection parameters for a specific database -func GetDatabaseConfig(databasesConfig DatabasesConfig, databaseName string) (*RedisConnectionConfig, error) { - var dbConfig DatabaseConfig +func GetDatabaseConfig(databasesConfig EnvDatabasesConfig, databaseName string) (*RedisConnectionConfig, error) { + var dbConfig EnvDatabaseConfig var exists bool if databaseName == "" { @@ -157,13 +159,90 @@ func GetDatabaseConfig(databasesConfig DatabasesConfig, databaseName string) (*R return nil, fmt.Errorf("no endpoints found in database configuration") } + var bdbId int + switch (dbConfig.BdbID).(type) { + case int: + bdbId = dbConfig.BdbID.(int) + case float64: + bdbId = int(dbConfig.BdbID.(float64)) + case string: + bdbId, _ = strconv.Atoi(dbConfig.BdbID.(string)) + } + return &RedisConnectionConfig{ Host: host, Port: port, Username: dbConfig.Username, Password: dbConfig.Password, TLS: dbConfig.TLS, - BdbID: dbConfig.BdbID, + BdbID: bdbId, + CertificatesLocation: dbConfig.CertificatesLocation, + Endpoints: dbConfig.Endpoints, + }, nil +} + +// ConvertEnvDatabaseConfigToRedisConnectionConfig converts EnvDatabaseConfig to RedisConnectionConfig +func ConvertEnvDatabaseConfigToRedisConnectionConfig(dbConfig EnvDatabaseConfig) (*RedisConnectionConfig, error) { + // Parse connection details from endpoints or raw_endpoints + var host string + var port int + + if len(dbConfig.RawEndpoints) > 0 { + // Use raw_endpoints if available (for more complex configurations) + endpoint := dbConfig.RawEndpoints[0] // Use the first endpoint + host = endpoint.DNSName + port = endpoint.Port + } else if len(dbConfig.Endpoints) > 0 { + // Parse from endpoints URLs + endpointURL, err := url.Parse(dbConfig.Endpoints[0]) + if err != nil { + return nil, fmt.Errorf("failed to parse endpoint URL %s: %w", dbConfig.Endpoints[0], err) + } + + host = endpointURL.Hostname() + portStr := endpointURL.Port() + if portStr == "" { + // Default ports based on scheme + switch endpointURL.Scheme { + case "redis": + port = 6379 + case "rediss": + port = 6380 + default: + port = 6379 + } + } else { + port, err = strconv.Atoi(portStr) + if err != nil { + return nil, fmt.Errorf("invalid port in endpoint URL %s: %w", dbConfig.Endpoints[0], err) + } + } + + // Override TLS setting based on scheme if not explicitly set + if endpointURL.Scheme == "rediss" { + dbConfig.TLS = true + } + } else { + return nil, fmt.Errorf("no endpoints found in database configuration") + } + + var bdbId int + switch dbConfig.BdbID.(type) { + case int: + bdbId = dbConfig.BdbID.(int) + case float64: + bdbId = int(dbConfig.BdbID.(float64)) + case string: + bdbId, _ = strconv.Atoi(dbConfig.BdbID.(string)) + } + + return &RedisConnectionConfig{ + Host: host, + Port: port, + Username: dbConfig.Username, + Password: dbConfig.Password, + TLS: dbConfig.TLS, + BdbID: bdbId, CertificatesLocation: dbConfig.CertificatesLocation, Endpoints: dbConfig.Endpoints, }, nil @@ -300,6 +379,7 @@ func (cf *ClientFactory) Create(key string, options *CreateClientOptions) (redis } } + fmt.Printf("Creating single client with options: %+v\n", clientOptions) client = redis.NewClient(clientOptions) } @@ -437,6 +517,30 @@ func CreateTestClientFactory(databaseName string) (*ClientFactory, error) { return NewClientFactory(dbConfig), nil } +// CreateTestClientFactoryWithBdbID creates a client factory using a specific bdb_id +// This is useful when you've created a fresh database and want to connect to it +func CreateTestClientFactoryWithBdbID(databaseName string, bdbID int) (*ClientFactory, error) { + envConfig, err := GetEnvConfig() + if err != nil { + return nil, fmt.Errorf("failed to get environment config: %w", err) + } + + databasesConfig, err := GetDatabaseConfigFromEnv(envConfig.RedisEndpointsConfigPath) + if err != nil { + return nil, fmt.Errorf("failed to get database config: %w", err) + } + + dbConfig, err := GetDatabaseConfig(databasesConfig, databaseName) + if err != nil { + return nil, fmt.Errorf("failed to get database config for %s: %w", databaseName, err) + } + + // Override the bdb_id with the newly created database ID + dbConfig.BdbID = bdbID + + return NewClientFactory(dbConfig), nil +} + // CreateTestFaultInjector creates a fault injector client from environment configuration func CreateTestFaultInjector() (*FaultInjectorClient, error) { envConfig, err := GetEnvConfig() @@ -461,3 +565,576 @@ func GetAvailableDatabases(configPath string) ([]string, error) { return databases, nil } + +// ConvertEnvDatabaseConfigToFaultInjectorConfig converts EnvDatabaseConfig to fault injector DatabaseConfig +func ConvertEnvDatabaseConfigToFaultInjectorConfig(envConfig EnvDatabaseConfig, name string) (DatabaseConfig, error) { + var port int + + // Extract port and DNS name from raw_endpoints or endpoints + if len(envConfig.RawEndpoints) > 0 { + endpoint := envConfig.RawEndpoints[0] + port = endpoint.Port + } else if len(envConfig.Endpoints) > 0 { + endpointURL, err := url.Parse(envConfig.Endpoints[0]) + if err != nil { + return DatabaseConfig{}, fmt.Errorf("failed to parse endpoint URL: %w", err) + } + portStr := endpointURL.Port() + if portStr != "" { + port, err = strconv.Atoi(portStr) + if err != nil { + return DatabaseConfig{}, fmt.Errorf("invalid port: %w", err) + } + } else { + port = 6379 * 2 // default*2 + } + } else { + return DatabaseConfig{}, fmt.Errorf("no endpoints found in configuration") + } + + randomPortOffset := 1 + rand.Intn(10) // Random port offset to avoid conflicts + + // Build the database config for fault injector + // TODO: Make this configurable + // IT is the defaults for a sharded database at the moment + dbConfig := DatabaseConfig{ + Name: name, + Port: port + randomPortOffset, + MemorySize: 268435456, // 256MB default + Replication: true, + EvictionPolicy: "noeviction", + ProxyPolicy: "single", + AutoUpgrade: true, + Sharding: true, + ShardsCount: 2, + ShardKeyRegex: []ShardKeyRegexPattern{ + {Regex: ".*\\{(?.*)\\}.*"}, + {Regex: "(?.*)"}, + }, + ShardsPlacement: "dense", + ModuleList: []DatabaseModule{ + {ModuleArgs: "", ModuleName: "ReJSON"}, + {ModuleArgs: "", ModuleName: "search"}, + {ModuleArgs: "", ModuleName: "timeseries"}, + {ModuleArgs: "", ModuleName: "bf"}, + }, + OSSCluster: false, + } + + // If we have raw_endpoints with cluster info, configure for cluster + if len(envConfig.RawEndpoints) > 0 { + endpoint := envConfig.RawEndpoints[0] + + // Check if this is a cluster configuration + if endpoint.ProxyPolicy != "" && endpoint.ProxyPolicy != "single" { + dbConfig.OSSCluster = true + dbConfig.Sharding = true + dbConfig.ShardsCount = 3 // default for cluster + dbConfig.ProxyPolicy = endpoint.ProxyPolicy + dbConfig.Replication = true + } + + if endpoint.OSSClusterAPIPreferredIPType != "" { + dbConfig.OSSClusterAPIPreferredIPType = endpoint.OSSClusterAPIPreferredIPType + } + } + + return dbConfig, nil +} + +// TestDatabaseManager manages database lifecycle for tests +type TestDatabaseManager struct { + faultInjector *FaultInjectorClient + clusterIndex int + createdBdbID int + dbConfig DatabaseConfig + t *testing.T +} + +// NewTestDatabaseManager creates a new test database manager +func NewTestDatabaseManager(t *testing.T, faultInjector *FaultInjectorClient, clusterIndex int) *TestDatabaseManager { + return &TestDatabaseManager{ + faultInjector: faultInjector, + clusterIndex: clusterIndex, + t: t, + } +} + +// CreateDatabaseFromEnvConfig creates a database using EnvDatabaseConfig +func (m *TestDatabaseManager) CreateDatabaseFromEnvConfig(ctx context.Context, envConfig EnvDatabaseConfig, name string) (int, error) { + // Convert EnvDatabaseConfig to DatabaseConfig + dbConfig, err := ConvertEnvDatabaseConfigToFaultInjectorConfig(envConfig, name) + if err != nil { + return 0, fmt.Errorf("failed to convert config: %w", err) + } + + m.dbConfig = dbConfig + return m.CreateDatabase(ctx, dbConfig) +} + +// CreateDatabase creates a database and waits for it to be ready +// Returns the bdb_id of the created database +func (m *TestDatabaseManager) CreateDatabase(ctx context.Context, dbConfig DatabaseConfig) (int, error) { + m.t.Logf("Creating database '%s' on port %d...", dbConfig.Name, dbConfig.Port) + + resp, err := m.faultInjector.CreateDatabase(ctx, m.clusterIndex, dbConfig) + if err != nil { + return 0, fmt.Errorf("failed to trigger database creation: %w", err) + } + + m.t.Logf("Database creation triggered. Action ID: %s", resp.ActionID) + + // Wait for creation to complete + status, err := m.faultInjector.WaitForAction(ctx, resp.ActionID, + WithMaxWaitTime(5*time.Minute), + WithPollInterval(5*time.Second)) + if err != nil { + return 0, fmt.Errorf("failed to wait for database creation: %w", err) + } + + if status.Status != StatusSuccess { + return 0, fmt.Errorf("database creation failed: %v", status.Error) + } + + // Extract bdb_id from output + var bdbID int + if status.Output != nil { + if id, ok := status.Output["bdb_id"].(float64); ok { + bdbID = int(id) + } else if resultMap, ok := status.Output["result"].(map[string]interface{}); ok { + if id, ok := resultMap["bdb_id"].(float64); ok { + bdbID = int(id) + } + } + } + + if bdbID == 0 { + return 0, fmt.Errorf("failed to extract bdb_id from creation output") + } + + m.createdBdbID = bdbID + m.t.Logf("Database created successfully with bdb_id: %d", bdbID) + + return bdbID, nil +} + +// CreateDatabaseAndGetConfig creates a database and returns both the bdb_id and the full connection config from the fault injector response +// This includes endpoints, username, password, TLS settings, and raw_endpoints +func (m *TestDatabaseManager) CreateDatabaseAndGetConfig(ctx context.Context, dbConfig DatabaseConfig) (int, EnvDatabaseConfig, error) { + m.t.Logf("Creating database '%s' on port %d...", dbConfig.Name, dbConfig.Port) + + resp, err := m.faultInjector.CreateDatabase(ctx, m.clusterIndex, dbConfig) + if err != nil { + return 0, EnvDatabaseConfig{}, fmt.Errorf("failed to trigger database creation: %w", err) + } + + m.t.Logf("Database creation triggered. Action ID: %s", resp.ActionID) + + // Wait for creation to complete + status, err := m.faultInjector.WaitForAction(ctx, resp.ActionID, + WithMaxWaitTime(5*time.Minute), + WithPollInterval(5*time.Second)) + if err != nil { + return 0, EnvDatabaseConfig{}, fmt.Errorf("failed to wait for database creation: %w", err) + } + + if status.Status != StatusSuccess { + return 0, EnvDatabaseConfig{}, fmt.Errorf("database creation failed: %v", status.Error) + } + + // Extract database configuration from output + var envConfig EnvDatabaseConfig + if status.Output == nil { + return 0, EnvDatabaseConfig{}, fmt.Errorf("no output in creation response") + } + + // Extract bdb_id + var bdbID int + if id, ok := status.Output["bdb_id"].(float64); ok { + bdbID = int(id) + envConfig.BdbID = bdbID + } else { + return 0, EnvDatabaseConfig{}, fmt.Errorf("failed to extract bdb_id from creation output") + } + + // Extract username + if username, ok := status.Output["username"].(string); ok { + envConfig.Username = username + } + + // Extract password + if password, ok := status.Output["password"].(string); ok { + envConfig.Password = password + } + + // Extract TLS setting + if tls, ok := status.Output["tls"].(bool); ok { + envConfig.TLS = tls + } + + // Extract endpoints + if endpoints, ok := status.Output["endpoints"].([]interface{}); ok { + envConfig.Endpoints = make([]string, 0, len(endpoints)) + for _, ep := range endpoints { + if epStr, ok := ep.(string); ok { + envConfig.Endpoints = append(envConfig.Endpoints, epStr) + } + } + } + + // Extract raw_endpoints + if rawEndpoints, ok := status.Output["raw_endpoints"].([]interface{}); ok { + envConfig.RawEndpoints = make([]DatabaseEndpoint, 0, len(rawEndpoints)) + for _, rawEp := range rawEndpoints { + if rawEpMap, ok := rawEp.(map[string]interface{}); ok { + var dbEndpoint DatabaseEndpoint + + // Extract addr + if addr, ok := rawEpMap["addr"].([]interface{}); ok { + dbEndpoint.Addr = make([]string, 0, len(addr)) + for _, a := range addr { + if aStr, ok := a.(string); ok { + dbEndpoint.Addr = append(dbEndpoint.Addr, aStr) + } + } + } + + // Extract other fields + if addrType, ok := rawEpMap["addr_type"].(string); ok { + dbEndpoint.AddrType = addrType + } + if dnsName, ok := rawEpMap["dns_name"].(string); ok { + dbEndpoint.DNSName = dnsName + } + if preferredEndpointType, ok := rawEpMap["oss_cluster_api_preferred_endpoint_type"].(string); ok { + dbEndpoint.OSSClusterAPIPreferredEndpointType = preferredEndpointType + } + if preferredIPType, ok := rawEpMap["oss_cluster_api_preferred_ip_type"].(string); ok { + dbEndpoint.OSSClusterAPIPreferredIPType = preferredIPType + } + if port, ok := rawEpMap["port"].(float64); ok { + dbEndpoint.Port = int(port) + } + if proxyPolicy, ok := rawEpMap["proxy_policy"].(string); ok { + dbEndpoint.ProxyPolicy = proxyPolicy + } + if uid, ok := rawEpMap["uid"].(string); ok { + dbEndpoint.UID = uid + } + + envConfig.RawEndpoints = append(envConfig.RawEndpoints, dbEndpoint) + } + } + } + + m.createdBdbID = bdbID + m.t.Logf("Database created successfully with bdb_id: %d", bdbID) + m.t.Logf("Database endpoints: %v", envConfig.Endpoints) + + return bdbID, envConfig, nil +} + +// DeleteDatabase deletes the created database +func (m *TestDatabaseManager) DeleteDatabase(ctx context.Context) error { + if m.createdBdbID == 0 { + return fmt.Errorf("no database to delete (bdb_id is 0)") + } + + m.t.Logf("Deleting database with bdb_id: %d...", m.createdBdbID) + + resp, err := m.faultInjector.DeleteDatabase(ctx, m.clusterIndex, m.createdBdbID) + if err != nil { + return fmt.Errorf("failed to trigger database deletion: %w", err) + } + + m.t.Logf("Database deletion triggered. Action ID: %s", resp.ActionID) + + // Wait for deletion to complete + status, err := m.faultInjector.WaitForAction(ctx, resp.ActionID, + WithMaxWaitTime(2*time.Minute), + WithPollInterval(3*time.Second)) + if err != nil { + return fmt.Errorf("failed to wait for database deletion: %w", err) + } + + if status.Status != StatusSuccess { + return fmt.Errorf("database deletion failed: %v", status.Error) + } + + m.t.Logf("Database deleted successfully") + m.createdBdbID = 0 + + return nil +} + +// GetBdbID returns the created database ID +func (m *TestDatabaseManager) GetBdbID() int { + return m.createdBdbID +} + +// Cleanup ensures the database is deleted (safe to call multiple times) +func (m *TestDatabaseManager) Cleanup(ctx context.Context) { + if m.createdBdbID != 0 { + if err := m.DeleteDatabase(ctx); err != nil { + m.t.Logf("Warning: Failed to cleanup database: %v", err) + } + } +} + +// SetupTestDatabaseFromEnv creates a database from environment config and returns a cleanup function +// Usage: +// +// cleanup := SetupTestDatabaseFromEnv(t, ctx, "my-test-db") +// defer cleanup() +func SetupTestDatabaseFromEnv(t *testing.T, ctx context.Context, databaseName string) (bdbID int, cleanup func()) { + // Get environment config + envConfig, err := GetEnvConfig() + if err != nil { + t.Fatalf("Failed to get environment config: %v", err) + } + + // Get database config from environment + databasesConfig, err := GetDatabaseConfigFromEnv(envConfig.RedisEndpointsConfigPath) + if err != nil { + t.Fatalf("Failed to get database config: %v", err) + } + + // Get the specific database config + var envDbConfig EnvDatabaseConfig + var exists bool + if databaseName == "" { + // Get first database if no name provided + for _, config := range databasesConfig { + envDbConfig = config + exists = true + break + } + } else { + envDbConfig, exists = databasesConfig[databaseName] + } + + if !exists { + t.Fatalf("Database %s not found in configuration", databaseName) + } + + // Create fault injector + faultInjector, err := CreateTestFaultInjector() + if err != nil { + t.Fatalf("Failed to create fault injector: %v", err) + } + + // Create database manager + dbManager := NewTestDatabaseManager(t, faultInjector, 0) + + // Create the database + testDBName := fmt.Sprintf("e2e-test-%s-%d", databaseName, time.Now().Unix()) + bdbID, err = dbManager.CreateDatabaseFromEnvConfig(ctx, envDbConfig, testDBName) + if err != nil { + t.Fatalf("Failed to create test database: %v", err) + } + + // Return cleanup function + cleanup = func() { + dbManager.Cleanup(ctx) + } + + return bdbID, cleanup +} + +// SetupTestDatabaseWithConfig creates a database with custom config and returns a cleanup function +// Usage: +// +// bdbID, cleanup := SetupTestDatabaseWithConfig(t, ctx, dbConfig) +// defer cleanup() +func SetupTestDatabaseWithConfig(t *testing.T, ctx context.Context, dbConfig DatabaseConfig) (bdbID int, cleanup func()) { + // Create fault injector + faultInjector, err := CreateTestFaultInjector() + if err != nil { + t.Fatalf("Failed to create fault injector: %v", err) + } + + // Create database manager + dbManager := NewTestDatabaseManager(t, faultInjector, 0) + + // Create the database + bdbID, err = dbManager.CreateDatabase(ctx, dbConfig) + if err != nil { + t.Fatalf("Failed to create test database: %v", err) + } + + // Return cleanup function + cleanup = func() { + dbManager.Cleanup(ctx) + } + + return bdbID, cleanup +} + +// SetupTestDatabaseAndFactory creates a database from environment config and returns both bdbID, factory, and cleanup function +// This is the recommended way to setup tests as it ensures the client factory connects to the newly created database +// Usage: +// +// bdbID, factory, cleanup := SetupTestDatabaseAndFactory(t, ctx, "standalone") +// defer cleanup() +func SetupTestDatabaseAndFactory(t *testing.T, ctx context.Context, databaseName string) (bdbID int, factory *ClientFactory, cleanup func()) { + // Get environment config + envConfig, err := GetEnvConfig() + if err != nil { + t.Fatalf("Failed to get environment config: %v", err) + } + + // Get database config from environment + databasesConfig, err := GetDatabaseConfigFromEnv(envConfig.RedisEndpointsConfigPath) + if err != nil { + t.Fatalf("Failed to get database config: %v", err) + } + + // Get the specific database config + var envDbConfig EnvDatabaseConfig + var exists bool + if databaseName == "" { + // Get first database if no name provided + for _, config := range databasesConfig { + envDbConfig = config + exists = true + break + } + } else { + envDbConfig, exists = databasesConfig[databaseName] + } + + if !exists { + t.Fatalf("Database %s not found in configuration", databaseName) + } + + // Convert to DatabaseConfig + dbConfig, err := ConvertEnvDatabaseConfigToFaultInjectorConfig(envDbConfig, fmt.Sprintf("e2e-test-%s-%d", databaseName, time.Now().Unix())) + if err != nil { + t.Fatalf("Failed to convert config: %v", err) + } + + // Create fault injector + faultInjector, err := CreateTestFaultInjector() + if err != nil { + t.Fatalf("Failed to create fault injector: %v", err) + } + + // Create database manager + dbManager := NewTestDatabaseManager(t, faultInjector, 0) + + // Create the database and get the actual connection config from fault injector + bdbID, newEnvConfig, err := dbManager.CreateDatabaseAndGetConfig(ctx, dbConfig) + if err != nil { + t.Fatalf("Failed to create test database: %v", err) + } + + t.Logf("Database created successfully:") + t.Logf(" bdb_id: %d", bdbID) + t.Logf(" endpoints: %v", newEnvConfig.Endpoints) + t.Logf(" username: %s", newEnvConfig.Username) + t.Logf(" TLS: %v", newEnvConfig.TLS) + + // Use certificate location from original config if not provided by fault injector + if newEnvConfig.CertificatesLocation == "" && envDbConfig.CertificatesLocation != "" { + newEnvConfig.CertificatesLocation = envDbConfig.CertificatesLocation + } + + // Convert EnvDatabaseConfig to RedisConnectionConfig + redisConfig, err := ConvertEnvDatabaseConfigToRedisConnectionConfig(newEnvConfig) + if err != nil { + dbManager.Cleanup(ctx) + t.Fatalf("Failed to convert database config: %v", err) + } + + // Create client factory with the actual config from fault injector + factory = NewClientFactory(redisConfig) + + // Combined cleanup function + cleanup = func() { + factory.DestroyAll() + dbManager.Cleanup(ctx) + } + + return bdbID, factory, cleanup +} + +// SetupTestDatabaseAndFactoryWithConfig creates a database with custom config and returns both bdbID, factory, and cleanup function +// Usage: +// +// bdbID, factory, cleanup := SetupTestDatabaseAndFactoryWithConfig(t, ctx, "standalone", dbConfig) +// defer cleanup() +func SetupTestDatabaseAndFactoryWithConfig(t *testing.T, ctx context.Context, databaseName string, dbConfig DatabaseConfig) (bdbID int, factory *ClientFactory, cleanup func()) { + // Get environment config to use as template for connection details + envConfig, err := GetEnvConfig() + if err != nil { + t.Fatalf("Failed to get environment config: %v", err) + } + + // Get database config from environment + databasesConfig, err := GetDatabaseConfigFromEnv(envConfig.RedisEndpointsConfigPath) + if err != nil { + t.Fatalf("Failed to get database config: %v", err) + } + + // Get the specific database config as template + var envDbConfig EnvDatabaseConfig + var exists bool + if databaseName == "" { + // Get first database if no name provided + for _, config := range databasesConfig { + envDbConfig = config + exists = true + break + } + } else { + envDbConfig, exists = databasesConfig[databaseName] + } + + if !exists { + t.Fatalf("Database %s not found in configuration", databaseName) + } + + // Create fault injector + faultInjector, err := CreateTestFaultInjector() + if err != nil { + t.Fatalf("Failed to create fault injector: %v", err) + } + + // Create database manager + dbManager := NewTestDatabaseManager(t, faultInjector, 0) + + // Create the database and get the actual connection config from fault injector + bdbID, newEnvConfig, err := dbManager.CreateDatabaseAndGetConfig(ctx, dbConfig) + if err != nil { + t.Fatalf("Failed to create test database: %v", err) + } + + t.Logf("Database created successfully:") + t.Logf(" bdb_id: %d", bdbID) + t.Logf(" endpoints: %v", newEnvConfig.Endpoints) + t.Logf(" username: %s", newEnvConfig.Username) + t.Logf(" TLS: %v", newEnvConfig.TLS) + + // Use certificate location from original config if not provided by fault injector + if newEnvConfig.CertificatesLocation == "" && envDbConfig.CertificatesLocation != "" { + newEnvConfig.CertificatesLocation = envDbConfig.CertificatesLocation + } + + // Convert EnvDatabaseConfig to RedisConnectionConfig + redisConfig, err := ConvertEnvDatabaseConfigToRedisConnectionConfig(newEnvConfig) + if err != nil { + dbManager.Cleanup(ctx) + t.Fatalf("Failed to convert database config: %v", err) + } + + // Create client factory with the actual config from fault injector + factory = NewClientFactory(redisConfig) + + // Combined cleanup function + cleanup = func() { + factory.DestroyAll() + dbManager.Cleanup(ctx) + } + + return bdbID, factory, cleanup +} diff --git a/maintnotifications/e2e/fault_injector_test.go b/maintnotifications/e2e/fault_injector_test.go index b1ac92985..fbb5d4a5f 100644 --- a/maintnotifications/e2e/fault_injector_test.go +++ b/maintnotifications/e2e/fault_injector_test.go @@ -44,6 +44,10 @@ const ( // Sequence and complex actions ActionSequence ActionType = "sequence_of_actions" ActionExecuteCommand ActionType = "execute_command" + + // Database management actions + ActionDeleteDatabase ActionType = "delete_database" + ActionCreateDatabase ActionType = "create_database" ) // ActionStatus represents the status of an action @@ -120,6 +124,7 @@ func (c *FaultInjectorClient) ListActions(ctx context.Context) ([]ActionType, er // TriggerAction triggers a specific action func (c *FaultInjectorClient) TriggerAction(ctx context.Context, action ActionRequest) (*ActionResponse, error) { var response ActionResponse + fmt.Printf("[FI] Triggering action: %+v\n", action) err := c.request(ctx, "POST", "/action", action, &response) return &response, err } @@ -350,6 +355,80 @@ func (c *FaultInjectorClient) DisableMaintenanceMode(ctx context.Context, nodeID }) } +// Database Management Actions + +// EnvDatabaseConfig represents the configuration for creating a database +type DatabaseConfig struct { + Name string `json:"name"` + Port int `json:"port"` + MemorySize int64 `json:"memory_size"` + Replication bool `json:"replication"` + EvictionPolicy string `json:"eviction_policy"` + Sharding bool `json:"sharding"` + AutoUpgrade bool `json:"auto_upgrade"` + ShardsCount int `json:"shards_count"` + ModuleList []DatabaseModule `json:"module_list,omitempty"` + OSSCluster bool `json:"oss_cluster"` + OSSClusterAPIPreferredIPType string `json:"oss_cluster_api_preferred_ip_type,omitempty"` + ProxyPolicy string `json:"proxy_policy,omitempty"` + ShardsPlacement string `json:"shards_placement,omitempty"` + ShardKeyRegex []ShardKeyRegexPattern `json:"shard_key_regex,omitempty"` +} + +// DatabaseModule represents a Redis module configuration +type DatabaseModule struct { + ModuleArgs string `json:"module_args"` + ModuleName string `json:"module_name"` +} + +// ShardKeyRegexPattern represents a shard key regex pattern +type ShardKeyRegexPattern struct { + Regex string `json:"regex"` +} + +// DeleteDatabase deletes a database +// Parameters: +// - clusterIndex: The index of the cluster +// - bdbID: The database ID to delete +func (c *FaultInjectorClient) DeleteDatabase(ctx context.Context, clusterIndex int, bdbID int) (*ActionResponse, error) { + return c.TriggerAction(ctx, ActionRequest{ + Type: ActionDeleteDatabase, + Parameters: map[string]interface{}{ + "cluster_index": clusterIndex, + "bdb_id": bdbID, + }, + }) +} + +// CreateDatabase creates a new database +// Parameters: +// - clusterIndex: The index of the cluster +// - databaseConfig: The database configuration +func (c *FaultInjectorClient) CreateDatabase(ctx context.Context, clusterIndex int, databaseConfig DatabaseConfig) (*ActionResponse, error) { + return c.TriggerAction(ctx, ActionRequest{ + Type: ActionCreateDatabase, + Parameters: map[string]interface{}{ + "cluster_index": clusterIndex, + "database_config": databaseConfig, + }, + }) +} + +// CreateDatabaseFromMap creates a new database using a map for configuration +// This is useful when you want to pass a raw configuration map +// Parameters: +// - clusterIndex: The index of the cluster +// - databaseConfig: The database configuration as a map +func (c *FaultInjectorClient) CreateDatabaseFromMap(ctx context.Context, clusterIndex int, databaseConfig map[string]interface{}) (*ActionResponse, error) { + return c.TriggerAction(ctx, ActionRequest{ + Type: ActionCreateDatabase, + Parameters: map[string]interface{}{ + "cluster_index": clusterIndex, + "database_config": databaseConfig, + }, + }) +} + // Complex Actions // ExecuteSequence executes a sequence of actions diff --git a/maintnotifications/e2e/notiftracker_test.go b/maintnotifications/e2e/notiftracker_test.go index f2a97286f..b35378dad 100644 --- a/maintnotifications/e2e/notiftracker_test.go +++ b/maintnotifications/e2e/notiftracker_test.go @@ -81,6 +81,37 @@ func (tnh *TrackingNotificationsHook) Clear() { tnh.migratedCount.Store(0) tnh.failingOverCount.Store(0) } +// wait for notification in prehook +func (tnh *TrackingNotificationsHook) FindOrWaitForNotification(notificationType string, timeout time.Duration) (notification []interface{}, found bool) { + if notification, found := tnh.FindNotification(notificationType); found { + return notification, true + } + + // wait for notification + timeoutCh := time.After(timeout) + ticker := time.NewTicker(100 * time.Millisecond) + for { + select { + case <-timeoutCh: + return nil, false + case <-ticker.C: + if notification, found := tnh.FindNotification(notificationType); found { + return notification, true + } + } + } +} + +func (tnh *TrackingNotificationsHook) FindNotification(notificationType string) (notification []interface{}, found bool) { + tnh.mutex.RLock() + defer tnh.mutex.RUnlock() + for _, event := range tnh.diagnosticsLog { + if event.Type == notificationType { + return event.Details["notification"].([]interface{}), true + } + } + return nil, false +} // PreHook captures timeout-related events before processing func (tnh *TrackingNotificationsHook) PreHook(_ context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool) { diff --git a/maintnotifications/e2e/scenario_endpoint_types_test.go b/maintnotifications/e2e/scenario_endpoint_types_test.go index d1ff4f823..57bd9439f 100644 --- a/maintnotifications/e2e/scenario_endpoint_types_test.go +++ b/maintnotifications/e2e/scenario_endpoint_types_test.go @@ -21,17 +21,11 @@ func TestEndpointTypesPushNotifications(t *testing.T) { t.Skip("Scenario tests require E2E_SCENARIO_TESTS=true") } - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 25*time.Minute) defer cancel() var dump = true var errorsDetected = false - var p = func(format string, args ...interface{}) { - format = "[%s][ENDPOINT-TYPES] " + format - ts := time.Now().Format("15:04:05.000") - args = append([]interface{}{ts}, args...) - t.Logf(format, args...) - } // Test different endpoint types endpointTypes := []struct { @@ -60,49 +54,51 @@ func TestEndpointTypesPushNotifications(t *testing.T) { logCollector.Clear() }() - // Create client factory from configuration - factory, err := CreateTestClientFactory("standalone") - if err != nil { - t.Skipf("Enterprise cluster not available, skipping endpoint types test: %v", err) - } - endpointConfig := factory.GetConfig() + // Test each endpoint type with its own fresh database + for _, endpointTest := range endpointTypes { + t.Run(endpointTest.name, func(t *testing.T) { + // Setup: Create fresh database and client factory for THIS endpoint type test + bdbID, factory, cleanup := SetupTestDatabaseAndFactory(t, ctx, "standalone") + defer cleanup() + t.Logf("[ENDPOINT-TYPES-%s] Created test database with bdb_id: %d", endpointTest.name, bdbID) - // Create fault injector - faultInjector, err := CreateTestFaultInjector() - if err != nil { - t.Fatalf("Failed to create fault injector: %v", err) - } + // Create fault injector + faultInjector, err := CreateTestFaultInjector() + if err != nil { + t.Fatalf("[ERROR] Failed to create fault injector: %v", err) + } - defer func() { - if dump { - p("Pool stats:") - factory.PrintPoolStats(t) - } - factory.DestroyAll() - }() + // Get endpoint config from factory (now connected to new database) + endpointConfig := factory.GetConfig() - // Test each endpoint type - for _, endpointTest := range endpointTypes { - t.Run(endpointTest.name, func(t *testing.T) { + defer func() { + if dump { + fmt.Println("Pool stats:") + factory.PrintPoolStats(t) + } + }() // Clear logs between endpoint type tests logCollector.Clear() - dump = true // reset dump flag + // reset errors detected flag + errorsDetected = false + // reset dump flag + dump = true // redefine p and e for each test to get // proper test name in logs and proper test failures var p = func(format string, args ...interface{}) { - format = "[%s][ENDPOINT-TYPES] " + format - ts := time.Now().Format("15:04:05.000") - args = append([]interface{}{ts}, args...) - t.Logf(format, args...) + printLog("ENDPOINT-TYPES", false, format, args...) } var e = func(format string, args ...interface{}) { errorsDetected = true - format = "[%s][ENDPOINT-TYPES][ERROR] " + format - ts := time.Now().Format("15:04:05.000") - args = append([]interface{}{ts}, args...) - t.Errorf(format, args...) + printLog("ENDPOINT-TYPES", true, format, args...) } + + var ef = func(format string, args ...interface{}) { + printLog("ENDPOINT-TYPES", true, format, args...) + t.FailNow() + } + p("Testing endpoint type: %s - %s", endpointTest.name, endpointTest.description) minIdleConns := 3 @@ -126,7 +122,7 @@ func TestEndpointTypesPushNotifications(t *testing.T) { ClientName: fmt.Sprintf("endpoint-test-%s", endpointTest.name), }) if err != nil { - t.Fatalf("Failed to create client for %s: %v", endpointTest.name, err) + ef("Failed to create client for %s: %v", endpointTest.name, err) } // Create timeout tracker @@ -134,17 +130,13 @@ func TestEndpointTypesPushNotifications(t *testing.T) { logger := maintnotifications.NewLoggingHook(int(logging.LogLevelDebug)) setupNotificationHooks(client, tracker, logger) defer func() { - if dump { - p("Tracker analysis for %s:", endpointTest.name) - tracker.GetAnalysis().Print(t) - } tracker.Clear() }() // Verify initial connectivity err = client.Ping(ctx).Err() if err != nil { - t.Fatalf("Failed to ping Redis with %s endpoint type: %v", endpointTest.name, err) + ef("Failed to ping Redis with %s endpoint type: %v", endpointTest.name, err) } p("Client connected successfully with %s endpoint type", endpointTest.name) @@ -160,16 +152,15 @@ func TestEndpointTypesPushNotifications(t *testing.T) { }() // Test failover with this endpoint type - p("Testing failover with %s endpoint type...", endpointTest.name) + p("Testing failover with %s endpoint type on database [bdb_id:%s]...", endpointTest.name, endpointConfig.BdbID) failoverResp, err := faultInjector.TriggerAction(ctx, ActionRequest{ Type: "failover", Parameters: map[string]interface{}{ - "cluster_index": "0", - "bdb_id": endpointConfig.BdbID, + "bdb_id": endpointConfig.BdbID, }, }) if err != nil { - t.Fatalf("Failed to trigger failover action for %s: %v", endpointTest.name, err) + ef("Failed to trigger failover action for %s: %v", endpointTest.name, err) } // Start command traffic @@ -177,12 +168,22 @@ func TestEndpointTypesPushNotifications(t *testing.T) { commandsRunner.FireCommandsUntilStop(ctx) }() + // Wait for failover to complete + status, err := faultInjector.WaitForAction(ctx, failoverResp.ActionID, + WithMaxWaitTime(240*time.Second), + WithPollInterval(2*time.Second), + ) + if err != nil { + ef("[FI] Failover action failed for %s: %v", endpointTest.name, err) + } + p("[FI] Failover action completed for %s: %s %s", endpointTest.name, status.Status, actionOutputIfFailed(status)) + // Wait for FAILING_OVER notification match, found := logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { return strings.Contains(s, logs2.ProcessingNotificationMessage) && notificationType(s, "FAILING_OVER") - }, 2*time.Minute) + }, 3*time.Minute) if !found { - t.Fatalf("FAILING_OVER notification was not received for %s endpoint type", endpointTest.name) + ef("FAILING_OVER notification was not received for %s endpoint type", endpointTest.name) } failingOverData := logs2.ExtractDataFromLogMessage(match) p("FAILING_OVER notification received for %s. %v", endpointTest.name, failingOverData) @@ -192,63 +193,53 @@ func TestEndpointTypesPushNotifications(t *testing.T) { connIDToObserve := uint64(failingOverData["connID"].(float64)) match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { return notificationType(s, "FAILED_OVER") && connID(s, connIDToObserve) && seqID(s, seqIDToObserve+1) - }, 2*time.Minute) + }, 3*time.Minute) if !found { - t.Fatalf("FAILED_OVER notification was not received for %s endpoint type", endpointTest.name) + ef("FAILED_OVER notification was not received for %s endpoint type", endpointTest.name) } failedOverData := logs2.ExtractDataFromLogMessage(match) p("FAILED_OVER notification received for %s. %v", endpointTest.name, failedOverData) - // Wait for failover to complete - status, err := faultInjector.WaitForAction(ctx, failoverResp.ActionID, - WithMaxWaitTime(120*time.Second), - WithPollInterval(1*time.Second), - ) - if err != nil { - t.Fatalf("[FI] Failover action failed for %s: %v", endpointTest.name, err) - } - p("[FI] Failover action completed for %s: %s", endpointTest.name, status.Status) - // Test migration with this endpoint type p("Testing migration with %s endpoint type...", endpointTest.name) migrateResp, err := faultInjector.TriggerAction(ctx, ActionRequest{ Type: "migrate", Parameters: map[string]interface{}{ - "cluster_index": "0", + "bdb_id": endpointConfig.BdbID, }, }) if err != nil { - t.Fatalf("Failed to trigger migrate action for %s: %v", endpointTest.name, err) + ef("Failed to trigger migrate action for %s: %v", endpointTest.name, err) + } + + // Wait for migration to complete + status, err = faultInjector.WaitForAction(ctx, migrateResp.ActionID, + WithMaxWaitTime(240*time.Second), + WithPollInterval(2*time.Second), + ) + if err != nil { + ef("[FI] Migrate action failed for %s: %v", endpointTest.name, err) } + p("[FI] Migrate action completed for %s: %s %s", endpointTest.name, status.Status, actionOutputIfFailed(status)) // Wait for MIGRATING notification - match, found = logCollector.WaitForLogMatchFunc(func(s string) bool { + match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { return strings.Contains(s, logs2.ProcessingNotificationMessage) && strings.Contains(s, "MIGRATING") - }, 30*time.Second) + }, 60*time.Second) if !found { - t.Fatalf("MIGRATING notification was not received for %s endpoint type", endpointTest.name) + ef("MIGRATING notification was not received for %s endpoint type", endpointTest.name) } migrateData := logs2.ExtractDataFromLogMessage(match) p("MIGRATING notification received for %s: %v", endpointTest.name, migrateData) - // Wait for migration to complete - status, err = faultInjector.WaitForAction(ctx, migrateResp.ActionID, - WithMaxWaitTime(120*time.Second), - WithPollInterval(1*time.Second), - ) - if err != nil { - t.Fatalf("[FI] Migrate action failed for %s: %v", endpointTest.name, err) - } - p("[FI] Migrate action completed for %s: %s", endpointTest.name, status.Status) - // Wait for MIGRATED notification seqIDToObserve = int64(migrateData["seqID"].(float64)) connIDToObserve = uint64(migrateData["connID"].(float64)) match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { return notificationType(s, "MIGRATED") && connID(s, connIDToObserve) && seqID(s, seqIDToObserve+1) - }, 2*time.Minute) + }, 3*time.Minute) if !found { - t.Fatalf("MIGRATED notification was not received for %s endpoint type", endpointTest.name) + ef("MIGRATED notification was not received for %s endpoint type", endpointTest.name) } migratedData := logs2.ExtractDataFromLogMessage(match) p("MIGRATED notification received for %s. %v", endpointTest.name, migratedData) @@ -257,20 +248,19 @@ func TestEndpointTypesPushNotifications(t *testing.T) { bindResp, err := faultInjector.TriggerAction(ctx, ActionRequest{ Type: "bind", Parameters: map[string]interface{}{ - "cluster_index": "0", - "bdb_id": endpointConfig.BdbID, + "bdb_id": endpointConfig.BdbID, }, }) if err != nil { - t.Fatalf("Failed to trigger bind action for %s: %v", endpointTest.name, err) + ef("Failed to trigger bind action for %s: %v", endpointTest.name, err) } // Wait for MOVING notification match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { return strings.Contains(s, logs2.ProcessingNotificationMessage) && notificationType(s, "MOVING") - }, 2*time.Minute) + }, 3*time.Minute) if !found { - t.Fatalf("MOVING notification was not received for %s endpoint type", endpointTest.name) + ef("MOVING notification was not received for %s endpoint type", endpointTest.name) } movingData := logs2.ExtractDataFromLogMessage(match) p("MOVING notification received for %s. %v", endpointTest.name, movingData) @@ -319,12 +309,12 @@ func TestEndpointTypesPushNotifications(t *testing.T) { // Wait for bind to complete bindStatus, err := faultInjector.WaitForAction(ctx, bindResp.ActionID, - WithMaxWaitTime(120*time.Second), + WithMaxWaitTime(240*time.Second), WithPollInterval(2*time.Second)) if err != nil { - t.Fatalf("Bind action failed for %s: %v", endpointTest.name, err) + ef("Bind action failed for %s: %v", endpointTest.name, err) } - p("Bind action completed for %s: %s", endpointTest.name, bindStatus.Status) + p("Bind action completed for %s: %s %s", endpointTest.name, bindStatus.Status, actionOutputIfFailed(bindStatus)) // Continue traffic for analysis time.Sleep(30 * time.Second) @@ -357,14 +347,21 @@ func TestEndpointTypesPushNotifications(t *testing.T) { e("Expected MOVING notifications with %s endpoint type, got none", endpointTest.name) } + logAnalysis := logCollector.GetAnalysis() + if logAnalysis.TotalHandoffCount == 0 { + e("Expected at least one handoff with %s endpoint type, got none", endpointTest.name) + } + if logAnalysis.TotalHandoffCount != logAnalysis.SucceededHandoffCount { + e("Expected all handoffs to succeed with %s endpoint type, got %d failed", endpointTest.name, logAnalysis.FailedHandoffCount) + } + if errorsDetected { logCollector.DumpLogs() trackerAnalysis.Print(t) logCollector.Clear() tracker.Clear() - t.Fatalf("[FAIL] Errors detected with %s endpoint type", endpointTest.name) + ef("[FAIL] Errors detected with %s endpoint type", endpointTest.name) } - dump = false p("Endpoint type %s test completed successfully", endpointTest.name) logCollector.GetAnalysis().Print(t) trackerAnalysis.Print(t) @@ -373,5 +370,5 @@ func TestEndpointTypesPushNotifications(t *testing.T) { }) } - p("All endpoint types tested successfully") + t.Log("All endpoint types tested successfully") } diff --git a/maintnotifications/e2e/scenario_push_notifications_test.go b/maintnotifications/e2e/scenario_push_notifications_test.go index 74d0a894b..7666ed0c4 100644 --- a/maintnotifications/e2e/scenario_push_notifications_test.go +++ b/maintnotifications/e2e/scenario_push_notifications_test.go @@ -19,9 +19,17 @@ func TestPushNotifications(t *testing.T) { t.Skip("Scenario tests require E2E_SCENARIO_TESTS=true") } - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute) defer cancel() + // Setup: Create fresh database and client factory for this test + bdbID, factory, cleanup := SetupTestDatabaseAndFactory(t, ctx, "standalone") + defer cleanup() + t.Logf("[PUSH-NOTIFICATIONS] Created test database with bdb_id: %d", bdbID) + + // Wait for database to be fully ready + time.Sleep(10 * time.Second) + var dump = true var seqIDToObserve int64 var connIDToObserve uint64 @@ -30,45 +38,34 @@ func TestPushNotifications(t *testing.T) { var found bool var status *ActionStatusResponse + var errorsDetected = false var p = func(format string, args ...interface{}) { - format = "[%s] " + format - ts := time.Now().Format("15:04:05.000") - args = append([]interface{}{ts}, args...) - t.Logf(format, args...) + printLog("PUSH-NOTIFICATIONS", false, format, args...) } - var errorsDetected = false var e = func(format string, args ...interface{}) { errorsDetected = true - format = "[%s][ERROR] " + format - ts := time.Now().Format("15:04:05.000") - args = append([]interface{}{ts}, args...) - t.Errorf(format, args...) + printLog("PUSH-NOTIFICATIONS", true, format, args...) + } + + var ef = func(format string, args ...interface{}) { + printLog("PUSH-NOTIFICATIONS", true, format, args...) + t.FailNow() } logCollector.ClearLogs() defer func() { - if dump { - p("Dumping logs...") - logCollector.DumpLogs() - p("Log Analysis:") - logCollector.GetAnalysis().Print(t) - } logCollector.Clear() }() - // Create client factory from configuration - factory, err := CreateTestClientFactory("standalone") - if err != nil { - t.Skipf("Enterprise cluster not available, skipping push notification tests: %v", err) - } + // Get endpoint config from factory (now connected to new database) endpointConfig := factory.GetConfig() // Create fault injector faultInjector, err := CreateTestFaultInjector() if err != nil { - t.Fatalf("Failed to create fault injector: %v", err) + ef("Failed to create fault injector: %v", err) } minIdleConns := 5 @@ -91,14 +88,10 @@ func TestPushNotifications(t *testing.T) { ClientName: "push-notification-test-client", }) if err != nil { - t.Fatalf("Failed to create client: %v", err) + ef("Failed to create client: %v", err) } defer func() { - if dump { - p("Pool stats:") - factory.PrintPoolStats(t) - } factory.DestroyAll() }() @@ -107,16 +100,13 @@ func TestPushNotifications(t *testing.T) { logger := maintnotifications.NewLoggingHook(int(logging.LogLevelDebug)) setupNotificationHooks(client, tracker, logger) defer func() { - if dump { - tracker.GetAnalysis().Print(t) - } tracker.Clear() }() // Verify initial connectivity err = client.Ping(ctx).Err() if err != nil { - t.Fatalf("Failed to ping Redis: %v", err) + ef("Failed to ping Redis: %v", err) } p("Client connected successfully, starting push notification test") @@ -138,23 +128,22 @@ func TestPushNotifications(t *testing.T) { failoverResp, err := faultInjector.TriggerAction(ctx, ActionRequest{ Type: "failover", Parameters: map[string]interface{}{ - "cluster_index": "0", - "bdb_id": endpointConfig.BdbID, + "bdb_id": endpointConfig.BdbID, }, }) if err != nil { - t.Fatalf("Failed to trigger failover action: %v", err) + ef("Failed to trigger failover action: %v", err) } go func() { p("Waiting for FAILING_OVER notification") match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { return strings.Contains(s, logs2.ProcessingNotificationMessage) && notificationType(s, "FAILING_OVER") - }, 2*time.Minute) + }, 3*time.Minute) commandsRunner.Stop() }() commandsRunner.FireCommandsUntilStop(ctx) if !found { - t.Fatal("FAILING_OVER notification was not received within 2 minutes") + ef("FAILING_OVER notification was not received within 3 minutes") } failingOverData := logs2.ExtractDataFromLogMessage(match) p("FAILING_OVER notification received. %v", failingOverData) @@ -164,24 +153,24 @@ func TestPushNotifications(t *testing.T) { p("Waiting for FAILED_OVER notification on conn %d with seqID %d...", connIDToObserve, seqIDToObserve+1) match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { return notificationType(s, "FAILED_OVER") && connID(s, connIDToObserve) && seqID(s, seqIDToObserve+1) - }, 2*time.Minute) + }, 3*time.Minute) commandsRunner.Stop() }() commandsRunner.FireCommandsUntilStop(ctx) if !found { - t.Fatal("FAILED_OVER notification was not received within 2 minutes") + ef("FAILED_OVER notification was not received within 3 minutes") } failedOverData := logs2.ExtractDataFromLogMessage(match) p("FAILED_OVER notification received. %v", failedOverData) status, err = faultInjector.WaitForAction(ctx, failoverResp.ActionID, - WithMaxWaitTime(120*time.Second), - WithPollInterval(1*time.Second), + WithMaxWaitTime(240*time.Second), + WithPollInterval(2*time.Second), ) if err != nil { - t.Fatalf("[FI] Failover action failed: %v", err) + ef("[FI] Failover action failed: %v", err) } - fmt.Printf("[FI] Failover action completed: %s\n", status.Status) + p("[FI] Failover action completed: %v %s", status.Status, actionOutputIfFailed(status)) p("FAILING_OVER / FAILED_OVER notifications test completed successfully") @@ -190,21 +179,29 @@ func TestPushNotifications(t *testing.T) { migrateResp, err := faultInjector.TriggerAction(ctx, ActionRequest{ Type: "migrate", Parameters: map[string]interface{}{ - "cluster_index": "0", + "bdb_id": endpointConfig.BdbID, }, }) if err != nil { - t.Fatalf("Failed to trigger migrate action: %v", err) + ef("Failed to trigger migrate action: %v", err) } go func() { - match, found = logCollector.WaitForLogMatchFunc(func(s string) bool { + match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { return strings.Contains(s, logs2.ProcessingNotificationMessage) && strings.Contains(s, "MIGRATING") - }, 20*time.Second) + }, 60*time.Second) commandsRunner.Stop() }() commandsRunner.FireCommandsUntilStop(ctx) if !found { - t.Fatal("MIGRATING notification for migrate action was not received within 20 seconds") + status, err = faultInjector.WaitForAction(ctx, migrateResp.ActionID, + WithMaxWaitTime(240*time.Second), + WithPollInterval(2*time.Second), + ) + if err != nil { + ef("[FI] Migrate action failed: %v", err) + } + p("[FI] Migrate action completed: %s %s", status.Status, actionOutputIfFailed(status)) + ef("MIGRATING notification for migrate action was not received within 60 seconds") } migrateData := logs2.ExtractDataFromLogMessage(match) seqIDToObserve = int64(migrateData["seqID"].(float64)) @@ -212,24 +209,24 @@ func TestPushNotifications(t *testing.T) { p("MIGRATING notification received: seqID: %d, connID: %d", seqIDToObserve, connIDToObserve) status, err = faultInjector.WaitForAction(ctx, migrateResp.ActionID, - WithMaxWaitTime(120*time.Second), - WithPollInterval(1*time.Second), + WithMaxWaitTime(240*time.Second), + WithPollInterval(2*time.Second), ) if err != nil { - t.Fatalf("[FI] Migrate action failed: %v", err) + ef("[FI] Migrate action failed: %v", err) } - fmt.Printf("[FI] Migrate action completed: %s\n", status.Status) + p("[FI] Migrate action completed: %s %s", status.Status, actionOutputIfFailed(status)) go func() { p("Waiting for MIGRATED notification on conn %d with seqID %d...", connIDToObserve, seqIDToObserve+1) match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { return notificationType(s, "MIGRATED") && connID(s, connIDToObserve) && seqID(s, seqIDToObserve+1) - }, 2*time.Minute) + }, 3*time.Minute) commandsRunner.Stop() }() commandsRunner.FireCommandsUntilStop(ctx) if !found { - t.Fatal("MIGRATED notification was not received within 2 minutes") + ef("MIGRATED notification was not received within 3 minutes") } migratedData := logs2.ExtractDataFromLogMessage(match) p("MIGRATED notification received. %v", migratedData) @@ -242,12 +239,11 @@ func TestPushNotifications(t *testing.T) { bindResp, err := faultInjector.TriggerAction(ctx, ActionRequest{ Type: "bind", Parameters: map[string]interface{}{ - "cluster_index": "0", - "bdb_id": endpointConfig.BdbID, + "bdb_id": endpointConfig.BdbID, }, }) if err != nil { - t.Fatalf("Failed to trigger bind action: %v", err) + ef("Failed to trigger bind action: %v", err) } // start a second client but don't execute any commands on it @@ -269,14 +265,14 @@ func TestPushNotifications(t *testing.T) { }) if err != nil { - t.Fatalf("failed to create client: %v", err) + ef("failed to create client: %v", err) } // setup tracking for second client tracker2 := NewTrackingNotificationsHook() logger2 := maintnotifications.NewLoggingHook(int(logging.LogLevelDebug)) setupNotificationHooks(client2, tracker2, logger2) commandsRunner2, _ := NewCommandRunner(client2) - t.Log("Second client created") + p("Second client created") // Use a channel to communicate errors from the goroutine errChan := make(chan error, 1) @@ -288,11 +284,16 @@ func TestPushNotifications(t *testing.T) { } }() - p("Waiting for MOVING notification on second client") + p("Waiting for MOVING notification on first client") match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { return strings.Contains(s, logs2.ProcessingNotificationMessage) && notificationType(s, "MOVING") - }, 2*time.Minute) + }, 3*time.Minute) commandsRunner.Stop() + if !found { + errChan <- fmt.Errorf("MOVING notification was not received within 3 minutes ON A FIRST CLIENT") + return + } + // once moving is received, start a second client commands runner p("Starting commands on second client") go commandsRunner2.FireCommandsUntilStop(ctx) @@ -302,52 +303,93 @@ func TestPushNotifications(t *testing.T) { // destroy the second client factory.Destroy("push-notification-client-2") }() - // wait for moving on second client - // we know the maxconn is 15, assuming 16/17 was used to init the second client, so connID 18 should be from the second client - // also validate big enough relaxed timeout - match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { - return strings.Contains(s, logs2.ProcessingNotificationMessage) && notificationType(s, "MOVING") && connID(s, 18) - }, 2*time.Minute) - if !found { - errChan <- fmt.Errorf("MOVING notification was not received within 2 minutes ON A SECOND CLIENT") - return - } else { - p("MOVING notification received on second client %v", logs2.ExtractDataFromLogMessage(match)) - } - // wait for relaxation of 30m - match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { - return strings.Contains(s, logs2.ApplyingRelaxedTimeoutDueToPostHandoffMessage) && strings.Contains(s, "30m") - }, 2*time.Minute) - if !found { - errChan <- fmt.Errorf("relaxed timeout was not applied within 2 minutes ON A SECOND CLIENT") + + p("Waiting for MOVING notification on second client") + matchNotif, fnd := tracker2.FindOrWaitForNotification("MOVING", 3*time.Minute) + if !fnd { + errChan <- fmt.Errorf("MOVING notification was not received within 3 minutes ON A SECOND CLIENT") return } else { - p("Relaxed timeout applied on second client") + p("MOVING notification received on second client %v", matchNotif) } + // Signal success errChan <- nil }() commandsRunner.FireCommandsUntilStop(ctx) - + // wait for moving on first client + // once the commandRunner stops, it means a waiting + // on the logCollector match has completed and we can proceed + if !found { + ef("MOVING notification was not received within 3 minutes") + } movingData := logs2.ExtractDataFromLogMessage(match) p("MOVING notification received. %v", movingData) seqIDToObserve = int64(movingData["seqID"].(float64)) connIDToObserve = uint64(movingData["connID"].(float64)) + time.Sleep(3 * time.Second) + // start a third client but don't execute any commands on it + p("Starting a third client to observe notification during moving...") + client3, err := factory.Create("push-notification-client-2", &CreateClientOptions{ + Protocol: 3, // RESP3 required for push notifications + PoolSize: poolSize, + MinIdleConns: minIdleConns, + MaxActiveConns: maxConnections, + MaintNotificationsConfig: &maintnotifications.Config{ + Mode: maintnotifications.ModeEnabled, + HandoffTimeout: 40 * time.Second, // 30 seconds + RelaxedTimeout: 30 * time.Minute, // 30 minutes relaxed timeout for second client + PostHandoffRelaxedDuration: 2 * time.Second, // 2 seconds post-handoff relaxed duration + MaxWorkers: 20, + EndpointType: maintnotifications.EndpointTypeExternalIP, // Use external IP for enterprise + }, + ClientName: "push-notification-test-client-3", + }) + + if err != nil { + ef("failed to create client: %v", err) + } + // setup tracking for second client + tracker3 := NewTrackingNotificationsHook() + logger3 := maintnotifications.NewLoggingHook(int(logging.LogLevelDebug)) + setupNotificationHooks(client3, tracker3, logger3) + commandsRunner3, _ := NewCommandRunner(client3) + p("Third client created") + go commandsRunner3.FireCommandsUntilStop(ctx) + // wait for moving on third client + movingNotification, found := tracker3.FindOrWaitForNotification("MOVING", 3*time.Minute) + if !found { + p("[NOTICE] MOVING notification was not received within 3 minutes ON A THIRD CLIENT") + } else { + p("MOVING notification received on third client. %v", movingNotification) + if len(movingNotification) != 4 { + p("[NOTICE] Invalid MOVING notification format: %s", movingNotification) + } + mNotifTimeS, ok := movingNotification[2].(int64) + if !ok { + p("[NOTICE] Invalid timeS in MOVING notification: %s", movingNotification) + } + // expect timeS to be less than 15 + if mNotifTimeS < 15 { + p("[NOTICE] Expected timeS < 15, got %d", mNotifTimeS) + } + } + commandsRunner3.Stop() // Wait for the goroutine to complete and check for errors if err := <-errChan; err != nil { - t.Fatalf("Second client goroutine error: %v", err) + ef("Second client goroutine error: %v", err) } // Wait for bind action to complete bindStatus, err := faultInjector.WaitForAction(ctx, bindResp.ActionID, - WithMaxWaitTime(120*time.Second), + WithMaxWaitTime(240*time.Second), WithPollInterval(2*time.Second)) if err != nil { - t.Fatalf("Bind action failed: %v", err) + ef("Bind action failed: %v", err) } - p("Bind action completed: %s", bindStatus.Status) + p("Bind action completed: %s %s", bindStatus.Status, actionOutputIfFailed(bindStatus)) p("MOVING notification test completed successfully") @@ -457,12 +499,10 @@ func TestPushNotifications(t *testing.T) { trackerAnalysis.Print(t) logCollector.Clear() tracker.Clear() - t.Fatalf("[FAIL] Errors detected in push notification test") + ef("[FAIL] Errors detected in push notification test") } p("Analysis complete, no errors found") - // print analysis here, don't dump logs later - dump = false allLogsAnalysis.Print(t) trackerAnalysis.Print(t) p("Command runner stats:") diff --git a/maintnotifications/e2e/scenario_stress_test.go b/maintnotifications/e2e/scenario_stress_test.go index 5a788ef18..2eea14448 100644 --- a/maintnotifications/e2e/scenario_stress_test.go +++ b/maintnotifications/e2e/scenario_stress_test.go @@ -16,49 +16,49 @@ import ( // TestStressPushNotifications tests push notifications under extreme stress conditions func TestStressPushNotifications(t *testing.T) { if os.Getenv("E2E_SCENARIO_TESTS") != "true" { - t.Skip("Scenario tests require E2E_SCENARIO_TESTS=true") + t.Skip("[STRESS][SKIP] Scenario tests require E2E_SCENARIO_TESTS=true") } - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 35*time.Minute) defer cancel() + // Setup: Create fresh database and client factory for this test + bdbID, factory, cleanup := SetupTestDatabaseAndFactory(t, ctx, "standalone") + defer cleanup() + t.Logf("[STRESS] Created test database with bdb_id: %d", bdbID) + + // Wait for database to be fully ready + time.Sleep(10 * time.Second) + var dump = true + var errorsDetected = false + var p = func(format string, args ...interface{}) { - format = "[%s][STRESS] " + format - ts := time.Now().Format("15:04:05.000") - args = append([]interface{}{ts}, args...) - t.Logf(format, args...) + printLog("STRESS", false, format, args...) } var e = func(format string, args ...interface{}) { - format = "[%s][STRESS][ERROR] " + format - ts := time.Now().Format("15:04:05.000") - args = append([]interface{}{ts}, args...) - t.Errorf(format, args...) + errorsDetected = true + printLog("STRESS", true, format, args...) + } + + var ef = func(format string, args ...interface{}) { + printLog("STRESS", true, format, args...) + t.FailNow() } logCollector.ClearLogs() defer func() { - if dump { - p("Dumping logs...") - logCollector.DumpLogs() - p("Log Analysis:") - logCollector.GetAnalysis().Print(t) - } logCollector.Clear() }() - // Create client factory from configuration - factory, err := CreateTestClientFactory("standalone") - if err != nil { - t.Skipf("Enterprise cluster not available, skipping stress test: %v", err) - } + // Get endpoint config from factory (now connected to new database) endpointConfig := factory.GetConfig() // Create fault injector faultInjector, err := CreateTestFaultInjector() if err != nil { - t.Fatalf("Failed to create fault injector: %v", err) + ef("Failed to create fault injector: %v", err) } // Extreme stress configuration @@ -90,7 +90,7 @@ func TestStressPushNotifications(t *testing.T) { ClientName: fmt.Sprintf("stress-test-client-%d", i), }) if err != nil { - t.Fatalf("Failed to create stress client %d: %v", i, err) + ef("Failed to create stress client %d: %v", i, err) } clients = append(clients, client) @@ -109,10 +109,6 @@ func TestStressPushNotifications(t *testing.T) { if dump { p("Pool stats:") factory.PrintPoolStats(t) - for i, tracker := range trackers { - p("Stress client %d analysis:", i) - tracker.GetAnalysis().Print(t) - } } for _, runner := range commandRunners { runner.Stop() @@ -124,7 +120,7 @@ func TestStressPushNotifications(t *testing.T) { for i, client := range clients { err = client.Ping(ctx).Err() if err != nil { - t.Fatalf("Failed to ping Redis with stress client %d: %v", i, err) + ef("Failed to ping Redis with stress client %d: %v", i, err) } } @@ -179,15 +175,14 @@ func TestStressPushNotifications(t *testing.T) { resp, err = faultInjector.TriggerAction(ctx, ActionRequest{ Type: "failover", Parameters: map[string]interface{}{ - "cluster_index": "0", - "bdb_id": endpointConfig.BdbID, + "bdb_id": endpointConfig.BdbID, }, }) case "migrate": resp, err = faultInjector.TriggerAction(ctx, ActionRequest{ Type: "migrate", Parameters: map[string]interface{}{ - "cluster_index": "0", + "bdb_id": endpointConfig.BdbID, }, }) } @@ -199,7 +194,7 @@ func TestStressPushNotifications(t *testing.T) { // Wait for action to complete status, err := faultInjector.WaitForAction(ctx, resp.ActionID, - WithMaxWaitTime(300*time.Second), // Very long wait for stress + WithMaxWaitTime(360*time.Second), // Longer wait time for stress WithPollInterval(2*time.Second), ) if err != nil { @@ -208,10 +203,10 @@ func TestStressPushNotifications(t *testing.T) { } actionMutex.Lock() - actionResults = append(actionResults, fmt.Sprintf("%s: %s", actionName, status.Status)) + actionResults = append(actionResults, fmt.Sprintf("%s: %s %s", actionName, status.Status, actionOutputIfFailed(status))) actionMutex.Unlock() - p("[FI] %s action completed: %s", actionName, status.Status) + p("[FI] %s action completed: %s %s", actionName, status.Status, actionOutputIfFailed(status)) }(action.name, action.action, action.delay) } @@ -287,14 +282,27 @@ func TestStressPushNotifications(t *testing.T) { e("Too many notification processing errors under stress: %d/%d", totalProcessingErrors, totalTrackerNotifications) } - p("Stress test completed successfully!") + if errorsDetected { + ef("Errors detected under stress") + logCollector.DumpLogs() + for i, tracker := range trackers { + p("=== Stress Client %d Analysis ===", i) + tracker.GetAnalysis().Print(t) + } + logCollector.Clear() + for _, tracker := range trackers { + tracker.Clear() + } + } + + dump = false + p("[SUCCESS] Stress test completed successfully!") p("Processed %d operations across %d clients with %d connections", totalOperations, numClients, allLogsAnalysis.ConnectionCount) p("Error rate: %.2f%%, Notification processing errors: %d/%d", errorRate, totalProcessingErrors, totalTrackerNotifications) // Print final analysis - dump = false allLogsAnalysis.Print(t) for i, tracker := range trackers { p("=== Stress Client %d Analysis ===", i) diff --git a/maintnotifications/e2e/scenario_template.go.example b/maintnotifications/e2e/scenario_template.go.example index 963971502..50791aa65 100644 --- a/maintnotifications/e2e/scenario_template.go.example +++ b/maintnotifications/e2e/scenario_template.go.example @@ -130,7 +130,7 @@ func TestScenarioTemplate(t *testing.T) { // Step 8: Wait for fault injection to complete // status, err := faultInjector.WaitForAction(ctx, resp.ActionID, - // WithMaxWaitTime(120*time.Second), + // WithMaxWaitTime(240*time.Second), // WithPollInterval(2*time.Second)) // if err != nil { // t.Fatalf("Fault injection failed: %v", err) diff --git a/maintnotifications/e2e/scenario_timeout_configs_test.go b/maintnotifications/e2e/scenario_timeout_configs_test.go index 0477a53fb..ae7fcdb0d 100644 --- a/maintnotifications/e2e/scenario_timeout_configs_test.go +++ b/maintnotifications/e2e/scenario_timeout_configs_test.go @@ -19,15 +19,19 @@ func TestTimeoutConfigurationsPushNotifications(t *testing.T) { t.Skip("Scenario tests require E2E_SCENARIO_TESTS=true") } - ctx, cancel := context.WithTimeout(context.Background(), 25*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) defer cancel() var dump = true + + var errorsDetected = false var p = func(format string, args ...interface{}) { - format = "[%s][TIMEOUT-CONFIGS] " + format - ts := time.Now().Format("15:04:05.000") - args = append([]interface{}{ts}, args...) - t.Logf(format, args...) + printLog("TIMEOUT-CONFIGS", false, format, args...) + } + + var e = func(format string, args ...interface{}) { + errorsDetected = true + printLog("TIMEOUT-CONFIGS", true, format, args...) } // Test different timeout configurations @@ -42,8 +46,8 @@ func TestTimeoutConfigurationsPushNotifications(t *testing.T) { { name: "Conservative", handoffTimeout: 60 * time.Second, - relaxedTimeout: 20 * time.Second, - postHandoffRelaxedDuration: 5 * time.Second, + relaxedTimeout: 30 * time.Second, + postHandoffRelaxedDuration: 2 * time.Minute, description: "Conservative timeouts for stable environments", expectedBehavior: "Longer timeouts, fewer timeout errors", }, @@ -67,54 +71,39 @@ func TestTimeoutConfigurationsPushNotifications(t *testing.T) { logCollector.ClearLogs() defer func() { - if dump { - p("Dumping logs...") - logCollector.DumpLogs() - p("Log Analysis:") - logCollector.GetAnalysis().Print(t) - } logCollector.Clear() }() - // Create client factory from configuration - factory, err := CreateTestClientFactory("standalone") - if err != nil { - t.Skipf("Enterprise cluster not available, skipping timeout configs test: %v", err) - } - endpointConfig := factory.GetConfig() - - // Create fault injector - faultInjector, err := CreateTestFaultInjector() - if err != nil { - t.Fatalf("Failed to create fault injector: %v", err) - } - - defer func() { - if dump { - p("Pool stats:") - factory.PrintPoolStats(t) - } - factory.DestroyAll() - }() - - // Test each timeout configuration + // Test each timeout configuration with its own fresh database for _, timeoutTest := range timeoutConfigs { t.Run(timeoutTest.name, func(t *testing.T) { - // redefine p and e for each test to get - // proper test name in logs and proper test failures - var p = func(format string, args ...interface{}) { - format = "[%s][ENDPOINT-TYPES] " + format - ts := time.Now().Format("15:04:05.000") - args = append([]interface{}{ts}, args...) - t.Logf(format, args...) + // Setup: Create fresh database and client factory for THIS timeout config test + bdbID, factory, cleanup := SetupTestDatabaseAndFactory(t, ctx, "standalone") + defer cleanup() + t.Logf("[TIMEOUT-CONFIGS-%s] Created test database with bdb_id: %d", timeoutTest.name, bdbID) + + // Get endpoint config from factory (now connected to new database) + endpointConfig := factory.GetConfig() + + // Create fault injector + faultInjector, err := CreateTestFaultInjector() + if err != nil { + t.Fatalf("[ERROR] Failed to create fault injector: %v", err) } - var e = func(format string, args ...interface{}) { - format = "[%s][ENDPOINT-TYPES][ERROR] " + format - ts := time.Now().Format("15:04:05.000") - args = append([]interface{}{ts}, args...) - t.Errorf(format, args...) + defer func() { + if dump { + p("Pool stats:") + factory.PrintPoolStats(t) + } + }() + + errorsDetected = false + var ef = func(format string, args ...interface{}) { + printLog("TIMEOUT-CONFIGS", true, format, args...) + t.FailNow() } + p("Testing timeout configuration: %s - %s", timeoutTest.name, timeoutTest.description) p("Expected behavior: %s", timeoutTest.expectedBehavior) p("Handoff timeout: %v, Relaxed timeout: %v, Post-handoff duration: %v", @@ -141,7 +130,7 @@ func TestTimeoutConfigurationsPushNotifications(t *testing.T) { ClientName: fmt.Sprintf("timeout-test-%s", timeoutTest.name), }) if err != nil { - t.Fatalf("Failed to create client for %s: %v", timeoutTest.name, err) + ef("Failed to create client for %s: %v", timeoutTest.name, err) } // Create timeout tracker @@ -149,17 +138,13 @@ func TestTimeoutConfigurationsPushNotifications(t *testing.T) { logger := maintnotifications.NewLoggingHook(int(logging.LogLevelDebug)) setupNotificationHooks(client, tracker, logger) defer func() { - if dump { - p("Tracker analysis for %s:", timeoutTest.name) - tracker.GetAnalysis().Print(t) - } tracker.Clear() }() // Verify initial connectivity err = client.Ping(ctx).Err() if err != nil { - t.Fatalf("Failed to ping Redis with %s timeout config: %v", timeoutTest.name, err) + ef("Failed to ping Redis with %s timeout config: %v", timeoutTest.name, err) } p("Client connected successfully with %s timeout configuration", timeoutTest.name) @@ -187,12 +172,11 @@ func TestTimeoutConfigurationsPushNotifications(t *testing.T) { failoverResp, err := faultInjector.TriggerAction(ctx, ActionRequest{ Type: "failover", Parameters: map[string]interface{}{ - "cluster_index": "0", - "bdb_id": endpointConfig.BdbID, + "bdb_id": endpointConfig.BdbID, }, }) if err != nil { - t.Fatalf("Failed to trigger failover action for %s: %v", timeoutTest.name, err) + ef("Failed to trigger failover action for %s: %v", timeoutTest.name, err) } // Wait for FAILING_OVER notification @@ -200,7 +184,7 @@ func TestTimeoutConfigurationsPushNotifications(t *testing.T) { return strings.Contains(s, logs2.ProcessingNotificationMessage) && notificationType(s, "FAILING_OVER") }, 3*time.Minute) if !found { - t.Fatalf("FAILING_OVER notification was not received for %s timeout config", timeoutTest.name) + ef("FAILING_OVER notification was not received for %s timeout config", timeoutTest.name) } failingOverData := logs2.ExtractDataFromLogMessage(match) p("FAILING_OVER notification received for %s. %v", timeoutTest.name, failingOverData) @@ -212,7 +196,7 @@ func TestTimeoutConfigurationsPushNotifications(t *testing.T) { return notificationType(s, "FAILED_OVER") && connID(s, connIDToObserve) && seqID(s, seqIDToObserve+1) }, 3*time.Minute) if !found { - t.Fatalf("FAILED_OVER notification was not received for %s timeout config", timeoutTest.name) + ef("FAILED_OVER notification was not received for %s timeout config", timeoutTest.name) } failedOverData := logs2.ExtractDataFromLogMessage(match) p("FAILED_OVER notification received for %s. %v", timeoutTest.name, failedOverData) @@ -220,12 +204,12 @@ func TestTimeoutConfigurationsPushNotifications(t *testing.T) { // Wait for failover to complete status, err := faultInjector.WaitForAction(ctx, failoverResp.ActionID, WithMaxWaitTime(180*time.Second), - WithPollInterval(1*time.Second), + WithPollInterval(2*time.Second), ) if err != nil { - t.Fatalf("[FI] Failover action failed for %s: %v", timeoutTest.name, err) + ef("[FI] Failover action failed for %s: %v", timeoutTest.name, err) } - p("[FI] Failover action completed for %s: %s", timeoutTest.name, status.Status) + p("[FI] Failover action completed for %s: %s %s", timeoutTest.name, status.Status, actionOutputIfFailed(status)) // Continue traffic to observe timeout behavior p("Continuing traffic for %v to observe timeout behavior...", timeoutTest.relaxedTimeout*2) @@ -236,58 +220,59 @@ func TestTimeoutConfigurationsPushNotifications(t *testing.T) { migrateResp, err := faultInjector.TriggerAction(ctx, ActionRequest{ Type: "migrate", Parameters: map[string]interface{}{ - "cluster_index": "0", + "bdb_id": endpointConfig.BdbID, }, }) if err != nil { - t.Fatalf("Failed to trigger migrate action for %s: %v", timeoutTest.name, err) + ef("Failed to trigger migrate action for %s: %v", timeoutTest.name, err) + } + + // Wait for migration to complete + status, err = faultInjector.WaitForAction(ctx, migrateResp.ActionID, + WithMaxWaitTime(240*time.Second), + WithPollInterval(2*time.Second), + ) + if err != nil { + ef("[FI] Migrate action failed for %s: %v", timeoutTest.name, err) } + p("[FI] Migrate action completed for %s: %s %s", timeoutTest.name, status.Status, actionOutputIfFailed(status)) + // Wait for MIGRATING notification - match, found = logCollector.WaitForLogMatchFunc(func(s string) bool { + match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { return strings.Contains(s, logs2.ProcessingNotificationMessage) && strings.Contains(s, "MIGRATING") - }, 30*time.Second) + }, 60*time.Second) if !found { - t.Fatalf("MIGRATING notification was not received for %s timeout config", timeoutTest.name) + ef("MIGRATING notification was not received for %s timeout config", timeoutTest.name) } migrateData := logs2.ExtractDataFromLogMessage(match) p("MIGRATING notification received for %s: %v", timeoutTest.name, migrateData) - // Wait for migration to complete - status, err = faultInjector.WaitForAction(ctx, migrateResp.ActionID, - WithMaxWaitTime(120*time.Second), - WithPollInterval(1*time.Second), - ) - if err != nil { - t.Fatalf("[FI] Migrate action failed for %s: %v", timeoutTest.name, err) - } - p("[FI] Migrate action completed for %s: %s", timeoutTest.name, status.Status) - // do a bind action bindResp, err := faultInjector.TriggerAction(ctx, ActionRequest{ Type: "bind", Parameters: map[string]interface{}{ - "cluster_index": "0", - "bdb_id": endpointConfig.BdbID, + "bdb_id": endpointConfig.BdbID, }, }) if err != nil { - t.Fatalf("Failed to trigger bind action for %s: %v", timeoutTest.name, err) + ef("Failed to trigger bind action for %s: %v", timeoutTest.name, err) } status, err = faultInjector.WaitForAction(ctx, bindResp.ActionID, - WithMaxWaitTime(120*time.Second), - WithPollInterval(1*time.Second), + WithMaxWaitTime(240*time.Second), + WithPollInterval(2*time.Second), ) if err != nil { - t.Fatalf("[FI] Bind action failed for %s: %v", timeoutTest.name, err) + ef("[FI] Bind action failed for %s: %v", timeoutTest.name, err) } - p("[FI] Bind action completed for %s: %s", timeoutTest.name, status.Status) + p("[FI] Bind action completed for %s: %s %s", timeoutTest.name, status.Status, actionOutputIfFailed(status)) + // waiting for moving notification match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { return strings.Contains(s, logs2.ProcessingNotificationMessage) && notificationType(s, "MOVING") - }, 2*time.Minute) + }, 3*time.Minute) if !found { - t.Fatalf("MOVING notification was not received for %s timeout config", timeoutTest.name) + ef("MOVING notification was not received for %s timeout config", timeoutTest.name) } movingData := logs2.ExtractDataFromLogMessage(match) @@ -350,6 +335,13 @@ func TestTimeoutConfigurationsPushNotifications(t *testing.T) { e("Expected successful handoffs with %s config, got none", timeoutTest.name) } + if errorsDetected { + logCollector.DumpLogs() + trackerAnalysis.Print(t) + logCollector.Clear() + tracker.Clear() + ef("[FAIL] Errors detected with %s timeout config", timeoutTest.name) + } p("Timeout configuration %s test completed successfully in %v", timeoutTest.name, testDuration) p("Command runner stats:") p("Operations: %d, Errors: %d, Timeout Errors: %d", diff --git a/maintnotifications/e2e/scenario_tls_configs_test.go b/maintnotifications/e2e/scenario_tls_configs_test.go index cbaec43a8..243ea3b7c 100644 --- a/maintnotifications/e2e/scenario_tls_configs_test.go +++ b/maintnotifications/e2e/scenario_tls_configs_test.go @@ -15,20 +15,23 @@ import ( // TODO ADD TLS CONFIGS // TestTLSConfigurationsPushNotifications tests push notifications with different TLS configurations -func TestTLSConfigurationsPushNotifications(t *testing.T) { +func ТestTLSConfigurationsPushNotifications(t *testing.T) { if os.Getenv("E2E_SCENARIO_TESTS") != "true" { t.Skip("Scenario tests require E2E_SCENARIO_TESTS=true") } - ctx, cancel := context.WithTimeout(context.Background(), 20*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 25*time.Minute) defer cancel() var dump = true + var errorsDetected = false var p = func(format string, args ...interface{}) { - format = "[%s][TLS-CONFIGS] " + format - ts := time.Now().Format("15:04:05.000") - args = append([]interface{}{ts}, args...) - t.Logf(format, args...) + printLog("TLS-CONFIGS", false, format, args...) + } + + var e = func(format string, args ...interface{}) { + errorsDetected = true + printLog("TLS-CONFIGS", true, format, args...) } // Test different TLS configurations @@ -64,54 +67,39 @@ func TestTLSConfigurationsPushNotifications(t *testing.T) { logCollector.ClearLogs() defer func() { - if dump { - p("Dumping logs...") - logCollector.DumpLogs() - p("Log Analysis:") - logCollector.GetAnalysis().Print(t) - } logCollector.Clear() }() - // Create client factory from configuration - factory, err := CreateTestClientFactory("standalone") - if err != nil { - t.Skipf("Enterprise cluster not available, skipping TLS configs test: %v", err) - } - endpointConfig := factory.GetConfig() - - // Create fault injector - faultInjector, err := CreateTestFaultInjector() - if err != nil { - t.Fatalf("Failed to create fault injector: %v", err) - } - - defer func() { - if dump { - p("Pool stats:") - factory.PrintPoolStats(t) - } - factory.DestroyAll() - }() - - // Test each TLS configuration + // Test each TLS configuration with its own fresh database for _, tlsTest := range tlsConfigs { t.Run(tlsTest.name, func(t *testing.T) { - // redefine p and e for each test to get - // proper test name in logs and proper test failures - var p = func(format string, args ...interface{}) { - format = "[%s][ENDPOINT-TYPES] " + format - ts := time.Now().Format("15:04:05.000") - args = append([]interface{}{ts}, args...) - t.Logf(format, args...) + // Setup: Create fresh database and client factory for THIS TLS config test + bdbID, factory, cleanup := SetupTestDatabaseAndFactory(t, ctx, "standalone") + defer cleanup() + t.Logf("[TLS-CONFIGS-%s] Created test database with bdb_id: %d", tlsTest.name, bdbID) + + // Get endpoint config from factory (now connected to new database) + endpointConfig := factory.GetConfig() + + // Create fault injector + faultInjector, err := CreateTestFaultInjector() + if err != nil { + t.Fatalf("[ERROR] Failed to create fault injector: %v", err) } - var e = func(format string, args ...interface{}) { - format = "[%s][ENDPOINT-TYPES][ERROR] " + format - ts := time.Now().Format("15:04:05.000") - args = append([]interface{}{ts}, args...) - t.Errorf(format, args...) + defer func() { + if dump { + p("Pool stats:") + factory.PrintPoolStats(t) + } + }() + + errorsDetected = false + var ef = func(format string, args ...interface{}) { + printLog("TLS-CONFIGS", true, format, args...) + t.FailNow() } + if tlsTest.skipReason != "" { t.Skipf("Skipping %s: %s", tlsTest.name, tlsTest.skipReason) } @@ -144,7 +132,7 @@ func TestTLSConfigurationsPushNotifications(t *testing.T) { if tlsTest.name == "TLSSecure" || tlsTest.name == "TLSStrict" { t.Skipf("TLS configuration %s failed (expected in test environment): %v", tlsTest.name, err) } - t.Fatalf("Failed to create client for %s: %v", tlsTest.name, err) + ef("Failed to create client for %s: %v", tlsTest.name, err) } // Create timeout tracker @@ -152,10 +140,6 @@ func TestTLSConfigurationsPushNotifications(t *testing.T) { logger := maintnotifications.NewLoggingHook(int(logging.LogLevelDebug)) setupNotificationHooks(client, tracker, logger) defer func() { - if dump { - p("Tracker analysis for %s:", tlsTest.name) - tracker.GetAnalysis().Print(t) - } tracker.Clear() }() @@ -165,7 +149,7 @@ func TestTLSConfigurationsPushNotifications(t *testing.T) { if tlsTest.name == "TLSSecure" || tlsTest.name == "TLSStrict" { t.Skipf("TLS configuration %s ping failed (expected in test environment): %v", tlsTest.name, err) } - t.Fatalf("Failed to ping Redis with %s TLS config: %v", tlsTest.name, err) + ef("Failed to ping Redis with %s TLS config: %v", tlsTest.name, err) } p("Client connected successfully with %s TLS configuration", tlsTest.name) @@ -185,82 +169,37 @@ func TestTLSConfigurationsPushNotifications(t *testing.T) { commandsRunner.FireCommandsUntilStop(ctx) }() - // Test failover with this TLS configuration - p("Testing failover with %s TLS configuration...", tlsTest.name) - failoverResp, err := faultInjector.TriggerAction(ctx, ActionRequest{ - Type: "failover", - Parameters: map[string]interface{}{ - "cluster_index": "0", - "bdb_id": endpointConfig.BdbID, - }, - }) - if err != nil { - t.Fatalf("Failed to trigger failover action for %s: %v", tlsTest.name, err) - } - - // Wait for FAILING_OVER notification - match, found := logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { - return strings.Contains(s, logs2.ProcessingNotificationMessage) && notificationType(s, "FAILING_OVER") - }, 2*time.Minute) - if !found { - t.Fatalf("FAILING_OVER notification was not received for %s TLS config", tlsTest.name) - } - failingOverData := logs2.ExtractDataFromLogMessage(match) - p("FAILING_OVER notification received for %s. %v", tlsTest.name, failingOverData) - - // Wait for FAILED_OVER notification - seqIDToObserve := int64(failingOverData["seqID"].(float64)) - connIDToObserve := uint64(failingOverData["connID"].(float64)) - match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { - return notificationType(s, "FAILED_OVER") && connID(s, connIDToObserve) && seqID(s, seqIDToObserve+1) - }, 2*time.Minute) - if !found { - t.Fatalf("FAILED_OVER notification was not received for %s TLS config", tlsTest.name) - } - failedOverData := logs2.ExtractDataFromLogMessage(match) - p("FAILED_OVER notification received for %s. %v", tlsTest.name, failedOverData) - - // Wait for failover to complete - status, err := faultInjector.WaitForAction(ctx, failoverResp.ActionID, - WithMaxWaitTime(120*time.Second), - WithPollInterval(1*time.Second), - ) - if err != nil { - t.Fatalf("[FI] Failover action failed for %s: %v", tlsTest.name, err) - } - p("[FI] Failover action completed for %s: %s", tlsTest.name, status.Status) - // Test migration with this TLS configuration p("Testing migration with %s TLS configuration...", tlsTest.name) migrateResp, err := faultInjector.TriggerAction(ctx, ActionRequest{ Type: "migrate", Parameters: map[string]interface{}{ - "cluster_index": "0", + "bdb_id": endpointConfig.BdbID, }, }) if err != nil { - t.Fatalf("Failed to trigger migrate action for %s: %v", tlsTest.name, err) + ef("Failed to trigger migrate action for %s: %v", tlsTest.name, err) } // Wait for MIGRATING notification - match, found = logCollector.WaitForLogMatchFunc(func(s string) bool { + match, found := logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool { return strings.Contains(s, logs2.ProcessingNotificationMessage) && strings.Contains(s, "MIGRATING") - }, 30*time.Second) + }, 60*time.Second) if !found { - t.Fatalf("MIGRATING notification was not received for %s TLS config", tlsTest.name) + ef("MIGRATING notification was not received for %s TLS config", tlsTest.name) } migrateData := logs2.ExtractDataFromLogMessage(match) p("MIGRATING notification received for %s: %v", tlsTest.name, migrateData) // Wait for migration to complete - status, err = faultInjector.WaitForAction(ctx, migrateResp.ActionID, - WithMaxWaitTime(120*time.Second), - WithPollInterval(1*time.Second), + status, err := faultInjector.WaitForAction(ctx, migrateResp.ActionID, + WithMaxWaitTime(240*time.Second), + WithPollInterval(2*time.Second), ) if err != nil { - t.Fatalf("[FI] Migrate action failed for %s: %v", tlsTest.name, err) + ef("[FI] Migrate action failed for %s: %v", tlsTest.name, err) } - p("[FI] Migrate action completed for %s: %s", tlsTest.name, status.Status) + p("[FI] Migrate action completed for %s: %s %s", tlsTest.name, status.Status, actionOutputIfFailed(status)) // Continue traffic for a bit to observe TLS behavior time.Sleep(5 * time.Second) @@ -287,6 +226,13 @@ func TestTLSConfigurationsPushNotifications(t *testing.T) { e("Expected MIGRATING notifications with %s TLS config, got none", tlsTest.name) } + if errorsDetected { + logCollector.DumpLogs() + trackerAnalysis.Print(t) + logCollector.Clear() + tracker.Clear() + ef("[FAIL] Errors detected with %s TLS config", tlsTest.name) + } // TLS-specific validations stats := commandsRunner.GetStats() switch tlsTest.name { diff --git a/maintnotifications/e2e/utils_test.go b/maintnotifications/e2e/utils_test.go index eb3cbe0b0..a60fac89f 100644 --- a/maintnotifications/e2e/utils_test.go +++ b/maintnotifications/e2e/utils_test.go @@ -1,5 +1,12 @@ package e2e +import ( + "fmt" + "path/filepath" + "runtime" + "time" +) + func isTimeout(errMsg string) bool { return contains(errMsg, "i/o timeout") || contains(errMsg, "deadline exceeded") || @@ -42,3 +49,28 @@ func min(a, b int) int { } return b } + +func printLog(group string, isError bool, format string, args ...interface{}) { + _, filename, line, _ := runtime.Caller(2) + filename = filepath.Base(filename) + finalFormat := "%s:%d [%s][%s] " + format + "\n" + if isError { + finalFormat = "%s:%d [%s][%s][ERROR] " + format + "\n" + } + ts := time.Now().Format("15:04:05.000") + args = append([]interface{}{filename, line, ts, group}, args...) + fmt.Printf(finalFormat, args...) +} + +func actionOutputIfFailed(status *ActionStatusResponse) string { + if status.Status != StatusFailed { + return "" + } + if status.Error != nil { + return fmt.Sprintf("%v", status.Error) + } + if status.Output == nil { + return "" + } + return fmt.Sprintf("%+v", status.Output) +}