Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 44 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,37 @@ curl https://raw.githubusercontent.com/ConduitIO/benchi/main/install.sh | sh

### Running Benchmarks

The repository includes an [example benchmark](./example). Use the following
command to run the benchmark:
Run benchi and point `-config` to a benchmark [configuration file](#configuration).
The repository includes an [example benchmark](./example), which can be run
using the following command:

```sh
benchi -config ./example/bench-kafka-kafka/bench.yml
```

Running the benchmark will store the results in the `results` folder. Inside the
results folder, you will find a folder named after the current date and time
(e.g. `results/20060102_150405`). This folder will contain logs and results:
### Results

Running the benchmark will store the results in a folder named after the current
date and time inside of `results` (e.g. `results/20060102_150405`). You can
adjust the output folder using the `-out` flag.

The output folder will contain two files:

- `benchi.log`: Log file containing the full output of benchi.
- `aggregated-results.csv`: Aggregated metric results from all collectors and
all tests. The results are aggregated using a
[trimmed mean](https://en.wikipedia.org/wiki/Truncated_mean), where the top
and bottom 5% of the results are removed. Benchi also disregards any 0 values
from the start and end of the test, to accomodate for warm-up and cool-down
periods.

The output folder will also contain one folder per benchmark run (i.e. per test
and tool combination). Each benchmark run folder will contain:

- `benchi.log`: Log file containing the output of the benchmark run.
- `infra.log`: Log file containing the output of the infrastructure docker containers.
- `tools.log`: Log file containing the output of the tools docker containers.
- `conduit.csv`: Metrics collected using the [Conduit](#conduit) collector.
- `docker.csv`: Metrics collected using the [Docker](#docker) collector.
- `kafka.csv`: Metrics collected using the [Kafka](#kafka) collector.

For details about the example benchmark, see the
[example README](./example/README.md).
- `COLLECTOR.csv`: Raw metrics collected using the corresponding
[metrics collector](#collectors).

### Command-Line Flags

Expand Down Expand Up @@ -90,6 +101,27 @@ networks:
external: true
```

### Environment variables

Benchi runs all Docker Compose commands using the same environment variables as
the current shell. This means that you can use environment variables to pass
values to your services.

For instance, having the following Docker Compose configuration:

```yaml
services:
my-service:
environment:
- MY_ENV_VAR=${MY_ENV_VAR}
```

You can inject the environment variable by running Benchi as follows:

```sh
MY_ENV_VAR=my-value benchi -config ./my-benchmark.yml
```

### Configuration

Benchi uses a YAML configuration file to define the benchmark in combination
Expand Down
51 changes: 46 additions & 5 deletions cmd/benchi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,11 @@ type mainModel struct {
config config.Config
resultsDir string
dockerClient client.APIClient
testRunners benchi.TestRunners

tests []testModel
currentTestIndex int
executedTests []int // Indexes of tests that have been executed

// Log models for the CLI.
infoLogModel internal.LogModel
Expand All @@ -168,6 +170,10 @@ type mainModelMsgNextTest struct {
testIndex int
}

type mainModelMsgExportedAggregatedMetrics struct {
err error
}

func newMainModel(infoReader, errorReader io.Reader) mainModel {
ctx, ctxCancel := context.WithCancel(context.Background())
cleanupCtx, cleanupCtxCancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -294,6 +300,23 @@ func (mainModel) runTestCmd(index int) tea.Cmd {
}
}

func (m mainModel) exportAggregatedMetricsCmd() tea.Cmd {
return func() tea.Msg {
if len(m.executedTests) == 0 {
return mainModelMsgExportedAggregatedMetrics{}
}

// Collect executed test runners
testRunners := make(benchi.TestRunners, 0, len(m.executedTests))
for _, i := range m.executedTests {
testRunners = append(testRunners, m.testRunners[i])
}

err := testRunners.ExportAggregatedMetrics(m.resultsDir)
return mainModelMsgExportedAggregatedMetrics{err: err}
}
}

func (m mainModel) quitCmd() tea.Cmd {
return func() tea.Msg {
if m.dockerClient != nil {
Expand Down Expand Up @@ -321,6 +344,7 @@ func (m mainModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
m.config = msg.config
m.resultsDir = msg.resultsDir
m.dockerClient = msg.dockerClient
m.testRunners = msg.testRunners

tests := make([]testModel, len(msg.testRunners))
for i, tr := range msg.testRunners {
Expand All @@ -347,7 +371,7 @@ func (m mainModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
return m, m.runTestCmd(0)
case mainModelMsgNextTest:
if msg.testIndex >= len(m.tests) {
return m, m.quitCmd()
return m, m.exportAggregatedMetricsCmd()
}
m.currentTestIndex = msg.testIndex
return m, m.tests[m.currentTestIndex].Init()
Expand All @@ -358,8 +382,21 @@ func (m mainModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
// Main context is cancelled, skip to the end.
nextIndex = len(m.tests)
}
if msg.err != nil {
m.lastError = errors.Join(m.lastError, msg.err)
} else {
// Only store the index of tests that have been executed successfully.
m.executedTests = append(m.executedTests, m.currentTestIndex)
}
return m, m.runTestCmd(nextIndex)

case mainModelMsgExportedAggregatedMetrics:
if msg.err != nil {
slog.Error("Failed to export aggregated metrics", "error", msg.err)
m.lastError = errors.Join(m.lastError, msg.err)
}
return m, m.quitCmd()

case tea.KeyMsg:
if msg.String() == "ctrl+c" {
switch {
Expand Down Expand Up @@ -446,7 +483,9 @@ type testModelMsgStep struct {
err error
}

type testModelMsgDone struct{}
type testModelMsgDone struct {
err error
}

func newTestModel(ctx context.Context, cleanupCtx context.Context, client client.APIClient, runner *benchi.TestRunner) (testModel, error) {
collectorModels := make([]internal.CollectorMonitorModel, 0, len(runner.Collectors()))
Expand Down Expand Up @@ -489,9 +528,11 @@ func (m testModel) stepCmd(ctx context.Context) tea.Cmd {
}
}

func (m testModel) doneCmd() tea.Cmd {
func (m testModel) doneCmd(err error) tea.Cmd {
return func() tea.Msg {
return testModelMsgDone{}
return testModelMsgDone{
err: err,
}
}
}

Expand All @@ -508,7 +549,7 @@ func (m testModel) Update(msg tea.Msg) (testModel, tea.Cmd) {

switch {
case m.currentStep == benchi.StepDone:
return m, m.doneCmd()
return m, m.doneCmd(errors.Join(m.errors...))
case m.currentStep >= benchi.StepPreCleanup:
// Cleanup steps use the cleanup context.
return m, m.stepCmd(m.cleanupCtx)
Expand Down
18 changes: 10 additions & 8 deletions example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ and Docker, displaying them in the CLI. After the benchmark is complete, the
metrics will be exported to CSV files in the output folder (e.g.
`results/20060102_150405`).

Benchi will store the results in the `results` folder. Inside the results folder,
you will find a folder named after the current date and time (e.g.
`results/20060102_150405`). This folder will contain logs and results:
The output folder will contain logs and results:

- `benchi.log`: Log file containing the output of the benchmark run.
- `infra.log`: Log file containing the output of the infrastructure docker containers.
- `tools.log`: Log file containing the output of the tools docker containers.
- `conduit.csv`: Metrics collected using the [Conduit](../README.md#conduit) collector.
- `docker.csv`: Metrics collected using the [Docker](../README.md##docker) collector.
- `kafka.csv`: Metrics collected using the [Kafka](../README.md##kafka) collector.
- `aggregated-results.csv`: Aggregated metric results from all collectors.
- `kafka-to-kafka_conduit`: Folder containing the logs and metrics for the
`kafka-to-kafka` test and the `conduit` tool.
- `infra.log`: Log file containing the output of the infrastructure docker
containers.
- `tools.log`: Log file containing the output of the tools docker containers.
- `conduit.csv`: Metrics collected using the [Conduit](../README.md#conduit) collector.
- `docker.csv`: Metrics collected using the [Docker](../README.md##docker) collector.
- `kafka.csv`: Metrics collected using the [Kafka](../README.md##kafka) collector.
3 changes: 1 addition & 2 deletions metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ type Collector interface {
// cancelled. The function should block until the context is cancelled, an
// error occurs, or the Stop function is called.
Run(ctx context.Context) error
// Results returns the collected metrics. If the collector is collecting
// multiple metrics, the key should be the name of the metric.
// Results returns the collected metrics.
Results() []Results
}

Expand Down
135 changes: 133 additions & 2 deletions runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"log/slog"
"maps"
"math"
"os"
"path/filepath"
"slices"
Expand All @@ -49,8 +50,6 @@ const (
DefaultHookImage = "alpine:latest"
)

type TestRunners []*TestRunner

type TestRunnerOptions struct {
// ResultsDir is the directory where the test results are stored.
ResultsDir string
Expand Down Expand Up @@ -227,6 +226,138 @@ func findContainerNames(ctx context.Context, files []string) ([]string, error) {
return containers, nil
}

type TestRunners []*TestRunner

func (runners TestRunners) ExportAggregatedMetrics(resultsDir string) error {
path := filepath.Join(resultsDir, "aggregated-results.csv")
slog.Info("Exporting aggregated metrics", "path", path)

headers, records := runners.aggregatedMetrics()

f, err := os.Create(path)
if err != nil {
return fmt.Errorf("error creating file: %w", err)
}
defer f.Close()

writer := csv.NewWriter(f)
err = writer.Write(headers)
if err != nil {
return fmt.Errorf("error writing CSV header: %w", err)
}

err = writer.WriteAll(records)
if err != nil {
return fmt.Errorf("error writing CSV records: %w", err)
}

err = writer.Error()
if err != nil {
return fmt.Errorf("error writing CSV records: %w", err)
}

return nil
}

func (runners TestRunners) aggregatedMetrics() (headers []string, records [][]string) {
headers = []string{"test", "tool"}

// colIndexes maps the collector name and metric name to the column index in
// the records and headers slices.
colIndexes := make(map[string]map[string]int)

for _, tr := range runners {
recs := make([]string, len(headers))
recs[0] = tr.Name()
recs[1] = tr.Tool()

for _, c := range tr.Collectors() {
indexes := colIndexes[c.Name()]
if indexes == nil {
indexes = make(map[string]int)
colIndexes[c.Name()] = indexes
}

for _, r := range c.Results() {
idx := indexes[r.Name]
if idx == 0 {
idx = len(headers)
indexes[r.Name] = idx
headers = append(headers, r.Name)
// Backfill records with empty values.
for i := 0; i < len(records); i++ {
records[i] = append(records[i], "")
}
recs = append(recs, "") //nolint:makezero // False positive.
}

mean, ok := runners.trimmedMean(r.Samples)
if ok {
recs[idx] = fmt.Sprintf("%.2f", mean)
}
}
}
records = append(records, recs)
}

return headers, records
}

// trimmedMean calculates the trimmed mean of the samples. It trims any zeros
// from the start and end of the samples, then trims any samples that are more
// than 2 standard deviations from the mean. It returns the trimmed mean and a
// boolean indicating if the trimmed mean was calculated successfully.
func (runners TestRunners) trimmedMean(samples []metrics.Sample) (float64, bool) {
if len(samples) == 0 {
return 0, false
}

// Trim any zeros from the start and end of the samples.
for len(samples) > 0 && samples[0].Value == 0 {
samples = samples[1:]
}
for len(samples) > 0 && samples[len(samples)-1].Value == 0 {
samples = samples[:len(samples)-1]
}

if len(samples) == 0 {
return 0, true
}

n := float64(len(samples)) // Number of samples as a float.

// Calculate mean and standard deviation
var sum float64
for _, s := range samples {
sum += s.Value
}
mean := sum / n

var variance float64
for _, s := range samples {
variance += (s.Value - mean) * (s.Value - mean)
}
stddev := math.Sqrt(variance / n)

// Trim samples that are more than 2 standard deviations from the mean.
var trimmed []float64
lowerBound := mean - 2*stddev
upperBound := mean + 2*stddev
for _, s := range samples {
if s.Value >= lowerBound && s.Value <= upperBound {
trimmed = append(trimmed, s.Value)
}
}

// Calculate the trimmed mean.
var trimmedSum float64
for _, s := range trimmed {
trimmedSum += s
}

return trimmedSum / float64(len(trimmed)), true
}

// TestRunner is a single test run for a single tool.
type TestRunner struct {
step Step
Expand Down