Skip to content

Commit 4fb24e1

Browse files
authored
[synthetics][SYNTH-24584]: Support On-demand Network Tests (#46789)
### What does this PR do? Adds support for On-demand Network tests * Creates a new "OnDemandPoller" and do a double queue with Scheduled and On-demand tests * On-demand tests have higher priority on execution ### Describe how you validated your changes * Unit Tests * Local running of the Agent ### Additional Notes Co-authored-by: rafael.oliveira <rafael.oliveira@datadoghq.com>
1 parent d9be88a commit 4fb24e1

File tree

9 files changed

+566
-61
lines changed

9 files changed

+566
-61
lines changed

comp/syntheticstestscheduler/common/data.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ type SyntheticsTestConfig struct {
7878
OrgID int `json:"org_id"`
7979
MainDC string `json:"main_dc"`
8080
PublicID string `json:"public_id"`
81+
ResultID string `json:"result_id"`
8182
RunType string `json:"run_type"`
8283
}
8384

@@ -148,6 +149,7 @@ func (c *SyntheticsTestConfig) UnmarshalJSON(data []byte) error {
148149
OrgID int `json:"org_id"`
149150
MainDC string `json:"main_dc"`
150151
PublicID string `json:"public_id"`
152+
ResultID string `json:"result_id"`
151153
RunType string `json:"run_type"`
152154
Interval int `json:"tick_every"`
153155
}
@@ -162,6 +164,7 @@ func (c *SyntheticsTestConfig) UnmarshalJSON(data []byte) error {
162164
c.OrgID = tmp.OrgID
163165
c.MainDC = tmp.MainDC
164166
c.PublicID = tmp.PublicID
167+
c.ResultID = tmp.ResultID
165168
c.RunType = tmp.RunType
166169
c.Interval = tmp.Interval
167170
c.Config.Assertions = tmp.Config.Assertions

comp/syntheticstestscheduler/impl/config.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,15 @@
66
package syntheticstestschedulerimpl
77

88
import (
9+
"net/http"
910
"time"
1011

1112
"github.com/DataDog/datadog-agent/comp/core/config"
13+
httputils "github.com/DataDog/datadog-agent/pkg/util/http"
1214
)
1315

16+
const defaultSite = "datadoghq.com"
17+
1418
type schedulerConfigs struct {
1519
workers int
1620
flushInterval time.Duration
@@ -24,3 +28,22 @@ func newSchedulerConfigs(agentConfig config.Component) *schedulerConfigs {
2428
flushInterval: agentConfig.GetDuration("synthetics.collector.flush_interval"),
2529
}
2630
}
31+
32+
type onDemandPollerConfig struct {
33+
site string
34+
apiKey string
35+
httpTransport *http.Transport
36+
}
37+
38+
func newOnDemandPollerConfig(agentConfig config.Component) *onDemandPollerConfig {
39+
site := agentConfig.GetString("site")
40+
if site == "" {
41+
site = defaultSite
42+
}
43+
44+
return &onDemandPollerConfig{
45+
site: site,
46+
apiKey: agentConfig.GetString("api_key"),
47+
httpTransport: httputils.CreateHTTPTransport(agentConfig),
48+
}
49+
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2025-present Datadog, Inc.
5+
6+
package syntheticstestschedulerimpl
7+
8+
import (
9+
"context"
10+
"encoding/json"
11+
"fmt"
12+
"net/http"
13+
"net/url"
14+
"time"
15+
16+
"github.com/DataDog/datadog-agent/comp/core/hostname"
17+
log "github.com/DataDog/datadog-agent/comp/core/log/def"
18+
"github.com/DataDog/datadog-agent/comp/syntheticstestscheduler/common"
19+
)
20+
21+
const (
22+
pollingFrequency = 2 * time.Second
23+
httpRequestTimeout = 10 * time.Second
24+
)
25+
26+
type onDemandTestResponse struct {
27+
Tests []common.SyntheticsTestConfig `json:"tests"`
28+
}
29+
30+
type onDemandPoller struct {
31+
httpClient *http.Client
32+
endpoint string
33+
apiKey string
34+
hostNameService hostname.Component
35+
log log.Component
36+
timeNowFn func() time.Time
37+
TestsChan chan SyntheticsTestCtx
38+
done chan struct{}
39+
}
40+
41+
func newOnDemandPoller(config *onDemandPollerConfig, hostNameService hostname.Component, logger log.Component, timeNowFn func() time.Time) *onDemandPoller {
42+
return &onDemandPoller{
43+
httpClient: &http.Client{Transport: config.httpTransport, Timeout: httpRequestTimeout},
44+
endpoint: "https://intake.synthetics." + config.site + "/api/unstable/synthetics/agents/tests",
45+
apiKey: config.apiKey,
46+
hostNameService: hostNameService,
47+
log: logger,
48+
timeNowFn: timeNowFn,
49+
TestsChan: make(chan SyntheticsTestCtx, 100),
50+
done: make(chan struct{}),
51+
}
52+
}
53+
54+
func (p *onDemandPoller) start(ctx context.Context) {
55+
go p.pollLoop(ctx)
56+
}
57+
58+
func (p *onDemandPoller) stop() {
59+
<-p.done
60+
}
61+
62+
func (p *onDemandPoller) pollLoop(ctx context.Context) {
63+
defer close(p.done)
64+
65+
ticker := time.NewTicker(pollingFrequency)
66+
defer ticker.Stop()
67+
68+
for {
69+
select {
70+
case <-ctx.Done():
71+
return
72+
case <-ticker.C:
73+
tests, err := p.fetchTests(ctx)
74+
if err != nil {
75+
p.log.Debugf("error fetching on-demand tests: %s", err)
76+
continue
77+
}
78+
for _, test := range tests {
79+
select {
80+
case p.TestsChan <- SyntheticsTestCtx{
81+
nextRun: p.timeNowFn(),
82+
cfg: test,
83+
}:
84+
case <-ctx.Done():
85+
return
86+
}
87+
}
88+
}
89+
}
90+
}
91+
92+
func (p *onDemandPoller) fetchTests(ctx context.Context) ([]common.SyntheticsTestConfig, error) {
93+
hostname, err := p.hostNameService.Get(ctx)
94+
if err != nil {
95+
return nil, fmt.Errorf("error getting hostname: %w", err)
96+
}
97+
98+
rawURL := p.endpoint + "?agent_hostname=" + url.QueryEscape(hostname)
99+
u, err := url.Parse(rawURL)
100+
if err != nil {
101+
return nil, fmt.Errorf("invalid endpoint URL: %w", err)
102+
}
103+
104+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
105+
if err != nil {
106+
return nil, err
107+
}
108+
req.Header.Set("DD-API-KEY", p.apiKey)
109+
110+
resp, err := p.httpClient.Do(req)
111+
if err != nil {
112+
return nil, err
113+
}
114+
defer resp.Body.Close()
115+
116+
if resp.StatusCode != http.StatusOK {
117+
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
118+
}
119+
120+
var response onDemandTestResponse
121+
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
122+
return nil, err
123+
}
124+
125+
return response.Tests, nil
126+
}

0 commit comments

Comments
 (0)