Skip to content

Commit f2a2eb5

Browse files
Merge pull request #3 from filipecosta90/standalone.support
Wip on MULTI EXEC
2 parents 767c14c + d0bf0da commit f2a2eb5

File tree

1 file changed

+46
-6
lines changed

1 file changed

+46
-6
lines changed

redis-bechmark-go.go

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ func stringWithCharset(length int, charset string) string {
3232
return string(b)
3333
}
3434

35-
func ingestionRoutine(conn radix.Client, continueOnError bool, cmdS []string, keyspacelen, datasize, number_samples uint64, loop bool, debug_level int, wg *sync.WaitGroup, keyplace, dataplace int, useLimiter bool, rateLimiter *rate.Limiter) {
35+
func ingestionRoutine(conn radix.Client, enableMultiExec, continueOnError bool, cmdS []string, keyspacelen, datasize, number_samples uint64, loop bool, debug_level int, wg *sync.WaitGroup, keyplace, dataplace int, useLimiter bool, rateLimiter *rate.Limiter) {
3636
defer wg.Done()
3737
for i := 0; uint64(i) < number_samples || loop; i++ {
3838
rawCurrentCmd, _, _ := keyBuildLogic(keyplace, dataplace, datasize, keyspacelen, cmdS)
39-
sendCmdLogic(conn, rawCurrentCmd, continueOnError, debug_level, useLimiter, rateLimiter)
39+
sendCmdLogic(conn, rawCurrentCmd, enableMultiExec, continueOnError, debug_level, useLimiter, rateLimiter)
4040
}
4141
}
4242

@@ -53,13 +53,48 @@ func keyBuildLogic(keyPos int, dataPos int, datasize, keyspacelen uint64, cmdS [
5353
return rawCmd, key, radix.ClusterSlot([]byte(newCmdS[1]))
5454
}
5555

56-
func sendCmdLogic(conn radix.Client, cmd radix.CmdAction, continueOnError bool, debug_level int, useRateLimiter bool, rateLimiter *rate.Limiter) {
56+
func sendCmdLogic(conn radix.Client, cmd radix.CmdAction, enableMultiExec bool, continueOnError bool, debug_level int, useRateLimiter bool, rateLimiter *rate.Limiter) {
5757
if useRateLimiter {
5858
r := rateLimiter.ReserveN(time.Now(), int(1))
5959
time.Sleep(r.Delay())
6060
}
61+
var err error
6162
startT := time.Now()
62-
var err = conn.Do(cmd)
63+
if enableMultiExec {
64+
key := "userFriends"
65+
err = conn.Do(radix.WithConn(key, func(c radix.Conn) error {
66+
67+
// Begin the transaction with a MULTI command
68+
if err := conn.Do(radix.Cmd(nil, "MULTI")); err != nil {
69+
log.Fatalf("Received an error while preparing for MULTI: %v, error: %v", cmd, err)
70+
}
71+
72+
// If any of the calls after the MULTI call error it's important that
73+
// the transaction is discarded. This isn't strictly necessary if the
74+
// only possible error is a network error, as the connection would be
75+
// closed by the client anyway.
76+
var err error
77+
defer func() {
78+
if err != nil {
79+
// The return from DISCARD doesn't matter. If it's an error then
80+
// it's a network error and the Conn will be closed by the
81+
// client.
82+
conn.Do(radix.Cmd(nil, "DISCARD"))
83+
log.Fatalf("Received an error while in multi: %v, error: %v", cmd, err)
84+
}
85+
}()
86+
87+
// queue up the transaction's commands
88+
err = conn.Do(cmd)
89+
90+
// execute the transaction, capturing the result in a Tuple. We only
91+
// care about the first element (the result from GET), so we discard the
92+
// second by setting nil.
93+
return conn.Do(radix.Cmd(nil, "EXEC"))
94+
}))
95+
} else {
96+
err = conn.Do(cmd)
97+
}
6398
endT := time.Now()
6499
if err != nil {
65100
if continueOnError {
@@ -90,6 +125,7 @@ func main() {
90125
datasize := flag.Uint64("d", 3, "Data size of the expanded string __data__ value in bytes. The benchmark will expand the string __data__ inside an argument with a charset with length specified by this parameter. The substitution changes every time a command is executed.")
91126
numberRequests := flag.Uint64("n", 10000000, "Total number of requests")
92127
debug := flag.Int("debug", 0, "Client debug level.")
128+
multi := flag.Bool("multi", false, "Run each command in multi-exec.")
93129
clusterMode := flag.Bool("oss-cluster", false, "Enable OSS cluster mode.")
94130
loop := flag.Bool("l", false, "Loop. Run the tests forever.")
95131
flag.Parse()
@@ -149,9 +185,13 @@ func main() {
149185
cmd := make([]string, len(args))
150186
copy(cmd, args)
151187
if *clusterMode {
152-
go ingestionRoutine(cluster, true, cmd, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, keyPlaceOlderPos, dataPlaceOlderPos, useRateLimiter, rateLimiter)
188+
go ingestionRoutine(cluster, *multi, true, cmd, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, keyPlaceOlderPos, dataPlaceOlderPos, useRateLimiter, rateLimiter)
153189
} else {
154-
go ingestionRoutine(standalone, true, cmd, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, keyPlaceOlderPos, dataPlaceOlderPos, useRateLimiter, rateLimiter)
190+
if *multi {
191+
go ingestionRoutine(getStandaloneConn(connectionStr, opts, 1), *multi, true, cmd, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, keyPlaceOlderPos, dataPlaceOlderPos, useRateLimiter, rateLimiter)
192+
} else {
193+
go ingestionRoutine(standalone, *multi, true, cmd, *keyspacelen, *datasize, samplesPerClient, *loop, int(*debug), &wg, keyPlaceOlderPos, dataPlaceOlderPos, useRateLimiter, rateLimiter)
194+
}
155195
}
156196
}
157197

0 commit comments

Comments
 (0)