Skip to content

Commit d11a832

Browse files
committed
Merge branch 'main' into feat-o11y-overhaul
2 parents 6ae4a40 + db140b7 commit d11a832

File tree

17 files changed

+362
-67
lines changed

17 files changed

+362
-67
lines changed

.github/workflows/sdk-typescript.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ jobs:
264264
- name: Run e2e tests
265265
env:
266266
HATCHET_CLIENT_TLS_STRATEGY: none
267-
HATCHET_CLIENT_WORKER_HEALTHCHECK_ENABLED: "true"
267+
HATCHET_CLIENT_WORKER_HEALTHCHECK_ENABLED: "false"
268268
NODE_TLS_REJECT_UNAUTHORIZED: "0"
269269
run: |
270270
echo "Testing current SDK against engine ${{ env.LATEST_TAG }}"

cmd/hatchet-engine/engine/run.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,6 @@ func Run(ctx context.Context, cf *loader.ConfigLoader, version string) error {
102102
return fmt.Errorf("could not run with config: %w", err)
103103
}
104104

105-
time.Sleep(server.Runtime.ShutdownWait)
106-
107105
l.Debug().Msgf("interrupt received, shutting down")
108106

109107
err = cleanup.Run()

cmd/hatchet-loadtest/do.go

Lines changed: 111 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,74 @@ import (
44
"context"
55
"fmt"
66
"log"
7+
"os"
8+
"path/filepath"
79
"time"
10+
11+
"github.com/vicanso/go-charts/v2"
812
)
913

14+
type LatencySnapshot struct {
15+
t time.Time
16+
latency time.Duration
17+
}
18+
19+
type LatencyResult struct {
20+
snapshots []LatencySnapshot
21+
}
22+
23+
func (lr *LatencyResult) GeneratePlot(plotPath string, plotName string) error {
24+
bytes, err := lr.PlotBytes(plotName)
25+
if err != nil {
26+
return err
27+
}
28+
29+
// save to file
30+
f, err := os.Create(filepath.Join(plotPath, fmt.Sprintf("%s_plot.png", plotName)))
31+
if err != nil {
32+
return err
33+
}
34+
defer f.Close()
35+
_, err = f.Write(bytes)
36+
return err
37+
}
38+
39+
func (lr *LatencyResult) PlotBytes(plotName string) ([]byte, error) {
40+
if len(lr.snapshots) == 0 {
41+
return nil, fmt.Errorf("no snapshots available")
42+
}
43+
44+
xvals := make([]string, 0, len(lr.snapshots))
45+
yvals := make([]float64, 0, len(lr.snapshots))
46+
47+
start := lr.snapshots[0].t
48+
49+
for _, s := range lr.snapshots {
50+
elapsed := s.t.Sub(start).Seconds()
51+
xvals = append(xvals, fmt.Sprintf("%.2f", elapsed))
52+
53+
latencyMs := float64(s.latency.Microseconds()) / 1000.0
54+
yvals = append(yvals, latencyMs)
55+
}
56+
57+
p, err := charts.LineRender(
58+
[][]float64{yvals},
59+
charts.TitleTextOptionFunc(fmt.Sprintf("Task %s (ms)", plotName)),
60+
charts.XAxisDataOptionFunc(xvals),
61+
charts.LegendLabelsOptionFunc([]string{"Latency"}),
62+
charts.HeightOptionFunc(500),
63+
charts.WidthOptionFunc(1000),
64+
)
65+
if err != nil {
66+
return nil, err
67+
}
68+
return p.Bytes()
69+
}
70+
1071
type avgResult struct {
11-
count int64
12-
avg time.Duration
72+
count int64
73+
avg time.Duration
74+
latencyResult LatencyResult
1375
}
1476

