Skip to content

Commit 4676126

Browse files
authored
Soak test for go client (#617)
* Soak test: Soak test for go client. Measure performance for go clients using datadog.
1 parent 3dfb66a commit 4676126

File tree

6 files changed

+1431
-1
lines changed

6 files changed

+1431
-1
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ install:
8080

8181
script:
8282
# should be replaced with golangci-lint
83-
- if [[ -f .do_lint ]]; then golint -set_exit_status ./examples/... ./kafka/... ./kafkatest/... ; fi
83+
- if [[ -f .do_lint ]]; then golint -set_exit_status ./examples/... ./kafka/... ./kafkatest/... ./soaktest/... ; fi
8484
- for dir in kafka ; do (cd $dir && go test -timeout 180s -v ${BUILD_TYPE} ./...) ; done
8585
- go-kafkacat --help
8686
- library-version

soaktest/README.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Information for soak test for go client
2+
The soak test contains 2 applications:
3+
soakclient.go for one application : Create Producer-1 to keep sending messages to the input-topic.
4+
Create Consumer-2 to receive messages from the output-topic.
5+
6+
soakclient_transaction.go for the other application: Create Consumer-1 to receive messages from the input-topic.
7+
Create transactional producer Producer-2 to sending the received messages to the output-topic.
8+
The initTransaction API registers a transactional.id when creating the producer.
9+
The producer sends the messages to the output-topic.
10+
Commit transaction after sending 100 messages each time or 1 second.
11+
12+
# Instruction on configuration and running
13+
14+
# Build soaktest API
15+
$ cd soaktest
16+
$ go mod init soaktest
17+
$ go mod tidy
18+
$ go install soaktest
19+
20+
# Build and run soakclient.go
21+
$ cd soaktest/soakclient
22+
$ go build
23+
$ ./soakclient -broker <broker> -inputTopic <..> -outTopic <..> -groupID <..> -inputTopicPartitionsNum <..> -outTopicPartitionsNum <..> -replicationFactor <..> -ccloudAPIKey <ccloudAPIKey> -ccloudAPISecret <ccloudAPISecret>
24+
25+
# Build and run soakclient_transaction.go
26+
$ cd soaktest/soakclient_transaction
27+
$ go build
28+
$ ./soakclient_transaction -broker <broker> -inputTopic <..> -outTopic <..> -outTopicPartitionsNum <..> -groupID <..> -transactionID <..> -ccloudAPIKey <ccloudAPIKey> -ccloudAPISecret <ccloudAPISecret>
29+
30+
### The default values if doesn't input from the command lines
31+
inputTopic: "inputTopic"
32+
outTopic: "outTopic"
33+
groupID : "groupID"
34+
inputTopicPartitionsNum: 1
35+
outTopicPartitionsNum: 1
36+
replicationFactor: 1
37+
transactionID: transactionID
38+
39+
# Recommend instance type and size
40+
At lease 4 CPUs and 8 GB volume size

soaktest/datadog.go

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package soaktest
2+
3+
/**
4+
* Copyright 2021 Confluent Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
import (
20+
"github.com/DataDog/datadog-go/statsd"
21+
"github.com/shirou/gopsutil/process"
22+
"os"
23+
"sync"
24+
"syscall"
25+
"time"
26+
)
27+
28+
const datadogHost = "127.0.0.1:8125"
29+
const ddPfx = "kafka.client.soak.go."
30+
31+
var lastRusage syscall.Rusage
32+
var lastRusageTime time.Time
33+
34+
const memoryRss = "memory.rss."
35+
const cpuUser = "cpu.user"
36+
const cpuSystem = "cpu.system"
37+
const memoryRssMax = "memory.rss.max"
38+
39+
var client, err = statsd.New(datadogHost)
40+
var proc, _ = process.NewProcess(int32(os.Getpid()))
41+
42+
// DatadogIncrement submits increment type metrics
43+
func DatadogIncrement(metricsName string, incrval float64, tags []string) {
44+
IncreTags := []string{"environment:dev"}
45+
if tags != nil {
46+
IncreTags = tags
47+
}
48+
client.Incr(ddPfx+metricsName, IncreTags, incrval)
49+
}
50+
51+
// DatadogGauge submits gauge type metrics
52+
func DatadogGauge(metricsName string,
53+
val float64,
54+
tags []string) {
55+
gaugeTags := []string{"environment:dev"}
56+
if tags != nil {
57+
gaugeTags = tags
58+
}
59+
client.Gauge(ddPfx+metricsName, val, gaugeTags, 1)
60+
}
61+
62+
// calcRusageDeltas calculates user CPU usage, system CPU usage
63+
// and max rss memory
64+
func calcRusageDeltas(metricsName string,
65+
curr, prev syscall.Rusage,
66+
elapsed float64) {
67+
// User CPU %
68+
userCPU := ((float64)(curr.Utime.Sec - prev.Utime.Sec)) / elapsed * 100.0
69+
DatadogGauge(metricsName+cpuUser, userCPU, nil)
70+
71+
//System CPU %
72+
sysCPU := ((float64)(curr.Stime.Sec - prev.Stime.Sec)) / elapsed * 100.0
73+
DatadogGauge(metricsName+cpuSystem, sysCPU, nil)
74+
75+
//Max RSS memory (monotonic)
76+
maxRss := float64(curr.Maxrss) / 1024.0
77+
DatadogGauge(metricsName+memoryRssMax, maxRss, nil)
78+
79+
InfoLogger.Printf("User CPU: %f, System CPU: %f, MaxRSS %f MiB\n",
80+
userCPU, sysCPU, maxRss)
81+
82+
}
83+
84+
// GetRusageMetrics is the entrance for resource calculation
85+
// If caught a terminate signal, return, else call the GetRusage() function
86+
// to calculate resource usage every 10 seconds
87+
func GetRusageMetrics(metricsName string,
88+
wg *sync.WaitGroup,
89+
doneChan chan bool,
90+
errorChan chan error) {
91+
defer wg.Done()
92+
ticker := time.NewTicker(10000 * time.Millisecond)
93+
run := true
94+
for run {
95+
select {
96+
case <-doneChan:
97+
run = false
98+
case <-ticker.C:
99+
err := GetRusage(metricsName)
100+
if err != nil {
101+
ErrorLogger.Printf("Failed to get resource usage %s\n", err)
102+
errorChan <- err
103+
return
104+
}
105+
}
106+
}
107+
}
108+
109+
// GetRusage calculates RSS memory usage
110+
func GetRusage(metricsName string) error {
111+
var ru syscall.Rusage
112+
if err := syscall.Getrusage(syscall.RUSAGE_SELF, &ru); err != nil {
113+
ErrorLogger.Printf("Error: unable to gather resource usage data: %s\n",
114+
err)
115+
return err
116+
}
117+
118+
now := time.Now()
119+
if !lastRusageTime.IsZero() {
120+
calcRusageDeltas(metricsName,
121+
ru,
122+
lastRusage,
123+
float64(now.Sub(lastRusageTime)/time.Millisecond))
124+
}
125+
lastRusage = ru
126+
lastRusageTime = now
127+
128+
// Current RSS memory
129+
memoryInfo, err := proc.MemoryInfo()
130+
if err != nil {
131+
ErrorLogger.Printf("Error: unable to gather memory info: %s\n", err)
132+
return err
133+
}
134+
rss := float64(memoryInfo.RSS) / (1024.0 * 1024.0)
135+
DatadogGauge(memoryRss+metricsName, rss, nil)
136+
return nil
137+
}

0 commit comments

Comments
 (0)