|
5 | 5 | "fmt" |
6 | 6 | hdrhistogram "github.com/HdrHistogram/hdrhistogram-go" |
7 | 7 | "github.com/RedisGraph/redisgraph-go" |
| 8 | + "github.com/gomodule/redigo/redis" |
8 | 9 | "golang.org/x/time/rate" |
9 | 10 | "log" |
10 | 11 | "math" |
@@ -32,14 +33,14 @@ var graphRunTimeLatencies *hdrhistogram.Histogram |
32 | 33 |
|
33 | 34 | const Inf = rate.Limit(math.MaxFloat64) |
34 | 35 |
|
35 | | -func ingestionRoutine(rg redisgraph.Graph, continueOnError bool, cmdS string, number_samples uint64, loop bool, debug_level int, wg *sync.WaitGroup, useLimiter bool, rateLimiter *rate.Limiter) { |
| 36 | +func ingestionRoutine(rg *redisgraph.Graph, continueOnError bool, cmdS string, number_samples uint64, loop bool, debug_level int, wg *sync.WaitGroup, useLimiter bool, rateLimiter *rate.Limiter) { |
36 | 37 | defer wg.Done() |
37 | 38 | for i := 0; uint64(i) < number_samples || loop; i++ { |
38 | 39 | sendCmdLogic(rg, cmdS, continueOnError, debug_level, useLimiter, rateLimiter) |
39 | 40 | } |
40 | 41 | } |
41 | 42 |
|
42 | | -func sendCmdLogic(rg redisgraph.Graph, query string, continueOnError bool, debug_level int, useRateLimiter bool, rateLimiter *rate.Limiter) { |
| 43 | +func sendCmdLogic(rg *redisgraph.Graph, query string, continueOnError bool, debug_level int, useRateLimiter bool, rateLimiter *rate.Limiter) { |
43 | 44 | if useRateLimiter { |
44 | 45 | r := rateLimiter.ReserveN(time.Now(), int(1)) |
45 | 46 | time.Sleep(r.Delay()) |
@@ -128,20 +129,30 @@ func main() { |
128 | 129 | fmt.Printf("Running in loop until you hit Ctrl+C\n") |
129 | 130 | } |
130 | 131 | query := strings.Join(args, " ") |
| 132 | + rgs := make([]redisgraph.Graph, *clients, *clients) |
| 133 | + conns := make([]redis.Conn, *clients, *clients) |
131 | 134 |
|
132 | | - for channel_id := 1; uint64(channel_id) <= *clients; channel_id++ { |
| 135 | + for client_id := 0; uint64(client_id) < *clients; client_id++ { |
133 | 136 | wg.Add(1) |
134 | 137 | cmd := make([]string, len(args)) |
135 | 138 | copy(cmd, args) |
136 | | - go ingestionRoutine(getStandaloneConn(*graphKey, "tcp", connectionStr), true, query, samplesPerClient, *loop, *debug, &wg, useRateLimiter, rateLimiter) |
| 139 | + rgs[client_id], conns[client_id] = getStandaloneConn(*graphKey, "tcp", connectionStr) |
| 140 | + go ingestionRoutine(&rgs[client_id], true, query, samplesPerClient, *loop, *debug, &wg, useRateLimiter, rateLimiter) |
137 | 141 | } |
138 | 142 |
|
139 | 143 | // listen for C-c |
140 | 144 | c := make(chan os.Signal, 1) |
141 | 145 | signal.Notify(c, os.Interrupt) |
142 | 146 |
|
143 | 147 | tick := time.NewTicker(time.Duration(client_update_tick) * time.Second) |
| 148 | + |
| 149 | + // enter the update loop |
144 | 150 | closed, _, duration, totalMessages, _ := updateCLI(tick, c, *numberRequests, *loop) |
| 151 | + |
| 152 | + // benchmarked ended, close the connections |
| 153 | + for _, standaloneConn := range conns { |
| 154 | + standaloneConn.Close() |
| 155 | + } |
145 | 156 | messageRate := float64(totalMessages) / float64(duration.Seconds()) |
146 | 157 | p50IngestionMs := float64(latencies.ValueAtQuantile(50.0)) / 1000.0 |
147 | 158 | p95IngestionMs := float64(latencies.ValueAtQuantile(95.0)) / 1000.0 |
|
0 commit comments