diff --git a/README.md b/README.md index 0c4d283..4a8a1ea 100644 --- a/README.md +++ b/README.md @@ -40,26 +40,37 @@ curl https://raw.githubusercontent.com/ConduitIO/benchi/main/install.sh | sh ### Running Benchmarks -The repository includes an [example benchmark](./example). Use the following -command to run the benchmark: +Run benchi and point `-config` to a benchmark [configuration file](#configuration). +The repository includes an [example benchmark](./example), which can be run +using the following command: ```sh benchi -config ./example/bench-kafka-kafka/bench.yml ``` -Running the benchmark will store the results in the `results` folder. Inside the -results folder, you will find a folder named after the current date and time -(e.g. `results/20060102_150405`). This folder will contain logs and results: +### Results + +Running the benchmark will store the results in a folder named after the current +date and time inside of `results` (e.g. `results/20060102_150405`). You can +adjust the output folder using the `-out` flag. + +The output folder will contain two files: + +- `benchi.log`: Log file containing the full output of benchi. +- `aggregated-results.csv`: Aggregated metric results from all collectors and + all tests. The results are aggregated using a + [trimmed mean](https://en.wikipedia.org/wiki/Truncated_mean), where the top + and bottom 5% of the results are removed. Benchi also disregards any 0 values + from the start and end of the test, to accomodate for warm-up and cool-down + periods. + +The output folder will also contain one folder per benchmark run (i.e. per test +and tool combination). Each benchmark run folder will contain: -- `benchi.log`: Log file containing the output of the benchmark run. - `infra.log`: Log file containing the output of the infrastructure docker containers. - `tools.log`: Log file containing the output of the tools docker containers. -- `conduit.csv`: Metrics collected using the [Conduit](#conduit) collector. -- `docker.csv`: Metrics collected using the [Docker](#docker) collector. -- `kafka.csv`: Metrics collected using the [Kafka](#kafka) collector. - -For details about the example benchmark, see the -[example README](./example/README.md). +- `COLLECTOR.csv`: Raw metrics collected using the corresponding + [metrics collector](#collectors). ### Command-Line Flags @@ -90,6 +101,27 @@ networks: external: true ``` +### Environment variables + +Benchi runs all Docker Compose commands using the same environment variables as +the current shell. This means that you can use environment variables to pass +values to your services. + +For instance, having the following Docker Compose configuration: + +```yaml +services: + my-service: + environment: + - MY_ENV_VAR=${MY_ENV_VAR} +``` + +You can inject the environment variable by running Benchi as follows: + +```sh +MY_ENV_VAR=my-value benchi -config ./my-benchmark.yml +``` + ### Configuration Benchi uses a YAML configuration file to define the benchmark in combination diff --git a/cmd/benchi/main.go b/cmd/benchi/main.go index 505bd11..9d13278 100644 --- a/cmd/benchi/main.go +++ b/cmd/benchi/main.go @@ -142,9 +142,11 @@ type mainModel struct { config config.Config resultsDir string dockerClient client.APIClient + testRunners benchi.TestRunners tests []testModel currentTestIndex int + executedTests []int // Indexes of tests that have been executed // Log models for the CLI. infoLogModel internal.LogModel @@ -168,6 +170,10 @@ type mainModelMsgNextTest struct { testIndex int } +type mainModelMsgExportedAggregatedMetrics struct { + err error +} + func newMainModel(infoReader, errorReader io.Reader) mainModel { ctx, ctxCancel := context.WithCancel(context.Background()) cleanupCtx, cleanupCtxCancel := context.WithCancel(context.Background()) @@ -294,6 +300,23 @@ func (mainModel) runTestCmd(index int) tea.Cmd { } } +func (m mainModel) exportAggregatedMetricsCmd() tea.Cmd { + return func() tea.Msg { + if len(m.executedTests) == 0 { + return mainModelMsgExportedAggregatedMetrics{} + } + + // Collect executed test runners + testRunners := make(benchi.TestRunners, 0, len(m.executedTests)) + for _, i := range m.executedTests { + testRunners = append(testRunners, m.testRunners[i]) + } + + err := testRunners.ExportAggregatedMetrics(m.resultsDir) + return mainModelMsgExportedAggregatedMetrics{err: err} + } +} + func (m mainModel) quitCmd() tea.Cmd { return func() tea.Msg { if m.dockerClient != nil { @@ -321,6 +344,7 @@ func (m mainModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { m.config = msg.config m.resultsDir = msg.resultsDir m.dockerClient = msg.dockerClient + m.testRunners = msg.testRunners tests := make([]testModel, len(msg.testRunners)) for i, tr := range msg.testRunners { @@ -347,7 +371,7 @@ func (m mainModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { return m, m.runTestCmd(0) case mainModelMsgNextTest: if msg.testIndex >= len(m.tests) { - return m, m.quitCmd() + return m, m.exportAggregatedMetricsCmd() } m.currentTestIndex = msg.testIndex return m, m.tests[m.currentTestIndex].Init() @@ -358,8 +382,21 @@ func (m mainModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) { // Main context is cancelled, skip to the end. nextIndex = len(m.tests) } + if msg.err != nil { + m.lastError = errors.Join(m.lastError, msg.err) + } else { + // Only store the index of tests that have been executed successfully. + m.executedTests = append(m.executedTests, m.currentTestIndex) + } return m, m.runTestCmd(nextIndex) + case mainModelMsgExportedAggregatedMetrics: + if msg.err != nil { + slog.Error("Failed to export aggregated metrics", "error", msg.err) + m.lastError = errors.Join(m.lastError, msg.err) + } + return m, m.quitCmd() + case tea.KeyMsg: if msg.String() == "ctrl+c" { switch { @@ -446,7 +483,9 @@ type testModelMsgStep struct { err error } -type testModelMsgDone struct{} +type testModelMsgDone struct { + err error +} func newTestModel(ctx context.Context, cleanupCtx context.Context, client client.APIClient, runner *benchi.TestRunner) (testModel, error) { collectorModels := make([]internal.CollectorMonitorModel, 0, len(runner.Collectors())) @@ -489,9 +528,11 @@ func (m testModel) stepCmd(ctx context.Context) tea.Cmd { } } -func (m testModel) doneCmd() tea.Cmd { +func (m testModel) doneCmd(err error) tea.Cmd { return func() tea.Msg { - return testModelMsgDone{} + return testModelMsgDone{ + err: err, + } } } @@ -508,7 +549,7 @@ func (m testModel) Update(msg tea.Msg) (testModel, tea.Cmd) { switch { case m.currentStep == benchi.StepDone: - return m, m.doneCmd() + return m, m.doneCmd(errors.Join(m.errors...)) case m.currentStep >= benchi.StepPreCleanup: // Cleanup steps use the cleanup context. return m, m.stepCmd(m.cleanupCtx) diff --git a/example/README.md b/example/README.md index 92c842c..e657222 100644 --- a/example/README.md +++ b/example/README.md @@ -30,13 +30,15 @@ and Docker, displaying them in the CLI. After the benchmark is complete, the metrics will be exported to CSV files in the output folder (e.g. `results/20060102_150405`). -Benchi will store the results in the `results` folder. Inside the results folder, -you will find a folder named after the current date and time (e.g. -`results/20060102_150405`). This folder will contain logs and results: +The output folder will contain logs and results: - `benchi.log`: Log file containing the output of the benchmark run. -- `infra.log`: Log file containing the output of the infrastructure docker containers. -- `tools.log`: Log file containing the output of the tools docker containers. -- `conduit.csv`: Metrics collected using the [Conduit](../README.md#conduit) collector. -- `docker.csv`: Metrics collected using the [Docker](../README.md##docker) collector. -- `kafka.csv`: Metrics collected using the [Kafka](../README.md##kafka) collector. +- `aggregated-results.csv`: Aggregated metric results from all collectors. +- `kafka-to-kafka_conduit`: Folder containing the logs and metrics for the + `kafka-to-kafka` test and the `conduit` tool. + - `infra.log`: Log file containing the output of the infrastructure docker + containers. + - `tools.log`: Log file containing the output of the tools docker containers. + - `conduit.csv`: Metrics collected using the [Conduit](../README.md#conduit) collector. + - `docker.csv`: Metrics collected using the [Docker](../README.md##docker) collector. + - `kafka.csv`: Metrics collected using the [Kafka](../README.md##kafka) collector. diff --git a/metrics/collector.go b/metrics/collector.go index c5a5094..48623ba 100644 --- a/metrics/collector.go +++ b/metrics/collector.go @@ -32,8 +32,7 @@ type Collector interface { // cancelled. The function should block until the context is cancelled, an // error occurs, or the Stop function is called. Run(ctx context.Context) error - // Results returns the collected metrics. If the collector is collecting - // multiple metrics, the key should be the name of the metric. + // Results returns the collected metrics. Results() []Results } diff --git a/runner.go b/runner.go index 6594b13..192ec54 100644 --- a/runner.go +++ b/runner.go @@ -24,6 +24,7 @@ import ( "fmt" "log/slog" "maps" + "math" "os" "path/filepath" "slices" @@ -49,8 +50,6 @@ const ( DefaultHookImage = "alpine:latest" ) -type TestRunners []*TestRunner - type TestRunnerOptions struct { // ResultsDir is the directory where the test results are stored. ResultsDir string @@ -227,6 +226,138 @@ func findContainerNames(ctx context.Context, files []string) ([]string, error) { return containers, nil } +type TestRunners []*TestRunner + +func (runners TestRunners) ExportAggregatedMetrics(resultsDir string) error { + path := filepath.Join(resultsDir, "aggregated-results.csv") + slog.Info("Exporting aggregated metrics", "path", path) + + headers, records := runners.aggregatedMetrics() + + f, err := os.Create(path) + if err != nil { + return fmt.Errorf("error creating file: %w", err) + } + defer f.Close() + + writer := csv.NewWriter(f) + err = writer.Write(headers) + if err != nil { + return fmt.Errorf("error writing CSV header: %w", err) + } + + err = writer.WriteAll(records) + if err != nil { + return fmt.Errorf("error writing CSV records: %w", err) + } + + err = writer.Error() + if err != nil { + return fmt.Errorf("error writing CSV records: %w", err) + } + + return nil +} + +func (runners TestRunners) aggregatedMetrics() (headers []string, records [][]string) { + headers = []string{"test", "tool"} + + // colIndexes maps the collector name and metric name to the column index in + // the records and headers slices. + colIndexes := make(map[string]map[string]int) + + for _, tr := range runners { + recs := make([]string, len(headers)) + recs[0] = tr.Name() + recs[1] = tr.Tool() + + for _, c := range tr.Collectors() { + indexes := colIndexes[c.Name()] + if indexes == nil { + indexes = make(map[string]int) + colIndexes[c.Name()] = indexes + } + + for _, r := range c.Results() { + idx := indexes[r.Name] + if idx == 0 { + idx = len(headers) + indexes[r.Name] = idx + headers = append(headers, r.Name) + // Backfill records with empty values. + for i := 0; i < len(records); i++ { + records[i] = append(records[i], "") + } + recs = append(recs, "") //nolint:makezero // False positive. + } + + mean, ok := runners.trimmedMean(r.Samples) + if ok { + recs[idx] = fmt.Sprintf("%.2f", mean) + } + } + } + records = append(records, recs) + } + + return headers, records +} + +// trimmedMean calculates the trimmed mean of the samples. It trims any zeros +// from the start and end of the samples, then trims any samples that are more +// than 2 standard deviations from the mean. It returns the trimmed mean and a +// boolean indicating if the trimmed mean was calculated successfully. +func (runners TestRunners) trimmedMean(samples []metrics.Sample) (float64, bool) { + if len(samples) == 0 { + return 0, false + } + + // Trim any zeros from the start and end of the samples. + for len(samples) > 0 && samples[0].Value == 0 { + samples = samples[1:] + } + for len(samples) > 0 && samples[len(samples)-1].Value == 0 { + samples = samples[:len(samples)-1] + } + + if len(samples) == 0 { + return 0, true + } + + n := float64(len(samples)) // Number of samples as a float. + + // Calculate mean and standard deviation + var sum float64 + for _, s := range samples { + sum += s.Value + } + mean := sum / n + + var variance float64 + for _, s := range samples { + variance += (s.Value - mean) * (s.Value - mean) + } + stddev := math.Sqrt(variance / n) + + // Trim samples that are more than 2 standard deviations from the mean. + var trimmed []float64 + lowerBound := mean - 2*stddev + upperBound := mean + 2*stddev + for _, s := range samples { + if s.Value >= lowerBound && s.Value <= upperBound { + trimmed = append(trimmed, s.Value) + } + } + + // Calculate the trimmed mean. + var trimmedSum float64 + for _, s := range trimmed { + trimmedSum += s + } + + return trimmedSum / float64(len(trimmed)), true +} + // TestRunner is a single test run for a single tool. type TestRunner struct { step Step