Skip to content

Commit cb44ed8

Browse files
authored
Performance Test framework (#271)
* perf: add golden-query harness package * metrics: add perf and flight ingress observability * docs: add perf smoke/nightly runbooks and scripts * remove obsolete local yaml * perf: run smoke/nightly against real duckgres queries * Add frozen DuckLake dataset support for perf harness * remove completed plan * perf: export nightly vars into child bash shells * perf: pass pg password for external pgwire dsn
1 parent b03d2e1 commit cb44ed8

40 files changed

+2985
-33
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ Thumbs.db
2727
*.test
2828
*.out
2929
coverage.html
30+
duckgres.yaml
31+
artifacts/perf/
3032

3133
.gemini/
3234
gha-creds-*.json

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ A PostgreSQL wire protocol compatible server backed by DuckDB. Connect with any
1010

1111
- [Features](#features)
1212
- [Metrics](#metrics)
13+
- [Perf Runbook](#perf-runbook)
1314
- [Quick Start](#quick-start)
1415
- [Configuration](#configuration)
1516
- [YAML Configuration](#yaml-configuration)
@@ -66,15 +67,26 @@ Duckgres exposes Prometheus metrics on `:9090/metrics`. The metrics port is curr
6667
| `duckgres_rate_limited_ips` | Gauge | Number of currently rate-limited IP addresses |
6768
| `duckgres_flight_auth_sessions_active` | Gauge | Number of active Flight auth sessions on the control plane |
6869
| `duckgres_control_plane_workers_active` | Gauge | Number of active control-plane worker processes |
70+
| `duckgres_control_plane_worker_acquire_seconds` | Histogram | Time spent acquiring a worker for a new session |
71+
| `duckgres_control_plane_worker_queue_depth` | Gauge | Approximate number of session requests waiting on worker acquisition |
72+
| `duckgres_control_plane_worker_spawn_seconds` | Histogram | Time spent spawning and health-checking a new worker |
73+
| `duckgres_flight_rpc_duration_seconds{method}` | Histogram | Flight ingress RPC duration by method |
74+
| `duckgres_flight_ingress_sessions_total{outcome}` | Counter | Flight ingress session outcomes (`created|reused|auth_failed|rate_limited|create_failed|token_invalid`) |
6975
| `duckgres_flight_sessions_reaped_total{trigger}` | Counter | Number of Flight auth sessions reaped (`trigger=periodic|forced`) |
7076
| `duckgres_flight_max_workers_retry_total{outcome}` | Counter | Max-worker retry outcomes for Flight session creation (`outcome=attempted|succeeded|failed`) |
7177

7278
### Testing Metrics
7379

7480
- `scripts/test_metrics.sh` - Runs a quick sanity check (starts server, runs queries, verifies counts)
7581
- `scripts/load_generator.sh` - Generates continuous query load until Ctrl-C
82+
- `scripts/perf_smoke.sh` - Runs the golden-query perf harness and writes artifacts to `artifacts/perf/<run_id>`
83+
- `scripts/perf_nightly.sh` - Nightly wrapper with lock/timeout guards and optional artifact upload hook
7684
- `prometheus-docker-compose.yml` - Starts Prometheus locally to scrape metrics (UI at http://localhost:9091)
7785

86+
## Perf Runbook
87+
88+
See [docs/perf-harness-runbook.md](docs/perf-harness-runbook.md) and [tests/perf/README.md](tests/perf/README.md) for local smoke and nightly operations.
89+
7890
## Quick Start
7991

8092
### Build

controlplane/flight_ingress_metrics.go

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
package controlplane
22

3-
import "github.com/prometheus/client_golang/prometheus/promauto"
4-
import "github.com/prometheus/client_golang/prometheus"
3+
import (
4+
"sync/atomic"
5+
"time"
6+
7+
"github.com/prometheus/client_golang/prometheus"
8+
"github.com/prometheus/client_golang/prometheus/promauto"
9+
)
510

611
var flightAuthSessionsGauge = promauto.NewGauge(prometheus.GaugeOpts{
712
Name: "duckgres_flight_auth_sessions_active",
@@ -13,6 +18,25 @@ var controlPlaneWorkersGauge = promauto.NewGauge(prometheus.GaugeOpts{
1318
Help: "Number of active control-plane worker processes",
1419
})
1520

21+
var controlPlaneWorkerAcquireHistogram = promauto.NewHistogram(prometheus.HistogramOpts{
22+
Name: "duckgres_control_plane_worker_acquire_seconds",
23+
Help: "Time spent acquiring a worker for a new session.",
24+
Buckets: prometheus.DefBuckets,
25+
})
26+
27+
var controlPlaneWorkerSpawnHistogram = promauto.NewHistogram(prometheus.HistogramOpts{
28+
Name: "duckgres_control_plane_worker_spawn_seconds",
29+
Help: "Time spent spawning and health-checking a new control-plane worker.",
30+
Buckets: prometheus.DefBuckets,
31+
})
32+
33+
var controlPlaneWorkerQueueDepthGauge = promauto.NewGauge(prometheus.GaugeOpts{
34+
Name: "duckgres_control_plane_worker_queue_depth",
35+
Help: "Approximate number of session requests waiting on worker acquisition.",
36+
})
37+
38+
var controlPlaneWorkerQueueDepth atomic.Int64
39+
1640
var flightSessionsReapedCounter = promauto.NewCounterVec(prometheus.CounterOpts{
1741
Name: "duckgres_flight_sessions_reaped_total",
1842
Help: "Number of Flight auth sessions reaped",
@@ -32,6 +56,29 @@ func observeControlPlaneWorkers(count int) {
3256
controlPlaneWorkersGauge.Set(float64(count))
3357
}
3458

59+
func observeControlPlaneWorkerAcquire(d time.Duration) {
60+
if d < 0 {
61+
d = 0
62+
}
63+
controlPlaneWorkerAcquireHistogram.Observe(d.Seconds())
64+
}
65+
66+
func observeControlPlaneWorkerSpawn(d time.Duration) {
67+
if d < 0 {
68+
d = 0
69+
}
70+
controlPlaneWorkerSpawnHistogram.Observe(d.Seconds())
71+
}
72+
73+
func observeControlPlaneWorkerQueueDepthDelta(delta int64) {
74+
newDepth := controlPlaneWorkerQueueDepth.Add(delta)
75+
if newDepth < 0 {
76+
controlPlaneWorkerQueueDepth.Store(0)
77+
newDepth = 0
78+
}
79+
controlPlaneWorkerQueueDepthGauge.Set(float64(newDepth))
80+
}
81+
3582
func observeFlightSessionsReaped(trigger string, count int) {
3683
if count <= 0 {
3784
return
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package controlplane
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/prometheus/client_golang/prometheus"
8+
dto "github.com/prometheus/client_model/go"
9+
)
10+
11+
func metricGaugeValue(t *testing.T, metricName string) float64 {
12+
t.Helper()
13+
families, err := prometheus.DefaultGatherer.Gather()
14+
if err != nil {
15+
t.Fatalf("failed to gather metrics: %v", err)
16+
}
17+
for _, fam := range families {
18+
if fam.GetName() != metricName {
19+
continue
20+
}
21+
if fam.GetType() != dto.MetricType_GAUGE {
22+
t.Fatalf("metric %q is not a gauge", metricName)
23+
}
24+
if len(fam.GetMetric()) == 0 {
25+
return 0
26+
}
27+
return fam.GetMetric()[0].GetGauge().GetValue()
28+
}
29+
t.Fatalf("metric %q not found", metricName)
30+
return 0
31+
}
32+
33+
func metricHistogramCount(t *testing.T, metricName string) uint64 {
34+
t.Helper()
35+
families, err := prometheus.DefaultGatherer.Gather()
36+
if err != nil {
37+
t.Fatalf("failed to gather metrics: %v", err)
38+
}
39+
for _, fam := range families {
40+
if fam.GetName() != metricName {
41+
continue
42+
}
43+
if fam.GetType() != dto.MetricType_HISTOGRAM {
44+
t.Fatalf("metric %q is not a histogram", metricName)
45+
}
46+
var total uint64
47+
for _, metric := range fam.GetMetric() {
48+
total += metric.GetHistogram().GetSampleCount()
49+
}
50+
return total
51+
}
52+
t.Fatalf("metric %q not found", metricName)
53+
return 0
54+
}
55+
56+
func TestControlPlaneWorkerAcquireAndSpawnMetrics(t *testing.T) {
57+
acquireBefore := metricHistogramCount(t, "duckgres_control_plane_worker_acquire_seconds")
58+
spawnBefore := metricHistogramCount(t, "duckgres_control_plane_worker_spawn_seconds")
59+
60+
observeControlPlaneWorkerAcquire(5 * time.Millisecond)
61+
observeControlPlaneWorkerSpawn(10 * time.Millisecond)
62+
63+
acquireAfter := metricHistogramCount(t, "duckgres_control_plane_worker_acquire_seconds")
64+
spawnAfter := metricHistogramCount(t, "duckgres_control_plane_worker_spawn_seconds")
65+
66+
if acquireAfter-acquireBefore != 1 {
67+
t.Fatalf("expected acquire histogram sample count delta 1, got %d", acquireAfter-acquireBefore)
68+
}
69+
if spawnAfter-spawnBefore != 1 {
70+
t.Fatalf("expected spawn histogram sample count delta 1, got %d", spawnAfter-spawnBefore)
71+
}
72+
}
73+
74+
func TestControlPlaneWorkerQueueDepthGauge(t *testing.T) {
75+
before := metricGaugeValue(t, "duckgres_control_plane_worker_queue_depth")
76+
77+
observeControlPlaneWorkerQueueDepthDelta(1)
78+
mid := metricGaugeValue(t, "duckgres_control_plane_worker_queue_depth")
79+
observeControlPlaneWorkerQueueDepthDelta(-1)
80+
after := metricGaugeValue(t, "duckgres_control_plane_worker_queue_depth")
81+
82+
if mid != before+1 {
83+
t.Fatalf("expected queue depth to increase by 1: before=%f mid=%f", before, mid)
84+
}
85+
if after != before {
86+
t.Fatalf("expected queue depth to return to baseline: before=%f after=%f", before, after)
87+
}
88+
}

controlplane/session_mgr.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ func (sm *SessionManager) CreateSession(ctx context.Context, username string, pi
5757

5858
// Acquire a worker: reuses idle pre-warmed workers or spawns a new one.
5959
// When max-workers is set, this blocks until a slot is available.
60+
observeControlPlaneWorkerQueueDepthDelta(1)
61+
defer observeControlPlaneWorkerQueueDepthDelta(-1)
62+
6063
worker, err := sm.pool.AcquireWorker(ctx)
6164
if err != nil {
6265
return 0, nil, fmt.Errorf("acquire worker: %w", err)

controlplane/worker_mgr.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,11 @@ func (p *FlightWorkerPool) ImportPrebound(sockets []*preboundSocket) {
239239
// binding a new socket (which may fail with EROFS under systemd's
240240
// ProtectSystem=strict after startup).
241241
func (p *FlightWorkerPool) SpawnWorker(id int) error {
242+
spawnStart := time.Now()
243+
defer func() {
244+
observeControlPlaneWorkerSpawn(time.Since(spawnStart))
245+
}()
246+
242247
token := generateToken()
243248

244249
// Try to use a pre-bound socket first. These are bound eagerly at startup
@@ -491,6 +496,11 @@ func (p *FlightWorkerPool) SpawnMinWorkers(count int) error {
491496
// This ensures the number of worker processes never exceeds maxWorkers while
492497
// allowing unlimited concurrent sessions across the fixed pool.
493498
func (p *FlightWorkerPool) AcquireWorker(ctx context.Context) (*ManagedWorker, error) {
499+
acquireStart := time.Now()
500+
defer func() {
501+
observeControlPlaneWorkerAcquire(time.Since(acquireStart))
502+
}()
503+
494504
p.mu.Lock()
495505
if p.shuttingDown {
496506
p.mu.Unlock()

controlplane/worker_mgr_test.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -490,11 +490,13 @@ func TestAcquireWorker_AtomicClaimRace(t *testing.T) {
490490
w := <-results
491491
if w == nil {
492492
t.Fatal("failed to acquire worker")
493+
return
493494
}
494-
if workers[w.ID] {
495-
t.Errorf("worker %d was assigned multiple times!", w.ID)
495+
workerID := w.ID
496+
if workers[workerID] {
497+
t.Errorf("worker %d was assigned multiple times!", workerID)
496498
}
497-
workers[w.ID] = true
499+
workers[workerID] = true
498500
}
499501
}
500502

@@ -660,6 +662,7 @@ func TestLeastLoadedWorkerLocked(t *testing.T) {
660662

661663
if best == nil {
662664
t.Fatal("expected a worker")
665+
return
663666
}
664667
if best.ID != 1 {
665668
t.Fatalf("expected worker 1 (least loaded with 2 sessions), got worker %d", best.ID)
@@ -840,6 +843,7 @@ func TestReleaseWorkerSocketReturnsPrebound(t *testing.T) {
840843
ps := pool.takePrebound()
841844
if ps == nil {
842845
t.Fatal("takePrebound returned nil")
846+
return
843847
}
844848

845849
w := &ManagedWorker{
@@ -971,6 +975,7 @@ func TestReleaseWorkerSocketIdempotent(t *testing.T) {
971975
ps := pool.takePrebound()
972976
if ps == nil {
973977
t.Fatal("takePrebound returned nil")
978+
return
974979
}
975980

976981
w := &ManagedWorker{
@@ -1068,6 +1073,7 @@ func TestImportPrebound(t *testing.T) {
10681073
ps := pool.takePrebound()
10691074
if ps == nil {
10701075
t.Fatal("takePrebound returned nil after import")
1076+
return
10711077
}
10721078
_ = ps.listener.Close()
10731079

@@ -1138,9 +1144,11 @@ func TestTakeAllPreboundThenImport(t *testing.T) {
11381144
ps := pool2.takePrebound()
11391145
if ps == nil {
11401146
t.Fatal("takePrebound from new pool returned nil")
1147+
return
11411148
}
11421149
if ps.listener == nil {
11431150
t.Fatal("listener is nil")
1151+
return
11441152
}
11451153
_ = ps.listener.Close()
11461154

0 commit comments

Comments
 (0)