1577
func do(config LoadTestConfig) error {
@@ -25,22 +87,28 @@ func do(config LoadTestConfig) error {
2587
defer cancel()
2688

2789
ch := make(chan int64, 2)
28-
durations := make(chan time.Duration, config.Events)
90+
durations := make(chan executionEvent, config.Events)
2991

3092
// Compute running average for executed durations using a rolling average.
3193
durationsResult := make(chan avgResult)
3294
go func() {
3395
var count int64
3496
var avg time.Duration
97+
var snapshots []LatencySnapshot
98+
3599
for d := range durations {
36100
count++
37101
if count == 1 {
38-
avg = d
102+
avg = d.duration
39103
} else {
40-
avg += (d - avg) / time.Duration(count)
104+
avg += (d.duration - avg) / time.Duration(count)
41105
}
106+
snapshots = append(snapshots, LatencySnapshot{
107+
t: d.startedAt,
108+
latency: d.duration,
109+
})
42110
}
43-
durationsResult <- avgResult{count: count, avg: avg}
111+
durationsResult <- avgResult{count: count, avg: avg, latencyResult: LatencyResult{snapshots: snapshots}}
44112
}()
45113

46114
// Start worker and ensure it has time to register
@@ -77,15 +145,20 @@ func do(config LoadTestConfig) error {
77145
go func() {
78146
var count int64
79147
var avg time.Duration
148+
var snapshots []LatencySnapshot
80149
for d := range scheduled {
81150
count++
82151
if count == 1 {
83152
avg = d
84153
} else {
85154
avg += (d - avg) / time.Duration(count)
86155
}
156+
snapshots = append(snapshots, LatencySnapshot{
157+
t: time.Now(),
158+
latency: d,
159+
})
87160
}
88-
scheduledResult <- avgResult{count: count, avg: avg}
161+
scheduledResult <- avgResult{count: count, avg: avg, latencyResult: LatencyResult{snapshots: snapshots}}
89162
}()
90163

91164
emitted := emit(ctx, config.Namespace, config.Events, config.Duration, scheduled, config.PayloadSize)
@@ -118,7 +191,37 @@ func do(config LoadTestConfig) error {
118191

119192
log.Printf("ℹ️ final average duration per executed event: %s", finalDurationResult.avg)
120193
log.Printf("ℹ️ final average scheduling time per event: %s", finalScheduledResult.avg)
121-
194+
if ShouldSendSlack() {
195+
log.Printf("ℹ️ sending scheduling/duration plots to Slack")
196+
slackSender := NewSlackSender("hatchet-staging-loadtest-us-west-2")
197+
durationBytes, err := finalDurationResult.latencyResult.PlotBytes("duration")
198+
if err != nil {
199+
log.Printf("❌ failed to generate duration plot: %v ", err)
200+
}
201+
schedulingBytes, err := finalScheduledResult.latencyResult.PlotBytes("scheduling")
202+
if err != nil {
203+
log.Printf("❌ failed to generate scheduling plot: %v ", err)
204+
}
205+
err = slackSender.Send(durationBytes, schedulingBytes, finalDurationResult.avg, finalScheduledResult.avg)
206+
if err != nil {
207+
log.Printf("❌ failed to send duration plots to slack: %v ", err)
208+
}
209+
log.Printf("ℹ️ scheduling/duration successfully plots to Slack")
210+
} else {
211+
log.Printf("ℹ️ not all environment vars for sending plots to Slack enabled...skipping")
212+
}
213+
if config.PlotDir != "" {
214+
log.Printf("ℹ️ exporting scheduling/duration snapshot data")
215+
err := finalScheduledResult.latencyResult.GeneratePlot(config.PlotDir, "scheduling")
216+
if err != nil {
217+
return err
218+
}
219+
err = finalDurationResult.latencyResult.GeneratePlot(config.PlotDir, "duration")
220+
if err != nil {
221+
return err
222+
}
223+
log.Printf("ℹ️ exported scheduling/duration snapshot data")
224+
}
122225
if expected != executed {
123226
log.Printf("⚠️ warning: pushed and executed counts do not match: expected=%d got=%d", expected, executed)
124227
}

cmd/hatchet-loadtest/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type LoadTestConfig struct {
3535
RlLimit int
3636
RlDurationUnit string
3737
AverageDurationThreshold time.Duration
38+
PlotDir string
3839
}
3940

4041
func main() {
@@ -83,7 +84,7 @@ func main() {
8384
loadtest.Flags().StringVar(&config.RlDurationUnit, "rlDurationUnit", "second", "rlDurationUnit specifies the duration unit for the rate limit (second, minute, hour)")
8485
loadtest.Flags().StringVarP(&logLevel, "level", "l", "info", "logLevel specifies the log level (debug, info, warn, error)")
8586
loadtest.Flags().DurationVar(&config.AverageDurationThreshold, "averageDurationThreshold", 100*time.Millisecond, "averageDurationThreshold specifies the threshold for the average duration per executed event to be considered a success")
86-
87+
loadtest.Flags().StringVar(&config.PlotDir, "plotDirectory", "", "plotDirectory specifies where to put the generated plots for latency and task duration")
8788
cmd := &cobra.Command{Use: "app"}
8889
cmd.AddCommand(loadtest)
8990
if err := cmd.Execute(); err != nil {

cmd/hatchet-loadtest/run.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,12 @@ type stepOneOutput struct {
2222
Message string `json:"message"`
2323
}
2424

25-
func run(ctx context.Context, config LoadTestConfig, executions chan<- time.Duration) (int64, int64) {
25+
type executionEvent struct {
26+
startedAt time.Time
27+
duration time.Duration
28+
}
29+
30+
func run(ctx context.Context, config LoadTestConfig, executions chan<- executionEvent) (int64, int64) {
2631
hatchet, err := v1.NewHatchetClient(
2732
v1.Config{
2833
Namespace: config.Namespace,
@@ -44,7 +49,7 @@ func run(ctx context.Context, config LoadTestConfig, executions chan<- time.Dura
4449
l.Info().Msgf("executing %d took %s", input.ID, took)
4550

4651
mx.Lock()
47-
executions <- took
52+
executions <- executionEvent{input.CreatedAt, took}
4853
// detect duplicate in executed slice
4954
var duplicate bool
5055
// for i := 0; i < len(executed)-1; i++ {

cmd/hatchet-loadtest/slack.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
"os"
8+
"time"
9+
10+
"github.com/aws/aws-sdk-go-v2/aws"
11+
"github.com/aws/aws-sdk-go-v2/config"
12+
"github.com/aws/aws-sdk-go-v2/service/s3"
13+
"github.com/google/uuid"
14+
"github.com/slack-go/slack"
15+
)
16+
17+
type SlackSender struct {
18+
s3Client *s3.Client
19+
s3Bucket string
20+
Token string
21+
Channel string
22+
Thread string
23+
}
24+
25+
func NewS3Client(ctx context.Context) (*s3.Client, error) {
26+
cfg, err := config.LoadDefaultConfig(ctx,
27+
config.WithRegion("us-west-2"),
28+
)
29+
if err != nil {
30+
return nil, err
31+
}
32+
33+
client := s3.NewFromConfig(cfg)
34+
return client, nil
35+
}
36+
37+
func ShouldSendSlack() bool {
38+
return (os.Getenv("SLACK_BOT_TOKEN") != "" &&
39+
os.Getenv("SLACK_THREAD_TS") != "" &&
40+
os.Getenv("SLACK_CHANNEL_ID") != "" &&
41+
os.Getenv("AWS_ACCESS_KEY_ID") != "" &&
42+
os.Getenv("AWS_SECRET_ACCESS_KEY") != "")
43+
}
44+
45+
func NewSlackSender(s3Bucket string) *SlackSender {
46+
s3Client, _ := NewS3Client(context.Background())
47+
return &SlackSender{
48+
s3Client: s3Client,
49+
s3Bucket: s3Bucket,
50+
Token: os.Getenv("SLACK_BOT_TOKEN"),
51+
Thread: os.Getenv("SLACK_THREAD_TS"),
52+
Channel: os.Getenv("SLACK_CHANNEL_ID"),
53+
}
54+
}
55+
56+
func (s *SlackSender) SendMessage(durationPlotUrl string, schedulingPlotUrl string, avgDuration time.Duration, avgScheduling time.Duration) error {
57+
text := fmt.Sprintf(
58+
":star:Load test results:star:\nAverage task duration: %s\nAverage task scheduling: %s",
59+
avgDuration.String(),
60+
avgScheduling.String(),
61+
)
62+
63+
section := slack.NewSectionBlock(
64+
slack.NewTextBlockObject("mrkdwn", text, false, false),
65+
nil,
66+
nil,
67+
)
68+
69+
image1 := slack.NewImageBlock(
70+
schedulingPlotUrl,
71+
"Scheduling test graph",
72+
"",
73+
nil,
74+
)
75+
76+
image2 := slack.NewImageBlock(
77+
durationPlotUrl,
78+
"Duration test graph",
79+
"",
80+
nil,
81+
)
82+
83+
blocks := []slack.Block{section, image1, image2}
84+
client := slack.New(s.Token)
85+
_, _, err := client.PostMessage(
86+
s.Channel,
87+
slack.MsgOptionBlocks(blocks...),
88+
slack.MsgOptionTS(s.Thread), // 👈 this attaches it to a thread
89+
)
90+
return err
91+
}
92+
93+
func (s *SlackSender) UploadS3(imageBytes []byte) (*string, error) {
94+
key := fmt.Sprintf("%s-%s-%s", "loadtest-plot", uuid.New(), time.Now().Format("20060102150405"))
95+
_, err := s.s3Client.PutObject(context.Background(), &s3.PutObjectInput{
96+
Bucket: aws.String(s.s3Bucket),
97+
Key: &key,
98+
Body: bytes.NewReader(imageBytes),
99+
})
100+
if err != nil {
101+
return nil, err
102+
}
103+
presigner := s3.NewPresignClient(s.s3Client)
104+
req, err := presigner.PresignGetObject(context.Background(),
105+
&s3.GetObjectInput{
106+
Bucket: &s.s3Bucket,
107+
Key: &key,
108+
},
109+
s3.WithPresignExpires(time.Hour*24*7),
110+
)
111+
if err != nil {
112+
return nil, err
113+
}
114+
115+
uploadedUrl := req.URL
116+
return &uploadedUrl, nil
117+
}
118+
119+
func (s *SlackSender) Send(durationBytes []byte, schedulingBytes []byte, avgDuration time.Duration, avgScheduling time.Duration) error {
120+
uploadedDurationFileUrl, err := s.UploadS3(durationBytes)
121+
if err != nil {
122+
return err
123+
}
124+
uploadedSchedulingFileUrl, err := s.UploadS3(schedulingBytes)
125+
if err != nil {
126+
return err
127+
}
128+
return s.SendMessage(*uploadedDurationFileUrl, *uploadedSchedulingFileUrl, avgDuration, avgScheduling)
129+
}

frontend/app/src/pages/main/v1/logs/components/logs-chart.tsx

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -136,12 +136,6 @@ export function LogsChart({ metrics, since, until, onZoom }: LogsChartProps) {
136136
/>
137137
}
138138
/>
139-
<Bar
140-
dataKey="DEBUG"
141-
stackId="normal"
142-
fill={CHART_CONFIG.DEBUG.color}
143-
isAnimationActive={false}
144-
/>
145139
<Bar
146140
dataKey="INFO"
147141
stackId="normal"
@@ -153,12 +147,21 @@ export function LogsChart({ metrics, since, until, onZoom }: LogsChartProps) {
153147
stackId="normal"
154148
fill={CHART_CONFIG.WARN.color}
155149
isAnimationActive={false}
150+
minPointSize={(value) => (value != null && value > 0 ? 2 : 0)}
151+
/>
152+
<Bar
153+
dataKey="DEBUG"
154+
stackId="normal"
155+
fill={CHART_CONFIG.DEBUG.color}
156+
isAnimationActive={false}
157+
minPointSize={(value) => (value != null && value > 0 ? 2 : 0)}
156158
/>
157159
<Bar
158160
dataKey="ERROR"
159161
stackId="error"
160162
fill={CHART_CONFIG.ERROR.color}
161163
isAnimationActive={false}
164+
minPointSize={(value) => (value != null && value > 0 ? 2 : 0)}
162165
/>
163166

164167
{refAreaLeft && refAreaRight && (

0 commit comments

Comments
 (0)