diff --git a/Makefile b/Makefile index a8b8aa1b..4ee14986 100644 --- a/Makefile +++ b/Makefile @@ -37,6 +37,7 @@ PROGS = helloworld \ sideeffect \ sleep \ dataconverter \ + autoscaling-monitoring \ TEST_ARG ?= -race -v -timeout 5m BUILD := ./build @@ -186,6 +187,9 @@ versioning: dataconverter: go build -o bin/dataconverter cmd/samples/recipes/dataconverter/*.go +autoscaling-monitoring: + go build -o bin/autoscaling-monitoring cmd/samples/advanced/autoscaling-monitoring/*.go + bins: helloworld \ versioning \ delaystart \ @@ -219,6 +223,7 @@ bins: helloworld \ sideeffect \ sleep \ dataconverter \ + autoscaling-monitoring \ test: bins @rm -f test diff --git a/cmd/samples/advanced/autoscaling-monitoring/Makefile b/cmd/samples/advanced/autoscaling-monitoring/Makefile new file mode 100644 index 00000000..4527b608 --- /dev/null +++ b/cmd/samples/advanced/autoscaling-monitoring/Makefile @@ -0,0 +1,26 @@ +# Makefile for autoscaling monitoring sample + +.PHONY: build clean test + +# Build the autoscaling monitoring sample +build: + go build -o ../../../../bin/autoscaling-monitoring *.go + +# Clean build artifacts +clean: + rm -f ../../../../bin/autoscaling-monitoring + +# Run tests +test: + go test -v . + +# Install dependencies +deps: + go mod tidy + +# Run the sample in different modes +run-worker: build + ../../../../bin/autoscaling-monitoring -m worker + +run-trigger: build + ../../../../bin/autoscaling-monitoring -m trigger diff --git a/cmd/samples/advanced/autoscaling-monitoring/README.md b/cmd/samples/advanced/autoscaling-monitoring/README.md new file mode 100644 index 00000000..e2c82bbe --- /dev/null +++ b/cmd/samples/advanced/autoscaling-monitoring/README.md @@ -0,0 +1,305 @@ +# Autoscaling Monitoring Sample + +This sample demonstrates three advanced Cadence worker features: + +1. **Worker Poller Autoscaling** - Dynamic adjustment of worker poller goroutines based on workload +2. **Integrated Prometheus Metrics** - Real-time metrics collection using Tally with Prometheus reporter +3. **Autoscaling Metrics** - Comprehensive autoscaling behavior metrics exposed via HTTP endpoint + +## Features + +### Worker Poller Autoscaling +The worker uses `worker.NewV2` with `AutoScalerOptions` to enable true autoscaling behavior: +- **AutoScalerOptions.Enabled**: true - Enables the autoscaling feature +- **PollerMinCount**: 2 - Minimum number of poller goroutines +- **PollerMaxCount**: 8 - Maximum number of poller goroutines +- **PollerInitCount**: 4 - Initial number of poller goroutines + +The worker automatically adjusts the number of poller goroutines between the min and max values based on the current workload. + +### Prometheus Metrics +The sample uses Tally with Prometheus reporter to expose comprehensive metrics: +- **Real-time autoscaling metrics** - Poller count changes, quota adjustments, wait times +- **Worker performance metrics** - Task processing rates, poller utilization, queue depths +- **Standard Cadence metrics** - All metrics automatically emitted by the Cadence Go client +- **Sanitized metric names** - Prometheus-compatible metric names and labels + +### Monitoring Dashboards +When running the Cadence server locally with Grafana, you can access the client dashboards at: + +**Client Dashboards**: http://localhost:3000/d/dehkspwgabvuoc/cadence-client + +> **Note**: Make sure to select a Domain in Grafana for the dashboards to display data. The dashboards will be empty until a domain is selected from the dropdown. + + +## Prerequisites + +1. **Cadence Server**: Running locally with Docker Compose. +2. **Prometheus**: Configured to scrape metrics from the sample. +3. **Grafana**: With Cadence dashboards (included with default Cadence server setup). Dashboards in the latest version of the server. + +## Quick Start + +### 1. Start the Worker +```bash +./bin/autoscaling-monitoring -m worker +``` + +The worker automatically exposes metrics at: http://127.0.0.1:8004/metrics + +### 2. Generate Load +```bash +./bin/autoscaling-monitoring -m trigger +``` + +## Configuration + +The sample uses a custom configuration system that extends the base Cadence configuration. You can specify a configuration file using the `-config` flag: + +```bash +./bin/autoscaling-monitoring -m worker -config /path/to/config.yaml +``` + +### Configuration File Structure + +```yaml +# Cadence connection settings +domain: "default" +service: "cadence-frontend" +host: "localhost:7833" + +# Prometheus configuration +prometheus: + listenAddress: "127.0.0.1:8004" + +# Autoscaling configuration +autoscaling: + # Worker autoscaling settings + pollerMinCount: 2 + pollerMaxCount: 8 + pollerInitCount: 4 + + # Load generation settings + loadGeneration: + # Workflow-level settings + workflows: 3 # Number of workflows to start + workflowDelay: 1000 # Delay between starting workflows (milliseconds) + + # Activity-level settings (per workflow) + activitiesPerWorkflow: 40 # Number of activities per workflow + batchDelay: 2000 # Delay between activity batches within workflow (milliseconds) + + # Activity processing time range (milliseconds) + minProcessingTime: 1000 + maxProcessingTime: 6000 +``` + +### Configuration Usage + +The configuration values are used throughout the sample: + +1. **Worker Configuration** (`worker_config.go`): + - `pollerMinCount`, `pollerMaxCount`, `pollerInitCount` → `AutoScalerOptions` + +2. **Workflow Configuration** (`workflow.go`): + - `activitiesPerWorkflow` → Number of activities to execute per workflow + - `batchDelay` → Delay between activity batches within workflow + +3. **Activity Configuration** (`activities.go`): + - `minProcessingTime`, `maxProcessingTime` → Activity processing time range + +4. **Prometheus Configuration** (integrated): + - `listenAddress` → Metrics endpoint port (default: 127.0.0.1:8004) + +### Default Configuration + +If no configuration file is provided or if the file cannot be read, the sample uses these defaults: + +```yaml +domain: "default" +service: "cadence-frontend" +host: "localhost:7833" +prometheus: + listenAddress: "127.0.0.1:8004" +autoscaling: + pollerMinCount: 2 + pollerMaxCount: 8 + pollerInitCount: 4 + loadGeneration: + workflows: 3 + workflowDelay: 1000 + activitiesPerWorkflow: 40 + batchDelay: 2000 + minProcessingTime: 1000 + maxProcessingTime: 6000 +``` + +### Load Pattern Examples + +The sample supports various load patterns for testing autoscaling behavior: + +#### **1. Gradual Ramp-up (Default)** +```yaml +loadGeneration: + workflows: 3 + workflowDelay: 1000 + activitiesPerWorkflow: 40 +``` +**Result**: 3 workflows starting 1 second apart, each with 40 activities (120 total activities) + +#### **2. Burst Load** +```yaml +loadGeneration: + workflows: 10 + workflowDelay: 0 + activitiesPerWorkflow: 20 +``` +**Result**: 10 workflows all starting immediately (200 total activities) + +#### **3. Sustained Load** +```yaml +loadGeneration: + workflows: 5 + workflowDelay: 5000 + activitiesPerWorkflow: 100 +``` +**Result**: 5 long-running workflows with 5-second delays between starts (500 total activities) + +#### **4. Light Load** +```yaml +loadGeneration: + workflows: 1 + workflowDelay: 0 + activitiesPerWorkflow: 20 +``` +**Result**: Single workflow with 20 activities for minimal load testing + +## Monitoring + +### Metrics Endpoints +- **Prometheus Metrics**: http://127.0.0.1:8004/metrics + - Exposed automatically when running worker mode only + - Real-time autoscaling and worker performance metrics + - Prometheus-compatible format with sanitized names + - **Note**: Metrics server is not started in trigger mode + +### Grafana Dashboard +Access the Cadence client dashboard at: http://localhost:3000/d/dehkspwgabvuoc/cadence-client + +### Key Metrics to Monitor + +1. **Worker Performance Metrics**: + - `cadence_worker_decision_poll_success_count` - Successful decision task polls + - `cadence_worker_activity_poll_success_count` - Successful activity task polls + - `cadence_worker_decision_poll_count` - Total decision task poll attempts + - `cadence_worker_activity_poll_count` - Total activity task poll attempts + +2. **Autoscaling Behavior Metrics**: + - `cadence_worker_poller_count` - Number of active poller goroutines (key autoscaling indicator) + - `cadence_concurrency_auto_scaler_poller_quota` - Current poller quota for autoscaling + - `cadence_concurrency_auto_scaler_poller_wait_time` - Time pollers wait for tasks + - `cadence_concurrency_auto_scaler_scale_up_count` - Number of scale-up events + - `cadence_concurrency_auto_scaler_scale_down_count` - Number of scale-down events + +## How It Works + +### Load Generation +The sample creates multiple workflows that execute activities in parallel, with each workflow: +- Starting with configurable delays (`workflowDelay`) to create sustained load patterns +- Executing a configurable number of activities (`activitiesPerWorkflow`) per workflow +- Each activity taking 1-6 seconds to complete (configurable via `minProcessingTime`/`maxProcessingTime`) +- Recording metrics about execution time +- Creating varying load patterns with configurable batch delays within each workflow + +### Autoscaling Demonstration +The worker uses `worker.NewV2` with `AutoScalerOptions` to: +- Start with configurable poller goroutines (`pollerInitCount`) +- Scale down to minimum pollers (`pollerMinCount`) when load is low +- Scale up to maximum pollers (`pollerMaxCount`) when load is high +- Automatically adjust based on task queue depth and processing time + +### Metrics Collection +The sample uses Tally with Prometheus reporter for comprehensive metrics: +- **Real-time autoscaling metrics** - Poller count changes, quota adjustments, scale events +- **Worker performance metrics** - Task processing rates, poller utilization, queue depths +- **Standard Cadence metrics** - All metrics automatically emitted by the Cadence Go client +- **Sanitized metric names** - Prometheus-compatible format with proper character replacement + +## Production Considerations + +### Scaling +- Adjust `pollerMinCount`, `pollerMaxCount`, and `pollerInitCount` based on your workload +- Monitor worker performance and adjust autoscaling parameters +- Use multiple worker instances for high availability + +### Monitoring +- Configure Prometheus to scrape metrics regularly (latest version of Cadence server is configured to do this) +- Set up alerts for worker performance issues +- Use Grafana dashboards to visualize autoscaling behavior +- Monitor poller count changes to verify autoscaling is working + +### Security +- Secure the Prometheus endpoint in production +- Use authentication for metrics access +- Consider using HTTPS for metrics endpoints + +## Testing + +The sample includes unit tests for the configuration loading functionality. Run these tests if you make any changes to the config: + +### Running Tests +```bash +# Run all tests +go test -v + +# Run specific test +go test -v -run TestLoadConfiguration_SuccessfulLoading + +# Run tests with coverage +go test -v -cover +``` + +### Test Coverage +The tests cover: +- **Successful configuration loading** - Complete YAML files with all fields +- **Missing file fallback** - Graceful handling when config file doesn't exist +- **Default value application** - Ensuring all fields have sensible defaults + +### Configuration Testing +The tests validate that the improved configuration system: +- Handles embedded struct issues properly +- Applies defaults correctly for missing fields +- Provides clear error messages for configuration problems +- Maintains backward compatibility + +## Troubleshooting + +### Common Issues + +1. **Worker Not Starting**: + - Check Cadence server is running + - Verify domain exists + - Check configuration file + - Ensure using compatible Cadence client version + +2. **Autoscaling Not Working**: + - Verify `worker.NewV2` is being used + - Check `AutoScalerOptions.Enabled` is true + - Monitor poller count changes in logs + - Ensure sufficient load is being generated + +3. **Configuration Issues**: + - Verify configuration file path is correct + - Check YAML syntax in configuration file + - Review default values if config file is not found + +4. **Metrics Not Appearing**: + - Verify worker is running (metrics are exposed automatically) + - Check metrics endpoint is accessible: http://127.0.0.1:8004/metrics + - Ensure Prometheus is configured to scrape the endpoint + - Check for metric name sanitization issues + +5. **Dashboard Not Loading**: + - Verify Grafana is running + - Check dashboard URL is correct + - Ensure Prometheus data source is configured diff --git a/cmd/samples/advanced/autoscaling-monitoring/activities.go b/cmd/samples/advanced/autoscaling-monitoring/activities.go new file mode 100644 index 00000000..b896a79a --- /dev/null +++ b/cmd/samples/advanced/autoscaling-monitoring/activities.go @@ -0,0 +1,35 @@ +package main + +import ( + "context" + "math/rand" + "time" + + "go.uber.org/cadence/activity" + "go.uber.org/zap" +) + +const ( + loadGenerationActivityName = "loadGenerationActivity" +) + +// LoadGenerationActivity simulates work that can be scaled +// It includes random delays to simulate real-world processing time +func LoadGenerationActivity(ctx context.Context, taskID int, minProcessingTime, maxProcessingTime int) error { + startTime := time.Now() + logger := activity.GetLogger(ctx) + logger.Info("Load generation activity started", zap.Int("taskID", taskID)) + + // Simulate variable processing time using configuration values + processingTime := time.Duration(rand.Intn(maxProcessingTime - minProcessingTime) + minProcessingTime) * time.Millisecond + time.Sleep(processingTime) + + duration := time.Since(startTime) + + logger.Info("Load generation activity completed", + zap.Int("taskID", taskID), + zap.Duration("processingTime", processingTime), + zap.Duration("totalDuration", duration)) + + return nil +} diff --git a/cmd/samples/advanced/autoscaling-monitoring/config.go b/cmd/samples/advanced/autoscaling-monitoring/config.go new file mode 100644 index 00000000..1dd150cd --- /dev/null +++ b/cmd/samples/advanced/autoscaling-monitoring/config.go @@ -0,0 +1,174 @@ +package main + +import ( + "fmt" + "os" + + "github.com/uber-common/cadence-samples/cmd/samples/common" + "github.com/uber-go/tally/prometheus" + "gopkg.in/yaml.v3" +) + +// AutoscalingConfiguration uses a flattened structure to avoid embedded struct issues +type AutoscalingConfiguration struct { + // Base configuration fields (explicit, not embedded) + DomainName string `yaml:"domain"` + ServiceName string `yaml:"service"` + HostNameAndPort string `yaml:"host"` + Prometheus *prometheus.Configuration `yaml:"prometheus"` + + // Autoscaling-specific fields + Autoscaling AutoscalingSettings `yaml:"autoscaling"` +} + +// AutoscalingSettings contains the autoscaling configuration +type AutoscalingSettings struct { + // Worker autoscaling settings + PollerMinCount int `yaml:"pollerMinCount"` + PollerMaxCount int `yaml:"pollerMaxCount"` + PollerInitCount int `yaml:"pollerInitCount"` + + // Load generation settings + LoadGeneration LoadGenerationSettings `yaml:"loadGeneration"` +} + +// LoadGenerationSettings contains the load generation configuration +type LoadGenerationSettings struct { + // Workflow-level settings + Workflows int `yaml:"workflows"` + WorkflowDelay int `yaml:"workflowDelay"` + + // Activity-level settings (per workflow) + ActivitiesPerWorkflow int `yaml:"activitiesPerWorkflow"` + BatchDelay int `yaml:"batchDelay"` + MinProcessingTime int `yaml:"minProcessingTime"` + MaxProcessingTime int `yaml:"maxProcessingTime"` +} + +// Default values as constants for easy maintenance +const ( + DefaultDomainName = "default" + DefaultServiceName = "cadence-frontend" + DefaultHostNameAndPort = "localhost:7833" + DefaultPrometheusAddr = "127.0.0.1:8004" + + DefaultPollerMinCount = 2 + DefaultPollerMaxCount = 8 + DefaultPollerInitCount = 4 + + DefaultWorkflows = 3 + DefaultWorkflowDelay = 1000 + DefaultActivitiesPerWorkflow = 40 + DefaultBatchDelay = 2000 + DefaultMinProcessingTime = 1000 + DefaultMaxProcessingTime = 6000 +) + +// DefaultAutoscalingConfiguration returns default configuration +func DefaultAutoscalingConfiguration() AutoscalingConfiguration { + return AutoscalingConfiguration{ + DomainName: DefaultDomainName, + ServiceName: DefaultServiceName, + HostNameAndPort: DefaultHostNameAndPort, + Prometheus: &prometheus.Configuration{ + ListenAddress: DefaultPrometheusAddr, + }, + Autoscaling: AutoscalingSettings{ + PollerMinCount: DefaultPollerMinCount, + PollerMaxCount: DefaultPollerMaxCount, + PollerInitCount: DefaultPollerInitCount, + LoadGeneration: LoadGenerationSettings{ + Workflows: DefaultWorkflows, + WorkflowDelay: DefaultWorkflowDelay, + ActivitiesPerWorkflow: DefaultActivitiesPerWorkflow, + BatchDelay: DefaultBatchDelay, + MinProcessingTime: DefaultMinProcessingTime, + MaxProcessingTime: DefaultMaxProcessingTime, + }, + }, + } +} + +// loadConfiguration loads the autoscaling configuration from file +func loadConfiguration(configFile string) AutoscalingConfiguration { + // Start with defaults + config := DefaultAutoscalingConfiguration() + + // Read config file + configData, err := os.ReadFile(configFile) + if err != nil { + fmt.Printf("Failed to read config file: %v, using defaults\n", err) + return config + } + + // Unmarshal into the config struct + if err := yaml.Unmarshal(configData, &config); err != nil { + fmt.Printf("Error parsing configuration: %v, using defaults\n", err) + return DefaultAutoscalingConfiguration() + } + + // Apply defaults for any missing fields + config.applyDefaults() + + return config +} + +// applyDefaults ensures all fields have sensible values +func (c *AutoscalingConfiguration) applyDefaults() { + // Base configuration defaults + if c.DomainName == "" { + c.DomainName = DefaultDomainName + } + if c.ServiceName == "" { + c.ServiceName = DefaultServiceName + } + if c.HostNameAndPort == "" { + c.HostNameAndPort = DefaultHostNameAndPort + } + if c.Prometheus == nil { + c.Prometheus = &prometheus.Configuration{ + ListenAddress: DefaultPrometheusAddr, + } + } + + // Autoscaling defaults + if c.Autoscaling.PollerMinCount == 0 { + c.Autoscaling.PollerMinCount = DefaultPollerMinCount + } + if c.Autoscaling.PollerMaxCount == 0 { + c.Autoscaling.PollerMaxCount = DefaultPollerMaxCount + } + if c.Autoscaling.PollerInitCount == 0 { + c.Autoscaling.PollerInitCount = DefaultPollerInitCount + } + + // Load generation defaults + if c.Autoscaling.LoadGeneration.Workflows == 0 { + c.Autoscaling.LoadGeneration.Workflows = DefaultWorkflows + } + if c.Autoscaling.LoadGeneration.WorkflowDelay == 0 { + c.Autoscaling.LoadGeneration.WorkflowDelay = DefaultWorkflowDelay + } + if c.Autoscaling.LoadGeneration.ActivitiesPerWorkflow == 0 { + c.Autoscaling.LoadGeneration.ActivitiesPerWorkflow = DefaultActivitiesPerWorkflow + } + if c.Autoscaling.LoadGeneration.BatchDelay == 0 { + c.Autoscaling.LoadGeneration.BatchDelay = DefaultBatchDelay + } + if c.Autoscaling.LoadGeneration.MinProcessingTime == 0 { + c.Autoscaling.LoadGeneration.MinProcessingTime = DefaultMinProcessingTime + } + if c.Autoscaling.LoadGeneration.MaxProcessingTime == 0 { + c.Autoscaling.LoadGeneration.MaxProcessingTime = DefaultMaxProcessingTime + } +} + +// ToCommonConfiguration converts to the common.Configuration type for compatibility +func (c *AutoscalingConfiguration) ToCommonConfiguration() common.Configuration { + return common.Configuration{ + DomainName: c.DomainName, + ServiceName: c.ServiceName, + HostNameAndPort: c.HostNameAndPort, + Prometheus: c.Prometheus, + } +} diff --git a/cmd/samples/advanced/autoscaling-monitoring/config/autoscaling.yaml b/cmd/samples/advanced/autoscaling-monitoring/config/autoscaling.yaml new file mode 100644 index 00000000..ceb2f0be --- /dev/null +++ b/cmd/samples/advanced/autoscaling-monitoring/config/autoscaling.yaml @@ -0,0 +1,30 @@ +# Configuration for autoscaling monitoring sample +domain: "default" +service: "cadence-frontend" +host: "localhost:7833" + +# Prometheus configuration for metrics collection +prometheus: + listenAddress: "127.0.0.1:8004" + +# Autoscaling configuration +# These settings control the worker's concurrency and autoscaling behavior +autoscaling: + # Worker autoscaling settings + pollerMinCount: 2 + pollerMaxCount: 8 + pollerInitCount: 4 + + # Worker load simulation settings + loadGeneration: + # Workflow-level settings + workflows: 3 # Number of workflows to start + workflowDelay: 400 # Delay between starting workflows (milliseconds) + + # Activity-level settings (per workflow) + activitiesPerWorkflow: 40 # Number of activities per workflow + batchDelay: 750 # Delay between activity batches within workflow (milliseconds) + + # Activity processing time range (milliseconds) + minProcessingTime: 1000 + maxProcessingTime: 6000 diff --git a/cmd/samples/advanced/autoscaling-monitoring/config_test.go b/cmd/samples/advanced/autoscaling-monitoring/config_test.go new file mode 100644 index 00000000..a9207d9c --- /dev/null +++ b/cmd/samples/advanced/autoscaling-monitoring/config_test.go @@ -0,0 +1,121 @@ +package main + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Test the improved configuration loader for regressions +func TestLoadConfiguration_SuccessfulLoading(t *testing.T) { + // Create a temporary configuration file with all fields populated + configContent := ` +domain: "test-domain" +service: "test-service" +host: "test-host:7833" +prometheus: + listenAddress: "127.0.0.1:9000" +autoscaling: + pollerMinCount: 3 + pollerMaxCount: 10 + pollerInitCount: 5 + loadGeneration: + workflows: 5 + workflowDelay: 3 + activitiesPerWorkflow: 100 + batchDelay: 5 + minProcessingTime: 2000 + maxProcessingTime: 8000 +` + + // Create temporary file + tmpFile, err := os.CreateTemp("", "test-config-*.yaml") + require.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + _, err = tmpFile.WriteString(configContent) + require.NoError(t, err) + tmpFile.Close() + + // Load configuration + config := loadConfiguration(tmpFile.Name()) + + // Validate all fields are populated correctly + assert.Equal(t, "test-domain", config.DomainName) + assert.Equal(t, "test-service", config.ServiceName) + assert.Equal(t, "test-host:7833", config.HostNameAndPort) + require.NotNil(t, config.Prometheus) + assert.Equal(t, "127.0.0.1:9000", config.Prometheus.ListenAddress) + assert.Equal(t, 3, config.Autoscaling.PollerMinCount) + assert.Equal(t, 10, config.Autoscaling.PollerMaxCount) + assert.Equal(t, 5, config.Autoscaling.PollerInitCount) + assert.Equal(t, 5, config.Autoscaling.LoadGeneration.Workflows) + assert.Equal(t, 3, config.Autoscaling.LoadGeneration.WorkflowDelay) + assert.Equal(t, 100, config.Autoscaling.LoadGeneration.ActivitiesPerWorkflow) + assert.Equal(t, 5, config.Autoscaling.LoadGeneration.BatchDelay) + assert.Equal(t, 2000, config.Autoscaling.LoadGeneration.MinProcessingTime) + assert.Equal(t, 8000, config.Autoscaling.LoadGeneration.MaxProcessingTime) +} + +func TestLoadConfiguration_MissingFileFallback(t *testing.T) { + // Use a non-existent file path + config := loadConfiguration("/non/existent/path/config.yaml") + + // Validate that default configuration is returned + assert.Equal(t, DefaultDomainName, config.DomainName) + assert.Equal(t, DefaultServiceName, config.ServiceName) + assert.Equal(t, DefaultHostNameAndPort, config.HostNameAndPort) + assert.Equal(t, DefaultPollerMinCount, config.Autoscaling.PollerMinCount) + assert.Equal(t, DefaultPollerMaxCount, config.Autoscaling.PollerMaxCount) + assert.Equal(t, DefaultPollerInitCount, config.Autoscaling.PollerInitCount) + assert.Equal(t, DefaultWorkflows, config.Autoscaling.LoadGeneration.Workflows) + assert.Equal(t, DefaultWorkflowDelay, config.Autoscaling.LoadGeneration.WorkflowDelay) + assert.Equal(t, DefaultActivitiesPerWorkflow, config.Autoscaling.LoadGeneration.ActivitiesPerWorkflow) + assert.Equal(t, DefaultBatchDelay, config.Autoscaling.LoadGeneration.BatchDelay) + assert.Equal(t, DefaultMinProcessingTime, config.Autoscaling.LoadGeneration.MinProcessingTime) + assert.Equal(t, DefaultMaxProcessingTime, config.Autoscaling.LoadGeneration.MaxProcessingTime) +} + +func TestDefaultAutoscalingConfiguration(t *testing.T) { + config := DefaultAutoscalingConfiguration() + + // Validate all default values + assert.Equal(t, DefaultDomainName, config.DomainName) + assert.Equal(t, DefaultServiceName, config.ServiceName) + assert.Equal(t, DefaultHostNameAndPort, config.HostNameAndPort) + require.NotNil(t, config.Prometheus) + assert.Equal(t, DefaultPrometheusAddr, config.Prometheus.ListenAddress) + assert.Equal(t, DefaultPollerMinCount, config.Autoscaling.PollerMinCount) + assert.Equal(t, DefaultPollerMaxCount, config.Autoscaling.PollerMaxCount) + assert.Equal(t, DefaultPollerInitCount, config.Autoscaling.PollerInitCount) + assert.Equal(t, DefaultWorkflows, config.Autoscaling.LoadGeneration.Workflows) + assert.Equal(t, DefaultWorkflowDelay, config.Autoscaling.LoadGeneration.WorkflowDelay) + assert.Equal(t, DefaultActivitiesPerWorkflow, config.Autoscaling.LoadGeneration.ActivitiesPerWorkflow) + assert.Equal(t, DefaultBatchDelay, config.Autoscaling.LoadGeneration.BatchDelay) + assert.Equal(t, DefaultMinProcessingTime, config.Autoscaling.LoadGeneration.MinProcessingTime) + assert.Equal(t, DefaultMaxProcessingTime, config.Autoscaling.LoadGeneration.MaxProcessingTime) +} + +func TestApplyDefaults(t *testing.T) { + // Test with empty configuration + config := AutoscalingConfiguration{} + config.applyDefaults() + + // Validate that all defaults are applied + assert.Equal(t, DefaultDomainName, config.DomainName) + assert.Equal(t, DefaultServiceName, config.ServiceName) + assert.Equal(t, DefaultHostNameAndPort, config.HostNameAndPort) + require.NotNil(t, config.Prometheus) + assert.Equal(t, DefaultPrometheusAddr, config.Prometheus.ListenAddress) + assert.Equal(t, DefaultPollerMinCount, config.Autoscaling.PollerMinCount) + assert.Equal(t, DefaultPollerMaxCount, config.Autoscaling.PollerMaxCount) + assert.Equal(t, DefaultPollerInitCount, config.Autoscaling.PollerInitCount) + assert.Equal(t, DefaultWorkflows, config.Autoscaling.LoadGeneration.Workflows) + assert.Equal(t, DefaultWorkflowDelay, config.Autoscaling.LoadGeneration.WorkflowDelay) + assert.Equal(t, DefaultActivitiesPerWorkflow, config.Autoscaling.LoadGeneration.ActivitiesPerWorkflow) + assert.Equal(t, DefaultBatchDelay, config.Autoscaling.LoadGeneration.BatchDelay) + assert.Equal(t, DefaultMinProcessingTime, config.Autoscaling.LoadGeneration.MinProcessingTime) + assert.Equal(t, DefaultMaxProcessingTime, config.Autoscaling.LoadGeneration.MaxProcessingTime) +} diff --git a/cmd/samples/advanced/autoscaling-monitoring/main.go b/cmd/samples/advanced/autoscaling-monitoring/main.go new file mode 100644 index 00000000..5740477d --- /dev/null +++ b/cmd/samples/advanced/autoscaling-monitoring/main.go @@ -0,0 +1,146 @@ +package main + +import ( + "flag" + "fmt" + "net/http" + "os" + "time" + + "github.com/pborman/uuid" + "go.uber.org/cadence/client" + + "github.com/uber-common/cadence-samples/cmd/samples/common" + "github.com/uber-go/tally" + "github.com/uber-go/tally/prometheus" + "go.uber.org/zap" +) + +const ( + ApplicationName = "autoscaling-monitoring" +) + +func main() { + // Parse command line arguments + var mode string + flag.StringVar(&mode, "m", "worker", "Mode: worker or trigger") + flag.Parse() + + // Load configuration + configFile := "config/autoscaling.yaml" + config := loadConfiguration(configFile) + + // Setup common helper with our configuration + var h common.SampleHelper + h.Config = config.ToCommonConfiguration() + + // Set up logging + logger, err := zap.NewDevelopment() + if err != nil { + panic(fmt.Sprintf("Failed to setup logger: %v", err)) + } + h.Logger = logger + + // Set up service client using our config + h.Builder = common.NewBuilder(logger). + SetHostPort(config.HostNameAndPort). + SetDomain(config.DomainName) + + service, err := h.Builder.BuildServiceClient() + if err != nil { + panic(fmt.Sprintf("Failed to build service client: %v", err)) + } + h.Service = service + + // Set up metrics scope with Tally Prometheus reporter + var ( + safeCharacters = []rune{'_'} + sanitizeOptions = tally.SanitizeOptions{ + NameCharacters: tally.ValidCharacters{ + Ranges: tally.AlphanumericRange, + Characters: safeCharacters, + }, + KeyCharacters: tally.ValidCharacters{ + Ranges: tally.AlphanumericRange, + Characters: safeCharacters, + }, + ValueCharacters: tally.ValidCharacters{ + Ranges: tally.AlphanumericRange, + Characters: safeCharacters, + }, + ReplacementCharacter: tally.DefaultReplacementCharacter, + } + ) + + // Create Prometheus reporter + reporter := prometheus.NewReporter(prometheus.Options{}) + + // Create root scope with proper options + scope, closer := tally.NewRootScope(tally.ScopeOptions{ + Tags: map[string]string{"service": "autoscaling-monitoring"}, + SanitizeOptions: &sanitizeOptions, + CachedReporter: reporter, + }, 10) + defer closer.Close() + + // Set up metrics scope for helper + h.WorkerMetricScope = scope + h.ServiceMetricScope = scope + + switch mode { + case "worker": + // Start metrics server only in worker mode + if config.Prometheus != nil { + go func() { + http.Handle("/metrics", reporter.HTTPHandler()) + logger.Info("Starting Prometheus metrics server", + zap.String("port", config.Prometheus.ListenAddress)) + if err := http.ListenAndServe(config.Prometheus.ListenAddress, nil); err != nil { + logger.Error("Failed to start metrics server", zap.Error(err)) + } + }() + } + startWorkers(&h, &config) + case "trigger": + startWorkflow(&h, &config) + default: + fmt.Printf("Unknown mode: %s\n", mode) + os.Exit(1) + } +} + +func startWorkers(h *common.SampleHelper, config *AutoscalingConfiguration) { + startWorkersWithAutoscaling(h, config) +} + +func startWorkflow(h *common.SampleHelper, config *AutoscalingConfiguration) { + workflowOptions := client.StartWorkflowOptions{ + ID: fmt.Sprintf("autoscaling_%s", uuid.New()), + TaskList: ApplicationName, + ExecutionStartToCloseTimeout: time.Minute * 10, + DecisionTaskStartToCloseTimeout: time.Minute, + } + + // Use configuration values + workflows := config.Autoscaling.LoadGeneration.Workflows + workflowDelay := config.Autoscaling.LoadGeneration.WorkflowDelay + activitiesPerWorkflow := config.Autoscaling.LoadGeneration.ActivitiesPerWorkflow + batchDelay := config.Autoscaling.LoadGeneration.BatchDelay + minProcessingTime := config.Autoscaling.LoadGeneration.MinProcessingTime + maxProcessingTime := config.Autoscaling.LoadGeneration.MaxProcessingTime + + // Start multiple workflows with delays + for i := 0; i < workflows; i++ { + workflowOptions.ID = fmt.Sprintf("autoscaling_%d_%s", i, uuid.New()) + h.StartWorkflow(workflowOptions, autoscalingWorkflowName, activitiesPerWorkflow, batchDelay, minProcessingTime, maxProcessingTime) + + // Add delay between workflows (except for the last one) + if i < workflows-1 { + time.Sleep(time.Duration(workflowDelay) * time.Millisecond) + } + } + + fmt.Printf("Started %d autoscaling workflows with %d activities each\n", workflows, activitiesPerWorkflow) + fmt.Println("Monitor the worker performance and autoscaling behavior in Grafana:") + fmt.Println("http://localhost:3000/d/dehkspwgabvuoc/cadence-client") +} diff --git a/cmd/samples/advanced/autoscaling-monitoring/worker_config.go b/cmd/samples/advanced/autoscaling-monitoring/worker_config.go new file mode 100644 index 00000000..b74ddecd --- /dev/null +++ b/cmd/samples/advanced/autoscaling-monitoring/worker_config.go @@ -0,0 +1,56 @@ +package main + +import ( + "go.uber.org/cadence/activity" + "go.uber.org/cadence/client" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/zap" + + "github.com/uber-common/cadence-samples/cmd/samples/common" +) + +// startWorkersWithAutoscaling starts workers with autoscaling configuration +func startWorkersWithAutoscaling(h *common.SampleHelper, config *AutoscalingConfiguration) { + // Configure worker options with autoscaling-friendly settings from config + workerOptions := worker.Options{ + MetricsScope: h.WorkerMetricScope, + Logger: h.Logger, + AutoScalerOptions: worker.AutoScalerOptions{ + Enabled: true, + PollerMinCount: config.Autoscaling.PollerMinCount, + PollerMaxCount: config.Autoscaling.PollerMaxCount, + PollerInitCount: config.Autoscaling.PollerInitCount, + }, + FeatureFlags: client.FeatureFlags{ + WorkflowExecutionAlreadyCompletedErrorEnabled: true, + }, + } + + h.Logger.Info("Starting workers with autoscaling configuration", + zap.Bool("AutoScalerEnabled", workerOptions.AutoScalerOptions.Enabled), + zap.Int("PollerMinCount", workerOptions.AutoScalerOptions.PollerMinCount), + zap.Int("PollerMaxCount", workerOptions.AutoScalerOptions.PollerMaxCount), + zap.Int("PollerInitCount", workerOptions.AutoScalerOptions.PollerInitCount)) + + // Use worker.NewV2 for autoscaling support + w, err := worker.NewV2(h.Service, h.Config.DomainName, ApplicationName, workerOptions) + if err != nil { + h.Logger.Fatal("Failed to create worker with autoscaling", zap.Error(err)) + } + + // Register workflows and activities + registerWorkflowAndActivityForAutoscaling(w) + + // Start the worker + err = w.Run() + if err != nil { + h.Logger.Fatal("Failed to run worker", zap.Error(err)) + } +} + +// registerWorkflowAndActivityForAutoscaling registers the workflow and activities +func registerWorkflowAndActivityForAutoscaling(w worker.Worker) { + w.RegisterWorkflowWithOptions(AutoscalingWorkflow, workflow.RegisterOptions{Name: autoscalingWorkflowName}) + w.RegisterActivityWithOptions(LoadGenerationActivity, activity.RegisterOptions{Name: loadGenerationActivityName}) +} diff --git a/cmd/samples/advanced/autoscaling-monitoring/workflow.go b/cmd/samples/advanced/autoscaling-monitoring/workflow.go new file mode 100644 index 00000000..22c6c17c --- /dev/null +++ b/cmd/samples/advanced/autoscaling-monitoring/workflow.go @@ -0,0 +1,53 @@ +package main + +import ( + "time" + + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +const ( + autoscalingWorkflowName = "autoscalingWorkflow" +) + +// AutoscalingWorkflow demonstrates a workflow that can generate load +// to test worker poller autoscaling +func AutoscalingWorkflow(ctx workflow.Context, activitiesPerWorkflow int, batchDelay int, minProcessingTime, maxProcessingTime int) error { + logger := workflow.GetLogger(ctx) + logger.Info("Autoscaling workflow started", zap.Int("activitiesPerWorkflow", activitiesPerWorkflow)) + + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute * 20, + StartToCloseTimeout: time.Minute * 20, + HeartbeatTimeout: time.Second * 20, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + // Generate load by executing activities in parallel + var futures []workflow.Future + + // Execute activities in batches to create varying load + for i := 0; i < activitiesPerWorkflow; i++ { + future := workflow.ExecuteActivity(ctx, LoadGenerationActivity, i, minProcessingTime, maxProcessingTime) + futures = append(futures, future) + + // Add some delay between batches to simulate real-world patterns + // Use batch delay from configuration + if i > 0 && i % 10 == 0 { + workflow.Sleep(ctx, time.Duration(batchDelay)*time.Millisecond) + } + } + + // Wait for all activities to complete + for i, future := range futures { + var result error + if err := future.Get(ctx, &result); err != nil { + logger.Error("Activity failed", zap.Int("taskID", i), zap.Error(err)) + return err + } + } + + logger.Info("Autoscaling workflow completed", zap.Int("totalActivities", len(futures))) + return nil +} diff --git a/go.mod b/go.mod index 4483671c..6a19054b 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/uber-go/tally v3.4.3+incompatible github.com/uber/cadence-idl v0.0.0-20250616185004-cc6f52f87bc6 github.com/uber/jaeger-client-go v2.30.0+incompatible - go.uber.org/cadence v1.3.1-rc.1 + go.uber.org/cadence v1.3.1-rc.8 go.uber.org/yarpc v1.60.0 go.uber.org/zap v1.23.0 gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index c61eecda..13e7a7e6 100644 --- a/go.sum +++ b/go.sum @@ -239,8 +239,8 @@ go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/cadence v1.3.1-rc.1 h1:73TllFy8BNih2/sjGpFv9vz5wXQBfk643vKxsAy1/EM= -go.uber.org/cadence v1.3.1-rc.1/go.mod h1:/Lv4o2eahjro+LKjXmuYm20wg5+84t+h2pu0xm7gUfM= +go.uber.org/cadence v1.3.1-rc.8 h1:9m1h7KZsPqJsNW87iVd3Mgi85PojCPyaUeIdzTjVu3Y= +go.uber.org/cadence v1.3.1-rc.8/go.mod h1:/Lv4o2eahjro+LKjXmuYm20wg5+84t+h2pu0xm7gUfM= go.uber.org/dig v1.8.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw= go.uber.org/dig v1.10.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw= go.uber.org/dig v1.17.0 h1:5Chju+tUvcC+N7N6EV08BJz41UZuO3BmHcN4A287ZLI=