|
| 1 | +package main |
| 2 | + |
| 3 | +import ( |
| 4 | + "flag" |
| 5 | + "fmt" |
| 6 | + hdrhistogram "github.com/HdrHistogram/hdrhistogram-go" |
| 7 | + "github.com/mediocregopher/radix/v3" |
| 8 | + "github.com/redislabs/redisgraph-go" |
| 9 | + "golang.org/x/time/rate" |
| 10 | + "log" |
| 11 | + "math" |
| 12 | + "os" |
| 13 | + "os/signal" |
| 14 | + "strings" |
| 15 | + "sync" |
| 16 | + "sync/atomic" |
| 17 | + "time" |
| 18 | +) |
| 19 | + |
| 20 | +var totalCommands uint64 |
| 21 | +var totalErrors uint64 |
| 22 | +var latencies *hdrhistogram.Histogram |
| 23 | +var graphRunTimeLatencies *hdrhistogram.Histogram |
| 24 | + |
| 25 | +const Inf = rate.Limit(math.MaxFloat64) |
| 26 | + |
| 27 | +func ingestionRoutine(rg redisgraph.Graph, continueOnError bool, cmdS string, number_samples uint64, loop bool, debug_level int, wg *sync.WaitGroup, useLimiter bool, rateLimiter *rate.Limiter) { |
| 28 | + defer wg.Done() |
| 29 | + for i := 0; uint64(i) < number_samples || loop; i++ { |
| 30 | + sendCmdLogic(rg, cmdS, continueOnError, debug_level, useLimiter, rateLimiter) |
| 31 | + } |
| 32 | +} |
| 33 | + |
| 34 | +func sendCmdLogic(rg redisgraph.Graph, query string, continueOnError bool, debug_level int, useRateLimiter bool, rateLimiter *rate.Limiter) { |
| 35 | + if useRateLimiter { |
| 36 | + r := rateLimiter.ReserveN(time.Now(), int(1)) |
| 37 | + time.Sleep(r.Delay()) |
| 38 | + } |
| 39 | + var err error |
| 40 | + var queryResult *redisgraph.QueryResult |
| 41 | + startT := time.Now() |
| 42 | + queryResult, err = rg.Query(query) |
| 43 | + endT := time.Now() |
| 44 | + if err != nil { |
| 45 | + if continueOnError { |
| 46 | + atomic.AddUint64(&totalErrors, uint64(1)) |
| 47 | + if debug_level > 0 { |
| 48 | + log.Println(fmt.Sprintf("Received an error with the following query(s): %v, error: %v", query, err)) |
| 49 | + } |
| 50 | + } else { |
| 51 | + log.Fatalf("Received an error with the following query(s): %v, error: %v", query, err) |
| 52 | + } |
| 53 | + } |
| 54 | + duration := endT.Sub(startT) |
| 55 | + err = latencies.RecordValue(duration.Microseconds()) |
| 56 | + if err != nil { |
| 57 | + log.Fatalf("Received an error while recording latencies: %v", err) |
| 58 | + } |
| 59 | + err = graphRunTimeLatencies.RecordValue(int64(queryResult.RunTime() * 1000)) |
| 60 | + if err != nil { |
| 61 | + log.Fatalf("Received an error while recording RedisGraph RunTime latencies: %v", err) |
| 62 | + } |
| 63 | + atomic.AddUint64(&totalCommands, uint64(1)) |
| 64 | +} |
| 65 | + |
| 66 | +func main() { |
| 67 | + host := flag.String("h", "127.0.0.1", "Server hostname.") |
| 68 | + port := flag.Int("p", 6379, "Server port.") |
| 69 | + rps := flag.Int64("rps", 0, "Max rps. If 0 no limit is applied and the DB is stressed up to maximum.") |
| 70 | + password := flag.String("a", "", "Password for Redis Auth.") |
| 71 | + clients := flag.Uint64("c", 50, "number of clients.") |
| 72 | + numberRequests := flag.Uint64("n", 100000, "Total number of requests") |
| 73 | + debug := flag.Int("debug", 0, "Client debug level.") |
| 74 | + loop := flag.Bool("l", false, "Loop. Run the tests forever.") |
| 75 | + graphKey := flag.String("graph-key", "graph", "graph key.") |
| 76 | + flag.Parse() |
| 77 | + args := flag.Args() |
| 78 | + if len(args) < 1 { |
| 79 | + log.Fatalf("You need to specify a query after the flag command arguments.") |
| 80 | + } |
| 81 | + |
| 82 | + var requestRate = Inf |
| 83 | + var requestBurst = 1 |
| 84 | + useRateLimiter := false |
| 85 | + if *rps != 0 { |
| 86 | + requestRate = rate.Limit(*rps) |
| 87 | + requestBurst = int(*clients) |
| 88 | + useRateLimiter = true |
| 89 | + } |
| 90 | + |
| 91 | + var rateLimiter = rate.NewLimiter(requestRate, requestBurst) |
| 92 | + samplesPerClient := *numberRequests / *clients |
| 93 | + client_update_tick := 1 |
| 94 | + latencies = hdrhistogram.New(1, 90000000, 3) |
| 95 | + graphRunTimeLatencies = hdrhistogram.New(1, 90000000, 3) |
| 96 | + opts := make([]radix.DialOpt, 0) |
| 97 | + if *password != "" { |
| 98 | + opts = append(opts, radix.DialAuthPass(*password)) |
| 99 | + } |
| 100 | + connectionStr := fmt.Sprintf("%s:%d", *host, *port) |
| 101 | + stopChan := make(chan struct{}) |
| 102 | + // a WaitGroup for the goroutines to tell us they've stopped |
| 103 | + wg := sync.WaitGroup{} |
| 104 | + if !*loop { |
| 105 | + fmt.Printf("Total clients: %d. Commands per client: %d Total commands: %d\n", *clients, samplesPerClient, *numberRequests) |
| 106 | + } else { |
| 107 | + fmt.Printf("Running in loop until you hit Ctrl+C\n") |
| 108 | + } |
| 109 | + query := strings.Join(args, " ") |
| 110 | + |
| 111 | + for channel_id := 1; uint64(channel_id) <= *clients; channel_id++ { |
| 112 | + wg.Add(1) |
| 113 | + cmd := make([]string, len(args)) |
| 114 | + copy(cmd, args) |
| 115 | + go ingestionRoutine(getStandaloneConn(*graphKey, "tcp", connectionStr), true, query, samplesPerClient, *loop, int(*debug), &wg, useRateLimiter, rateLimiter) |
| 116 | + } |
| 117 | + |
| 118 | + // listen for C-c |
| 119 | + c := make(chan os.Signal, 1) |
| 120 | + signal.Notify(c, os.Interrupt) |
| 121 | + |
| 122 | + tick := time.NewTicker(time.Duration(client_update_tick) * time.Second) |
| 123 | + closed, _, duration, totalMessages, _ := updateCLI(tick, c, *numberRequests, *loop) |
| 124 | + messageRate := float64(totalMessages) / float64(duration.Seconds()) |
| 125 | + p50IngestionMs := float64(latencies.ValueAtQuantile(50.0)) / 1000.0 |
| 126 | + p95IngestionMs := float64(latencies.ValueAtQuantile(95.0)) / 1000.0 |
| 127 | + p99IngestionMs := float64(latencies.ValueAtQuantile(99.0)) / 1000.0 |
| 128 | + |
| 129 | + graph_p50IngestionMs := float64(graphRunTimeLatencies.ValueAtQuantile(50.0)) / 1000.0 |
| 130 | + graph_p95IngestionMs := float64(graphRunTimeLatencies.ValueAtQuantile(95.0)) / 1000.0 |
| 131 | + graph_p99IngestionMs := float64(graphRunTimeLatencies.ValueAtQuantile(99.0)) / 1000.0 |
| 132 | + |
| 133 | + fmt.Printf("\n") |
| 134 | + fmt.Printf("#################################################\n") |
| 135 | + fmt.Printf("Total Duration %.3f Seconds\n", duration.Seconds()) |
| 136 | + fmt.Printf("Total Errors %d\n", totalErrors) |
| 137 | + fmt.Printf("Throughput summary: %.0f requests per second\n", messageRate) |
| 138 | + fmt.Printf("Overall Client Latency summary (msec):\n") |
| 139 | + fmt.Printf(" %9s %9s %9s\n", "p50", "p95", "p99") |
| 140 | + fmt.Printf(" %9.3f %9.3f %9.3f\n", p50IngestionMs, p95IngestionMs, p99IngestionMs) |
| 141 | + fmt.Printf("Overall RedisGraph Internal Execution time Latency summary (msec):\n") |
| 142 | + fmt.Printf(" %9s %9s %9s\n", "p50", "p95", "p99") |
| 143 | + fmt.Printf(" %9.3f %9.3f %9.3f\n", graph_p50IngestionMs, graph_p95IngestionMs, graph_p99IngestionMs) |
| 144 | + |
| 145 | + if closed { |
| 146 | + return |
| 147 | + } |
| 148 | + |
| 149 | + // tell the goroutine to stop |
| 150 | + close(stopChan) |
| 151 | + // and wait for them both to reply back |
| 152 | + wg.Wait() |
| 153 | +} |
| 154 | + |
| 155 | +func updateCLI(tick *time.Ticker, c chan os.Signal, message_limit uint64, loop bool) (bool, time.Time, time.Duration, uint64, []float64) { |
| 156 | + |
| 157 | + start := time.Now() |
| 158 | + prevTime := time.Now() |
| 159 | + prevMessageCount := uint64(0) |
| 160 | + messageRateTs := []float64{} |
| 161 | + fmt.Printf("%26s %7s %25s %25s %7s %25s %25s %25s\n", "Test time", " ", "Total Commands", "Total Errors", "", "Command Rate", "Client p50 with RTT(ms)", "Graph Internal p50 with RTT(ms)") |
| 162 | + for { |
| 163 | + select { |
| 164 | + case <-tick.C: |
| 165 | + { |
| 166 | + now := time.Now() |
| 167 | + took := now.Sub(prevTime) |
| 168 | + messageRate := float64(totalCommands-prevMessageCount) / float64(took.Seconds()) |
| 169 | + completionPercentStr := "[----%]" |
| 170 | + if !loop { |
| 171 | + completionPercent := float64(totalCommands) / float64(message_limit) * 100.0 |
| 172 | + completionPercentStr = fmt.Sprintf("[%3.1f%%]", completionPercent) |
| 173 | + } |
| 174 | + errorPercent := float64(totalErrors) / float64(totalCommands) * 100.0 |
| 175 | + |
| 176 | + p50 := float64(latencies.ValueAtQuantile(50.0)) / 1000.0 |
| 177 | + p50RunTimeGraph := float64(graphRunTimeLatencies.ValueAtQuantile(50.0)) / 1000.0 |
| 178 | + |
| 179 | + if prevMessageCount == 0 && totalCommands != 0 { |
| 180 | + start = time.Now() |
| 181 | + } |
| 182 | + if totalCommands != 0 { |
| 183 | + messageRateTs = append(messageRateTs, messageRate) |
| 184 | + } |
| 185 | + prevMessageCount = totalCommands |
| 186 | + prevTime = now |
| 187 | + |
| 188 | + fmt.Printf("%25.0fs %s %25d %25d [%3.1f%%] %25.2f %25.3f %25.3f\t", time.Since(start).Seconds(), completionPercentStr, totalCommands, totalErrors, errorPercent, messageRate, p50, p50RunTimeGraph) |
| 189 | + fmt.Printf("\r") |
| 190 | + if message_limit > 0 && totalCommands >= uint64(message_limit) && !loop { |
| 191 | + return true, start, time.Since(start), totalCommands, messageRateTs |
| 192 | + } |
| 193 | + |
| 194 | + break |
| 195 | + } |
| 196 | + |
| 197 | + case <-c: |
| 198 | + fmt.Println("\nreceived Ctrl-c - shutting down") |
| 199 | + return true, start, time.Since(start), totalCommands, messageRateTs |
| 200 | + } |
| 201 | + } |
| 202 | +} |
0 commit comments