9
9
"log"
10
10
"math"
11
11
"math/rand"
12
+ "net"
12
13
"os"
13
14
"os/signal"
14
15
"sync"
@@ -171,7 +172,8 @@ func main() {
171
172
if * password != "" {
172
173
opts = append (opts , radix .DialAuthPass (* password ))
173
174
}
174
- connectionStr := fmt .Sprintf ("%s:%d" , * host , * port )
175
+ ips , _ := net .LookupIP (* host )
176
+
175
177
stopChan := make (chan struct {})
176
178
// a WaitGroup for the goroutines to tell us they've stopped
177
179
wg := sync.WaitGroup {}
@@ -183,15 +185,15 @@ func main() {
183
185
fmt .Printf ("Using random seed: %d\n " , * seed )
184
186
rand .Seed (* seed )
185
187
var cluster * radix.Cluster
186
- var standalone * radix.Pool
187
- if * clusterMode {
188
- cluster = getOSSClusterConn (connectionStr , opts , * clients )
189
- } else {
190
- standalone = getStandaloneConn (connectionStr , opts , * clients )
191
- }
188
+
192
189
datapointsChan := make (chan datapoint , * numberRequests )
193
190
for channel_id := 1 ; uint64 (channel_id ) <= * clients ; channel_id ++ {
194
191
wg .Add (1 )
192
+ connectionStr := fmt .Sprintf ("%s:%d" , ips [rand .Int63n (int64 (len (ips )))], * port )
193
+ if * clusterMode {
194
+ cluster = getOSSClusterConn (connectionStr , opts , * clients )
195
+ }
196
+ fmt .Printf ("Using connection string %s for client %d\n " , connectionStr , channel_id )
195
197
cmd := make ([]string , len (args ))
196
198
copy (cmd , args )
197
199
if * clusterMode {
@@ -200,7 +202,8 @@ func main() {
200
202
if * multi {
201
203
go ingestionRoutine (getStandaloneConn (connectionStr , opts , 1 ), * multi , datapointsChan , true , cmd , * keyspacelen , * datasize , samplesPerClient , * loop , int (* debug ), & wg , keyPlaceOlderPos , dataPlaceOlderPos , useRateLimiter , rateLimiter )
202
204
} else {
203
- go ingestionRoutine (standalone , * multi , datapointsChan , true , cmd , * keyspacelen , * datasize , samplesPerClient , * loop , int (* debug ), & wg , keyPlaceOlderPos , dataPlaceOlderPos , useRateLimiter , rateLimiter )
205
+ go ingestionRoutine (getStandaloneConn (connectionStr , opts , 1 ), * multi , datapointsChan , true , cmd , * keyspacelen , * datasize , samplesPerClient , * loop , int (* debug ), & wg , keyPlaceOlderPos , dataPlaceOlderPos , useRateLimiter , rateLimiter )
206
+ time .Sleep (time .Millisecond * 10 )
204
207
}
205
208
}
206
209
}
@@ -212,6 +215,7 @@ func main() {
212
215
tick := time .NewTicker (time .Duration (client_update_tick ) * time .Second )
213
216
closed , _ , duration , totalMessages , _ := updateCLI (tick , c , * numberRequests , * loop , datapointsChan )
214
217
messageRate := float64 (totalMessages ) / float64 (duration .Seconds ())
218
+ avgMs := float64 (latencies .Mean ()) / 1000.0
215
219
p50IngestionMs := float64 (latencies .ValueAtQuantile (50.0 )) / 1000.0
216
220
p95IngestionMs := float64 (latencies .ValueAtQuantile (95.0 )) / 1000.0
217
221
p99IngestionMs := float64 (latencies .ValueAtQuantile (99.0 )) / 1000.0
@@ -222,8 +226,8 @@ func main() {
222
226
fmt .Printf ("Total Errors %d\n " , totalErrors )
223
227
fmt .Printf ("Throughput summary: %.0f requests per second\n " , messageRate )
224
228
fmt .Printf ("Latency summary (msec):\n " )
225
- fmt .Printf (" %9s %9s %9s\n " , "p50" , "p95" , "p99" )
226
- fmt .Printf (" %9.3f %9.3f %9.3f\n " , p50IngestionMs , p95IngestionMs , p99IngestionMs )
229
+ fmt .Printf (" %9s %9s %9s %9s \n " , "avg " , "p50" , "p95" , "p99" )
230
+ fmt .Printf (" %9.3f %9.3f %9.3f %9.3f \n " , avgMs , p50IngestionMs , p95IngestionMs , p99IngestionMs )
227
231
228
232
if closed {
229
233
return
0 commit comments