-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathworkload.go
More file actions
206 lines (191 loc) · 6.83 KB
/
workload.go
File metadata and controls
206 lines (191 loc) · 6.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
package main
import (
"errors"
"fmt"
"log/slog"
"math/rand/v2"
"net"
"os"
"os/signal"
"path/filepath"
"strconv"
"sync"
"syscall"
"time"
"github.com/antithesishq/antithesis-sdk-go/assert"
"github.com/antithesishq/antithesis-sdk-go/lifecycle"
"github.com/antithesishq/valthree/internal/client"
"github.com/antithesishq/valthree/internal/proptest"
"github.com/spf13/cobra"
)
func init() {
rootCmd.AddCommand(workloadCmd)
workloadCmd.Flags().StringSlice("addrs", []string{":6379"}, "Valthree cluster address(es)")
workloadCmd.Flags().Duration("check-timeout", time.Hour, "model checking timeout")
workloadCmd.Flags().String("artifacts", ".", "directory for storing debugging artifacts")
}
var workloadCmd = &cobra.Command{
Use: "workload",
Short: "Start a continuous workload exercising a Valthree cluster",
Run: func(cmd *cobra.Command, args []string) {
// The entry point for the Antithesis workload, which runs indefinitely.
// First, validate the user-supplied flags. Because we're optimizing for
// brevity, we simply crash when flags are invalid.
logger := orFatal(newLogger(cmd.Flags()))
clusterAddrs := orFatal(cmd.Flags().GetStringSlice("addrs"))
checkTimeout := orFatal(cmd.Flags().GetDuration("check-timeout"))
artifactDir := orFatal(cmd.Flags().GetString("artifacts"))
// Before injecting faults, the Antithesis platform lets us verify that our
// system is up and running. We'll check the cluster by waiting for each
// server to respond to a PING.
addrs := make([]net.Addr, len(clusterAddrs))
for i, serverAddr := range clusterAddrs {
logger := logger.With("server_addr", serverAddr)
addr, err := net.ResolveTCPAddr("tcp", serverAddr)
if err != nil {
logger.Error("server addr misconfigured", "err", err)
os.Exit(1)
}
logger.Debug("resolved server addr")
pinger := dial(logger, addr) // blocks until cluster is ready
logger.Debug("pinged server")
pinger.CloseAndLog(logger)
addrs[i] = addr
}
// The cluster is up! Using the Antithesis SDK, we tell the platform that
// we're ready for fault injection.
logger.Info("setup complete", "cluster_addrs", addrs)
lifecycle.SetupComplete(map[string]any{"cluster_addrs": addrs})
// Until the workload gets a signal to stop, exercise the cluster. Each
// iteration generates a random, concurrent workload, records the results,
// and verifies that the cluster is strict serializable.
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
var iterations int
for {
select {
case <-sig:
os.Exit(0)
default:
exerciseAndVerify(iterations, logger, addrs, checkTimeout, artifactDir)
iterations++
}
}
},
}
func exerciseAndVerify(
iteration int,
logger *slog.Logger,
addrs []net.Addr,
timeout time.Duration,
artifactDir string,
) {
seeds := []uint64{rand.Uint64(), rand.Uint64()}
logger = logger.With("pcg_seeds", seeds, "cluster_addrs", addrs)
// Before running this workload, return the cluster to a known state by
// flushing it. This prevents an unclean shutdown from poisoning subsequent
// runs.
logger.Debug("flushing cluster")
client := dial(logger, addrs[0])
for {
if err := client.FlushAll(); err != nil {
logger.Debug("flush failed", "retry_after", time.Second, "err", err)
time.Sleep(time.Second)
continue
}
logger.Debug("flushed cluster")
break
}
// Next, generate a concurrent, randomized workload. The workload is a set of
// instructions, telling each client to execute a series of GET, PUT, and DEL
// commands on a small set of keys.
logger.Debug("generating new workload")
r := rand.New(rand.NewPCG(seeds[0], seeds[1]))
workloads := proptest.GenWorkloads(r)
// In each test run, start without concurrency. This is purely for
// demonstration purposes - real workloads don't need this!
const serialIterations = 16
if iteration < serialIterations && len(workloads) > 1 {
workloads = workloads[:1]
}
if iteration == serialIterations {
logger.Info("allowing concurrent workloads")
}
// Run the workload, recording the timing and result of each operation. To
// maximize concurrent work, we block each client until all the clients are
// ready to begin.
logger.Debug("running workload")
var wg sync.WaitGroup
start := make(chan struct{})
for i, workload := range workloads {
wg.Go(func() {
addr := addrs[i%len(addrs)]
logger := logger.With("client_id", i, "addr", addr)
client := dial(logger, addr)
defer client.CloseAndLog(logger)
<-start
proptest.RunWorkload(logger, client, workload)
})
}
close(start)
wg.Wait()
logger.Debug("workload complete")
// We've run the workload and collected the results. Using the porcupine
// linearizability checker, verify that the operations on each key are
// linearizable - and therefore, that the Valthree key-value store is strong
// serializable. (Etcd, the strong serializable key-value store at the heart
// of Kubernetes, also uses porcupine to check linearizability!)
progress, err := proptest.CheckWorkloads(timeout, workloads)
if err != nil {
// Antithesis reports may include debugging artifacts. In this case,
// porcupine produces an interactive visualization of the consistency bug
// which we'd like to surface.
var perr *proptest.Error
if errors.As(err, &perr) {
fname := fmt.Sprintf("consistency-failure-%s.html", perr.Key)
fpath := filepath.Join(artifactDir, fname)
if err := os.WriteFile(fpath, perr.Visualization.Bytes(), 0644); err != nil {
logger.Error("write model visualization failed", "err", err, "key", perr.Key)
}
}
// Using the Antithesis SDK, tell the platform that we've violated a
// critical system property. Unreachable is the simplest assertion, so it
// just takes a message and loosely-typed details.
//
// If integrating the SDK is difficult, Antithesis can also look for the
// presence or absence of particular log lines.
assert.Unreachable(
// Formally, we've found a violation of strong serializability. But this
// string appears directly in the Antithesis UI, so we avoid academic
// terms to keep the demo accessible to a wide audience.
"Clients can always read their own writes",
map[string]any{"error": err.Error()},
)
logger.Error("strong serializability violated", "err", err)
} else {
percent := strconv.FormatFloat(100*progress, 'f', 1 /* precision */, 64 /* bitsize */)
logger.Info("strong serializability verified", "percent_success", percent)
}
}
func dial(logger *slog.Logger, addr net.Addr) *client.Client {
var usable *client.Client
for {
c, err := client.New(addr)
if err != nil {
logger.Debug("dial failed", "retry_after", time.Second, "err", err)
time.Sleep(time.Second)
continue
}
usable = c
break
}
for {
err := usable.Ping()
if err != nil {
logger.Debug("ping failed", "retry_after", time.Second, "err", err)
time.Sleep(time.Second)
continue
}
return usable
}
}