From 17c243f418330a69c26fe26a576ef0afd71690c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CKevin=E2=80=9D?= <“kburns2@ext.uber.com”> Date: Mon, 18 Aug 2025 08:51:18 -0700 Subject: [PATCH 01/18] Initital implementation of autoscaling-monitoring sample --- Makefile | 5 + .../advanced/autoscaling-monitoring/Makefile | 29 +++++ .../autoscaling-monitoring/activities.go | 40 +++++++ .../advanced/autoscaling-monitoring/config.go | 52 +++++++++ .../config/autoscaling.yaml | 28 +++++ .../advanced/autoscaling-monitoring/main.go | 108 ++++++++++++++++++ .../prometheus_server.go | 108 ++++++++++++++++++ .../autoscaling-monitoring/worker_config.go | 58 ++++++++++ .../autoscaling-monitoring/workflow.go | 57 +++++++++ 9 files changed, 485 insertions(+) create mode 100644 cmd/samples/advanced/autoscaling-monitoring/Makefile create mode 100644 cmd/samples/advanced/autoscaling-monitoring/activities.go create mode 100644 cmd/samples/advanced/autoscaling-monitoring/config.go create mode 100644 cmd/samples/advanced/autoscaling-monitoring/config/autoscaling.yaml create mode 100644 cmd/samples/advanced/autoscaling-monitoring/main.go create mode 100644 cmd/samples/advanced/autoscaling-monitoring/prometheus_server.go create mode 100644 cmd/samples/advanced/autoscaling-monitoring/worker_config.go create mode 100644 cmd/samples/advanced/autoscaling-monitoring/workflow.go diff --git a/Makefile b/Makefile index a8b8aa1b..4ee14986 100644 --- a/Makefile +++ b/Makefile @@ -37,6 +37,7 @@ PROGS = helloworld \ sideeffect \ sleep \ dataconverter \ + autoscaling-monitoring \ TEST_ARG ?= -race -v -timeout 5m BUILD := ./build @@ -186,6 +187,9 @@ versioning: dataconverter: go build -o bin/dataconverter cmd/samples/recipes/dataconverter/*.go +autoscaling-monitoring: + go build -o bin/autoscaling-monitoring cmd/samples/advanced/autoscaling-monitoring/*.go + bins: helloworld \ versioning \ delaystart \ @@ -219,6 +223,7 @@ bins: helloworld \ sideeffect \ sleep \ dataconverter \ + autoscaling-monitoring \ test: bins @rm -f test diff --git a/cmd/samples/advanced/autoscaling-monitoring/Makefile b/cmd/samples/advanced/autoscaling-monitoring/Makefile new file mode 100644 index 00000000..879a4503 --- /dev/null +++ b/cmd/samples/advanced/autoscaling-monitoring/Makefile @@ -0,0 +1,29 @@ +# Makefile for autoscaling monitoring sample + +.PHONY: build clean test + +# Build the autoscaling monitoring sample +build: + go build -o ../../../bin/autoscaling-monitoring *.go + +# Clean build artifacts +clean: + rm -f ../../../bin/autoscaling-monitoring + +# Run tests +test: + go test -v . + +# Install dependencies +deps: + go mod tidy + +# Run the sample in different modes +run-worker: build + ../../../bin/autoscaling-monitoring -m worker + +run-server: build + ../../../bin/autoscaling-monitoring -m server + +run-trigger: build + ../../../bin/autoscaling-monitoring -m trigger diff --git a/cmd/samples/advanced/autoscaling-monitoring/activities.go b/cmd/samples/advanced/autoscaling-monitoring/activities.go new file mode 100644 index 00000000..75d58d31 --- /dev/null +++ b/cmd/samples/advanced/autoscaling-monitoring/activities.go @@ -0,0 +1,40 @@ +package main + +import ( + "context" + "math/rand" + "time" + + "go.uber.org/cadence/activity" + "go.uber.org/zap" +) + +const ( + loadGenerationActivityName = "loadGenerationActivity" +) + +// LoadGenerationActivity simulates work that can be scaled +// It includes random delays to simulate real-world processing time +func LoadGenerationActivity(ctx context.Context, taskID int) error { + startTime := time.Now() + logger := activity.GetLogger(ctx) + logger.Info("Load generation activity started", zap.Int("taskID", taskID)) + + // Simulate variable processing time using configuration values + minTime := config.Autoscaling.LoadGeneration.MinProcessingTime + maxTime := config.Autoscaling.LoadGeneration.MaxProcessingTime + processingTime := time.Duration(rand.Intn(maxTime-minTime)+minTime) * time.Millisecond + time.Sleep(processingTime) + + duration := time.Since(startTime) + + // Record metrics for monitoring + RecordActivityCompleted("autoscaling-worker-1", "load_generation", duration) + + logger.Info("Load generation activity completed", + zap.Int("taskID", taskID), + zap.Duration("processingTime", processingTime), + zap.Duration("totalDuration", duration)) + + return nil +} diff --git a/cmd/samples/advanced/autoscaling-monitoring/config.go b/cmd/samples/advanced/autoscaling-monitoring/config.go new file mode 100644 index 00000000..3eb245a3 --- /dev/null +++ b/cmd/samples/advanced/autoscaling-monitoring/config.go @@ -0,0 +1,52 @@ +package main + +import ( + "github.com/uber-common/cadence-samples/cmd/samples/common" +) + +// AutoscalingConfiguration extends the base Configuration with autoscaling-specific settings +type AutoscalingConfiguration struct { + common.Configuration + Autoscaling AutoscalingSettings `yaml:"autoscaling"` +} + +// AutoscalingSettings contains the autoscaling configuration +type AutoscalingSettings struct { + // Worker autoscaling settings + PollerMinCount int `yaml:"pollerMinCount"` + PollerMaxCount int `yaml:"pollerMaxCount"` + PollerInitCount int `yaml:"pollerInitCount"` + + // Load generation settings + LoadGeneration LoadGenerationSettings `yaml:"loadGeneration"` +} + +// LoadGenerationSettings contains the load generation configuration +type LoadGenerationSettings struct { + Iterations int `yaml:"iterations"` + BatchDelay int `yaml:"batchDelay"` + MinProcessingTime int `yaml:"minProcessingTime"` + MaxProcessingTime int `yaml:"maxProcessingTime"` +} + +// DefaultAutoscalingConfiguration returns default autoscaling settings +func DefaultAutoscalingConfiguration() AutoscalingConfiguration { + return AutoscalingConfiguration{ + Configuration: common.Configuration{ + DomainName: "default", + ServiceName: "cadence-frontend", + HostNameAndPort: "localhost:7833", + }, + Autoscaling: AutoscalingSettings{ + PollerMinCount: 2, + PollerMaxCount: 8, + PollerInitCount: 4, + LoadGeneration: LoadGenerationSettings{ + Iterations: 50, + BatchDelay: 2, + MinProcessingTime: 1000, + MaxProcessingTime: 6000, + }, + }, + } +} diff --git a/cmd/samples/advanced/autoscaling-monitoring/config/autoscaling.yaml b/cmd/samples/advanced/autoscaling-monitoring/config/autoscaling.yaml new file mode 100644 index 00000000..9c5b1670 --- /dev/null +++ b/cmd/samples/advanced/autoscaling-monitoring/config/autoscaling.yaml @@ -0,0 +1,28 @@ +# Configuration for autoscaling monitoring sample +domain: "default" +service: "cadence-frontend" +host: "localhost:7833" + +# Prometheus configuration for metrics collection +prometheus: + listenAddress: "127.0.0.1:8004" + +# Autoscaling configuration +# These settings control the worker's concurrency and autoscaling behavior +autoscaling: + # Worker autoscaling settings + pollerMinCount: 2 + pollerMaxCount: 8 + pollerInitCount: 4 + + # Worker load simulation settings + loadGeneration: + # Number of iterations in the autoscaling workflow + iterations: 50 + + # Delay between activity batches (seconds) + batchDelay: 2 + + # Activity processing time range (milliseconds) + minProcessingTime: 1000 + maxProcessingTime: 6000 diff --git a/cmd/samples/advanced/autoscaling-monitoring/main.go b/cmd/samples/advanced/autoscaling-monitoring/main.go new file mode 100644 index 00000000..31a4bf8a --- /dev/null +++ b/cmd/samples/advanced/autoscaling-monitoring/main.go @@ -0,0 +1,108 @@ +package main + +import ( + "flag" + "fmt" + "io/ioutil" + "os" + "os/signal" + "syscall" + "time" + + "github.com/pborman/uuid" + "go.uber.org/cadence/client" + "gopkg.in/yaml.v2" + + "github.com/uber-common/cadence-samples/cmd/samples/common" +) + +const ( + ApplicationName = "autoscaling-monitoring" +) + +// Global configuration +var config AutoscalingConfiguration + +func main() { + var mode string + var configFile string + flag.StringVar(&mode, "m", "trigger", "Mode is worker, trigger, or server.") + flag.StringVar(&configFile, "config", "config/autoscaling.yaml", "Configuration file path.") + flag.Parse() + + // Load configuration + loadConfiguration(configFile) + + // Setup common helper with our configuration + var h common.SampleHelper + h.Config = config.Configuration + h.SetupServiceConfig() + + switch mode { + case "worker": + startWorkers(&h) + + // The workers are supposed to be long running process that should not exit. + // Use select{} to block indefinitely for samples, you can quit by CMD+C. + select {} + case "trigger": + startWorkflow(&h) + case "server": + startPrometheusServer(&h) + default: + fmt.Printf("Unknown mode: %s\n", mode) + os.Exit(1) + } +} + +// loadConfiguration loads the autoscaling configuration from file +func loadConfiguration(configFile string) { + // Read config file + configData, err := ioutil.ReadFile(configFile) + if err != nil { + fmt.Printf("Failed to read config file: %v, using defaults\n", err) + config = DefaultAutoscalingConfiguration() + return + } + + // Parse config + if err := yaml.Unmarshal(configData, &config); err != nil { + fmt.Printf("Error parsing configuration: %v, using defaults\n", err) + config = DefaultAutoscalingConfiguration() + return + } + + fmt.Printf("Loaded configuration from %s\n", configFile) +} + +func startWorkers(h *common.SampleHelper) { + startWorkersWithAutoscaling(h) +} + +func startWorkflow(h *common.SampleHelper) { + workflowOptions := client.StartWorkflowOptions{ + ID: "autoscaling_" + uuid.New(), + TaskList: ApplicationName, + ExecutionStartToCloseTimeout: time.Minute * 10, + DecisionTaskStartToCloseTimeout: time.Minute, + } + + // Use iterations from configuration + iterations := config.Autoscaling.LoadGeneration.Iterations + h.StartWorkflow(workflowOptions, autoscalingWorkflowName, iterations) + + fmt.Printf("Started autoscaling workflow with %d iterations\n", iterations) + fmt.Println("Monitor the worker performance and autoscaling behavior in Grafana:") + fmt.Println("http://localhost:3000/d/dehkspwgabvuoc/cadence-client") +} + +func startPrometheusServer(h *common.SampleHelper) { + startPrometheusHTTPServer(h) + + // Block until interrupted + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) + fmt.Println("Prometheus server started. Press Ctrl+C to stop...") + <-done + fmt.Println("Shutting down Prometheus server...") +} diff --git a/cmd/samples/advanced/autoscaling-monitoring/prometheus_server.go b/cmd/samples/advanced/autoscaling-monitoring/prometheus_server.go new file mode 100644 index 00000000..1bf15dc6 --- /dev/null +++ b/cmd/samples/advanced/autoscaling-monitoring/prometheus_server.go @@ -0,0 +1,108 @@ +package main + +import ( + "fmt" + "net/http" + "os" + "time" + + "github.com/m3db/prometheus_client_golang/prometheus" + "github.com/m3db/prometheus_client_golang/prometheus/promhttp" + "github.com/uber-common/cadence-samples/cmd/samples/common" + "go.uber.org/zap" +) + +const ( + defaultPrometheusPort = ":8004" +) + +var ( + // Custom metrics for autoscaling monitoring + autoscalingWorkflowsStarted = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "autoscaling_workflows_started_total", + Help: "Total number of autoscaling workflows started", + }, + []string{"worker_id"}, + ) + + autoscalingActivitiesCompleted = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "autoscaling_activities_completed_total", + Help: "Total number of autoscaling activities completed", + }, + []string{"worker_id", "activity_type"}, + ) + + autoscalingActivityDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "autoscaling_activity_duration_seconds", + Help: "Duration of autoscaling activities in seconds", + Buckets: prometheus.DefBuckets, + }, + []string{"worker_id", "activity_type"}, + ) + + autoscalingWorkerLoad = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "autoscaling_worker_load", + Help: "Current load on the autoscaling worker", + }, + []string{"worker_id", "load_type"}, + ) +) + +func init() { + // Register custom metrics + prometheus.MustRegister(autoscalingWorkflowsStarted) + prometheus.MustRegister(autoscalingActivitiesCompleted) + prometheus.MustRegister(autoscalingActivityDuration) + prometheus.MustRegister(autoscalingWorkerLoad) +} + +// startPrometheusHTTPServer starts an HTTP server to expose Prometheus metrics +func startPrometheusHTTPServer(h *common.SampleHelper) { + port := defaultPrometheusPort + if h.Config.Prometheus != nil && h.Config.Prometheus.ListenAddress != "" { + port = h.Config.Prometheus.ListenAddress + } + + h.Logger.Info("Starting Prometheus HTTP server", zap.String("port", port)) + + // Set up HTTP handlers for Prometheus metrics + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "Autoscaling Monitoring Sample - Prometheus Metrics Server\n") + fmt.Fprintf(w, "Metrics available at: /metrics\n") + fmt.Fprintf(w, "Health check at: /health\n") + fmt.Fprintf(w, "\nDashboard: http://localhost:3000/d/dehkspwgabvuoc/cadence-client\n") + }) + + // Prometheus metrics endpoint + http.Handle("/metrics", promhttp.Handler()) + + http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprintf(w, "OK\n") + }) + + // Start server in goroutine + go func() { + if err := http.ListenAndServe(port, nil); err != nil { + h.Logger.Error("Failed to start Prometheus server", zap.Error(err)) + os.Exit(1) + } + }() + + h.Logger.Info("Prometheus server started successfully", zap.String("port", port)) +} + +// RecordWorkflowStarted records when a workflow is started +func RecordWorkflowStarted(workerID string) { + autoscalingWorkflowsStarted.WithLabelValues(workerID).Inc() +} + +// RecordActivityCompleted records when an activity is completed +func RecordActivityCompleted(workerID, activityType string, duration time.Duration) { + autoscalingActivitiesCompleted.WithLabelValues(workerID, activityType).Inc() + autoscalingActivityDuration.WithLabelValues(workerID, activityType).Observe(duration.Seconds()) +} diff --git a/cmd/samples/advanced/autoscaling-monitoring/worker_config.go b/cmd/samples/advanced/autoscaling-monitoring/worker_config.go new file mode 100644 index 00000000..4de4c3fa --- /dev/null +++ b/cmd/samples/advanced/autoscaling-monitoring/worker_config.go @@ -0,0 +1,58 @@ +package main + +import ( + "go.uber.org/cadence/activity" + "go.uber.org/cadence/client" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + "go.uber.org/zap" + + "github.com/uber-common/cadence-samples/cmd/samples/common" +) + +// startWorkersWithAutoscaling starts workers with autoscaling configuration +func startWorkersWithAutoscaling(h *common.SampleHelper) worker.Worker { + // Configure worker options with autoscaling-friendly settings from config + workerOptions := worker.Options{ + MetricsScope: h.WorkerMetricScope, + Logger: h.Logger, + AutoScalerOptions: worker.AutoScalerOptions{ + Enabled: true, + PollerMinCount: config.Autoscaling.PollerMinCount, + PollerMaxCount: config.Autoscaling.PollerMaxCount, + PollerInitCount: config.Autoscaling.PollerInitCount, + }, + FeatureFlags: client.FeatureFlags{ + WorkflowExecutionAlreadyCompletedErrorEnabled: true, + }, + } + + h.Logger.Info("Starting workers with autoscaling configuration", + zap.Bool("AutoScalerEnabled", workerOptions.AutoScalerOptions.Enabled), + zap.Int("PollerMinCount", workerOptions.AutoScalerOptions.PollerMinCount), + zap.Int("PollerMaxCount", workerOptions.AutoScalerOptions.PollerMaxCount), + zap.Int("PollerInitCount", workerOptions.AutoScalerOptions.PollerInitCount)) + + // Use worker.NewV2 for autoscaling support + w, err := worker.NewV2(h.Service, h.Config.DomainName, ApplicationName, workerOptions) + if err != nil { + h.Logger.Fatal("Failed to create worker with autoscaling", zap.Error(err)) + } + + // Register workflows and activities + registerWorkflowAndActivityForAutoscaling(h, w) + + // Start the worker + err = w.Start() + if err != nil { + h.Logger.Fatal("Failed to start worker", zap.Error(err)) + } + + return w +} + +// registerWorkflowAndActivityForAutoscaling registers the workflow and activities +func registerWorkflowAndActivityForAutoscaling(h *common.SampleHelper, w worker.Worker) { + w.RegisterWorkflowWithOptions(AutoscalingWorkflow, workflow.RegisterOptions{Name: autoscalingWorkflowName}) + w.RegisterActivityWithOptions(LoadGenerationActivity, activity.RegisterOptions{Name: loadGenerationActivityName}) +} diff --git a/cmd/samples/advanced/autoscaling-monitoring/workflow.go b/cmd/samples/advanced/autoscaling-monitoring/workflow.go new file mode 100644 index 00000000..b3481c6b --- /dev/null +++ b/cmd/samples/advanced/autoscaling-monitoring/workflow.go @@ -0,0 +1,57 @@ +package main + +import ( + "time" + + "go.uber.org/cadence/workflow" + "go.uber.org/zap" +) + +const ( + autoscalingWorkflowName = "autoscalingWorkflow" +) + +// AutoscalingWorkflow demonstrates a workflow that can generate load +// to test worker poller autoscaling +func AutoscalingWorkflow(ctx workflow.Context, iterations int) error { + logger := workflow.GetLogger(ctx) + logger.Info("Autoscaling workflow started", zap.Int("iterations", iterations)) + + // Record workflow start metrics + RecordWorkflowStarted("autoscaling-worker-1") + + ao := workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: time.Minute, + HeartbeatTimeout: time.Second * 20, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + // Generate load by executing activities in parallel + var futures []workflow.Future + + // Execute activities in batches to create varying load + for i := 0; i < iterations; i++ { + future := workflow.ExecuteActivity(ctx, LoadGenerationActivity, i) + futures = append(futures, future) + + // Add some delay between batches to simulate real-world patterns + // Use batch delay from configuration + if i > 0 && i%10 == 0 { + batchDelay := time.Duration(config.Autoscaling.LoadGeneration.BatchDelay) * time.Second + workflow.Sleep(ctx, batchDelay) + } + } + + // Wait for all activities to complete + for i, future := range futures { + var result error + if err := future.Get(ctx, &result); err != nil { + logger.Error("Activity failed", zap.Int("taskID", i), zap.Error(err)) + return err + } + } + + logger.Info("Autoscaling workflow completed", zap.Int("totalActivities", len(futures))) + return nil +} From 77b359b34aee9d0896003d28274a80892919af28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CKevin=E2=80=9D?= <“kburns2@ext.uber.com”> Date: Mon, 18 Aug 2025 08:52:24 -0700 Subject: [PATCH 02/18] Add Readme with instructions on how it works and how to run --- .../advanced/autoscaling-monitoring/README.md | 229 ++++++++++++++++++ 1 file changed, 229 insertions(+) create mode 100644 cmd/samples/advanced/autoscaling-monitoring/README.md diff --git a/cmd/samples/advanced/autoscaling-monitoring/README.md b/cmd/samples/advanced/autoscaling-monitoring/README.md new file mode 100644 index 00000000..cf4fd0bb --- /dev/null +++ b/cmd/samples/advanced/autoscaling-monitoring/README.md @@ -0,0 +1,229 @@ +# Autoscaling Monitoring Sample + +This sample demonstrates three advanced Cadence worker features: + +1. **Worker Poller Autoscaling** - Dynamic adjustment of worker poller goroutines based on workload +2. **Prometheus Tally Reporter** - Metrics collection using Tally and Prometheus +3. **HTTP Endpoint for Prometheus Scraping** - Exposing metrics for monitoring + +## Features + +### Worker Poller Autoscaling +The worker uses `worker.NewV2` with `AutoScalerOptions` to enable true autoscaling behavior: +- **AutoScalerOptions.Enabled**: true - Enables the autoscaling feature +- **PollerMinCount**: 2 - Minimum number of poller goroutines +- **PollerMaxCount**: 8 - Maximum number of poller goroutines +- **PollerInitCount**: 4 - Initial number of poller goroutines + +The worker automatically adjusts the number of poller goroutines between the min and max values based on the current workload. + +### Prometheus Metrics +The sample exposes comprehensive metrics about worker performance, activity execution, and autoscaling behavior: +- `autoscaling_workflows_started_total` - Counter of workflows started +- `autoscaling_activities_completed_total` - Counter of activities completed +- `autoscaling_activity_duration_seconds` - Histogram of activity execution times +- `autoscaling_worker_load` - Gauge showing current worker load + +### Monitoring Dashboards +When running the Cadence server locally with Grafana, you can access the client dashboards at: + +**Client Dashboards**: http://localhost:3000/d/dehkspwgabvuoc/cadence-client + +## Prerequisites + +1. **Cadence Server**: Running locally with Docker Compose +2. **Prometheus**: Configured to scrape metrics from the sample +3. **Grafana**: With Cadence dashboards (included with default Cadence server setup) + +## Quick Start + +### 1. Start the Worker +```bash +./bin/autoscaling-monitoring -m worker +``` + +### 2. Start the Prometheus Server (Optional) +```bash +./bin/autoscaling-monitoring -m server +``` + +### 3. Generate Load +```bash +./bin/autoscaling-monitoring -m trigger +``` + +## Configuration + +The sample uses a custom configuration system that extends the base Cadence configuration. You can specify a configuration file using the `-config` flag: + +```bash +./bin/autoscaling-monitoring -m worker -config /path/to/config.yaml +``` + +### Configuration File Structure + +```yaml +# Cadence connection settings +domain: "default" +service: "cadence-frontend" +host: "localhost:7833" + +# Prometheus configuration +prometheus: + listenAddress: "127.0.0.1:8004" + +# Autoscaling configuration +autoscaling: + # Worker autoscaling settings + pollerMinCount: 2 + pollerMaxCount: 8 + pollerInitCount: 4 + + # Load generation settings + loadGeneration: + iterations: 50 # Number of activities to execute + batchDelay: 2 # Delay between batches (seconds) + minProcessingTime: 1000 # Min activity time (ms) + maxProcessingTime: 6000 # Max activity time (ms) +``` + +### Configuration Usage + +The configuration values are used throughout the sample: + +1. **Worker Configuration** (`worker_config.go`): + - `pollerMinCount`, `pollerMaxCount`, `pollerInitCount` → `AutoScalerOptions` + +2. **Workflow Configuration** (`workflow.go`): + - `iterations` → Number of activities to execute + - `batchDelay` → Delay between activity batches + +3. **Activity Configuration** (`activities.go`): + - `minProcessingTime`, `maxProcessingTime` → Activity processing time range + +4. **Prometheus Configuration** (`prometheus_server.go`): + - `listenAddress` → HTTP server port + +### Default Configuration + +If no configuration file is provided or if the file cannot be read, the sample uses these defaults: + +```yaml +domain: "default" +service: "cadence-frontend" +host: "localhost:7833" +autoscaling: + pollerMinCount: 2 + pollerMaxCount: 8 + pollerInitCount: 4 + loadGeneration: + iterations: 50 + batchDelay: 2 + minProcessingTime: 1000 + maxProcessingTime: 6000 +``` + +## Monitoring + +### Metrics Endpoints +- **Prometheus Metrics**: http://localhost:8004/metrics +- **Health Check**: http://localhost:8004/health +- **Status Page**: http://localhost:8004/ + +### Grafana Dashboard +Access the Cadence client dashboard at: http://localhost:3000/d/dehkspwgabvuoc/cadence-client + +### Key Metrics to Monitor + +1. **Worker Performance**: + - Activity execution rate + - Decision task processing rate + - Worker load levels + +2. **Autoscaling Behavior**: + - Concurrent activity execution count + - Task queue depth + - Poller utilization + - Number of active poller goroutines + +3. **Custom Metrics**: + - `autoscaling_workflows_started_total` + - `autoscaling_activities_completed_total` + - `autoscaling_activity_duration_seconds` + +## How It Works + +### Load Generation +The sample creates a workflow that executes activities in parallel, with each activity: +- Taking 1-6 seconds to complete (configurable via `minProcessingTime`/`maxProcessingTime`) +- Recording metrics about execution time +- Creating varying load patterns with configurable batch delays + +### Autoscaling Demonstration +The worker uses `worker.NewV2` with `AutoScalerOptions` to: +- Start with configurable poller goroutines (`pollerInitCount`) +- Scale down to minimum pollers (`pollerMinCount`) when load is low +- Scale up to maximum pollers (`pollerMaxCount`) when load is high +- Automatically adjust based on task queue depth and processing time + +### Metrics Collection +The sample collects and exposes: +- Workflow start/complete events +- Activity execution times and counts +- Worker load simulation metrics +- Standard Cadence metrics via Tally + +## Production Considerations + +### Scaling +- Adjust `pollerMinCount`, `pollerMaxCount`, and `pollerInitCount` based on your workload +- Monitor worker performance and adjust autoscaling parameters +- Use multiple worker instances for high availability + +### Monitoring +- Configure Prometheus to scrape metrics regularly +- Set up alerts for worker performance issues +- Use Grafana dashboards to visualize autoscaling behavior +- Monitor poller count changes to verify autoscaling is working + +### Security +- Secure the Prometheus endpoint in production +- Use authentication for metrics access +- Consider using HTTPS for metrics endpoints + +## Troubleshooting + +### Common Issues + +1. **Worker Not Starting**: + - Check Cadence server is running + - Verify domain exists + - Check configuration file + - Ensure using compatible Cadence client version + +2. **Autoscaling Not Working**: + - Verify `worker.NewV2` is being used + - Check `AutoScalerOptions.Enabled` is true + - Monitor poller count changes in logs + - Ensure sufficient load is being generated + +3. **Configuration Issues**: + - Verify configuration file path is correct + - Check YAML syntax in configuration file + - Review default values if config file is not found + +4. **Metrics Not Appearing**: + - Verify Prometheus server is running + - Check metrics endpoint is accessible + - Ensure Prometheus is configured to scrape the endpoint + +5. **Dashboard Not Loading**: + - Verify Grafana is running + - Check dashboard URL is correct + - Ensure Prometheus data source is configured + +### Debug Mode +Enable debug logging by setting the log level in your environment: +```bash +export CADENCE_LOG_LEVEL=debug +``` From 61fb2910e642b631c8e77591241b52aa86bf2b5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CKevin=E2=80=9D?= <“kburns2@ext.uber.com”> Date: Tue, 19 Aug 2025 15:43:45 -0700 Subject: [PATCH 03/18] Fix makefile relative path issue --- cmd/samples/advanced/autoscaling-monitoring/Makefile | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cmd/samples/advanced/autoscaling-monitoring/Makefile b/cmd/samples/advanced/autoscaling-monitoring/Makefile index 879a4503..d4f2dbe3 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/Makefile +++ b/cmd/samples/advanced/autoscaling-monitoring/Makefile @@ -4,11 +4,11 @@ # Build the autoscaling monitoring sample build: - go build -o ../../../bin/autoscaling-monitoring *.go + go build -o ../../../../bin/autoscaling-monitoring *.go # Clean build artifacts clean: - rm -f ../../../bin/autoscaling-monitoring + rm -f ../../../../bin/autoscaling-monitoring # Run tests test: @@ -20,10 +20,10 @@ deps: # Run the sample in different modes run-worker: build - ../../../bin/autoscaling-monitoring -m worker + ../../../../bin/autoscaling-monitoring -m worker run-server: build - ../../../bin/autoscaling-monitoring -m server + ../../../../bin/autoscaling-monitoring -m server run-trigger: build - ../../../bin/autoscaling-monitoring -m trigger + ../../../../bin/autoscaling-monitoring -m trigger From 3e62ecbb3ac06fb17a056435320a814eed8c4f41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CKevin=E2=80=9D?= <“kburns2@ext.uber.com”> Date: Tue, 19 Aug 2025 15:46:20 -0700 Subject: [PATCH 04/18] Refactor global var config to be passed --- .../autoscaling-monitoring/activities.go | 9 +-- .../advanced/autoscaling-monitoring/main.go | 75 +++++++++++++------ .../autoscaling-monitoring/worker_config.go | 12 ++- .../autoscaling-monitoring/workflow.go | 10 +-- 4 files changed, 62 insertions(+), 44 deletions(-) diff --git a/cmd/samples/advanced/autoscaling-monitoring/activities.go b/cmd/samples/advanced/autoscaling-monitoring/activities.go index 75d58d31..6e63f15f 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/activities.go +++ b/cmd/samples/advanced/autoscaling-monitoring/activities.go @@ -15,22 +15,17 @@ const ( // LoadGenerationActivity simulates work that can be scaled // It includes random delays to simulate real-world processing time -func LoadGenerationActivity(ctx context.Context, taskID int) error { +func LoadGenerationActivity(ctx context.Context, taskID int, minProcessingTime, maxProcessingTime int) error { startTime := time.Now() logger := activity.GetLogger(ctx) logger.Info("Load generation activity started", zap.Int("taskID", taskID)) // Simulate variable processing time using configuration values - minTime := config.Autoscaling.LoadGeneration.MinProcessingTime - maxTime := config.Autoscaling.LoadGeneration.MaxProcessingTime - processingTime := time.Duration(rand.Intn(maxTime-minTime)+minTime) * time.Millisecond + processingTime := time.Duration(rand.Intn(maxProcessingTime-minProcessingTime)+minProcessingTime) * time.Millisecond time.Sleep(processingTime) duration := time.Since(startTime) - // Record metrics for monitoring - RecordActivityCompleted("autoscaling-worker-1", "load_generation", duration) - logger.Info("Load generation activity completed", zap.Int("taskID", taskID), zap.Duration("processingTime", processingTime), diff --git a/cmd/samples/advanced/autoscaling-monitoring/main.go b/cmd/samples/advanced/autoscaling-monitoring/main.go index 31a4bf8a..73f7f9b5 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/main.go +++ b/cmd/samples/advanced/autoscaling-monitoring/main.go @@ -3,7 +3,6 @@ package main import ( "flag" "fmt" - "io/ioutil" "os" "os/signal" "syscall" @@ -14,15 +13,14 @@ import ( "gopkg.in/yaml.v2" "github.com/uber-common/cadence-samples/cmd/samples/common" + "github.com/uber-go/tally" + "go.uber.org/zap" ) const ( ApplicationName = "autoscaling-monitoring" ) -// Global configuration -var config AutoscalingConfiguration - func main() { var mode string var configFile string @@ -31,22 +29,39 @@ func main() { flag.Parse() // Load configuration - loadConfiguration(configFile) + config := loadConfiguration(configFile) // Setup common helper with our configuration var h common.SampleHelper h.Config = config.Configuration - h.SetupServiceConfig() + + // Set up logging + logger, err := zap.NewDevelopment() + if err != nil { + panic(fmt.Sprintf("Failed to setup logger: %v", err)) + } + h.Logger = logger + + // Set up service client using our config + h.Builder = common.NewBuilder(logger). + SetHostPort(config.HostNameAndPort). + SetDomain(config.DomainName) + + service, err := h.Builder.BuildServiceClient() + if err != nil { + panic(fmt.Sprintf("Failed to build service client: %v", err)) + } + h.Service = service + + // Set up metrics scope (noop for now, Prometheus will be set up separately) + h.WorkerMetricScope = tally.NoopScope + h.ServiceMetricScope = tally.NoopScope switch mode { case "worker": - startWorkers(&h) - - // The workers are supposed to be long running process that should not exit. - // Use select{} to block indefinitely for samples, you can quit by CMD+C. - select {} + startWorkers(&h, &config) case "trigger": - startWorkflow(&h) + startWorkflow(&h, &config) case "server": startPrometheusServer(&h) default: @@ -56,30 +71,41 @@ func main() { } // loadConfiguration loads the autoscaling configuration from file -func loadConfiguration(configFile string) { +func loadConfiguration(configFile string) AutoscalingConfiguration { // Read config file - configData, err := ioutil.ReadFile(configFile) + configData, err := os.ReadFile(configFile) if err != nil { fmt.Printf("Failed to read config file: %v, using defaults\n", err) - config = DefaultAutoscalingConfiguration() - return + return DefaultAutoscalingConfiguration() } // Parse config + var config AutoscalingConfiguration if err := yaml.Unmarshal(configData, &config); err != nil { fmt.Printf("Error parsing configuration: %v, using defaults\n", err) - config = DefaultAutoscalingConfiguration() - return + return DefaultAutoscalingConfiguration() + } + + // Ensure base Configuration fields are populated + if config.DomainName == "" { + config.DomainName = "default" + } + if config.ServiceName == "" { + config.ServiceName = "cadence-frontend" + } + if config.HostNameAndPort == "" { + config.HostNameAndPort = "localhost:7833" } fmt.Printf("Loaded configuration from %s\n", configFile) + return config } -func startWorkers(h *common.SampleHelper) { - startWorkersWithAutoscaling(h) +func startWorkers(h *common.SampleHelper, config *AutoscalingConfiguration) { + startWorkersWithAutoscaling(h, config) } -func startWorkflow(h *common.SampleHelper) { +func startWorkflow(h *common.SampleHelper, config *AutoscalingConfiguration) { workflowOptions := client.StartWorkflowOptions{ ID: "autoscaling_" + uuid.New(), TaskList: ApplicationName, @@ -87,9 +113,12 @@ func startWorkflow(h *common.SampleHelper) { DecisionTaskStartToCloseTimeout: time.Minute, } - // Use iterations from configuration + // Use configuration values iterations := config.Autoscaling.LoadGeneration.Iterations - h.StartWorkflow(workflowOptions, autoscalingWorkflowName, iterations) + batchDelay := config.Autoscaling.LoadGeneration.BatchDelay + minProcessingTime := config.Autoscaling.LoadGeneration.MinProcessingTime + maxProcessingTime := config.Autoscaling.LoadGeneration.MaxProcessingTime + h.StartWorkflow(workflowOptions, autoscalingWorkflowName, iterations, batchDelay, minProcessingTime, maxProcessingTime) fmt.Printf("Started autoscaling workflow with %d iterations\n", iterations) fmt.Println("Monitor the worker performance and autoscaling behavior in Grafana:") diff --git a/cmd/samples/advanced/autoscaling-monitoring/worker_config.go b/cmd/samples/advanced/autoscaling-monitoring/worker_config.go index 4de4c3fa..b74ddecd 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/worker_config.go +++ b/cmd/samples/advanced/autoscaling-monitoring/worker_config.go @@ -11,7 +11,7 @@ import ( ) // startWorkersWithAutoscaling starts workers with autoscaling configuration -func startWorkersWithAutoscaling(h *common.SampleHelper) worker.Worker { +func startWorkersWithAutoscaling(h *common.SampleHelper, config *AutoscalingConfiguration) { // Configure worker options with autoscaling-friendly settings from config workerOptions := worker.Options{ MetricsScope: h.WorkerMetricScope, @@ -40,19 +40,17 @@ func startWorkersWithAutoscaling(h *common.SampleHelper) worker.Worker { } // Register workflows and activities - registerWorkflowAndActivityForAutoscaling(h, w) + registerWorkflowAndActivityForAutoscaling(w) // Start the worker - err = w.Start() + err = w.Run() if err != nil { - h.Logger.Fatal("Failed to start worker", zap.Error(err)) + h.Logger.Fatal("Failed to run worker", zap.Error(err)) } - - return w } // registerWorkflowAndActivityForAutoscaling registers the workflow and activities -func registerWorkflowAndActivityForAutoscaling(h *common.SampleHelper, w worker.Worker) { +func registerWorkflowAndActivityForAutoscaling(w worker.Worker) { w.RegisterWorkflowWithOptions(AutoscalingWorkflow, workflow.RegisterOptions{Name: autoscalingWorkflowName}) w.RegisterActivityWithOptions(LoadGenerationActivity, activity.RegisterOptions{Name: loadGenerationActivityName}) } diff --git a/cmd/samples/advanced/autoscaling-monitoring/workflow.go b/cmd/samples/advanced/autoscaling-monitoring/workflow.go index b3481c6b..3b90fe65 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/workflow.go +++ b/cmd/samples/advanced/autoscaling-monitoring/workflow.go @@ -13,13 +13,10 @@ const ( // AutoscalingWorkflow demonstrates a workflow that can generate load // to test worker poller autoscaling -func AutoscalingWorkflow(ctx workflow.Context, iterations int) error { +func AutoscalingWorkflow(ctx workflow.Context, iterations int, batchDelay int, minProcessingTime, maxProcessingTime int) error { logger := workflow.GetLogger(ctx) logger.Info("Autoscaling workflow started", zap.Int("iterations", iterations)) - // Record workflow start metrics - RecordWorkflowStarted("autoscaling-worker-1") - ao := workflow.ActivityOptions{ ScheduleToStartTimeout: time.Minute, StartToCloseTimeout: time.Minute, @@ -32,14 +29,13 @@ func AutoscalingWorkflow(ctx workflow.Context, iterations int) error { // Execute activities in batches to create varying load for i := 0; i < iterations; i++ { - future := workflow.ExecuteActivity(ctx, LoadGenerationActivity, i) + future := workflow.ExecuteActivity(ctx, LoadGenerationActivity, i, minProcessingTime, maxProcessingTime) futures = append(futures, future) // Add some delay between batches to simulate real-world patterns // Use batch delay from configuration if i > 0 && i%10 == 0 { - batchDelay := time.Duration(config.Autoscaling.LoadGeneration.BatchDelay) * time.Second - workflow.Sleep(ctx, batchDelay) + workflow.Sleep(ctx, time.Duration(batchDelay)*time.Second) } } From a5ef6b73e1dd981e789acf681b6c971c39c73fa3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CKevin=E2=80=9D?= <“kburns2@ext.uber.com”> Date: Tue, 19 Aug 2025 15:48:04 -0700 Subject: [PATCH 05/18] Minimize the prometheus server implementation to the bare minimum --- .../prometheus_server.go | 75 +------------------ 1 file changed, 2 insertions(+), 73 deletions(-) diff --git a/cmd/samples/advanced/autoscaling-monitoring/prometheus_server.go b/cmd/samples/advanced/autoscaling-monitoring/prometheus_server.go index 1bf15dc6..185287a4 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/prometheus_server.go +++ b/cmd/samples/advanced/autoscaling-monitoring/prometheus_server.go @@ -1,65 +1,18 @@ package main import ( - "fmt" "net/http" "os" - "time" - "github.com/m3db/prometheus_client_golang/prometheus" "github.com/m3db/prometheus_client_golang/prometheus/promhttp" "github.com/uber-common/cadence-samples/cmd/samples/common" "go.uber.org/zap" ) const ( - defaultPrometheusPort = ":8004" + defaultPrometheusPort = "127.0.0.1:8004" ) -var ( - // Custom metrics for autoscaling monitoring - autoscalingWorkflowsStarted = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "autoscaling_workflows_started_total", - Help: "Total number of autoscaling workflows started", - }, - []string{"worker_id"}, - ) - - autoscalingActivitiesCompleted = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "autoscaling_activities_completed_total", - Help: "Total number of autoscaling activities completed", - }, - []string{"worker_id", "activity_type"}, - ) - - autoscalingActivityDuration = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Name: "autoscaling_activity_duration_seconds", - Help: "Duration of autoscaling activities in seconds", - Buckets: prometheus.DefBuckets, - }, - []string{"worker_id", "activity_type"}, - ) - - autoscalingWorkerLoad = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "autoscaling_worker_load", - Help: "Current load on the autoscaling worker", - }, - []string{"worker_id", "load_type"}, - ) -) - -func init() { - // Register custom metrics - prometheus.MustRegister(autoscalingWorkflowsStarted) - prometheus.MustRegister(autoscalingActivitiesCompleted) - prometheus.MustRegister(autoscalingActivityDuration) - prometheus.MustRegister(autoscalingWorkerLoad) -} - // startPrometheusHTTPServer starts an HTTP server to expose Prometheus metrics func startPrometheusHTTPServer(h *common.SampleHelper) { port := defaultPrometheusPort @@ -69,22 +22,9 @@ func startPrometheusHTTPServer(h *common.SampleHelper) { h.Logger.Info("Starting Prometheus HTTP server", zap.String("port", port)) - // Set up HTTP handlers for Prometheus metrics - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "Autoscaling Monitoring Sample - Prometheus Metrics Server\n") - fmt.Fprintf(w, "Metrics available at: /metrics\n") - fmt.Fprintf(w, "Health check at: /health\n") - fmt.Fprintf(w, "\nDashboard: http://localhost:3000/d/dehkspwgabvuoc/cadence-client\n") - }) - - // Prometheus metrics endpoint + // Prometheus metrics endpoint - exposes standard Cadence metrics http.Handle("/metrics", promhttp.Handler()) - http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - fmt.Fprintf(w, "OK\n") - }) - // Start server in goroutine go func() { if err := http.ListenAndServe(port, nil); err != nil { @@ -95,14 +35,3 @@ func startPrometheusHTTPServer(h *common.SampleHelper) { h.Logger.Info("Prometheus server started successfully", zap.String("port", port)) } - -// RecordWorkflowStarted records when a workflow is started -func RecordWorkflowStarted(workerID string) { - autoscalingWorkflowsStarted.WithLabelValues(workerID).Inc() -} - -// RecordActivityCompleted records when an activity is completed -func RecordActivityCompleted(workerID, activityType string, duration time.Duration) { - autoscalingActivitiesCompleted.WithLabelValues(workerID, activityType).Inc() - autoscalingActivityDuration.WithLabelValues(workerID, activityType).Observe(duration.Seconds()) -} From e004671eafe3cba1765113609ee2ace7aa761688 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CKevin=E2=80=9D?= <“kburns2@ext.uber.com”> Date: Tue, 19 Aug 2025 15:48:41 -0700 Subject: [PATCH 06/18] Add missing Prometheus section in the config --- cmd/samples/advanced/autoscaling-monitoring/config.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cmd/samples/advanced/autoscaling-monitoring/config.go b/cmd/samples/advanced/autoscaling-monitoring/config.go index 3eb245a3..de4802c1 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/config.go +++ b/cmd/samples/advanced/autoscaling-monitoring/config.go @@ -2,12 +2,14 @@ package main import ( "github.com/uber-common/cadence-samples/cmd/samples/common" + "github.com/uber-go/tally/prometheus" ) // AutoscalingConfiguration extends the base Configuration with autoscaling-specific settings type AutoscalingConfiguration struct { common.Configuration - Autoscaling AutoscalingSettings `yaml:"autoscaling"` + Autoscaling AutoscalingSettings `yaml:"autoscaling"` + Prometheus *prometheus.Configuration `yaml:"prometheus"` } // AutoscalingSettings contains the autoscaling configuration @@ -37,6 +39,9 @@ func DefaultAutoscalingConfiguration() AutoscalingConfiguration { ServiceName: "cadence-frontend", HostNameAndPort: "localhost:7833", }, + Prometheus: &prometheus.Configuration{ + ListenAddress: "127.0.0.1:8004", + }, Autoscaling: AutoscalingSettings{ PollerMinCount: 2, PollerMaxCount: 8, From 29524e992ffdde84f494d66af1815cfb1d7cd921 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CKevin=E2=80=9D?= <“kburns2@ext.uber.com”> Date: Tue, 19 Aug 2025 15:49:38 -0700 Subject: [PATCH 07/18] Update Readme to reflect refactored sample --- .../advanced/autoscaling-monitoring/README.md | 62 ++++++++----------- 1 file changed, 25 insertions(+), 37 deletions(-) diff --git a/cmd/samples/advanced/autoscaling-monitoring/README.md b/cmd/samples/advanced/autoscaling-monitoring/README.md index cf4fd0bb..cfeaff6f 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/README.md +++ b/cmd/samples/advanced/autoscaling-monitoring/README.md @@ -18,11 +18,10 @@ The worker uses `worker.NewV2` with `AutoScalerOptions` to enable true autoscali The worker automatically adjusts the number of poller goroutines between the min and max values based on the current workload. ### Prometheus Metrics -The sample exposes comprehensive metrics about worker performance, activity execution, and autoscaling behavior: -- `autoscaling_workflows_started_total` - Counter of workflows started -- `autoscaling_activities_completed_total` - Counter of activities completed -- `autoscaling_activity_duration_seconds` - Histogram of activity execution times -- `autoscaling_worker_load` - Gauge showing current worker load +The sample exposes standard Cadence metrics via Prometheus, including comprehensive autoscaling behavior metrics: +- Standard worker performance metrics (poller counts, task processing rates) +- Autoscaling-specific metrics (`cadence_concurrency_auto_scaler_*`) +- All metrics automatically emitted by the Cadence Go client ### Monitoring Dashboards When running the Cadence server locally with Grafana, you can access the client dashboards at: @@ -31,9 +30,9 @@ When running the Cadence server locally with Grafana, you can access the client ## Prerequisites -1. **Cadence Server**: Running locally with Docker Compose -2. **Prometheus**: Configured to scrape metrics from the sample -3. **Grafana**: With Cadence dashboards (included with default Cadence server setup) +1. **Cadence Server**: Running locally with Docker Compose. +2. **Prometheus**: Configured to scrape metrics from the sample. +3. **Grafana**: With Cadence dashboards (included with default Cadence server setup). Dashboards in the latest version of the server. ## Quick Start @@ -112,6 +111,8 @@ If no configuration file is provided or if the file cannot be read, the sample u domain: "default" service: "cadence-frontend" host: "localhost:7833" +prometheus: + listenAddress: "127.0.0.1:8004" autoscaling: pollerMinCount: 2 pollerMaxCount: 8 @@ -126,30 +127,23 @@ autoscaling: ## Monitoring ### Metrics Endpoints -- **Prometheus Metrics**: http://localhost:8004/metrics -- **Health Check**: http://localhost:8004/health -- **Status Page**: http://localhost:8004/ +- **Prometheus Metrics**: http://127.0.0.1:8004/metrics ### Grafana Dashboard Access the Cadence client dashboard at: http://localhost:3000/d/dehkspwgabvuoc/cadence-client ### Key Metrics to Monitor -1. **Worker Performance**: - - Activity execution rate - - Decision task processing rate - - Worker load levels - -2. **Autoscaling Behavior**: - - Concurrent activity execution count - - Task queue depth - - Poller utilization - - Number of active poller goroutines +1. **Worker Performance Metrics**: + - `cadence_worker_decision_poll_success_count` - Successful decision task polls + - `cadence_worker_activity_poll_success_count` - Successful activity task polls + - `cadence_worker_decision_poll_count` - Total decision task poll attempts + - `cadence_worker_activity_poll_count` - Total activity task poll attempts -3. **Custom Metrics**: - - `autoscaling_workflows_started_total` - - `autoscaling_activities_completed_total` - - `autoscaling_activity_duration_seconds` +2. **Autoscaling Behavior Metrics**: + - `cadence_worker_poller_count` - Number of active poller goroutines (key autoscaling indicator) + - `cadence_concurrency_auto_scaler_poller_quota` - Current poller quota for autoscaling + - `cadence_concurrency_auto_scaler_poller_wait_time` - Time pollers wait for tasks ## How It Works @@ -167,11 +161,11 @@ The worker uses `worker.NewV2` with `AutoScalerOptions` to: - Automatically adjust based on task queue depth and processing time ### Metrics Collection -The sample collects and exposes: -- Workflow start/complete events -- Activity execution times and counts -- Worker load simulation metrics -- Standard Cadence metrics via Tally +The sample exposes standard Cadence metrics via Prometheus: +- Worker poller count and utilization +- Decision and activity task processing rates +- Task queue depth and processing times +- All standard Cadence worker metrics via Tally ## Production Considerations @@ -181,7 +175,7 @@ The sample collects and exposes: - Use multiple worker instances for high availability ### Monitoring -- Configure Prometheus to scrape metrics regularly +- Configure Prometheus to scrape metrics regularly (latest version of Cadence server is configured to do this) - Set up alerts for worker performance issues - Use Grafana dashboards to visualize autoscaling behavior - Monitor poller count changes to verify autoscaling is working @@ -221,9 +215,3 @@ The sample collects and exposes: - Verify Grafana is running - Check dashboard URL is correct - Ensure Prometheus data source is configured - -### Debug Mode -Enable debug logging by setting the log level in your environment: -```bash -export CADENCE_LOG_LEVEL=debug -``` From 2356416a81a2c99f08650f802fbdd15d27fc2360 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CKevin=E2=80=9D?= <“kburns2@ext.uber.com”> Date: Tue, 19 Aug 2025 17:06:20 -0700 Subject: [PATCH 08/18] Add metrics scope with reporter and sanitize options --- .../advanced/autoscaling-monitoring/main.go | 74 +++++++++++++++---- 1 file changed, 59 insertions(+), 15 deletions(-) diff --git a/cmd/samples/advanced/autoscaling-monitoring/main.go b/cmd/samples/advanced/autoscaling-monitoring/main.go index 73f7f9b5..8dffe699 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/main.go +++ b/cmd/samples/advanced/autoscaling-monitoring/main.go @@ -3,6 +3,7 @@ package main import ( "flag" "fmt" + "net/http" "os" "os/signal" "syscall" @@ -14,6 +15,7 @@ import ( "github.com/uber-common/cadence-samples/cmd/samples/common" "github.com/uber-go/tally" + "github.com/uber-go/tally/prometheus" "go.uber.org/zap" ) @@ -53,9 +55,52 @@ func main() { } h.Service = service - // Set up metrics scope (noop for now, Prometheus will be set up separately) - h.WorkerMetricScope = tally.NoopScope - h.ServiceMetricScope = tally.NoopScope + // Set up metrics scope with Tally Prometheus reporter + var ( + safeCharacters = []rune{'_'} + sanitizeOptions = tally.SanitizeOptions{ + NameCharacters: tally.ValidCharacters{ + Ranges: tally.AlphanumericRange, + Characters: safeCharacters, + }, + KeyCharacters: tally.ValidCharacters{ + Ranges: tally.AlphanumericRange, + Characters: safeCharacters, + }, + ValueCharacters: tally.ValidCharacters{ + Ranges: tally.AlphanumericRange, + Characters: safeCharacters, + }, + ReplacementCharacter: tally.DefaultReplacementCharacter, + } + ) + + // Create Prometheus reporter + reporter := prometheus.NewReporter(prometheus.Options{}) + + // Create root scope with proper options + scope, closer := tally.NewRootScope(tally.ScopeOptions{ + Tags: map[string]string{"service": "autoscaling-monitoring"}, + SanitizeOptions: &sanitizeOptions, + CachedReporter: reporter, + }, 10) + defer closer.Close() + + // Set up HTTP handler for metrics endpoint + if config.Prometheus != nil { + go func() { + http.Handle("/metrics", reporter.HTTPHandler()) + logger.Info("Starting Prometheus metrics server", + zap.String("port", config.Prometheus.ListenAddress)) + if err := http.ListenAndServe(config.Prometheus.ListenAddress, nil); err != nil { + logger.Error("Failed to start metrics server", zap.Error(err)) + } + }() + } + + // Set up metrics scope for helper + h.WorkerMetricScope = scope + h.ServiceMetricScope = scope switch mode { case "worker": @@ -63,7 +108,17 @@ func main() { case "trigger": startWorkflow(&h, &config) case "server": - startPrometheusServer(&h) + // Server mode - just block until interrupted + // Metrics are automatically exposed when running worker mode + fmt.Println("Server mode - metrics are automatically exposed when running worker mode") + fmt.Println("Access metrics at: http://127.0.0.1:8004/metrics") + fmt.Println("Press Ctrl+C to stop...") + + // Block until interrupted + done := make(chan os.Signal, 1) + signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) + <-done + fmt.Println("Shutting down server...") default: fmt.Printf("Unknown mode: %s\n", mode) os.Exit(1) @@ -124,14 +179,3 @@ func startWorkflow(h *common.SampleHelper, config *AutoscalingConfiguration) { fmt.Println("Monitor the worker performance and autoscaling behavior in Grafana:") fmt.Println("http://localhost:3000/d/dehkspwgabvuoc/cadence-client") } - -func startPrometheusServer(h *common.SampleHelper) { - startPrometheusHTTPServer(h) - - // Block until interrupted - done := make(chan os.Signal, 1) - signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) - fmt.Println("Prometheus server started. Press Ctrl+C to stop...") - <-done - fmt.Println("Shutting down Prometheus server...") -} From 1066542458b10d127a3e01b64c8c01e3f4349952 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CKevin=E2=80=9D?= <“kburns2@ext.uber.com”> Date: Tue, 19 Aug 2025 17:06:59 -0700 Subject: [PATCH 09/18] Remove prometheus server file --- .../prometheus_server.go | 37 ------------------- 1 file changed, 37 deletions(-) delete mode 100644 cmd/samples/advanced/autoscaling-monitoring/prometheus_server.go diff --git a/cmd/samples/advanced/autoscaling-monitoring/prometheus_server.go b/cmd/samples/advanced/autoscaling-monitoring/prometheus_server.go deleted file mode 100644 index 185287a4..00000000 --- a/cmd/samples/advanced/autoscaling-monitoring/prometheus_server.go +++ /dev/null @@ -1,37 +0,0 @@ -package main - -import ( - "net/http" - "os" - - "github.com/m3db/prometheus_client_golang/prometheus/promhttp" - "github.com/uber-common/cadence-samples/cmd/samples/common" - "go.uber.org/zap" -) - -const ( - defaultPrometheusPort = "127.0.0.1:8004" -) - -// startPrometheusHTTPServer starts an HTTP server to expose Prometheus metrics -func startPrometheusHTTPServer(h *common.SampleHelper) { - port := defaultPrometheusPort - if h.Config.Prometheus != nil && h.Config.Prometheus.ListenAddress != "" { - port = h.Config.Prometheus.ListenAddress - } - - h.Logger.Info("Starting Prometheus HTTP server", zap.String("port", port)) - - // Prometheus metrics endpoint - exposes standard Cadence metrics - http.Handle("/metrics", promhttp.Handler()) - - // Start server in goroutine - go func() { - if err := http.ListenAndServe(port, nil); err != nil { - h.Logger.Error("Failed to start Prometheus server", zap.Error(err)) - os.Exit(1) - } - }() - - h.Logger.Info("Prometheus server started successfully", zap.String("port", port)) -} From d2bfad2bc28bc367e938323821496525fdb9f20a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CKevin=E2=80=9D?= <“kburns2@ext.uber.com”> Date: Tue, 19 Aug 2025 17:08:30 -0700 Subject: [PATCH 10/18] Update Readme after refactor --- .../advanced/autoscaling-monitoring/README.md | 42 ++++++++++++------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/cmd/samples/advanced/autoscaling-monitoring/README.md b/cmd/samples/advanced/autoscaling-monitoring/README.md index cfeaff6f..42087804 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/README.md +++ b/cmd/samples/advanced/autoscaling-monitoring/README.md @@ -3,8 +3,8 @@ This sample demonstrates three advanced Cadence worker features: 1. **Worker Poller Autoscaling** - Dynamic adjustment of worker poller goroutines based on workload -2. **Prometheus Tally Reporter** - Metrics collection using Tally and Prometheus -3. **HTTP Endpoint for Prometheus Scraping** - Exposing metrics for monitoring +2. **Integrated Prometheus Metrics** - Real-time metrics collection using Tally with Prometheus reporter +3. **Autoscaling Metrics** - Comprehensive autoscaling behavior metrics exposed via HTTP endpoint ## Features @@ -18,10 +18,11 @@ The worker uses `worker.NewV2` with `AutoScalerOptions` to enable true autoscali The worker automatically adjusts the number of poller goroutines between the min and max values based on the current workload. ### Prometheus Metrics -The sample exposes standard Cadence metrics via Prometheus, including comprehensive autoscaling behavior metrics: -- Standard worker performance metrics (poller counts, task processing rates) -- Autoscaling-specific metrics (`cadence_concurrency_auto_scaler_*`) -- All metrics automatically emitted by the Cadence Go client +The sample uses Tally with Prometheus reporter to expose comprehensive metrics: +- **Real-time autoscaling metrics** - Poller count changes, quota adjustments, wait times +- **Worker performance metrics** - Task processing rates, poller utilization, queue depths +- **Standard Cadence metrics** - All metrics automatically emitted by the Cadence Go client +- **Sanitized metric names** - Prometheus-compatible metric names and labels ### Monitoring Dashboards When running the Cadence server locally with Grafana, you can access the client dashboards at: @@ -41,7 +42,10 @@ When running the Cadence server locally with Grafana, you can access the client ./bin/autoscaling-monitoring -m worker ``` -### 2. Start the Prometheus Server (Optional) +The worker automatically exposes metrics at: http://127.0.0.1:8004/metrics + +### 2. Access Metrics (Optional) +For dedicated metrics server: ```bash ./bin/autoscaling-monitoring -m server ``` @@ -100,8 +104,8 @@ The configuration values are used throughout the sample: 3. **Activity Configuration** (`activities.go`): - `minProcessingTime`, `maxProcessingTime` → Activity processing time range -4. **Prometheus Configuration** (`prometheus_server.go`): - - `listenAddress` → HTTP server port +4. **Prometheus Configuration** (integrated): + - `listenAddress` → Metrics endpoint port (default: 127.0.0.1:8004) ### Default Configuration @@ -128,6 +132,9 @@ autoscaling: ### Metrics Endpoints - **Prometheus Metrics**: http://127.0.0.1:8004/metrics + - Exposed automatically when running worker or server mode + - Real-time autoscaling and worker performance metrics + - Prometheus-compatible format with sanitized names ### Grafana Dashboard Access the Cadence client dashboard at: http://localhost:3000/d/dehkspwgabvuoc/cadence-client @@ -144,6 +151,8 @@ Access the Cadence client dashboard at: http://localhost:3000/d/dehkspwgabvuoc/c - `cadence_worker_poller_count` - Number of active poller goroutines (key autoscaling indicator) - `cadence_concurrency_auto_scaler_poller_quota` - Current poller quota for autoscaling - `cadence_concurrency_auto_scaler_poller_wait_time` - Time pollers wait for tasks + - `cadence_concurrency_auto_scaler_scale_up_count` - Number of scale-up events + - `cadence_concurrency_auto_scaler_scale_down_count` - Number of scale-down events ## How It Works @@ -161,11 +170,11 @@ The worker uses `worker.NewV2` with `AutoScalerOptions` to: - Automatically adjust based on task queue depth and processing time ### Metrics Collection -The sample exposes standard Cadence metrics via Prometheus: -- Worker poller count and utilization -- Decision and activity task processing rates -- Task queue depth and processing times -- All standard Cadence worker metrics via Tally +The sample uses Tally with Prometheus reporter for comprehensive metrics: +- **Real-time autoscaling metrics** - Poller count changes, quota adjustments, scale events +- **Worker performance metrics** - Task processing rates, poller utilization, queue depths +- **Standard Cadence metrics** - All metrics automatically emitted by the Cadence Go client +- **Sanitized metric names** - Prometheus-compatible format with proper character replacement ## Production Considerations @@ -207,9 +216,10 @@ The sample exposes standard Cadence metrics via Prometheus: - Review default values if config file is not found 4. **Metrics Not Appearing**: - - Verify Prometheus server is running - - Check metrics endpoint is accessible + - Verify worker is running (metrics are exposed automatically) + - Check metrics endpoint is accessible: http://127.0.0.1:8004/metrics - Ensure Prometheus is configured to scrape the endpoint + - Check for metric name sanitization issues 5. **Dashboard Not Loading**: - Verify Grafana is running From dc8fbe7b1b430c17881f9daf90ae3687ddee4b7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CKevin=E2=80=9D?= <“kburns2@ext.uber.com”> Date: Tue, 19 Aug 2025 17:28:25 -0700 Subject: [PATCH 11/18] Remove prom server option --- cmd/samples/advanced/autoscaling-monitoring/Makefile | 3 --- .../advanced/autoscaling-monitoring/README.md | 8 +------- cmd/samples/advanced/autoscaling-monitoring/main.go | 12 ------------ 3 files changed, 1 insertion(+), 22 deletions(-) diff --git a/cmd/samples/advanced/autoscaling-monitoring/Makefile b/cmd/samples/advanced/autoscaling-monitoring/Makefile index d4f2dbe3..4527b608 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/Makefile +++ b/cmd/samples/advanced/autoscaling-monitoring/Makefile @@ -22,8 +22,5 @@ deps: run-worker: build ../../../../bin/autoscaling-monitoring -m worker -run-server: build - ../../../../bin/autoscaling-monitoring -m server - run-trigger: build ../../../../bin/autoscaling-monitoring -m trigger diff --git a/cmd/samples/advanced/autoscaling-monitoring/README.md b/cmd/samples/advanced/autoscaling-monitoring/README.md index 42087804..eafe0abe 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/README.md +++ b/cmd/samples/advanced/autoscaling-monitoring/README.md @@ -44,13 +44,7 @@ When running the Cadence server locally with Grafana, you can access the client The worker automatically exposes metrics at: http://127.0.0.1:8004/metrics -### 2. Access Metrics (Optional) -For dedicated metrics server: -```bash -./bin/autoscaling-monitoring -m server -``` - -### 3. Generate Load +### 2. Generate Load ```bash ./bin/autoscaling-monitoring -m trigger ``` diff --git a/cmd/samples/advanced/autoscaling-monitoring/main.go b/cmd/samples/advanced/autoscaling-monitoring/main.go index 8dffe699..6bc40871 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/main.go +++ b/cmd/samples/advanced/autoscaling-monitoring/main.go @@ -107,18 +107,6 @@ func main() { startWorkers(&h, &config) case "trigger": startWorkflow(&h, &config) - case "server": - // Server mode - just block until interrupted - // Metrics are automatically exposed when running worker mode - fmt.Println("Server mode - metrics are automatically exposed when running worker mode") - fmt.Println("Access metrics at: http://127.0.0.1:8004/metrics") - fmt.Println("Press Ctrl+C to stop...") - - // Block until interrupted - done := make(chan os.Signal, 1) - signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) - <-done - fmt.Println("Shutting down server...") default: fmt.Printf("Unknown mode: %s\n", mode) os.Exit(1) From cd80ffa14c300a599031095475f4ecae56662692 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CKevin=E2=80=9D?= <“kburns2@ext.uber.com”> Date: Tue, 19 Aug 2025 17:29:21 -0700 Subject: [PATCH 12/18] Remove unused imports --- cmd/samples/advanced/autoscaling-monitoring/main.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/cmd/samples/advanced/autoscaling-monitoring/main.go b/cmd/samples/advanced/autoscaling-monitoring/main.go index 6bc40871..2fe67280 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/main.go +++ b/cmd/samples/advanced/autoscaling-monitoring/main.go @@ -5,8 +5,6 @@ import ( "fmt" "net/http" "os" - "os/signal" - "syscall" "time" "github.com/pborman/uuid" From da7afbe632191c0e7880b5f5c213db2f714cf18e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CKevin=E2=80=9D?= <“kburns2@ext.uber.com”> Date: Wed, 20 Aug 2025 14:08:30 -0700 Subject: [PATCH 13/18] Add more load generating options to config and tests --- .../advanced/autoscaling-monitoring/README.md | 105 +++++- .../advanced/autoscaling-monitoring/config.go | 161 +++++++-- .../config/autoscaling.yaml | 10 +- .../autoscaling-monitoring/config_test.go | 305 ++++++++++++++++++ .../advanced/autoscaling-monitoring/main.go | 83 ++--- .../autoscaling-monitoring/workflow.go | 6 +- 6 files changed, 578 insertions(+), 92 deletions(-) create mode 100644 cmd/samples/advanced/autoscaling-monitoring/config_test.go diff --git a/cmd/samples/advanced/autoscaling-monitoring/README.md b/cmd/samples/advanced/autoscaling-monitoring/README.md index eafe0abe..8118b52c 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/README.md +++ b/cmd/samples/advanced/autoscaling-monitoring/README.md @@ -78,10 +78,17 @@ autoscaling: # Load generation settings loadGeneration: - iterations: 50 # Number of activities to execute - batchDelay: 2 # Delay between batches (seconds) - minProcessingTime: 1000 # Min activity time (ms) - maxProcessingTime: 6000 # Max activity time (ms) + # Workflow-level settings + workflows: 3 # Number of workflows to start + workflowDelay: 2 # Delay between starting workflows (seconds) + + # Activity-level settings (per workflow) + activitiesPerWorkflow: 40 # Number of activities per workflow + batchDelay: 2 # Delay between activity batches within workflow (seconds) + + # Activity processing time range (milliseconds) + minProcessingTime: 1000 + maxProcessingTime: 6000 ``` ### Configuration Usage @@ -92,8 +99,8 @@ The configuration values are used throughout the sample: - `pollerMinCount`, `pollerMaxCount`, `pollerInitCount` → `AutoScalerOptions` 2. **Workflow Configuration** (`workflow.go`): - - `iterations` → Number of activities to execute - - `batchDelay` → Delay between activity batches + - `activitiesPerWorkflow` → Number of activities to execute per workflow + - `batchDelay` → Delay between activity batches within workflow 3. **Activity Configuration** (`activities.go`): - `minProcessingTime`, `maxProcessingTime` → Activity processing time range @@ -116,19 +123,62 @@ autoscaling: pollerMaxCount: 8 pollerInitCount: 4 loadGeneration: - iterations: 50 + workflows: 3 + workflowDelay: 2 + activitiesPerWorkflow: 40 batchDelay: 2 minProcessingTime: 1000 maxProcessingTime: 6000 ``` +### Load Pattern Examples + +The sample supports various load patterns for testing autoscaling behavior: + +#### **1. Gradual Ramp-up (Default)** +```yaml +loadGeneration: + workflows: 3 + workflowDelay: 2 + activitiesPerWorkflow: 40 +``` +**Result**: 3 workflows starting 2 seconds apart, each with 40 activities (120 total activities) + +#### **2. Burst Load** +```yaml +loadGeneration: + workflows: 10 + workflowDelay: 0 + activitiesPerWorkflow: 20 +``` +**Result**: 10 workflows all starting immediately (200 total activities) + +#### **3. Sustained Load** +```yaml +loadGeneration: + workflows: 5 + workflowDelay: 5 + activitiesPerWorkflow: 100 +``` +**Result**: 5 long-running workflows with 5-second delays between starts (500 total activities) + +#### **4. Light Load** +```yaml +loadGeneration: + workflows: 1 + workflowDelay: 0 + activitiesPerWorkflow: 20 +``` +**Result**: Single workflow with 20 activities for minimal load testing + ## Monitoring ### Metrics Endpoints - **Prometheus Metrics**: http://127.0.0.1:8004/metrics - - Exposed automatically when running worker or server mode + - Exposed automatically when running worker mode only - Real-time autoscaling and worker performance metrics - Prometheus-compatible format with sanitized names + - **Note**: Metrics server is not started in trigger mode ### Grafana Dashboard Access the Cadence client dashboard at: http://localhost:3000/d/dehkspwgabvuoc/cadence-client @@ -151,10 +201,12 @@ Access the Cadence client dashboard at: http://localhost:3000/d/dehkspwgabvuoc/c ## How It Works ### Load Generation -The sample creates a workflow that executes activities in parallel, with each activity: -- Taking 1-6 seconds to complete (configurable via `minProcessingTime`/`maxProcessingTime`) +The sample creates multiple workflows that execute activities in parallel, with each workflow: +- Starting with configurable delays (`workflowDelay`) to create sustained load patterns +- Executing a configurable number of activities (`activitiesPerWorkflow`) per workflow +- Each activity taking 1-6 seconds to complete (configurable via `minProcessingTime`/`maxProcessingTime`) - Recording metrics about execution time -- Creating varying load patterns with configurable batch delays +- Creating varying load patterns with configurable batch delays within each workflow ### Autoscaling Demonstration The worker uses `worker.NewV2` with `AutoScalerOptions` to: @@ -188,6 +240,37 @@ The sample uses Tally with Prometheus reporter for comprehensive metrics: - Use authentication for metrics access - Consider using HTTPS for metrics endpoints +## Testing + +The sample includes comprehensive unit tests for the configuration loading functionality: + +### Running Tests +```bash +# Run all tests +go test -v + +# Run specific test +go test -v -run TestLoadConfiguration_SuccessfulLoading + +# Run tests with coverage +go test -v -cover +``` + +### Test Coverage +The tests cover: +- **Successful configuration loading** - Complete YAML files with all fields +- **Missing file fallback** - Graceful handling when config file doesn't exist +- **Partial configuration** - YAML files with only some fields specified +- **Malformed YAML handling** - Invalid YAML syntax and field types +- **Default value application** - Ensuring all fields have sensible defaults + +### Configuration Testing +The tests validate that the improved configuration system: +- Handles embedded struct issues properly +- Applies defaults correctly for missing fields +- Provides clear error messages for configuration problems +- Maintains backward compatibility + ## Troubleshooting ### Common Issues diff --git a/cmd/samples/advanced/autoscaling-monitoring/config.go b/cmd/samples/advanced/autoscaling-monitoring/config.go index de4802c1..92f0d114 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/config.go +++ b/cmd/samples/advanced/autoscaling-monitoring/config.go @@ -1,15 +1,24 @@ package main import ( + "fmt" + "os" + "github.com/uber-common/cadence-samples/cmd/samples/common" "github.com/uber-go/tally/prometheus" + "gopkg.in/yaml.v3" ) -// AutoscalingConfiguration extends the base Configuration with autoscaling-specific settings +// AutoscalingConfiguration uses a flattened structure to avoid embedded struct issues type AutoscalingConfiguration struct { - common.Configuration - Autoscaling AutoscalingSettings `yaml:"autoscaling"` - Prometheus *prometheus.Configuration `yaml:"prometheus"` + // Base configuration fields (explicit, not embedded) + DomainName string `yaml:"domain"` + ServiceName string `yaml:"service"` + HostNameAndPort string `yaml:"host"` + Prometheus *prometheus.Configuration `yaml:"prometheus"` + + // Autoscaling-specific fields + Autoscaling AutoscalingSettings `yaml:"autoscaling"` } // AutoscalingSettings contains the autoscaling configuration @@ -25,33 +34,141 @@ type AutoscalingSettings struct { // LoadGenerationSettings contains the load generation configuration type LoadGenerationSettings struct { - Iterations int `yaml:"iterations"` - BatchDelay int `yaml:"batchDelay"` - MinProcessingTime int `yaml:"minProcessingTime"` - MaxProcessingTime int `yaml:"maxProcessingTime"` + // Workflow-level settings + Workflows int `yaml:"workflows"` + WorkflowDelay int `yaml:"workflowDelay"` + + // Activity-level settings (per workflow) + ActivitiesPerWorkflow int `yaml:"activitiesPerWorkflow"` + BatchDelay int `yaml:"batchDelay"` + MinProcessingTime int `yaml:"minProcessingTime"` + MaxProcessingTime int `yaml:"maxProcessingTime"` } -// DefaultAutoscalingConfiguration returns default autoscaling settings +// Default values as constants for easy maintenance +const ( + DefaultDomainName = "default" + DefaultServiceName = "cadence-frontend" + DefaultHostNameAndPort = "localhost:7833" + DefaultPrometheusAddr = "127.0.0.1:8004" + + DefaultPollerMinCount = 2 + DefaultPollerMaxCount = 8 + DefaultPollerInitCount = 4 + + DefaultWorkflows = 3 + DefaultWorkflowDelay = 2 + DefaultActivitiesPerWorkflow = 40 + DefaultBatchDelay = 2 + DefaultMinProcessingTime = 1000 + DefaultMaxProcessingTime = 6000 +) + +// DefaultAutoscalingConfiguration returns default configuration func DefaultAutoscalingConfiguration() AutoscalingConfiguration { return AutoscalingConfiguration{ - Configuration: common.Configuration{ - DomainName: "default", - ServiceName: "cadence-frontend", - HostNameAndPort: "localhost:7833", - }, + DomainName: DefaultDomainName, + ServiceName: DefaultServiceName, + HostNameAndPort: DefaultHostNameAndPort, Prometheus: &prometheus.Configuration{ - ListenAddress: "127.0.0.1:8004", + ListenAddress: DefaultPrometheusAddr, }, Autoscaling: AutoscalingSettings{ - PollerMinCount: 2, - PollerMaxCount: 8, - PollerInitCount: 4, + PollerMinCount: DefaultPollerMinCount, + PollerMaxCount: DefaultPollerMaxCount, + PollerInitCount: DefaultPollerInitCount, LoadGeneration: LoadGenerationSettings{ - Iterations: 50, - BatchDelay: 2, - MinProcessingTime: 1000, - MaxProcessingTime: 6000, + Workflows: DefaultWorkflows, + WorkflowDelay: DefaultWorkflowDelay, + ActivitiesPerWorkflow: DefaultActivitiesPerWorkflow, + BatchDelay: DefaultBatchDelay, + MinProcessingTime: DefaultMinProcessingTime, + MaxProcessingTime: DefaultMaxProcessingTime, }, }, } } + +// loadConfiguration loads the autoscaling configuration from file +func loadConfiguration(configFile string) AutoscalingConfiguration { + // Start with defaults + config := DefaultAutoscalingConfiguration() + + // Read config file + configData, err := os.ReadFile(configFile) + if err != nil { + fmt.Printf("Failed to read config file: %v, using defaults\n", err) + return config + } + + // Unmarshal into the config struct + if err := yaml.Unmarshal(configData, &config); err != nil { + fmt.Printf("Error parsing configuration: %v, using defaults\n", err) + return DefaultAutoscalingConfiguration() + } + + // Apply defaults for any missing fields + config.applyDefaults() + + return config +} + +// applyDefaults ensures all fields have sensible values +func (c *AutoscalingConfiguration) applyDefaults() { + // Base configuration defaults + if c.DomainName == "" { + c.DomainName = DefaultDomainName + } + if c.ServiceName == "" { + c.ServiceName = DefaultServiceName + } + if c.HostNameAndPort == "" { + c.HostNameAndPort = DefaultHostNameAndPort + } + if c.Prometheus == nil { + c.Prometheus = &prometheus.Configuration{ + ListenAddress: DefaultPrometheusAddr, + } + } + + // Autoscaling defaults + if c.Autoscaling.PollerMinCount == 0 { + c.Autoscaling.PollerMinCount = DefaultPollerMinCount + } + if c.Autoscaling.PollerMaxCount == 0 { + c.Autoscaling.PollerMaxCount = DefaultPollerMaxCount + } + if c.Autoscaling.PollerInitCount == 0 { + c.Autoscaling.PollerInitCount = DefaultPollerInitCount + } + + // Load generation defaults + if c.Autoscaling.LoadGeneration.Workflows == 0 { + c.Autoscaling.LoadGeneration.Workflows = DefaultWorkflows + } + if c.Autoscaling.LoadGeneration.WorkflowDelay == 0 { + c.Autoscaling.LoadGeneration.WorkflowDelay = DefaultWorkflowDelay + } + if c.Autoscaling.LoadGeneration.ActivitiesPerWorkflow == 0 { + c.Autoscaling.LoadGeneration.ActivitiesPerWorkflow = DefaultActivitiesPerWorkflow + } + if c.Autoscaling.LoadGeneration.BatchDelay == 0 { + c.Autoscaling.LoadGeneration.BatchDelay = DefaultBatchDelay + } + if c.Autoscaling.LoadGeneration.MinProcessingTime == 0 { + c.Autoscaling.LoadGeneration.MinProcessingTime = DefaultMinProcessingTime + } + if c.Autoscaling.LoadGeneration.MaxProcessingTime == 0 { + c.Autoscaling.LoadGeneration.MaxProcessingTime = DefaultMaxProcessingTime + } +} + +// ToCommonConfiguration converts to the common.Configuration type for compatibility +func (c *AutoscalingConfiguration) ToCommonConfiguration() common.Configuration { + return common.Configuration{ + DomainName: c.DomainName, + ServiceName: c.ServiceName, + HostNameAndPort: c.HostNameAndPort, + Prometheus: c.Prometheus, + } +} diff --git a/cmd/samples/advanced/autoscaling-monitoring/config/autoscaling.yaml b/cmd/samples/advanced/autoscaling-monitoring/config/autoscaling.yaml index 9c5b1670..d5910a5a 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/config/autoscaling.yaml +++ b/cmd/samples/advanced/autoscaling-monitoring/config/autoscaling.yaml @@ -17,11 +17,13 @@ autoscaling: # Worker load simulation settings loadGeneration: - # Number of iterations in the autoscaling workflow - iterations: 50 + # Workflow-level settings + workflows: 3 # Number of workflows to start + workflowDelay: 2 # Delay between starting workflows (seconds) - # Delay between activity batches (seconds) - batchDelay: 2 + # Activity-level settings (per workflow) + activitiesPerWorkflow: 40 # Number of activities per workflow + batchDelay: 2 # Delay between activity batches within workflow (seconds) # Activity processing time range (milliseconds) minProcessingTime: 1000 diff --git a/cmd/samples/advanced/autoscaling-monitoring/config_test.go b/cmd/samples/advanced/autoscaling-monitoring/config_test.go new file mode 100644 index 00000000..569b85ba --- /dev/null +++ b/cmd/samples/advanced/autoscaling-monitoring/config_test.go @@ -0,0 +1,305 @@ +package main + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Test the improved configuration loader for regressions +func TestLoadConfiguration_SuccessfulLoading(t *testing.T) { + // Create a temporary configuration file with all fields populated + configContent := ` +domain: "test-domain" +service: "test-service" +host: "test-host:7833" +prometheus: + listenAddress: "127.0.0.1:9000" +autoscaling: + pollerMinCount: 3 + pollerMaxCount: 10 + pollerInitCount: 5 + loadGeneration: + workflows: 5 + workflowDelay: 3 + activitiesPerWorkflow: 100 + batchDelay: 5 + minProcessingTime: 2000 + maxProcessingTime: 8000 +` + + // Create temporary file + tmpFile, err := os.CreateTemp("", "test-config-*.yaml") + require.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + _, err = tmpFile.WriteString(configContent) + require.NoError(t, err) + tmpFile.Close() + + // Load configuration + config := loadConfiguration(tmpFile.Name()) + + // Validate all fields are populated correctly + assert.Equal(t, "test-domain", config.DomainName) + assert.Equal(t, "test-service", config.ServiceName) + assert.Equal(t, "test-host:7833", config.HostNameAndPort) + require.NotNil(t, config.Prometheus) + assert.Equal(t, "127.0.0.1:9000", config.Prometheus.ListenAddress) + assert.Equal(t, 3, config.Autoscaling.PollerMinCount) + assert.Equal(t, 10, config.Autoscaling.PollerMaxCount) + assert.Equal(t, 5, config.Autoscaling.PollerInitCount) + assert.Equal(t, 5, config.Autoscaling.LoadGeneration.Workflows) + assert.Equal(t, 3, config.Autoscaling.LoadGeneration.WorkflowDelay) + assert.Equal(t, 100, config.Autoscaling.LoadGeneration.ActivitiesPerWorkflow) + assert.Equal(t, 5, config.Autoscaling.LoadGeneration.BatchDelay) + assert.Equal(t, 2000, config.Autoscaling.LoadGeneration.MinProcessingTime) + assert.Equal(t, 8000, config.Autoscaling.LoadGeneration.MaxProcessingTime) +} + +func TestLoadConfiguration_MissingFileFallback(t *testing.T) { + // Use a non-existent file path + config := loadConfiguration("/non/existent/path/config.yaml") + + // Validate that default configuration is returned + assert.Equal(t, DefaultDomainName, config.DomainName) + assert.Equal(t, DefaultServiceName, config.ServiceName) + assert.Equal(t, DefaultHostNameAndPort, config.HostNameAndPort) + assert.Equal(t, DefaultPollerMinCount, config.Autoscaling.PollerMinCount) + assert.Equal(t, DefaultPollerMaxCount, config.Autoscaling.PollerMaxCount) + assert.Equal(t, DefaultPollerInitCount, config.Autoscaling.PollerInitCount) + assert.Equal(t, DefaultWorkflows, config.Autoscaling.LoadGeneration.Workflows) + assert.Equal(t, DefaultWorkflowDelay, config.Autoscaling.LoadGeneration.WorkflowDelay) + assert.Equal(t, DefaultActivitiesPerWorkflow, config.Autoscaling.LoadGeneration.ActivitiesPerWorkflow) + assert.Equal(t, DefaultBatchDelay, config.Autoscaling.LoadGeneration.BatchDelay) + assert.Equal(t, DefaultMinProcessingTime, config.Autoscaling.LoadGeneration.MinProcessingTime) + assert.Equal(t, DefaultMaxProcessingTime, config.Autoscaling.LoadGeneration.MaxProcessingTime) +} + +func TestLoadConfiguration_PartialConfiguration(t *testing.T) { + testCases := []struct { + name string + configContent string + expectedDomain string + expectedService string + expectedHost string + expectedPollerMin int + expectedPollerMax int + expectedPollerInit int + description string + }{ + { + name: "Only domain specified", + configContent: ` +domain: "custom-domain" +`, + expectedDomain: "custom-domain", + expectedService: DefaultServiceName, + expectedHost: DefaultHostNameAndPort, + expectedPollerMin: DefaultPollerMinCount, + expectedPollerMax: DefaultPollerMaxCount, + expectedPollerInit: DefaultPollerInitCount, + description: "Only domain field provided", + }, + { + name: "Only autoscaling settings specified", + configContent: ` +autoscaling: + pollerMinCount: 5 + pollerMaxCount: 15 + pollerInitCount: 8 +`, + expectedDomain: DefaultDomainName, + expectedService: DefaultServiceName, + expectedHost: DefaultHostNameAndPort, + expectedPollerMin: 5, + expectedPollerMax: 15, + expectedPollerInit: 8, + description: "Only autoscaling fields provided", + }, + { + name: "Mixed configuration", + configContent: ` +domain: "mixed-domain" +service: "mixed-service" +autoscaling: + pollerMinCount: 1 + loadGeneration: + workflows: 5 + activitiesPerWorkflow: 75 + batchDelay: 3 +`, + expectedDomain: "mixed-domain", + expectedService: "mixed-service", + expectedHost: DefaultHostNameAndPort, + expectedPollerMin: 1, + expectedPollerMax: DefaultPollerMaxCount, + expectedPollerInit: DefaultPollerInitCount, + description: "Mix of provided and default values", + }, + { + name: "Empty file", + configContent: "", + expectedDomain: DefaultDomainName, + expectedService: DefaultServiceName, + expectedHost: DefaultHostNameAndPort, + expectedPollerMin: DefaultPollerMinCount, + expectedPollerMax: DefaultPollerMaxCount, + expectedPollerInit: DefaultPollerInitCount, + description: "Empty YAML file", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create temporary file + tmpFile, err := os.CreateTemp("", "test-config-*.yaml") + require.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + _, err = tmpFile.WriteString(tc.configContent) + require.NoError(t, err) + tmpFile.Close() + + // Load configuration + config := loadConfiguration(tmpFile.Name()) + + // Validate expected values + assert.Equal(t, tc.expectedDomain, config.DomainName) + assert.Equal(t, tc.expectedService, config.ServiceName) + assert.Equal(t, tc.expectedHost, config.HostNameAndPort) + assert.Equal(t, tc.expectedPollerMin, config.Autoscaling.PollerMinCount) + assert.Equal(t, tc.expectedPollerMax, config.Autoscaling.PollerMaxCount) + assert.Equal(t, tc.expectedPollerInit, config.Autoscaling.PollerInitCount) + + // Validate that other fields have appropriate defaults + if tc.configContent == "" || tc.configContent == ` +domain: "custom-domain" +` { + // Should have all defaults for load generation + assert.Equal(t, DefaultWorkflows, config.Autoscaling.LoadGeneration.Workflows) + assert.Equal(t, DefaultWorkflowDelay, config.Autoscaling.LoadGeneration.WorkflowDelay) + assert.Equal(t, DefaultActivitiesPerWorkflow, config.Autoscaling.LoadGeneration.ActivitiesPerWorkflow) + assert.Equal(t, DefaultBatchDelay, config.Autoscaling.LoadGeneration.BatchDelay) + assert.Equal(t, DefaultMinProcessingTime, config.Autoscaling.LoadGeneration.MinProcessingTime) + assert.Equal(t, DefaultMaxProcessingTime, config.Autoscaling.LoadGeneration.MaxProcessingTime) + } + }) + } +} + +func TestLoadConfiguration_MalformedYAML(t *testing.T) { + testCases := []struct { + name string + configContent string + description string + }{ + { + name: "Invalid YAML syntax", + configContent: ` +domain: "test-domain" +service: "test-service" +host: "test-host:7833" +autoscaling: + pollerMinCount: 3 + pollerMaxCount: 10 + pollerInitCount: 5 + loadGeneration: + workflows: 5 + activitiesPerWorkflow: 100 + batchDelay: 5 + minProcessingTime: 2000 + maxProcessingTime: 8000 +invalid: yaml: syntax: here +`, + description: "YAML with syntax error", + }, + { + name: "Invalid field types", + configContent: ` +domain: "test-domain" +service: "test-service" +host: "test-host:7833" +autoscaling: + pollerMinCount: "not-a-number" + pollerMaxCount: 10 + pollerInitCount: 5 + loadGeneration: + workflows: 5 + activitiesPerWorkflow: 100 + batchDelay: 5 + minProcessingTime: 2000 + maxProcessingTime: 8000 +`, + description: "YAML with invalid field types", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create temporary file + tmpFile, err := os.CreateTemp("", "test-config-*.yaml") + require.NoError(t, err) + defer os.Remove(tmpFile.Name()) + + _, err = tmpFile.WriteString(tc.configContent) + require.NoError(t, err) + tmpFile.Close() + + // Load configuration - should not panic and should return defaults + config := loadConfiguration(tmpFile.Name()) + + // Validate that default configuration is returned + assert.Equal(t, DefaultDomainName, config.DomainName) + assert.Equal(t, DefaultServiceName, config.ServiceName) + assert.Equal(t, DefaultHostNameAndPort, config.HostNameAndPort) + assert.Equal(t, DefaultPollerMinCount, config.Autoscaling.PollerMinCount) + assert.Equal(t, DefaultPollerMaxCount, config.Autoscaling.PollerMaxCount) + assert.Equal(t, DefaultPollerInitCount, config.Autoscaling.PollerInitCount) + }) + } +} + +func TestDefaultAutoscalingConfiguration(t *testing.T) { + config := DefaultAutoscalingConfiguration() + + // Validate all default values + assert.Equal(t, DefaultDomainName, config.DomainName) + assert.Equal(t, DefaultServiceName, config.ServiceName) + assert.Equal(t, DefaultHostNameAndPort, config.HostNameAndPort) + require.NotNil(t, config.Prometheus) + assert.Equal(t, DefaultPrometheusAddr, config.Prometheus.ListenAddress) + assert.Equal(t, DefaultPollerMinCount, config.Autoscaling.PollerMinCount) + assert.Equal(t, DefaultPollerMaxCount, config.Autoscaling.PollerMaxCount) + assert.Equal(t, DefaultPollerInitCount, config.Autoscaling.PollerInitCount) + assert.Equal(t, DefaultWorkflows, config.Autoscaling.LoadGeneration.Workflows) + assert.Equal(t, DefaultWorkflowDelay, config.Autoscaling.LoadGeneration.WorkflowDelay) + assert.Equal(t, DefaultActivitiesPerWorkflow, config.Autoscaling.LoadGeneration.ActivitiesPerWorkflow) + assert.Equal(t, DefaultBatchDelay, config.Autoscaling.LoadGeneration.BatchDelay) + assert.Equal(t, DefaultMinProcessingTime, config.Autoscaling.LoadGeneration.MinProcessingTime) + assert.Equal(t, DefaultMaxProcessingTime, config.Autoscaling.LoadGeneration.MaxProcessingTime) +} + +func TestApplyDefaults(t *testing.T) { + // Test with empty configuration + config := AutoscalingConfiguration{} + config.applyDefaults() + + // Validate that all defaults are applied + assert.Equal(t, DefaultDomainName, config.DomainName) + assert.Equal(t, DefaultServiceName, config.ServiceName) + assert.Equal(t, DefaultHostNameAndPort, config.HostNameAndPort) + require.NotNil(t, config.Prometheus) + assert.Equal(t, DefaultPrometheusAddr, config.Prometheus.ListenAddress) + assert.Equal(t, DefaultPollerMinCount, config.Autoscaling.PollerMinCount) + assert.Equal(t, DefaultPollerMaxCount, config.Autoscaling.PollerMaxCount) + assert.Equal(t, DefaultPollerInitCount, config.Autoscaling.PollerInitCount) + assert.Equal(t, DefaultWorkflows, config.Autoscaling.LoadGeneration.Workflows) + assert.Equal(t, DefaultWorkflowDelay, config.Autoscaling.LoadGeneration.WorkflowDelay) + assert.Equal(t, DefaultActivitiesPerWorkflow, config.Autoscaling.LoadGeneration.ActivitiesPerWorkflow) + assert.Equal(t, DefaultBatchDelay, config.Autoscaling.LoadGeneration.BatchDelay) + assert.Equal(t, DefaultMinProcessingTime, config.Autoscaling.LoadGeneration.MinProcessingTime) + assert.Equal(t, DefaultMaxProcessingTime, config.Autoscaling.LoadGeneration.MaxProcessingTime) +} diff --git a/cmd/samples/advanced/autoscaling-monitoring/main.go b/cmd/samples/advanced/autoscaling-monitoring/main.go index 2fe67280..ec94ef1f 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/main.go +++ b/cmd/samples/advanced/autoscaling-monitoring/main.go @@ -9,7 +9,6 @@ import ( "github.com/pborman/uuid" "go.uber.org/cadence/client" - "gopkg.in/yaml.v2" "github.com/uber-common/cadence-samples/cmd/samples/common" "github.com/uber-go/tally" @@ -22,18 +21,18 @@ const ( ) func main() { + // Parse command line arguments var mode string - var configFile string - flag.StringVar(&mode, "m", "trigger", "Mode is worker, trigger, or server.") - flag.StringVar(&configFile, "config", "config/autoscaling.yaml", "Configuration file path.") + flag.StringVar(&mode, "m", "worker", "Mode: worker or trigger") flag.Parse() // Load configuration + configFile := "config/autoscaling.yaml" config := loadConfiguration(configFile) // Setup common helper with our configuration var h common.SampleHelper - h.Config = config.Configuration + h.Config = config.ToCommonConfiguration() // Set up logging logger, err := zap.NewDevelopment() @@ -84,24 +83,23 @@ func main() { }, 10) defer closer.Close() - // Set up HTTP handler for metrics endpoint - if config.Prometheus != nil { - go func() { - http.Handle("/metrics", reporter.HTTPHandler()) - logger.Info("Starting Prometheus metrics server", - zap.String("port", config.Prometheus.ListenAddress)) - if err := http.ListenAndServe(config.Prometheus.ListenAddress, nil); err != nil { - logger.Error("Failed to start metrics server", zap.Error(err)) - } - }() - } - // Set up metrics scope for helper h.WorkerMetricScope = scope h.ServiceMetricScope = scope switch mode { case "worker": + // Start metrics server only in worker mode + if config.Prometheus != nil { + go func() { + http.Handle("/metrics", reporter.HTTPHandler()) + logger.Info("Starting Prometheus metrics server", + zap.String("port", config.Prometheus.ListenAddress)) + if err := http.ListenAndServe(config.Prometheus.ListenAddress, nil); err != nil { + logger.Error("Failed to start metrics server", zap.Error(err)) + } + }() + } startWorkers(&h, &config) case "trigger": startWorkflow(&h, &config) @@ -111,57 +109,38 @@ func main() { } } -// loadConfiguration loads the autoscaling configuration from file -func loadConfiguration(configFile string) AutoscalingConfiguration { - // Read config file - configData, err := os.ReadFile(configFile) - if err != nil { - fmt.Printf("Failed to read config file: %v, using defaults\n", err) - return DefaultAutoscalingConfiguration() - } - - // Parse config - var config AutoscalingConfiguration - if err := yaml.Unmarshal(configData, &config); err != nil { - fmt.Printf("Error parsing configuration: %v, using defaults\n", err) - return DefaultAutoscalingConfiguration() - } - - // Ensure base Configuration fields are populated - if config.DomainName == "" { - config.DomainName = "default" - } - if config.ServiceName == "" { - config.ServiceName = "cadence-frontend" - } - if config.HostNameAndPort == "" { - config.HostNameAndPort = "localhost:7833" - } - - fmt.Printf("Loaded configuration from %s\n", configFile) - return config -} - func startWorkers(h *common.SampleHelper, config *AutoscalingConfiguration) { startWorkersWithAutoscaling(h, config) } func startWorkflow(h *common.SampleHelper, config *AutoscalingConfiguration) { workflowOptions := client.StartWorkflowOptions{ - ID: "autoscaling_" + uuid.New(), + ID: fmt.Sprintf("autoscaling_%s", uuid.New()), TaskList: ApplicationName, ExecutionStartToCloseTimeout: time.Minute * 10, DecisionTaskStartToCloseTimeout: time.Minute, } // Use configuration values - iterations := config.Autoscaling.LoadGeneration.Iterations + workflows := config.Autoscaling.LoadGeneration.Workflows + workflowDelay := config.Autoscaling.LoadGeneration.WorkflowDelay + activitiesPerWorkflow := config.Autoscaling.LoadGeneration.ActivitiesPerWorkflow batchDelay := config.Autoscaling.LoadGeneration.BatchDelay minProcessingTime := config.Autoscaling.LoadGeneration.MinProcessingTime maxProcessingTime := config.Autoscaling.LoadGeneration.MaxProcessingTime - h.StartWorkflow(workflowOptions, autoscalingWorkflowName, iterations, batchDelay, minProcessingTime, maxProcessingTime) - fmt.Printf("Started autoscaling workflow with %d iterations\n", iterations) + // Start multiple workflows with delays + for i := 0; i < workflows; i++ { + workflowOptions.ID = fmt.Sprintf("autoscaling_%d_%s", i, uuid.New()) + h.StartWorkflow(workflowOptions, autoscalingWorkflowName, activitiesPerWorkflow, batchDelay, minProcessingTime, maxProcessingTime) + + // Add delay between workflows (except for the last one) + if i < workflows-1 { + time.Sleep(time.Duration(workflowDelay) * time.Second) + } + } + + fmt.Printf("Started %d autoscaling workflows with %d activities each\n", workflows, activitiesPerWorkflow) fmt.Println("Monitor the worker performance and autoscaling behavior in Grafana:") fmt.Println("http://localhost:3000/d/dehkspwgabvuoc/cadence-client") } diff --git a/cmd/samples/advanced/autoscaling-monitoring/workflow.go b/cmd/samples/advanced/autoscaling-monitoring/workflow.go index 3b90fe65..1e7283fa 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/workflow.go +++ b/cmd/samples/advanced/autoscaling-monitoring/workflow.go @@ -13,9 +13,9 @@ const ( // AutoscalingWorkflow demonstrates a workflow that can generate load // to test worker poller autoscaling -func AutoscalingWorkflow(ctx workflow.Context, iterations int, batchDelay int, minProcessingTime, maxProcessingTime int) error { +func AutoscalingWorkflow(ctx workflow.Context, activitiesPerWorkflow int, batchDelay int, minProcessingTime, maxProcessingTime int) error { logger := workflow.GetLogger(ctx) - logger.Info("Autoscaling workflow started", zap.Int("iterations", iterations)) + logger.Info("Autoscaling workflow started", zap.Int("activitiesPerWorkflow", activitiesPerWorkflow)) ao := workflow.ActivityOptions{ ScheduleToStartTimeout: time.Minute, @@ -28,7 +28,7 @@ func AutoscalingWorkflow(ctx workflow.Context, iterations int, batchDelay int, m var futures []workflow.Future // Execute activities in batches to create varying load - for i := 0; i < iterations; i++ { + for i := 0; i < activitiesPerWorkflow; i++ { future := workflow.ExecuteActivity(ctx, LoadGenerationActivity, i, minProcessingTime, maxProcessingTime) futures = append(futures, future) From 3968f6fb3f96f46b46f851b07c7920a7c6f50c4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CKevin=E2=80=9D?= <“kburns2@ext.uber.com”> Date: Fri, 22 Aug 2025 10:35:13 -0700 Subject: [PATCH 14/18] Change workflowDelay and batchDelay config from seconds to ms --- .../advanced/autoscaling-monitoring/README.md | 14 +++++++------- .../advanced/autoscaling-monitoring/config.go | 4 ++-- .../autoscaling-monitoring/config/autoscaling.yaml | 4 ++-- .../advanced/autoscaling-monitoring/main.go | 2 +- .../advanced/autoscaling-monitoring/workflow.go | 4 ++-- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/cmd/samples/advanced/autoscaling-monitoring/README.md b/cmd/samples/advanced/autoscaling-monitoring/README.md index 8118b52c..6aaf44d6 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/README.md +++ b/cmd/samples/advanced/autoscaling-monitoring/README.md @@ -80,11 +80,11 @@ autoscaling: loadGeneration: # Workflow-level settings workflows: 3 # Number of workflows to start - workflowDelay: 2 # Delay between starting workflows (seconds) + workflowDelay: 1000 # Delay between starting workflows (milliseconds) # Activity-level settings (per workflow) activitiesPerWorkflow: 40 # Number of activities per workflow - batchDelay: 2 # Delay between activity batches within workflow (seconds) + batchDelay: 2000 # Delay between activity batches within workflow (milliseconds) # Activity processing time range (milliseconds) minProcessingTime: 1000 @@ -124,9 +124,9 @@ autoscaling: pollerInitCount: 4 loadGeneration: workflows: 3 - workflowDelay: 2 + workflowDelay: 1000 activitiesPerWorkflow: 40 - batchDelay: 2 + batchDelay: 2000 minProcessingTime: 1000 maxProcessingTime: 6000 ``` @@ -139,10 +139,10 @@ The sample supports various load patterns for testing autoscaling behavior: ```yaml loadGeneration: workflows: 3 - workflowDelay: 2 + workflowDelay: 1000 activitiesPerWorkflow: 40 ``` -**Result**: 3 workflows starting 2 seconds apart, each with 40 activities (120 total activities) +**Result**: 3 workflows starting 1 second apart, each with 40 activities (120 total activities) #### **2. Burst Load** ```yaml @@ -157,7 +157,7 @@ loadGeneration: ```yaml loadGeneration: workflows: 5 - workflowDelay: 5 + workflowDelay: 5000 activitiesPerWorkflow: 100 ``` **Result**: 5 long-running workflows with 5-second delays between starts (500 total activities) diff --git a/cmd/samples/advanced/autoscaling-monitoring/config.go b/cmd/samples/advanced/autoscaling-monitoring/config.go index 92f0d114..1dd150cd 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/config.go +++ b/cmd/samples/advanced/autoscaling-monitoring/config.go @@ -57,9 +57,9 @@ const ( DefaultPollerInitCount = 4 DefaultWorkflows = 3 - DefaultWorkflowDelay = 2 + DefaultWorkflowDelay = 1000 DefaultActivitiesPerWorkflow = 40 - DefaultBatchDelay = 2 + DefaultBatchDelay = 2000 DefaultMinProcessingTime = 1000 DefaultMaxProcessingTime = 6000 ) diff --git a/cmd/samples/advanced/autoscaling-monitoring/config/autoscaling.yaml b/cmd/samples/advanced/autoscaling-monitoring/config/autoscaling.yaml index d5910a5a..ceb2f0be 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/config/autoscaling.yaml +++ b/cmd/samples/advanced/autoscaling-monitoring/config/autoscaling.yaml @@ -19,11 +19,11 @@ autoscaling: loadGeneration: # Workflow-level settings workflows: 3 # Number of workflows to start - workflowDelay: 2 # Delay between starting workflows (seconds) + workflowDelay: 400 # Delay between starting workflows (milliseconds) # Activity-level settings (per workflow) activitiesPerWorkflow: 40 # Number of activities per workflow - batchDelay: 2 # Delay between activity batches within workflow (seconds) + batchDelay: 750 # Delay between activity batches within workflow (milliseconds) # Activity processing time range (milliseconds) minProcessingTime: 1000 diff --git a/cmd/samples/advanced/autoscaling-monitoring/main.go b/cmd/samples/advanced/autoscaling-monitoring/main.go index ec94ef1f..5740477d 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/main.go +++ b/cmd/samples/advanced/autoscaling-monitoring/main.go @@ -136,7 +136,7 @@ func startWorkflow(h *common.SampleHelper, config *AutoscalingConfiguration) { // Add delay between workflows (except for the last one) if i < workflows-1 { - time.Sleep(time.Duration(workflowDelay) * time.Second) + time.Sleep(time.Duration(workflowDelay) * time.Millisecond) } } diff --git a/cmd/samples/advanced/autoscaling-monitoring/workflow.go b/cmd/samples/advanced/autoscaling-monitoring/workflow.go index 1e7283fa..d5ee057d 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/workflow.go +++ b/cmd/samples/advanced/autoscaling-monitoring/workflow.go @@ -34,8 +34,8 @@ func AutoscalingWorkflow(ctx workflow.Context, activitiesPerWorkflow int, batchD // Add some delay between batches to simulate real-world patterns // Use batch delay from configuration - if i > 0 && i%10 == 0 { - workflow.Sleep(ctx, time.Duration(batchDelay)*time.Second) + if i > 0 && i % 10 == 0 { + workflow.Sleep(ctx, time.Duration(batchDelay)*time.Millisecond) } } From 1bd7270c59ac646eedf2c29acf570fbe5c54a437 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CKevin=E2=80=9D?= <“kburns2@ext.uber.com”> Date: Thu, 28 Aug 2025 09:12:34 -0700 Subject: [PATCH 15/18] Add a little whitespace between arithmatic operators --- cmd/samples/advanced/autoscaling-monitoring/activities.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/samples/advanced/autoscaling-monitoring/activities.go b/cmd/samples/advanced/autoscaling-monitoring/activities.go index 6e63f15f..b896a79a 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/activities.go +++ b/cmd/samples/advanced/autoscaling-monitoring/activities.go @@ -21,7 +21,7 @@ func LoadGenerationActivity(ctx context.Context, taskID int, minProcessingTime, logger.Info("Load generation activity started", zap.Int("taskID", taskID)) // Simulate variable processing time using configuration values - processingTime := time.Duration(rand.Intn(maxProcessingTime-minProcessingTime)+minProcessingTime) * time.Millisecond + processingTime := time.Duration(rand.Intn(maxProcessingTime - minProcessingTime) + minProcessingTime) * time.Millisecond time.Sleep(processingTime) duration := time.Since(startTime) From 3effba8b1c85ea6e423846137e6d99b17a6d4b97 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CKevin=E2=80=9D?= <“kburns2@ext.uber.com”> Date: Thu, 28 Aug 2025 09:14:19 -0700 Subject: [PATCH 16/18] Remove uneeded unit tests, keeping just the essential 4 tests --- .../advanced/autoscaling-monitoring/README.md | 7 +- .../autoscaling-monitoring/config_test.go | 184 ------------------ 2 files changed, 4 insertions(+), 187 deletions(-) diff --git a/cmd/samples/advanced/autoscaling-monitoring/README.md b/cmd/samples/advanced/autoscaling-monitoring/README.md index 6aaf44d6..e2c82bbe 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/README.md +++ b/cmd/samples/advanced/autoscaling-monitoring/README.md @@ -29,6 +29,9 @@ When running the Cadence server locally with Grafana, you can access the client **Client Dashboards**: http://localhost:3000/d/dehkspwgabvuoc/cadence-client +> **Note**: Make sure to select a Domain in Grafana for the dashboards to display data. The dashboards will be empty until a domain is selected from the dropdown. + + ## Prerequisites 1. **Cadence Server**: Running locally with Docker Compose. @@ -242,7 +245,7 @@ The sample uses Tally with Prometheus reporter for comprehensive metrics: ## Testing -The sample includes comprehensive unit tests for the configuration loading functionality: +The sample includes unit tests for the configuration loading functionality. Run these tests if you make any changes to the config: ### Running Tests ```bash @@ -260,8 +263,6 @@ go test -v -cover The tests cover: - **Successful configuration loading** - Complete YAML files with all fields - **Missing file fallback** - Graceful handling when config file doesn't exist -- **Partial configuration** - YAML files with only some fields specified -- **Malformed YAML handling** - Invalid YAML syntax and field types - **Default value application** - Ensuring all fields have sensible defaults ### Configuration Testing diff --git a/cmd/samples/advanced/autoscaling-monitoring/config_test.go b/cmd/samples/advanced/autoscaling-monitoring/config_test.go index 569b85ba..a9207d9c 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/config_test.go +++ b/cmd/samples/advanced/autoscaling-monitoring/config_test.go @@ -78,190 +78,6 @@ func TestLoadConfiguration_MissingFileFallback(t *testing.T) { assert.Equal(t, DefaultMaxProcessingTime, config.Autoscaling.LoadGeneration.MaxProcessingTime) } -func TestLoadConfiguration_PartialConfiguration(t *testing.T) { - testCases := []struct { - name string - configContent string - expectedDomain string - expectedService string - expectedHost string - expectedPollerMin int - expectedPollerMax int - expectedPollerInit int - description string - }{ - { - name: "Only domain specified", - configContent: ` -domain: "custom-domain" -`, - expectedDomain: "custom-domain", - expectedService: DefaultServiceName, - expectedHost: DefaultHostNameAndPort, - expectedPollerMin: DefaultPollerMinCount, - expectedPollerMax: DefaultPollerMaxCount, - expectedPollerInit: DefaultPollerInitCount, - description: "Only domain field provided", - }, - { - name: "Only autoscaling settings specified", - configContent: ` -autoscaling: - pollerMinCount: 5 - pollerMaxCount: 15 - pollerInitCount: 8 -`, - expectedDomain: DefaultDomainName, - expectedService: DefaultServiceName, - expectedHost: DefaultHostNameAndPort, - expectedPollerMin: 5, - expectedPollerMax: 15, - expectedPollerInit: 8, - description: "Only autoscaling fields provided", - }, - { - name: "Mixed configuration", - configContent: ` -domain: "mixed-domain" -service: "mixed-service" -autoscaling: - pollerMinCount: 1 - loadGeneration: - workflows: 5 - activitiesPerWorkflow: 75 - batchDelay: 3 -`, - expectedDomain: "mixed-domain", - expectedService: "mixed-service", - expectedHost: DefaultHostNameAndPort, - expectedPollerMin: 1, - expectedPollerMax: DefaultPollerMaxCount, - expectedPollerInit: DefaultPollerInitCount, - description: "Mix of provided and default values", - }, - { - name: "Empty file", - configContent: "", - expectedDomain: DefaultDomainName, - expectedService: DefaultServiceName, - expectedHost: DefaultHostNameAndPort, - expectedPollerMin: DefaultPollerMinCount, - expectedPollerMax: DefaultPollerMaxCount, - expectedPollerInit: DefaultPollerInitCount, - description: "Empty YAML file", - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - // Create temporary file - tmpFile, err := os.CreateTemp("", "test-config-*.yaml") - require.NoError(t, err) - defer os.Remove(tmpFile.Name()) - - _, err = tmpFile.WriteString(tc.configContent) - require.NoError(t, err) - tmpFile.Close() - - // Load configuration - config := loadConfiguration(tmpFile.Name()) - - // Validate expected values - assert.Equal(t, tc.expectedDomain, config.DomainName) - assert.Equal(t, tc.expectedService, config.ServiceName) - assert.Equal(t, tc.expectedHost, config.HostNameAndPort) - assert.Equal(t, tc.expectedPollerMin, config.Autoscaling.PollerMinCount) - assert.Equal(t, tc.expectedPollerMax, config.Autoscaling.PollerMaxCount) - assert.Equal(t, tc.expectedPollerInit, config.Autoscaling.PollerInitCount) - - // Validate that other fields have appropriate defaults - if tc.configContent == "" || tc.configContent == ` -domain: "custom-domain" -` { - // Should have all defaults for load generation - assert.Equal(t, DefaultWorkflows, config.Autoscaling.LoadGeneration.Workflows) - assert.Equal(t, DefaultWorkflowDelay, config.Autoscaling.LoadGeneration.WorkflowDelay) - assert.Equal(t, DefaultActivitiesPerWorkflow, config.Autoscaling.LoadGeneration.ActivitiesPerWorkflow) - assert.Equal(t, DefaultBatchDelay, config.Autoscaling.LoadGeneration.BatchDelay) - assert.Equal(t, DefaultMinProcessingTime, config.Autoscaling.LoadGeneration.MinProcessingTime) - assert.Equal(t, DefaultMaxProcessingTime, config.Autoscaling.LoadGeneration.MaxProcessingTime) - } - }) - } -} - -func TestLoadConfiguration_MalformedYAML(t *testing.T) { - testCases := []struct { - name string - configContent string - description string - }{ - { - name: "Invalid YAML syntax", - configContent: ` -domain: "test-domain" -service: "test-service" -host: "test-host:7833" -autoscaling: - pollerMinCount: 3 - pollerMaxCount: 10 - pollerInitCount: 5 - loadGeneration: - workflows: 5 - activitiesPerWorkflow: 100 - batchDelay: 5 - minProcessingTime: 2000 - maxProcessingTime: 8000 -invalid: yaml: syntax: here -`, - description: "YAML with syntax error", - }, - { - name: "Invalid field types", - configContent: ` -domain: "test-domain" -service: "test-service" -host: "test-host:7833" -autoscaling: - pollerMinCount: "not-a-number" - pollerMaxCount: 10 - pollerInitCount: 5 - loadGeneration: - workflows: 5 - activitiesPerWorkflow: 100 - batchDelay: 5 - minProcessingTime: 2000 - maxProcessingTime: 8000 -`, - description: "YAML with invalid field types", - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - // Create temporary file - tmpFile, err := os.CreateTemp("", "test-config-*.yaml") - require.NoError(t, err) - defer os.Remove(tmpFile.Name()) - - _, err = tmpFile.WriteString(tc.configContent) - require.NoError(t, err) - tmpFile.Close() - - // Load configuration - should not panic and should return defaults - config := loadConfiguration(tmpFile.Name()) - - // Validate that default configuration is returned - assert.Equal(t, DefaultDomainName, config.DomainName) - assert.Equal(t, DefaultServiceName, config.ServiceName) - assert.Equal(t, DefaultHostNameAndPort, config.HostNameAndPort) - assert.Equal(t, DefaultPollerMinCount, config.Autoscaling.PollerMinCount) - assert.Equal(t, DefaultPollerMaxCount, config.Autoscaling.PollerMaxCount) - assert.Equal(t, DefaultPollerInitCount, config.Autoscaling.PollerInitCount) - }) - } -} - func TestDefaultAutoscalingConfiguration(t *testing.T) { config := DefaultAutoscalingConfiguration() From b331eea0e16855fa0f380f0842837b922759e1e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CKevin=E2=80=9D?= <“kburns2@ext.uber.com”> Date: Thu, 28 Aug 2025 12:15:08 -0700 Subject: [PATCH 17/18] Merge in changes from master branch --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 4483671c..6a19054b 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/uber-go/tally v3.4.3+incompatible github.com/uber/cadence-idl v0.0.0-20250616185004-cc6f52f87bc6 github.com/uber/jaeger-client-go v2.30.0+incompatible - go.uber.org/cadence v1.3.1-rc.1 + go.uber.org/cadence v1.3.1-rc.8 go.uber.org/yarpc v1.60.0 go.uber.org/zap v1.23.0 gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index c61eecda..13e7a7e6 100644 --- a/go.sum +++ b/go.sum @@ -239,8 +239,8 @@ go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/cadence v1.3.1-rc.1 h1:73TllFy8BNih2/sjGpFv9vz5wXQBfk643vKxsAy1/EM= -go.uber.org/cadence v1.3.1-rc.1/go.mod h1:/Lv4o2eahjro+LKjXmuYm20wg5+84t+h2pu0xm7gUfM= +go.uber.org/cadence v1.3.1-rc.8 h1:9m1h7KZsPqJsNW87iVd3Mgi85PojCPyaUeIdzTjVu3Y= +go.uber.org/cadence v1.3.1-rc.8/go.mod h1:/Lv4o2eahjro+LKjXmuYm20wg5+84t+h2pu0xm7gUfM= go.uber.org/dig v1.8.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw= go.uber.org/dig v1.10.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw= go.uber.org/dig v1.17.0 h1:5Chju+tUvcC+N7N6EV08BJz41UZuO3BmHcN4A287ZLI= From a37ceb73dc399a476dfbc1c90a300a28613b880d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CKevin=E2=80=9D?= <“kburns2@ext.uber.com”> Date: Thu, 28 Aug 2025 13:24:57 -0700 Subject: [PATCH 18/18] Adjust activity timeouts up --- cmd/samples/advanced/autoscaling-monitoring/workflow.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/samples/advanced/autoscaling-monitoring/workflow.go b/cmd/samples/advanced/autoscaling-monitoring/workflow.go index d5ee057d..22c6c17c 100644 --- a/cmd/samples/advanced/autoscaling-monitoring/workflow.go +++ b/cmd/samples/advanced/autoscaling-monitoring/workflow.go @@ -18,8 +18,8 @@ func AutoscalingWorkflow(ctx workflow.Context, activitiesPerWorkflow int, batchD logger.Info("Autoscaling workflow started", zap.Int("activitiesPerWorkflow", activitiesPerWorkflow)) ao := workflow.ActivityOptions{ - ScheduleToStartTimeout: time.Minute, - StartToCloseTimeout: time.Minute, + ScheduleToStartTimeout: time.Minute * 20, + StartToCloseTimeout: time.Minute * 20, HeartbeatTimeout: time.Second * 20, } ctx = workflow.WithActivityOptions(ctx, ao)