Skip to content

Commit 17531fd

Browse files
committed
correct propagation of context, no mutex -> use errgroup instead, one more test for 2 loki queries; query executor and loki parallelization; added todo
1 parent a21de6f commit 17531fd

File tree

7 files changed

+584
-63
lines changed

7 files changed

+584
-63
lines changed

wasp/benchspy/TO_DO.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
Known things to do:
2+
- [ ] add logger
3+
- [ ] add tests
4+
- [ ] write documentation
5+
- [ ] test with Docker app (focus: resources)
6+
- [ ] test with k8s app (focus: resources)
7+
- [ ] add report builder (?)
8+
- [ ] add wrapper function for executing some code and then creating a report
9+
- [ ] think what to do with errors... do we really need a slice?
10+
- [ ] if TestEnd is zero, default to `time.Now()`?
11+
- [ ] add helper method for a profile what would create a report based on all generators?

wasp/benchspy/loki.go

Lines changed: 42 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/pkg/errors"
1212
"github.com/smartcontractkit/chainlink-testing-framework/lib/client"
1313
"github.com/smartcontractkit/chainlink-testing-framework/wasp"
14+
"golang.org/x/sync/errgroup"
1415
)
1516

1617
func NewLokiQuery(queries map[string]string, lokiConfig *wasp.LokiConfig) *LokiQuery {
@@ -65,7 +66,7 @@ func (l *LokiQuery) Validate() error {
6566
return nil
6667
}
6768

68-
func (l *LokiQuery) Execute() error {
69+
func (l *LokiQuery) Execute(ctx context.Context) error {
6970
splitAuth := strings.Split(l.LokiConfig.BasicAuth, ":")
7071
var basicAuth client.LokiBasicAuth
7172
if len(splitAuth) == 2 {
@@ -76,41 +77,54 @@ func (l *LokiQuery) Execute() error {
7677
}
7778

7879
l.QueryResults = make(map[string][]string)
80+
resultCh := make(chan map[string][]string, len(l.Queries))
81+
errGroup, errCtx := errgroup.WithContext(ctx)
7982

80-
// TODO this can also be parallelized, just remember to add a mutex for writing to results map
8183
for name, query := range l.Queries {
82-
queryParams := client.LokiQueryParams{
83-
Query: query,
84-
StartTime: l.StartTime,
85-
EndTime: l.EndTime,
86-
Limit: 1000, //TODO make this configurable
87-
}
84+
errGroup.Go(func() error {
85+
queryParams := client.LokiQueryParams{
86+
Query: query,
87+
StartTime: l.StartTime,
88+
EndTime: l.EndTime,
89+
Limit: 1000, //TODO make this configurable
90+
}
8891

89-
parsedLokiUrl, err := url.Parse(l.LokiConfig.URL)
90-
if err != nil {
91-
return errors.Wrapf(err, "failed to parse Loki URL %s", l.LokiConfig.URL)
92-
}
92+
parsedLokiUrl, err := url.Parse(l.LokiConfig.URL)
93+
if err != nil {
94+
return errors.Wrapf(err, "failed to parse Loki URL %s", l.LokiConfig.URL)
95+
}
9396

94-
lokiUrl := parsedLokiUrl.Scheme + "://" + parsedLokiUrl.Host
95-
lokiClient := client.NewLokiClient(lokiUrl, l.LokiConfig.TenantID, basicAuth, queryParams)
97+
lokiUrl := parsedLokiUrl.Scheme + "://" + parsedLokiUrl.Host
98+
lokiClient := client.NewLokiClient(lokiUrl, l.LokiConfig.TenantID, basicAuth, queryParams)
9699

97-
ctx, cancelFn := context.WithTimeout(context.Background(), l.LokiConfig.Timeout)
98-
rawLogs, err := lokiClient.QueryLogs(ctx)
99-
if err != nil {
100-
l.Errors = append(l.Errors, err)
101-
cancelFn()
102-
continue
103-
}
100+
rawLogs, err := lokiClient.QueryLogs(errCtx)
101+
if err != nil {
102+
return errors.Wrapf(err, "failed to query logs for %s", name)
103+
}
104104

105-
cancelFn()
106-
l.QueryResults[name] = []string{}
107-
for _, log := range rawLogs {
108-
l.QueryResults[name] = append(l.QueryResults[name], log.Log)
109-
}
105+
resultMap := make(map[string][]string)
106+
for _, log := range rawLogs {
107+
resultMap[name] = append(resultMap[name], log.Log)
108+
}
109+
110+
select {
111+
case resultCh <- resultMap:
112+
return nil
113+
case <-errCtx.Done():
114+
return errCtx.Err() // Allows goroutine to exit if timeout occurs
115+
}
116+
})
110117
}
111118

112-
if len(l.Errors) > 0 {
113-
return errors.New("there were errors while fetching the results. Please check the errors and try again")
119+
if err := errGroup.Wait(); err != nil {
120+
return errors.Wrap(err, "failed to execute Loki queries")
121+
}
122+
123+
for i := 0; i < len(l.Queries); i++ {
124+
result := <-resultCh
125+
for name, logs := range result {
126+
l.QueryResults[name] = logs
127+
}
114128
}
115129

116130
return nil

wasp/benchspy/report.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package benchspy
22

33
import (
4+
"context"
45
"encoding/json"
56
"fmt"
7+
8+
"golang.org/x/sync/errgroup"
69
)
710

811
// StandardReport is a report that contains all the necessary data for a performance test
@@ -21,26 +24,35 @@ func (b *StandardReport) Load() error {
2124
return b.LocalReportStorage.Load(b.TestName, b.CommitOrTag, b)
2225
}
2326

24-
func (b *StandardReport) Fetch() error {
27+
func (b *StandardReport) Fetch(ctx context.Context) error {
2528
basicErr := b.BasicData.Validate()
2629
if basicErr != nil {
2730
return basicErr
2831
}
2932

30-
// TODO parallelize it
33+
errGroup, errCtx := errgroup.WithContext(ctx)
34+
3135
for _, queryExecutor := range b.QueryExecutors {
32-
queryExecutor.TimeRange(b.TestStart, b.TestEnd)
36+
errGroup.Go(func() error {
37+
queryExecutor.TimeRange(b.TestStart, b.TestEnd)
3338

34-
if validateErr := queryExecutor.Validate(); validateErr != nil {
35-
return validateErr
36-
}
39+
if validateErr := queryExecutor.Validate(); validateErr != nil {
40+
return validateErr
41+
}
3742

38-
if execErr := queryExecutor.Execute(); execErr != nil {
39-
return execErr
40-
}
43+
if execErr := queryExecutor.Execute(errCtx); execErr != nil {
44+
return execErr
45+
}
46+
47+
return nil
48+
})
49+
}
50+
51+
if err := errGroup.Wait(); err != nil {
52+
return err
4153
}
4254

43-
resourceErr := b.FetchResources()
55+
resourceErr := b.FetchResources(ctx)
4456
if resourceErr != nil {
4557
return resourceErr
4658
}

wasp/benchspy/resources.go

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ import (
55
"fmt"
66
"os"
77
"regexp"
8-
"sync"
9-
"time"
108

119
"github.com/docker/docker/api/types/container"
1210
"github.com/pkg/errors"
@@ -49,14 +47,14 @@ type PodResources struct {
4947
LimitsMemory int64
5048
}
5149

52-
func (r *ResourceReporter) FetchResources() error {
50+
func (r *ResourceReporter) FetchResources(ctx context.Context) error {
5351
if r.ExecutionEnvironment == ExecutionEnvironment_Docker {
54-
err := r.fetchDockerResources()
52+
err := r.fetchDockerResources(ctx)
5553
if err != nil {
5654
return err
5755
}
5856
} else {
59-
err := r.fetchK8sResources()
57+
err := r.fetchK8sResources(ctx)
6058
if err != nil {
6159
return err
6260
}
@@ -65,7 +63,7 @@ func (r *ResourceReporter) FetchResources() error {
6563
return nil
6664
}
6765

68-
func (r *ResourceReporter) fetchK8sResources() error {
66+
func (r *ResourceReporter) fetchK8sResources(ctx context.Context) error {
6967
config, err := rest.InClusterConfig()
7068
if err != nil {
7169
return errors.Wrapf(err, "failed to get in-cluster config, are you sure this is running in a k8s cluster?")
@@ -82,9 +80,9 @@ func (r *ResourceReporter) fetchK8sResources() error {
8280
return errors.Wrapf(err, "failed to read namespace file %s", namespaceFile)
8381
}
8482

85-
pods, err := clientset.CoreV1().Pods(string(namespace)).List(context.TODO(), metav1.ListOptions{})
83+
pods, err := clientset.CoreV1().Pods(string(namespace)).List(ctx, metav1.ListOptions{})
8684
if err != nil {
87-
panic(err)
85+
return errors.Wrapf(err, "failed to list pods in namespace %s", namespace)
8886
}
8987

9088
r.PodsResources = make(map[string]*PodResources)
@@ -101,22 +99,21 @@ func (r *ResourceReporter) fetchK8sResources() error {
10199
return nil
102100
}
103101

104-
func (r *ResourceReporter) fetchDockerResources() error {
102+
func (r *ResourceReporter) fetchDockerResources(ctx context.Context) error {
105103
provider, err := tc.NewDockerProvider()
106104
if err != nil {
107105
return fmt.Errorf("failed to create Docker provider: %w", err)
108106
}
109107

110-
containers, err := provider.Client().ContainerList(context.Background(), container.ListOptions{All: true})
108+
containers, err := provider.Client().ContainerList(ctx, container.ListOptions{All: true})
111109
if err != nil {
112110
return fmt.Errorf("failed to list Docker containers: %w", err)
113111
}
114112

115-
eg := &errgroup.Group{}
113+
eg, errCtx := errgroup.WithContext(ctx)
116114
pattern := regexp.MustCompile(r.ResourceSelectionPattern)
117115

118-
var dockerResources = make(map[string]*DockerResources)
119-
resourceMutex := sync.Mutex{}
116+
resultCh := make(chan map[string]*DockerResources, len(containers))
120117

121118
for _, containerInfo := range containers {
122119
eg.Go(func() error {
@@ -125,32 +122,40 @@ func (r *ResourceReporter) fetchDockerResources() error {
125122
return nil
126123
}
127124

128-
ctx, cancelFn := context.WithTimeout(context.Background(), 30*time.Second)
129-
info, err := provider.Client().ContainerInspect(ctx, containerInfo.ID)
125+
info, err := provider.Client().ContainerInspect(errCtx, containerInfo.ID)
130126
if err != nil {
131-
cancelFn()
132127
return errors.Wrapf(err, "failed to inspect container %s", containerName)
133128
}
134129

135-
cancelFn()
136-
resourceMutex.Lock()
137-
dockerResources[containerName] = &DockerResources{
130+
result := make(map[string]*DockerResources)
131+
result[containerName] = &DockerResources{
138132
NanoCPUs: info.HostConfig.NanoCPUs,
139133
CpuShares: info.HostConfig.CPUShares,
140134
Memory: info.HostConfig.Memory,
141135
MemorySwap: info.HostConfig.MemorySwap,
142136
}
143-
resourceMutex.Unlock()
144137

145-
return nil
138+
select {
139+
case resultCh <- result:
140+
return nil
141+
case <-errCtx.Done():
142+
return errCtx.Err() // Allows goroutine to exit if timeout occurs
143+
}
146144
})
147145
}
148146

149147
if err := eg.Wait(); err != nil {
150148
return errors.Wrapf(err, "failed to fetch Docker resources")
151149
}
152150

153-
r.ContainerResources = dockerResources
151+
r.ContainerResources = make(map[string]*DockerResources)
152+
153+
for i := 0; i < len(containers); i++ {
154+
result := <-resultCh
155+
for name, res := range result {
156+
r.ContainerResources[name] = res
157+
}
158+
}
154159

155160
return nil
156161
}

wasp/benchspy/types.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package benchspy
22

3-
import "time"
3+
import (
4+
"context"
5+
"time"
6+
)
47

58
type Report interface {
69
// Store stores the report in a persistent storage and returns the path to it, or an error
@@ -17,7 +20,7 @@ type QueryExecutor interface {
1720
// Validate checks if the QueryExecutor has all the necessary data and configuration to execute the queries
1821
Validate() error
1922
// Execute executes the queries and populates the QueryExecutor with the results
20-
Execute() error
23+
Execute(ctx context.Context) error
2124
// Results returns the results of the queries, where key is the name of the query and value is the result
2225
Results() map[string][]string
2326
// IsComparable checks whether both QueryExecutors can be compared (e.g. they have the same type, queries are the same, etc.), and returns an error (if any difference is found)

0 commit comments

Comments
 (0)